Skip to content
Permalink
Browse files
IGNITE-15064 Added tests for replicated caches (#67)
  • Loading branch information
nizhikov committed Jul 7, 2021
1 parent c478ce1 commit 826b6a1ab70d0886013a3a6b9d38b0193dab8799
Showing 1 changed file with 53 additions and 19 deletions.
@@ -18,6 +18,7 @@
package org.apache.ignite.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -26,7 +27,9 @@
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -44,6 +47,8 @@

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -52,22 +57,34 @@
/** */
@RunWith(Parameterized.class)
public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
/** Cache mode. */
/** Cache atomicity mode. */
@Parameterized.Parameter
public CacheAtomicityMode cacheMode;
public CacheAtomicityMode atomicity;

/** */
/** Cache replication mode. */
@Parameterized.Parameter(1)
public int backupCnt;
public CacheMode mode;

/** */
@Parameterized.Parameter(2)
public int backups;

/** @return Test parameters. */
@Parameterized.Parameters(name = "cacheMode={0},backupCnt={1}")
@Parameterized.Parameters(name = "atomicity={0}, mode={1}, backupCnt={2}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();

for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
for (int i = 0; i < 2; i++)
params.add(new Object[] {mode, i});
for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
for (int backups = 0; backups < 2; backups++) {
// backupCount ignored for REPLICATED caches.
if (backups > 0 && mode == REPLICATED)
continue;

params.add(new Object[] {atomicity, mode, backups});
}
}
}

return params;
}
@@ -97,7 +114,7 @@ private enum WaitDataMode {
}

/** */
public static final int KEYS_CNT = 1000;
public static final int KEYS_CNT = 500;

/** */
protected static IgniteEx[] srcCluster;
@@ -130,7 +147,7 @@ private enum WaitDataMode {
CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();

cfgPlugin.setClusterId(clusterId);
cfgPlugin.setCaches(new HashSet<>(Collections.singletonList(ACTIVE_ACTIVE_CACHE)));
cfgPlugin.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
cfgPlugin.setConflictResolveField("reqId");

cfg.setPluginProviders(cfgPlugin);
@@ -175,7 +192,11 @@ private enum WaitDataMode {
}

/** */
private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(String clusterTag, String clientPrefix, int idx) throws Exception {
private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(
String clusterTag,
String clientPrefix,
int idx
) throws Exception {
IgniteEx[] cluster = new IgniteEx[] {
startGrid(idx + 1),
startGrid(idx + 2)
@@ -205,17 +226,17 @@ public void testActivePassiveReplication() throws Exception {
List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();

try {
IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].createCache(ACTIVE_PASSIVE_CACHE);
IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);

destCache.put(1, ConflictResolvableTestData.create());
destCache.remove(1);
destCache.put(KEYS_CNT + 1, ConflictResolvableTestData.create());
destCache.remove(KEYS_CNT + 1);

// Updates for "ignored-cache" should be ignored because of CDC consume configuration.
runAsync(generateData(IGNORED_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
runAsync(generateData(ACTIVE_PASSIVE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));

IgniteCache<Integer, ConflictResolvableTestData> srcCache =
srcCluster[srcCluster.length - 1].getOrCreateCache(ACTIVE_PASSIVE_CACHE);
createCache(srcCluster[srcCluster.length - 1], ACTIVE_PASSIVE_CACHE);

waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);

@@ -234,8 +255,8 @@ public void testActivePassiveReplication() throws Exception {
/** Active/Active mode means changes made in both clusters. */
@Test
public void testActiveActiveReplication() throws Exception {
IgniteCache<Integer, ConflictResolvableTestData> srcCache = srcCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
IgniteCache<Integer, ConflictResolvableTestData> srcCache = createCache(srcCluster[0], ACTIVE_ACTIVE_CACHE);
IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_ACTIVE_CACHE);

// Even keys goes to src cluster.
runAsync(generateData(ACTIVE_ACTIVE_CACHE, srcCluster[srcCluster.length - 1],
@@ -262,9 +283,9 @@ public void testActiveActiveReplication() throws Exception {
}

/** */
public static Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
return () -> {
IgniteCache<Integer, ConflictResolvableTestData> cache = ign.getOrCreateCache(cacheName);
IgniteCache<Integer, ConflictResolvableTestData> cache = createCache(ign, cacheName);

keys.forEach(i -> cache.put(i, ConflictResolvableTestData.create()));
};
@@ -311,6 +332,19 @@ private boolean checkFuts(boolean res, List<IgniteInternalFuture<?>> futs) {
return res;
}

/** */
private IgniteCache<Integer, ConflictResolvableTestData> createCache(IgniteEx ignite, String name) {
CacheConfiguration<Integer, ConflictResolvableTestData> ccfg = new CacheConfiguration<Integer, ConflictResolvableTestData>()
.setName(name)
.setCacheMode(mode)
.setAtomicityMode(atomicity);

if (mode != REPLICATED)
ccfg.setBackups(backups);

return ignite.getOrCreateCache(ccfg);
}

/** */
protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();

0 comments on commit 826b6a1

Please sign in to comment.