diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java index 01ef73d167e9c..03f0628ad1437 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java @@ -18,10 +18,9 @@ import java.util.Map; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.AssertionFailedError; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.binary.BinaryObject; @@ -39,8 +38,11 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -74,6 +76,9 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm /** */ private static final int BINARY_TYPE_ID = 708045005; + /** */ + private static final long MAX_AWAIT = 9_000; + /** */ private static final AtomicInteger metadataReqsCounter = new AtomicInteger(0); @@ -107,35 +112,6 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm return cfg; } - /** - * - */ - private static final class ErrorHolder { - /** */ - private volatile Error e; - - /** - * @param e Exception. - */ - void error(Error e) { - this.e = e; - } - - /** - * - */ - void fail() { - throw e; - } - - /** - * - */ - boolean isEmpty() { - return e == null; - } - } - /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -143,45 +119,28 @@ boolean isEmpty() { stopAllGrids(); } - /** */ - private static final CountDownLatch LATCH1 = new CountDownLatch(1); - /** * Verifies that if thread tries to read metadata with ongoing update it gets blocked * until acknowledge message arrives. */ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception { - applyDiscoveryHook = true; - discoveryHook = new DiscoveryHook() { - @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) { - DiscoveryCustomMessage customMsg = msg == null ? null - : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate"); - - if (customMsg instanceof MetadataUpdateAcceptedMessage) { - if (((MetadataUpdateAcceptedMessage)customMsg).typeId() == BINARY_TYPE_ID) - try { - Thread.sleep(300); - } - catch (InterruptedException ignored) { - // No-op. - } - } - } - }; - - final IgniteEx ignite0 = startGrid(0); + final CyclicBarrier barrier = new CyclicBarrier(2); applyDiscoveryHook = false; - final IgniteEx ignite1 = startGrid(1); + final Ignite ignite0 = startGrid(0); + final Ignite ignite1 = startGrid(1); - final ErrorHolder errorHolder = new ErrorHolder(); + final GridFutureAdapter finishFut = new GridFutureAdapter(); applyDiscoveryHook = true; discoveryHook = new DiscoveryHook() { private volatile IgniteEx ignite; @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) { + if (finishFut.isDone()) + return; + DiscoveryCustomMessage customMsg = msg == null ? null : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate"); @@ -192,10 +151,17 @@ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception { Object transport = U.field(binaryProc, "transport"); try { + barrier.await(MAX_AWAIT, TimeUnit.MILLISECONDS); + Map syncMap = U.field(transport, "syncMap"); - int size = syncMap.size(); - assertEquals("unexpected size of syncMap: ", 1, size); + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return syncMap.size() == 1; + } + }, MAX_AWAIT); + + assertEquals("unexpected size of syncMap: ", 1, syncMap.size()); Object syncKey = syncMap.keySet().iterator().next(); @@ -204,9 +170,11 @@ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception { int ver = U.field(syncKey, "ver"); assertEquals("unexpected pendingVersion: ", 2, ver); + + finishFut.onDone(); } - catch (AssertionFailedError err) { - errorHolder.error(err); + catch (Throwable t) { + finishFut.onDone(t); } } } @@ -220,42 +188,35 @@ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception { final IgniteEx ignite2 = startGrid(2); discoveryHook.ignite(ignite2); - ignite0.executorService().submit(new Runnable() { + // Unfinished PME may affect max await timeout. + awaitPartitionMapExchange(); + + // Update metadata (version 1). + ignite0.executorService(ignite0.cluster().forLocal()).submit(new Runnable() { @Override public void run() { addIntField(ignite0, "f1", 101, 1); } }).get(); - UUID id2 = ignite2.localNode().id(); - - ClusterGroup cg2 = ignite2.cluster().forNodeId(id2); - - Future fut = ignite1.executorService().submit(new Runnable() { + // Update metadata (version 2). + ignite1.executorService(ignite1.cluster().forLocal()).submit(new Runnable() { @Override public void run() { - LATCH1.countDown(); addStringField(ignite1, "f2", "str", 2); } }); - ignite2.compute(cg2).withAsync().call(new IgniteCallable() { + // Read metadata. + IgniteFuture readFut = ignite2.compute(ignite2.cluster().forLocal()).callAsync(new IgniteCallable() { @Override public Object call() throws Exception { - try { - LATCH1.await(); - } - catch (InterruptedException ignored) { - // No-op. - } - - Object fieldVal = ((BinaryObject) ignite2.cache(DEFAULT_CACHE_NAME).withKeepBinary().get(1)).field("f1"); + barrier.await(MAX_AWAIT, TimeUnit.MILLISECONDS); - return fieldVal; + return ((BinaryObject) ignite2.cache(DEFAULT_CACHE_NAME).withKeepBinary().get(1)).field("f1"); } }); - fut.get(); + finishFut.get(MAX_AWAIT); - if (!errorHolder.isEmpty()) - errorHolder.fail(); + assertEquals(101, readFut.get(MAX_AWAIT)); } /**