Skip to content
Permalink
Browse files
IGNITE-15116 Test of SQL query replication added. (#70)
  • Loading branch information
nizhikov committed Jul 14, 2021
1 parent 826b6a1 commit 41991eb6a733d52d7f47c423a0142e3c12d5fb0e
Showing 4 changed files with 77 additions and 14 deletions.
@@ -85,7 +85,7 @@ protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
// IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
ignite().cache(cacheName);

return ignite().cachex(cacheName);
return ignite().cachex(cacheName).keepBinary();
}
}

@@ -105,7 +105,12 @@ protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
if (evt.value() != null) {
applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);

CacheObject val = new CacheObjectImpl(evt.value(), null);
CacheObject val;

if (evt.value() instanceof CacheObject)
val = (CacheObject)evt.value();
else
val = new CacheObjectImpl(evt.value(), null);

updBatch.put(key, new GridCacheDrInfo(val,
new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
@@ -24,10 +24,12 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -36,6 +38,7 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -144,13 +147,19 @@ private enum WaitDataMode {
}}));

if (!cfg.isClientMode()) {
CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();
CacheVersionConflictResolverPluginProvider<?> cfgPlugin1 = new CacheVersionConflictResolverPluginProvider<>();

cfgPlugin.setClusterId(clusterId);
cfgPlugin.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
cfgPlugin.setConflictResolveField("reqId");
cfgPlugin1.setClusterId(clusterId);
cfgPlugin1.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
cfgPlugin1.setConflictResolveField("reqId");

cfg.setPluginProviders(cfgPlugin);
CacheVersionConflictResolverPluginProvider<?> cfgPlugin2 = new CacheVersionConflictResolverPluginProvider<>();

cfgPlugin2.setClusterId(clusterId);
cfgPlugin2.setCaches(new HashSet<>(Arrays.asList("T1")));
cfgPlugin2.setConflictResolveField("ID");

cfg.setPluginProviders(cfgPlugin1);

cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
@@ -223,7 +232,7 @@ private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(
/** Active/Passive mode means changes made only in one cluster. */
@Test
public void testActivePassiveReplication() throws Exception {
List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();
List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);

try {
IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);
@@ -252,6 +261,50 @@ public void testActivePassiveReplication() throws Exception {
}
}

/** Active/Passive mode means changes made only in one cluster. */
@Test
public void testActivePassiveSqlDataReplication() throws Exception {
String createTbl = "CREATE TABLE T1(ID BIGINT PRIMARY KEY, NAME VARCHAR) WITH \"CACHE_NAME=T1,VALUE_TYPE=T1Type\"";
String insertQry = "INSERT INTO T1 VALUES(?, ?)";
String deleteQry = "DELETE FROM T1";

executeSql(srcCluster[0], createTbl);
executeSql(destCluster[0], createTbl);

executeSql(destCluster[0], insertQry, -1, "Name-1");
executeSql(destCluster[0], deleteQry);

IntStream.range(0, KEYS_CNT).forEach(id -> executeSql(srcCluster[0], insertQry, id, "Name" + id));

List<IgniteInternalFuture<?>> futs = startActivePassiveCdc("T1");

try {
Function<Integer, GridAbsPredicate> waitForTblSz = expSz -> () -> {
long cnt = (Long)executeSql(destCluster[0], "SELECT COUNT(*) FROM T1").get(0).get(0);

return cnt == expSz;
};

assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));


List<List<?>> data = executeSql(destCluster[0], "SELECT ID, NAME FROM T1 ORDER BY ID");

for (int i = 0; i < KEYS_CNT; i++) {
assertEquals((long)i, data.get(i).get(0));
assertEquals("Name" + i, data.get(i).get(1));
}

executeSql(srcCluster[0], deleteQry);

assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
}
finally {
for (IgniteInternalFuture<?> fut : futs)
fut.cancel();
}
}

/** Active/Active mode means changes made in both clusters. */
@Test
public void testActiveActiveReplication() throws Exception {
@@ -346,7 +399,12 @@ private IgniteCache<Integer, ConflictResolvableTestData> createCache(IgniteEx ig
}

/** */
protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();
private List<List<?>> executeSql(IgniteEx node, String sqlText, Object... args) {
return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
}

/** */
protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache);

/** */
protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
@@ -29,11 +29,11 @@
/** */
public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
/** {@inheritDoc} */
@Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
@Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();

for (int i = 0; i < srcCluster.length; i++)
futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_PASSIVE_CACHE));
futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], cache));

return futs;
}
@@ -80,15 +80,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
}

/** {@inheritDoc} */
@Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
@Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();

for (IgniteEx ex : srcCluster)
futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, ACTIVE_PASSIVE_CACHE));
futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, cache));

for (int i = 0; i < destCluster.length; i++) {
futs.add(kafkaToIgnite(
ACTIVE_PASSIVE_CACHE,
cache,
DFLT_TOPIC,
destClusterCliCfg[i],
i * (DFLT_PARTS / 2),

0 comments on commit 41991eb

Please sign in to comment.