Skip to content

Commit

Permalink
IGNITE-9935 Removed redundant multithreaded grid startup. - Fixes #5032.
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitriy Pavlov <dpavlov@apache.org>
  • Loading branch information
xtern authored and dspavlov committed Oct 30, 2018
1 parent 4e628a9 commit cb76ced
Showing 1 changed file with 41 additions and 80 deletions.
Expand Up @@ -18,10 +18,9 @@


import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.AssertionFailedError;
import org.apache.ignite.Ignite; import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
Expand All @@ -39,8 +38,11 @@
import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
Expand Down Expand Up @@ -74,6 +76,9 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
/** */ /** */
private static final int BINARY_TYPE_ID = 708045005; private static final int BINARY_TYPE_ID = 708045005;


/** */
private static final long MAX_AWAIT = 9_000;

/** */ /** */
private static final AtomicInteger metadataReqsCounter = new AtomicInteger(0); private static final AtomicInteger metadataReqsCounter = new AtomicInteger(0);


Expand Down Expand Up @@ -107,81 +112,35 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm
return cfg; return cfg;
} }


/**
*
*/
private static final class ErrorHolder {
/** */
private volatile Error e;

/**
* @param e Exception.
*/
void error(Error e) {
this.e = e;
}

/**
*
*/
void fail() {
throw e;
}

/**
*
*/
boolean isEmpty() {
return e == null;
}
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception { @Override protected void afterTest() throws Exception {
super.afterTest(); super.afterTest();


stopAllGrids(); stopAllGrids();
} }


/** */
private static final CountDownLatch LATCH1 = new CountDownLatch(1);

/** /**
* Verifies that if thread tries to read metadata with ongoing update it gets blocked * Verifies that if thread tries to read metadata with ongoing update it gets blocked
* until acknowledge message arrives. * until acknowledge message arrives.
*/ */
public void testReadRequestBlockedOnUpdatingMetadata() throws Exception { public void testReadRequestBlockedOnUpdatingMetadata() throws Exception {
applyDiscoveryHook = true; final CyclicBarrier barrier = new CyclicBarrier(2);
discoveryHook = new DiscoveryHook() {
@Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
DiscoveryCustomMessage customMsg = msg == null ? null
: (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");

if (customMsg instanceof MetadataUpdateAcceptedMessage) {
if (((MetadataUpdateAcceptedMessage)customMsg).typeId() == BINARY_TYPE_ID)
try {
Thread.sleep(300);
}
catch (InterruptedException ignored) {
// No-op.
}
}
}
};

final IgniteEx ignite0 = startGrid(0);


applyDiscoveryHook = false; applyDiscoveryHook = false;


final IgniteEx ignite1 = startGrid(1); final Ignite ignite0 = startGrid(0);
final Ignite ignite1 = startGrid(1);


final ErrorHolder errorHolder = new ErrorHolder(); final GridFutureAdapter finishFut = new GridFutureAdapter();


applyDiscoveryHook = true; applyDiscoveryHook = true;
discoveryHook = new DiscoveryHook() { discoveryHook = new DiscoveryHook() {
private volatile IgniteEx ignite; private volatile IgniteEx ignite;


@Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) { @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
if (finishFut.isDone())
return;

DiscoveryCustomMessage customMsg = msg == null ? null DiscoveryCustomMessage customMsg = msg == null ? null
: (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate"); : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");


Expand All @@ -192,10 +151,17 @@ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception {
Object transport = U.field(binaryProc, "transport"); Object transport = U.field(binaryProc, "transport");


try { try {
barrier.await(MAX_AWAIT, TimeUnit.MILLISECONDS);

Map syncMap = U.field(transport, "syncMap"); Map syncMap = U.field(transport, "syncMap");


int size = syncMap.size(); GridTestUtils.waitForCondition(new PA() {
assertEquals("unexpected size of syncMap: ", 1, size); @Override public boolean apply() {
return syncMap.size() == 1;
}
}, MAX_AWAIT);

assertEquals("unexpected size of syncMap: ", 1, syncMap.size());


Object syncKey = syncMap.keySet().iterator().next(); Object syncKey = syncMap.keySet().iterator().next();


Expand All @@ -204,9 +170,11 @@ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception {


int ver = U.field(syncKey, "ver"); int ver = U.field(syncKey, "ver");
assertEquals("unexpected pendingVersion: ", 2, ver); assertEquals("unexpected pendingVersion: ", 2, ver);

finishFut.onDone();
} }
catch (AssertionFailedError err) { catch (Throwable t) {
errorHolder.error(err); finishFut.onDone(t);
} }
} }
} }
Expand All @@ -220,42 +188,35 @@ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception {
final IgniteEx ignite2 = startGrid(2); final IgniteEx ignite2 = startGrid(2);
discoveryHook.ignite(ignite2); discoveryHook.ignite(ignite2);


ignite0.executorService().submit(new Runnable() { // Unfinished PME may affect max await timeout.
awaitPartitionMapExchange();

// Update metadata (version 1).
ignite0.executorService(ignite0.cluster().forLocal()).submit(new Runnable() {
@Override public void run() { @Override public void run() {
addIntField(ignite0, "f1", 101, 1); addIntField(ignite0, "f1", 101, 1);
} }
}).get(); }).get();


UUID id2 = ignite2.localNode().id(); // Update metadata (version 2).

ignite1.executorService(ignite1.cluster().forLocal()).submit(new Runnable() {
ClusterGroup cg2 = ignite2.cluster().forNodeId(id2);

Future<?> fut = ignite1.executorService().submit(new Runnable() {
@Override public void run() { @Override public void run() {
LATCH1.countDown();
addStringField(ignite1, "f2", "str", 2); addStringField(ignite1, "f2", "str", 2);
} }
}); });


ignite2.compute(cg2).withAsync().call(new IgniteCallable<Object>() { // Read metadata.
IgniteFuture readFut = ignite2.compute(ignite2.cluster().forLocal()).callAsync(new IgniteCallable<Object>() {
@Override public Object call() throws Exception { @Override public Object call() throws Exception {
try { barrier.await(MAX_AWAIT, TimeUnit.MILLISECONDS);
LATCH1.await();
}
catch (InterruptedException ignored) {
// No-op.
}

Object fieldVal = ((BinaryObject) ignite2.cache(DEFAULT_CACHE_NAME).withKeepBinary().get(1)).field("f1");


return fieldVal; return ((BinaryObject) ignite2.cache(DEFAULT_CACHE_NAME).withKeepBinary().get(1)).field("f1");
} }
}); });


fut.get(); finishFut.get(MAX_AWAIT);


if (!errorHolder.isEmpty()) assertEquals(101, readFut.get(MAX_AWAIT));
errorHolder.fail();
} }


/** /**
Expand Down

0 comments on commit cb76ced

Please sign in to comment.