From 9b91cecdd2cc2fc62a6d8b2eb87f7cd2240f7703 Mon Sep 17 00:00:00 2001 From: Alexander Paschenko Date: Wed, 26 Jul 2017 02:01:34 +0300 Subject: [PATCH] IGNITE-5837 Test logic fix. --- ...ynamicIndexAbstractConcurrentSelfTest.java | 162 ++++++++++-------- 1 file changed, 91 insertions(+), 71 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java index 913d724ceb5c7..c3e169d66338c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java @@ -49,6 +49,9 @@ import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.jetbrains.annotations.NotNull; import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi; @@ -58,6 +61,9 @@ */ @SuppressWarnings("unchecked") public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicIndexAbstractSelfTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** Test duration. */ private static final long TEST_DUR = 10_000L; @@ -65,7 +71,8 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde private static final int LARGE_CACHE_SIZE = 100_000; /** Latches to block certain index operations. */ - private static final ConcurrentHashMap> BLOCKS = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap> BLOCKS = + new ConcurrentHashMap<>(); /** Cache mode. */ private final CacheMode cacheMode; @@ -120,7 +127,7 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde /** {@inheritDoc} */ @Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception { - return super.commonConfiguration(idx).setDiscoverySpi(new TestTcpDiscoverySpi()); + return super.commonConfiguration(idx).setDiscoverySpi(new TestTcpDiscoverySpi().setIpFinder(IP_FINDER)); } /** @@ -130,16 +137,16 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde */ public void testCoordinatorChange() throws Exception { // Start servers. - Ignite srv1 = Ignition.start(serverConfiguration(1)); - Ignite srv2 = Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); - Ignition.start(serverConfiguration(4)); + Ignite srv1 = ignitionStart(serverConfiguration(1)); + Ignite srv2 = ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); + ignitionStart(serverConfiguration(4)); UUID srv1Id = srv1.cluster().localNode().id(); UUID srv2Id = srv2.cluster().localNode().id(); // Start client which will execute operations. - Ignite cli = Ignition.start(clientConfiguration(5)); + Ignite cli = ignitionStart(clientConfiguration(5)); createSqlCache(cli); @@ -151,9 +158,9 @@ public void testCoordinatorChange() throws Exception { QueryIndex idx1 = index(IDX_NAME_1, field(FIELD_NAME_1)); IgniteInternalFuture idxFut1 = queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, - idx1, false); + idx1, false); - idxLatch.countDown(); + U.await(idxLatch); //srv1.close(); Ignition.stop(srv1.name(), true); @@ -172,9 +179,9 @@ public void testCoordinatorChange() throws Exception { QueryIndex idx2 = index(IDX_NAME_2, field(aliasUnescaped(FIELD_NAME_2))); IgniteInternalFuture idxFut2 = - queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx2, false); + queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx2, false); - idxLatch.countDown(); + U.await(idxLatch); //srv2.close(); Ignition.stop(srv2.name(), true); @@ -194,11 +201,11 @@ public void testCoordinatorChange() throws Exception { * @throws Exception If failed. */ public void testOperationChaining() throws Exception { - Ignite srv1 = Ignition.start(serverConfiguration(1)); + Ignite srv1 = ignitionStart(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); - Ignition.start(clientConfiguration(4)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); + ignitionStart(clientConfiguration(4)); createSqlCache(srv1); @@ -208,15 +215,15 @@ public void testOperationChaining() throws Exception { QueryIndex idx2 = index(IDX_NAME_2, field(aliasUnescaped(FIELD_NAME_2))); IgniteInternalFuture idxFut1 = - queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx1, false); + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx1, false); IgniteInternalFuture idxFut2 = - queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx2, false); + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx2, false); // Start even more nodes of different flavors - Ignition.start(serverConfiguration(5)); - Ignition.start(serverConfiguration(6, true)); - Ignition.start(clientConfiguration(7)); + ignitionStart(serverConfiguration(5)); + ignitionStart(serverConfiguration(6, true)); + ignitionStart(clientConfiguration(7)); assert !idxFut1.isDone(); assert !idxFut2.isDone(); @@ -229,7 +236,7 @@ public void testOperationChaining() throws Exception { assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(aliasUnescaped(FIELD_NAME_2))); - idxLatch.countDown(); + U.await(idxLatch); put(srv1, 0, KEY_AFTER); @@ -246,7 +253,7 @@ public void testOperationChaining() throws Exception { * @throws Exception If failed. */ public void testNodeJoinOnPendingOperation() throws Exception { - Ignite srv1 = Ignition.start(serverConfiguration(1)); + Ignite srv1 = ignitionStart(serverConfiguration(1)); createSqlCache(srv1); @@ -255,11 +262,11 @@ public void testNodeJoinOnPendingOperation() throws Exception { QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); IgniteInternalFuture idxFut = - queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); - Ignition.start(clientConfiguration(4)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); + ignitionStart(clientConfiguration(4)); assert !idxFut.isDone(); @@ -267,7 +274,7 @@ public void testNodeJoinOnPendingOperation() throws Exception { idxFut.get(); - idxLatch.countDown(); + U.await(idxLatch); assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); @@ -284,10 +291,10 @@ public void testNodeJoinOnPendingOperation() throws Exception { */ public void testConcurrentPutRemove() throws Exception { // Start several nodes. - Ignite srv1 = Ignition.start(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3)); - Ignition.start(serverConfiguration(4)); + Ignite srv1 = ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3)); + ignitionStart(serverConfiguration(4)); awaitPartitionMapExchange(); @@ -359,7 +366,7 @@ public void testConcurrentPutRemove() throws Exception { List> res = nodeCache.query(qry).getAll(); assertEquals("Cache size mismatch [exp=" + expKeys.size() + ", actual=" + res.size() + ']', - expKeys.size(), res.size()); + expKeys.size(), res.size()); for (Cache.Entry entry : res) { long key = entry.getKey().field(FIELD_KEY); @@ -368,7 +375,7 @@ public void testConcurrentPutRemove() throws Exception { assertTrue("Expected key is not in result set: " + key, expKeys.containsKey(key)); assertEquals("Unexpected value [key=" + key + ", expVal=" + expKeys.get(key) + - ", actualVal=" + fieldVal + ']', expKeys.get(key), fieldVal); + ", actualVal=" + fieldVal + ']', expKeys.get(key), fieldVal); } } @@ -381,8 +388,8 @@ public void testConcurrentPutRemove() throws Exception { */ public void testConcurrentRebalance() throws Exception { // Start cache and populate it with data. - Ignite srv1 = Ignition.start(serverConfiguration(1)); - Ignite srv2 = Ignition.start(serverConfiguration(2)); + Ignite srv1 = ignitionStart(serverConfiguration(1)); + Ignite srv2 = ignitionStart(serverConfiguration(2)); createSqlCache(srv1); @@ -397,18 +404,18 @@ public void testConcurrentRebalance() throws Exception { QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); final IgniteInternalFuture idxFut = - queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); - idxLatch1.countDown(); - idxLatch2.countDown(); + U.await(idxLatch1); + U.await(idxLatch2); // Start two more nodes and unblock index operation in the middle. - Ignition.start(serverConfiguration(3)); + ignitionStart(serverConfiguration(3)); unblockIndexing(srv1); unblockIndexing(srv2); - Ignition.start(serverConfiguration(4)); + ignitionStart(serverConfiguration(4)); awaitPartitionMapExchange(); @@ -428,12 +435,12 @@ public void testConcurrentRebalance() throws Exception { */ public void testConcurrentCacheDestroy() throws Exception { // Start complex topology. - Ignite srv1 = Ignition.start(serverConfiguration(1)); + Ignite srv1 = ignitionStart(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); - Ignite cli = Ignition.start(clientConfiguration(4)); + Ignite cli = ignitionStart(clientConfiguration(4)); // Start cache and populate it with data. createSqlCache(cli); @@ -446,9 +453,9 @@ public void testConcurrentCacheDestroy() throws Exception { QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); final IgniteInternalFuture idxFut = - queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, CACHE_NAME, TBL_NAME, idx, false); - idxLatch.countDown(); + U.await(idxLatch); // Destroy cache (drop table). destroySqlCache(cli); @@ -473,11 +480,11 @@ public void testConcurrentCacheDestroy() throws Exception { */ public void testConcurrentOperationsMultithreaded() throws Exception { // Start complex topology. - Ignition.start(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); + ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); - Ignite cli = Ignition.start(clientConfiguration(4)); + Ignite cli = ignitionStart(clientConfiguration(4)); createSqlCache(cli); @@ -547,11 +554,11 @@ public void testConcurrentOperationsMultithreaded() throws Exception { */ public void testQueryConsistencyMultithreaded() throws Exception { // Start complex topology. - Ignition.start(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); + ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); - Ignite cli = Ignition.start(clientConfiguration(4)); + Ignite cli = ignitionStart(clientConfiguration(4)); createSqlCache(cli); @@ -645,11 +652,11 @@ public void testClientReconnectWithCacheRestart() throws Exception { */ private void checkClientReconnect(final boolean restartCache) throws Exception { // Start complex topology. - final Ignite srv = Ignition.start(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); + final Ignite srv = ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); - final Ignite cli = Ignition.start(clientConfiguration(4)); + final Ignite cli = ignitionStart(clientConfiguration(4)); createSqlCache(cli); @@ -709,7 +716,7 @@ private void checkClientReconnect(final boolean restartCache) throws Exception { * @throws Exception If failed. */ private void reconnectClientNode(final Ignite srvNode, final Ignite cliNode, final boolean restart, - final RunnableX clo) throws Exception { + final RunnableX clo) throws Exception { IgniteClientReconnectAbstractTest.reconnectClientNode(log, cliNode, srvNode, new Runnable() { @Override public void run() { if (restart) { @@ -743,11 +750,11 @@ private void reconnectClientNode(final Ignite srvNode, final Ignite cliNode, fin */ public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Exception { // Start several stable nodes. - Ignition.start(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); + ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); - final Ignite cli = Ignition.start(clientConfiguration(4)); + final Ignite cli = ignitionStart(clientConfiguration(4)); createSqlCache(cli); @@ -789,7 +796,7 @@ public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Excep cfg = clientConfiguration(lastIdx); } - Ignition.start(cfg); + ignitionStart(cfg); exists = true; } @@ -868,11 +875,11 @@ public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Excep */ public void testConcurrentOperationsAndCacheStartStopMultithreaded() throws Exception { // Start complex topology. - Ignition.start(serverConfiguration(1)); - Ignition.start(serverConfiguration(2)); - Ignition.start(serverConfiguration(3, true)); + ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); - Ignite cli = Ignition.start(clientConfiguration(4)); + Ignite cli = ignitionStart(clientConfiguration(4)); final AtomicBoolean stopped = new AtomicBoolean(); @@ -1058,8 +1065,8 @@ private static String aliasUnescaped(String field) { private static class BlockingIndexing extends IgniteH2Indexing { /** {@inheritDoc} */ @Override public void dynamicIndexCreate(@NotNull String schemaName, String tblName, - QueryIndexDescriptorImpl idxDesc, boolean ifNotExists, SchemaIndexCacheVisitor cacheVisitor) - throws IgniteCheckedException { + QueryIndexDescriptorImpl idxDesc, boolean ifNotExists, SchemaIndexCacheVisitor cacheVisitor) + throws IgniteCheckedException { awaitIndexing(ctx.localNodeId()); super.dynamicIndexCreate(schemaName, tblName, idxDesc, ifNotExists, cacheVisitor); @@ -1067,7 +1074,7 @@ private static class BlockingIndexing extends IgniteH2Indexing { /** {@inheritDoc} */ @Override public void dynamicIndexDrop(@NotNull String schemaName, String idxName, boolean ifExists) - throws IgniteCheckedException{ + throws IgniteCheckedException{ awaitIndexing(ctx.localNodeId()); super.dynamicIndexDrop(schemaName, idxName, ifExists); @@ -1082,4 +1089,17 @@ private static class BlockingIndexing extends IgniteH2Indexing { private IgniteCache createSqlCache(Ignite node) throws IgniteCheckedException { return createSqlCache(node, cacheConfiguration()); } + + /** + * Spoof blocking indexing class and start new node. + * @param cfg Node configuration. + * @return New node. + */ + private static Ignite ignitionStart(IgniteConfiguration cfg) { + // Have to do this for each starting node - see GridQueryProcessor ctor, it nulls + // idxCls static field on each call. + GridQueryProcessor.idxCls = BlockingIndexing.class; + + return Ignition.start(cfg); + } }