Skip to content

Commit

Permalink
IGNITE-8474 Fixed WalStateNodeLeaveExchangeTask preventing exchange m…
Browse files Browse the repository at this point in the history
…erge - Fixes apache#3990.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>

(cherry-picked from commit #ebd669e4c53cfd66708ff18dd59071e4aace38ae)
  • Loading branch information
sergey-chugunov-1985 committed May 17, 2018
1 parent b9136ec commit b8ec950
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 45 deletions.
Expand Up @@ -217,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** For tests only. */
private volatile AffinityTopologyVersion exchMergeTestWaitVer;

/** For tests only. */
private volatile List mergedEvtsForTest;

/** Distributed latch manager. */
private ExchangeLatchManager latchMgr;

Expand Down Expand Up @@ -1879,9 +1882,14 @@ private void dumpDiagnosticInfo(IgniteInternalFuture<?> fut,
* For testing only.
*
* @param exchMergeTestWaitVer Version to wait for.
* @param mergedEvtsForTest List to collect discovery events with merged exchanges.
*/
public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) {
public void mergeExchangesTestWaitVersion(
AffinityTopologyVersion exchMergeTestWaitVer,
@Nullable List mergedEvtsForTest
) {
this.exchMergeTestWaitVer = exchMergeTestWaitVer;
this.mergedEvtsForTest = mergedEvtsForTest;
}

/**
Expand Down Expand Up @@ -1968,46 +1976,8 @@ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFu

AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;

if (exchMergeTestWaitVer != null) {
if (log.isInfoEnabled()) {
log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
", waitVer=" + exchMergeTestWaitVer + ']');
}

long end = U.currentTimeMillis() + 10_000;

while (U.currentTimeMillis() < end) {
boolean found = false;

for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
if (task instanceof GridDhtPartitionsExchangeFuture) {
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;

if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
if (log.isInfoEnabled())
log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);

found = true;

break;
}
}
}

if (found)
break;
else {
try {
U.sleep(100);
}
catch (IgniteInterruptedCheckedException e) {
break;
}
}
}

this.exchMergeTestWaitVer = null;
}
if (exchMergeTestWaitVer != null)
waitForTestVersion(exchMergeTestWaitVer, curFut);

synchronized (curFut.mutex()) {
int awaited = 0;
Expand Down Expand Up @@ -2048,6 +2018,8 @@ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFu
", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
}

addDiscoEvtForTest(fut.firstEvent());

curFut.context().events().addEvent(fut.initialVersion(),
fut.firstEvent(),
fut.firstEventCache());
Expand All @@ -2071,6 +2043,67 @@ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFu
}
}


/**
* For testing purposes. Stores discovery events with merged exchanges to enable examining them later.
*
* @param discoEvt Discovery event.
*/
private void addDiscoEvtForTest(DiscoveryEvent discoEvt) {
List mergedEvtsForTest = this.mergedEvtsForTest;

if (mergedEvtsForTest != null)
mergedEvtsForTest.add(discoEvt);
}

/**
* For testing purposes. Method allows to wait for an exchange future of specific version
* to appear in exchange worker queue.
*
* @param exchMergeTestWaitVer Topology Version to wait for.
* @param curFut Current Exchange Future.
*/
private void waitForTestVersion(AffinityTopologyVersion exchMergeTestWaitVer, GridDhtPartitionsExchangeFuture curFut) {
if (log.isInfoEnabled()) {
log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
", waitVer=" + exchMergeTestWaitVer + ']');
}

long end = U.currentTimeMillis() + 10_000;

while (U.currentTimeMillis() < end) {
boolean found = false;

for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
if (task instanceof GridDhtPartitionsExchangeFuture) {
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;

if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
if (log.isInfoEnabled())
log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);

found = true;

break;
}
}
}

if (found)
break;
else {
try {
U.sleep(100);
}
catch (IgniteInterruptedCheckedException e) {
break;
}
}
}

this.exchMergeTestWaitVer = null;
}

/**
* Exchange future thread. All exchanges happen only by one thread and next
* exchange will not start until previous one completes.
Expand Down
Expand Up @@ -47,7 +47,7 @@ public ClusterNode node() {

/** {@inheritDoc} */
@Override public boolean skipForExchangeMerge() {
return false;
return true;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -775,17 +776,37 @@ public void testMergeServersFail1_2() throws Exception {
* @throws Exception If failed.
*/
private void mergeServersFail1(boolean waitRebalance) throws Exception {
final Ignite srv0 = startGrids(4);
final Ignite srv0 = startGrids(5);

if (waitRebalance)
awaitPartitionMapExchange();

mergeExchangeWaitVersion(srv0, 6);
final List<DiscoveryEvent> mergedEvts = new ArrayList<>();

mergeExchangeWaitVersion(srv0, 8, mergedEvts);

UUID grid3Id = grid(3).localNode().id();
UUID grid2Id = grid(2).localNode().id();

stopGrid(getTestIgniteInstanceName(4), true, false);
stopGrid(getTestIgniteInstanceName(3), true, false);
stopGrid(getTestIgniteInstanceName(2), true, false);

checkCaches();

awaitPartitionMapExchange();

assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(), mergedEvts.size() == 2);

for (DiscoveryEvent discoEvt : mergedEvts) {
ClusterNode evtNode = discoEvt.eventNode();

assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode != null);

assertTrue("Unexpected eventNode ID: "
+ evtNode.id() + " while expecting " + grid2Id + " or " + grid3Id,
evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id));
}
}

/**
Expand Down
Expand Up @@ -1949,6 +1949,15 @@ public static String randomString(Random rnd, int maxLen) {
*/
public static void mergeExchangeWaitVersion(Ignite node, long topVer) {
((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
new AffinityTopologyVersion(topVer, 0));
new AffinityTopologyVersion(topVer, 0), null);
}

/**
* @param node Node.
* @param topVer Ready exchange version to wait for before trying to merge exchanges.
*/
public static void mergeExchangeWaitVersion(Ignite node, long topVer, List mergedEvts) {
((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
new AffinityTopologyVersion(topVer, 0), mergedEvts);
}
}

0 comments on commit b8ec950

Please sign in to comment.