Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-8474 skipForExchangeMerge property changed to 'true' #3990

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -217,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** For tests only. */
private volatile AffinityTopologyVersion exchMergeTestWaitVer;

/** For tests only. */
private volatile List mergedEvtsForTest;

/** Distributed latch manager. */
private ExchangeLatchManager latchMgr;

Expand Down Expand Up @@ -1879,9 +1882,14 @@ private void dumpDiagnosticInfo(IgniteInternalFuture<?> fut,
* For testing only.
*
* @param exchMergeTestWaitVer Version to wait for.
* @param mergedEvtsForTest List to collect discovery events with merged exchanges.
*/
public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) {
public void mergeExchangesTestWaitVersion(
AffinityTopologyVersion exchMergeTestWaitVer,
@Nullable List mergedEvtsForTest
) {
this.exchMergeTestWaitVer = exchMergeTestWaitVer;
this.mergedEvtsForTest = mergedEvtsForTest;
}

/**
Expand Down Expand Up @@ -1968,46 +1976,8 @@ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFu

AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;

if (exchMergeTestWaitVer != null) {
if (log.isInfoEnabled()) {
log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
", waitVer=" + exchMergeTestWaitVer + ']');
}

long end = U.currentTimeMillis() + 10_000;

while (U.currentTimeMillis() < end) {
boolean found = false;

for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
if (task instanceof GridDhtPartitionsExchangeFuture) {
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;

if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
if (log.isInfoEnabled())
log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);

found = true;

break;
}
}
}

if (found)
break;
else {
try {
U.sleep(100);
}
catch (IgniteInterruptedCheckedException e) {
break;
}
}
}

this.exchMergeTestWaitVer = null;
}
if (exchMergeTestWaitVer != null)
waitForTestVersion(exchMergeTestWaitVer, curFut);

synchronized (curFut.mutex()) {
int awaited = 0;
Expand Down Expand Up @@ -2048,6 +2018,8 @@ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFu
", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
}

addDiscoEvtForTest(fut.firstEvent());

curFut.context().events().addEvent(fut.initialVersion(),
fut.firstEvent(),
fut.firstEventCache());
Expand All @@ -2071,6 +2043,67 @@ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFu
}
}


/**
* For testing purposes. Stores discovery events with merged exchanges to enable examining them later.
*
* @param discoEvt Discovery event.
*/
private void addDiscoEvtForTest(DiscoveryEvent discoEvt) {
List mergedEvtsForTest = this.mergedEvtsForTest;

if (mergedEvtsForTest != null)
mergedEvtsForTest.add(discoEvt);
}

/**
* For testing purposes. Method allows to wait for an exchange future of specific version
* to appear in exchange worker queue.
*
* @param exchMergeTestWaitVer Topology Version to wait for.
* @param curFut Current Exchange Future.
*/
private void waitForTestVersion(AffinityTopologyVersion exchMergeTestWaitVer, GridDhtPartitionsExchangeFuture curFut) {
if (log.isInfoEnabled()) {
log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() +
", waitVer=" + exchMergeTestWaitVer + ']');
}

long end = U.currentTimeMillis() + 10_000;

while (U.currentTimeMillis() < end) {
boolean found = false;

for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
if (task instanceof GridDhtPartitionsExchangeFuture) {
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;

if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
if (log.isInfoEnabled())
log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer);

found = true;

break;
}
}
}

if (found)
break;
else {
try {
U.sleep(100);
}
catch (IgniteInterruptedCheckedException e) {
break;
}
}
}

this.exchMergeTestWaitVer = null;
}

/**
* Exchange future thread. All exchanges happen only by one thread and next
* exchange will not start until previous one completes.
Expand Down
Expand Up @@ -47,7 +47,7 @@ public ClusterNode node() {

/** {@inheritDoc} */
@Override public boolean skipForExchangeMerge() {
return false;
return true;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -775,17 +776,37 @@ public void testMergeServersFail1_2() throws Exception {
* @throws Exception If failed.
*/
private void mergeServersFail1(boolean waitRebalance) throws Exception {
final Ignite srv0 = startGrids(4);
final Ignite srv0 = startGrids(5);

if (waitRebalance)
awaitPartitionMapExchange();

mergeExchangeWaitVersion(srv0, 6);
final List<DiscoveryEvent> mergedEvts = new ArrayList<>();

mergeExchangeWaitVersion(srv0, 8, mergedEvts);

UUID grid3Id = grid(3).localNode().id();
UUID grid2Id = grid(2).localNode().id();

stopGrid(getTestIgniteInstanceName(4), true, false);
stopGrid(getTestIgniteInstanceName(3), true, false);
stopGrid(getTestIgniteInstanceName(2), true, false);

checkCaches();

awaitPartitionMapExchange();

assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(), mergedEvts.size() == 2);

for (DiscoveryEvent discoEvt : mergedEvts) {
ClusterNode evtNode = discoEvt.eventNode();

assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode != null);

assertTrue("Unexpected eventNode ID: "
+ evtNode.id() + " while expecting " + grid2Id + " or " + grid3Id,
evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id));
}
}

/**
Expand Down
Expand Up @@ -1949,6 +1949,15 @@ public static String randomString(Random rnd, int maxLen) {
*/
public static void mergeExchangeWaitVersion(Ignite node, long topVer) {
((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
new AffinityTopologyVersion(topVer, 0));
new AffinityTopologyVersion(topVer, 0), null);
}

/**
* @param node Node.
* @param topVer Ready exchange version to wait for before trying to merge exchanges.
*/
public static void mergeExchangeWaitVersion(Ignite node, long topVer, List mergedEvts) {
((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
new AffinityTopologyVersion(topVer, 0), mergedEvts);
}
}