Skip to content

Commit

Permalink
IGNITE-8210 Fixed custom event handling for baseline topology change -
Browse files Browse the repository at this point in the history
…Fixes apache#3814.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>

(cherry picked from commit d79c640)
  • Loading branch information
slukyano committed Apr 17, 2018
1 parent d1d813e commit 7cfc737
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 2 deletions.
Expand Up @@ -304,7 +304,8 @@ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, @Nullab
List<List<ClusterNode>> assignment;

if (prevAssignment != null && discoEvt != null) {
boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter);
boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter)
|| discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT;

if (!affNode)
assignment = prevAssignment;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
Expand All @@ -53,7 +54,10 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

Expand All @@ -75,7 +79,16 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
private static final int NODE_COUNT = 4;

/** */
private static boolean delayRebalance = false;
private boolean delayRebalance;

/** */
private Map<String, Object> userAttrs;

/** */
private static final String DATA_NODE = "dataNodeUserAttr";

/** */
private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
Expand All @@ -99,6 +112,11 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(IP_FINDER);

cfg.setDiscoverySpi(discoSpi);

cfg.setConsistentId(igniteInstanceName);

cfg.setDataStorageConfiguration(
Expand All @@ -118,6 +136,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
.setWalMode(WALMode.LOG_ONLY)
);

if (userAttrs != null)
cfg.setUserAttributes(userAttrs);

if (client)
cfg.setClientMode(true);

Expand All @@ -127,6 +148,89 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
return cfg;
}

/**
* Verifies that rebalance on cache with Node Filter happens when BaselineTopology changes.
*
* @throws Exception
*/
public void testRebalanceForCacheWithNodeFilter() throws Exception {
try {
final int EMPTY_NODE_IDX = 2;

userAttrs = U.newHashMap(1);
userAttrs.put(DATA_NODE, true);

startGrids(2);

userAttrs.put(DATA_NODE, false);

IgniteEx ignite = startGrid(2);

ignite.cluster().active(true);

awaitPartitionMapExchange();

IgniteCache<Integer, Integer> cache =
ignite.createCache(
new CacheConfiguration<Integer, Integer>()
.setName(CACHE_NAME)
.setCacheMode(PARTITIONED)
.setBackups(1)
.setPartitionLossPolicy(READ_ONLY_SAFE)
.setAffinity(new RendezvousAffinityFunction(32, null))
.setNodeFilter(new DataNodeFilter())
);

for (int k = 0; k < 10_000; k++)
cache.put(k, k);

Thread.sleep(500);

printSizesDataNodes(NODE_COUNT - 1, EMPTY_NODE_IDX);

userAttrs.put(DATA_NODE, true);

startGrid(3);

ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());

awaitPartitionMapExchange();

Thread.sleep(500);

printSizesDataNodes(NODE_COUNT, EMPTY_NODE_IDX);
}
finally {
userAttrs = null;
}
}

/** */
private void printSizesDataNodes(int nodesCnt, int emptyNodeIdx) {
for (int i = 0; i < nodesCnt; i++) {
IgniteEx ig = grid(i);

int locSize = ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY);

if (i == emptyNodeIdx)
assertEquals("Cache local size on "
+ i
+ " node is expected to be zero", 0, locSize);
else
assertTrue("Cache local size on "
+ i
+ " node is expected to be non zero", locSize > 0);
}
}

/** */
private static class DataNodeFilter implements IgnitePredicate<ClusterNode> {

@Override public boolean apply(ClusterNode clusterNode) {
return clusterNode.attribute(DATA_NODE);
}
}

/**
* @throws Exception If failed.
*/
Expand Down

0 comments on commit 7cfc737

Please sign in to comment.