From 301b7f2fa2a4f758eba911c260df1b83a95e9272 Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Thu, 12 Apr 2018 12:41:34 +0300 Subject: [PATCH 1/3] IGNITE-8210 test reproducing the problem is added --- .../CacheBaselineTopologyTest.java | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java index 26502ed62391e..83afd1984bbed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -80,6 +82,12 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { /** */ private boolean delayRebalance; + /** */ + private Map userAttrs; + + /** */ + private static final String DATA_NODE = "dataNodeUserAttr"; + /** */ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -129,6 +137,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { .setWalMode(WALMode.LOG_ONLY) ); + if (userAttrs != null) + cfg.setUserAttributes(userAttrs); + if (client) cfg.setClientMode(true); @@ -138,6 +149,77 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { return cfg; } + /** + * @throws Exception + */ + public void testRebalanceForCacheWithNodeFilter() throws Exception { + try { + userAttrs = U.newHashMap(1); + userAttrs.put(DATA_NODE, true); + + startGrids(2); + + userAttrs.put(DATA_NODE, false); + + IgniteEx ignite = startGrid(2); + + ignite.cluster().active(true); + + awaitPartitionMapExchange(); + + IgniteCache cache = + ignite.createCache( + new CacheConfiguration() + .setName(CACHE_NAME) + .setCacheMode(PARTITIONED) + .setBackups(1) + .setPartitionLossPolicy(READ_ONLY_SAFE) + .setAffinity(new RendezvousAffinityFunction(32, null)) + .setNodeFilter(new DataNodeFilter()) + ); + + for (int k = 0; k < 10_000; k++) + cache.put(k, k); + + Thread.sleep(500); + + printSizesDataNodes(NODE_COUNT - 1); + + userAttrs.put(DATA_NODE, true); + + startGrid(3); + + ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion()); + + awaitPartitionMapExchange(); + + Thread.sleep(500); + printSizesDataNodes(NODE_COUNT); + } + finally { + userAttrs = null; + } + } + + /** */ + private void printSizesDataNodes(int nodesCount) { + for (int i = 0; i < nodesCount; i++) { + IgniteEx ig = grid(i); + + System.out.println("Cache size on i-th node: " + + i + + "->" + ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + } + } + + /** */ + private static class DataNodeFilter implements IgnitePredicate { + + @Override public boolean apply(ClusterNode clusterNode) { + return clusterNode.attribute(DATA_NODE); + } + } + /** * @throws Exception If failed. */ From 769c6bb71896c13e45ca7de7c1db2b4680fa7115 Mon Sep 17 00:00:00 2001 From: Ilya Lantukh Date: Fri, 6 Apr 2018 13:49:10 +0300 Subject: [PATCH 2/3] ignite-8210 : Fixed skipping of affinity calculation in case when eventNode is not affinityNode. --- .../processors/affinity/GridAffinityAssignmentCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index b1899e352c02b..9d5ce051f7076 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -322,7 +322,7 @@ public List> calculate( for (DiscoveryEvent event : events.events()) { boolean affinityNode = CU.affinityNode(event.eventNode(), nodeFilter); - if (affinityNode) { + if (affinityNode || event.type() == EVT_DISCOVERY_CUSTOM_EVT) { skipCalculation = false; break; From 91f0b9085ed4405c5a411b8c1e6ed0e56ecee57c Mon Sep 17 00:00:00 2001 From: Sergey Chugunov Date: Fri, 13 Apr 2018 11:55:49 +0300 Subject: [PATCH 3/3] IGNITE-8210 test was improved to performs actual checks instead of dumping info --- .../CacheBaselineTopologyTest.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java index 83afd1984bbed..0d59a2d79a13d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java @@ -150,10 +150,14 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { } /** + * Verifies that rebalance on cache with Node Filter happens when BaselineTopology changes. + * * @throws Exception */ public void testRebalanceForCacheWithNodeFilter() throws Exception { try { + final int EMPTY_NODE_IDX = 2; + userAttrs = U.newHashMap(1); userAttrs.put(DATA_NODE, true); @@ -183,7 +187,7 @@ public void testRebalanceForCacheWithNodeFilter() throws Exception { Thread.sleep(500); - printSizesDataNodes(NODE_COUNT - 1); + printSizesDataNodes(NODE_COUNT - 1, EMPTY_NODE_IDX); userAttrs.put(DATA_NODE, true); @@ -194,7 +198,8 @@ public void testRebalanceForCacheWithNodeFilter() throws Exception { awaitPartitionMapExchange(); Thread.sleep(500); - printSizesDataNodes(NODE_COUNT); + + printSizesDataNodes(NODE_COUNT, EMPTY_NODE_IDX); } finally { userAttrs = null; @@ -202,13 +207,20 @@ public void testRebalanceForCacheWithNodeFilter() throws Exception { } /** */ - private void printSizesDataNodes(int nodesCount) { - for (int i = 0; i < nodesCount; i++) { + private void printSizesDataNodes(int nodesCnt, int emptyNodeIdx) { + for (int i = 0; i < nodesCnt; i++) { IgniteEx ig = grid(i); - System.out.println("Cache size on i-th node: " - + i - + "->" + ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + int locSize = ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY); + + if (i == emptyNodeIdx) + assertEquals("Cache local size on " + + i + + " node is expected to be zero", 0, locSize); + else + assertTrue("Cache local size on " + + i + + " node is expected to be non zero", locSize > 0); } }