Skip to content
Permalink
Browse files
IGNITE-16664 IgniteToIgniteCdcStreamer named parameters added (#100)
  • Loading branch information
nizhikov committed Mar 5, 2022
1 parent 2ed8662 commit 3288f2d39c3bca056e3a33a922e6adc6d9426da0
Showing 4 changed files with 82 additions and 27 deletions.
@@ -40,7 +40,7 @@
*/
public abstract class CdcEventsApplier {
/** Maximum batch size. */
private final int maxBatchSize;
protected int maxBatchSize;

/** Caches. */
private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();
@@ -32,9 +32,14 @@
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;

import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;

/**
* Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
* Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
@@ -49,6 +54,7 @@
* @see CdcMain
* @see CacheVersionConflictResolverImpl
*/
@IgniteExperimental
public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
/** */
public static final String EVTS_CNT = "EventsCount";
@@ -63,14 +69,20 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";

/** Destination cluster client configuration. */
private final IgniteConfiguration destIgniteCfg;
private IgniteConfiguration destIgniteCfg;

/** Handle only primary entry flag. */
private final boolean onlyPrimary;
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;

/** Destination Ignite cluster client */
private IgniteEx dest;

/** Cache names. */
private Set<String> caches;

/** Cache IDs. */
private Set<Integer> cachesIds;

/** Timestamp of last sent message. */
private AtomicLongMetric lastEvtTs;

@@ -81,31 +93,23 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
@LoggerResource
private IgniteLogger log;

/** Cache IDs. */
private final Set<Integer> cachesIds;
/** */
public IgniteToIgniteCdcStreamer() {
super(DFLT_MAX_BATCH_SIZE);
}

/**
* @param destIgniteCfg Configuration of the destination Ignite node.
* @param onlyPrimary Only primary flag.
* @param caches Cache names.
* @param maxBatchSize Maximum batch size.
*/
public IgniteToIgniteCdcStreamer(IgniteConfiguration destIgniteCfg, boolean onlyPrimary, Set<String> caches, int maxBatchSize) {
super(maxBatchSize);
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
A.notNull(destIgniteCfg, "Destination ignite configuration");
A.notEmpty(caches, "caches");

this.destIgniteCfg = destIgniteCfg;
this.onlyPrimary = onlyPrimary;
if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');

cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());
}

/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');

dest = (IgniteEx)Ignition.start(destIgniteCfg);

@@ -153,4 +157,51 @@ public IgniteToIgniteCdcStreamer(IgniteConfiguration destIgniteCfg, boolean only
@Override protected IgniteLogger log() {
return log;
}

/**
* Sets Ignite client node configuration that will connect to destination cluster.
* @param destIgniteCfg Ignite client node configuration that will connect to destination cluster.
* @return {@code this} for chaining.
*/
public IgniteToIgniteCdcStreamer setDestinationIgniteConfiguration(IgniteConfiguration destIgniteCfg) {
this.destIgniteCfg = destIgniteCfg;

return this;
}

/**
* Sets whether entries only from primary nodes should be handled.
*
* @param onlyPrimary Whether entries only from primary nodes should be handled.
* @return {@code this} for chaining.
*/
public IgniteToIgniteCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
this.onlyPrimary = onlyPrimary;

return this;
}

/**
* Sets cache names that participate in CDC.
*
* @param caches Cache names.
* @return {@code this} for chaining.
*/
public IgniteToIgniteCdcStreamer setCaches(Set<String> caches) {
this.caches = caches;

return this;
}

/**
* Sets maximum batch size that will be applied to destination cluster.
*
* @param maxBatchSize Maximum batch size.
* @return {@code this} for chaining.
*/
public IgniteToIgniteCdcStreamer setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;

return this;
}
}
@@ -105,7 +105,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
private Set<Integer> cachesIds;

/** Cache names. */
private Collection<String> cacheNames;
private Collection<String> caches;

/** Max batch size. */
private int maxBatchSize = DFLT_MAX_BATCH_SIZE;
@@ -194,14 +194,14 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
@Override public void start(MetricRegistry mreg) {
A.notNull(kafkaProps, "Kafka properties");
A.notNull(topic, "Kafka topic");
A.notEmpty(cacheNames, "caches");
A.notEmpty(caches, "caches");
A.ensure(kafkaParts > 0, "The number of Kafka partitions must be explicitly set to a value greater than zero.");
A.ensure(kafkaReqTimeout >= 0, "The Kafka request timeout cannot be negative.");

kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

cachesIds = cacheNames.stream()
cachesIds = caches.stream()
.map(CU::cacheId)
.collect(Collectors.toSet());

@@ -228,7 +228,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/**
* Sets whether entries only from primary nodes should be handled.
*
* @param onlyPrimary Kafka whether entries only from primary nodes should be handled.
* @param onlyPrimary Whether entries only from primary nodes should be handled.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
@@ -268,7 +268,7 @@ public IgniteToKafkaCdcStreamer setKafkaPartitions(int kafkaParts) {
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setCaches(Collection<String> caches) {
this.cacheNames = caches;
this.caches = caches;

return this;
}
@@ -69,7 +69,11 @@ protected IgniteInternalFuture<?> igniteToIgnite(IgniteConfiguration srcCfg, Ign
return runAsync(() -> {
CdcConfiguration cdcCfg = new CdcConfiguration();

cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer(destCfg, false, Collections.singleton(cache), KEYS_CNT));
cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
.setMaxBatchSize(KEYS_CNT)
.setDestinationIgniteConfiguration(destCfg)
.setCaches(Collections.singleton(cache)));

cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());

CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);

0 comments on commit 3288f2d

Please sign in to comment.