Skip to content
Permalink
Browse files
IGNITE-16176 Adds configurable Kafka requests timeout. (#99)
  • Loading branch information
ololo3000 committed Mar 1, 2022
1 parent e534200 commit 2ed86621333be0af69224f33e08feeca96934a46
Showing 12 changed files with 214 additions and 79 deletions.
@@ -96,7 +96,7 @@ public CacheVersionConflictResolverCachePluginProvider(String conflictResolveFie
}

/** {@inheritDoc} */
@Nullable public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
@Nullable @Override public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
return null;
}
}
@@ -167,7 +167,7 @@ public void setName(String name) {
}

/** {@inheritDoc} */
@Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls) {
@Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
return null;
}
}
@@ -18,6 +18,7 @@
package org.apache.ignite.cdc.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -35,7 +36,9 @@
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -47,6 +50,8 @@
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

@@ -66,9 +71,10 @@
* @see KafkaToIgniteCdcStreamer
* @see CacheVersionConflictResolverImpl
*/
@IgniteExperimental
public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Default kafka request timeout in seconds. */
public static final int DFLT_REQ_TIMEOUT = 5;
/** Default value for the flag that indicates whether entries only from primary nodes should be handled. */
public static final boolean DFLT_IS_ONLY_PRIMARY = false;

/** Bytes sent metric name. */
public static final String BYTES_SENT = "BytesSent";
@@ -84,22 +90,28 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
private KafkaProducer<Integer, byte[]> producer;

/** Handle only primary entry flag. */
private final boolean onlyPrimary;
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;

/** Topic name. */
private final String topic;
private String topic;

/** Kafka topic partitions count. */
private final int kafkaParts;
private int kafkaParts;

/** Kafka properties. */
private final Properties kafkaProps;
private Properties kafkaProps;

/** Cache IDs. */
private final Set<Integer> cachesIds;
private Set<Integer> cachesIds;

/** Cache names. */
private Collection<String> cacheNames;

/** Max batch size. */
private final int maxBatchSize;
private int maxBatchSize = DFLT_MAX_BATCH_SIZE;

/** The maximum time to complete Kafka related requests, in milliseconds. */
private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;

/** Timestamp of last sent message. */
private AtomicLongMetric lastMsgTs;
@@ -110,39 +122,6 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Count of sent messages. */
private AtomicLongMetric msgsSnt;

/**
* @param topic Topic name.
* @param kafkaParts Kafka partitions count.
* @param caches Cache names.
* @param maxBatchSize Maximum size of records concurrently sent to Kafka.
* @param onlyPrimary If {@code true} then stream only events from primaries.
* @param kafkaProps Kafka properties.
*/
public IgniteToKafkaCdcStreamer(
String topic,
int kafkaParts,
Set<String> caches,
int maxBatchSize,
boolean onlyPrimary,
Properties kafkaProps
) {
assert caches != null && !caches.isEmpty();

this.topic = topic;
this.kafkaParts = kafkaParts;
this.onlyPrimary = onlyPrimary;
this.kafkaProps = kafkaProps;
this.maxBatchSize = maxBatchSize;

cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());

kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
}

/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
List<Future<RecordMetadata>> futs = new ArrayList<>();
@@ -169,7 +148,7 @@ public IgniteToKafkaCdcStreamer(
continue;
}

if (!cachesIds.isEmpty() && !cachesIds.contains(evt.cacheId())) {
if (!cachesIds.contains(evt.cacheId())) {
if (log.isDebugEnabled())
log.debug("Event skipped because of cacheId [evt=" + evt + ']');

@@ -194,7 +173,7 @@ public IgniteToKafkaCdcStreamer(
if (!futs.isEmpty()) {
try {
for (Future<RecordMetadata> fut : futs)
fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
fut.get(kafkaReqTimeout, TimeUnit.MILLISECONDS);

msgsSnt.add(futs.size());

@@ -213,6 +192,19 @@ public IgniteToKafkaCdcStreamer(

/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
A.notNull(kafkaProps, "Kafka properties");
A.notNull(topic, "Kafka topic");
A.notEmpty(cacheNames, "caches");
A.ensure(kafkaParts > 0, "The number of Kafka partitions must be explicitly set to a value greater than zero.");
A.ensure(kafkaReqTimeout >= 0, "The Kafka request timeout cannot be negative.");

kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

cachesIds = cacheNames.stream()
.map(CU::cacheId)
.collect(Collectors.toSet());

try {
producer = new KafkaProducer<>(kafkaProps);

@@ -232,4 +224,88 @@ public IgniteToKafkaCdcStreamer(
@Override public void stop() {
producer.close();
}

/**
* Sets whether entries only from primary nodes should be handled.
*
* @param onlyPrimary Kafka whether entries only from primary nodes should be handled.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
this.onlyPrimary = onlyPrimary;

return this;
}

/**
* Sets topic that is used to send data to Kafka.
*
* @param topic Kafka topic.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setTopic(String topic) {
this.topic = topic;

return this;
}

/**
* Sets number of Kafka partitions.
*
* @param kafkaParts Number of Kafka partitions.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setKafkaPartitions(int kafkaParts) {
this.kafkaParts = kafkaParts;

return this;
}

/**
* Sets cache names that participate in CDC.
*
* @param caches Cache names.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setCaches(Collection<String> caches) {
this.cacheNames = caches;

return this;
}

/**
* Sets maximum batch size.
*
* @param maxBatchSize Maximum batch size.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;

return this;
}

/**
* Sets properties that are used to initiate connection to Kafka.
*
* @param kafkaProps Properties that are used to initiate connection to Kafka.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setKafkaProperties(Properties kafkaProps) {
this.kafkaProps = kafkaProps;

return this;
}

/**
* Sets the maximum time to complete Kafka related requests, in milliseconds.
*
* @param kafkaReqTimeout Timeout value.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setKafkaRequestTimeout(long kafkaReqTimeout) {
this.kafkaReqTimeout = kafkaReqTimeout;

return this;
}
}
@@ -35,8 +35,10 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -83,6 +85,7 @@
* @see KafkaToIgniteCdcStreamerApplier
* @see CacheConflictResolutionManagerImpl
*/
@IgniteExperimental
public class KafkaToIgniteCdcStreamer implements Runnable {
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
@@ -109,6 +112,22 @@ public KafkaToIgniteCdcStreamer(
Properties kafkaProps,
KafkaToIgniteCdcStreamerConfiguration streamerCfg
) {
A.notNull(streamerCfg.getTopic(), "Kafka topic");
A.ensure(
streamerCfg.getKafkaPartsFrom() >= 0,
"The Kafka partitions lower bound must be explicitly set to a value greater than or equals to zero.");
A.ensure(
streamerCfg.getKafkaPartsTo() > 0,
"The Kafka partitions upper bound must be explicitly set to a value greater than zero.");
A.ensure(
streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
"The Kafka partitions upper bound must be greater than lower bound.");
A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative.");
A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero.");
A.ensure(
streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
"Threads count must be less or equals to the total Kafka partitions count.");

this.igniteCfg = igniteCfg;
this.kafkaProps = kafkaProps;
this.streamerCfg = streamerCfg;
@@ -158,9 +177,6 @@ private void runx() throws Exception {
int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
int threadCnt = streamerCfg.getThreadCount();

assert kafkaParts >= threadCnt
: "Threads count bigger then kafka partitions count [kafkaParts=" + kafkaParts + ",threadCount=" + threadCnt + ']';

int partPerApplier = kafkaParts / threadCnt;

for (int i = 0; i < threadCnt; i++) {
@@ -179,7 +195,8 @@ private void runx() throws Exception {
kafkaPartsFrom + to,
caches,
streamerCfg.getMaxBatchSize(),
stopped
stopped,
streamerCfg.getKafkaRequestTimeout()
);

appliers.add(applier);
@@ -81,9 +81,6 @@
* @see CacheEntryVersion
*/
class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
/** */
public static final int DFLT_REQ_TIMEOUT = 3;

/** Ignite instance. */
private final IgniteEx ign;

@@ -114,6 +111,9 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
/** */
private final AtomicLong rcvdEvts = new AtomicLong();

/** The maximum time to complete Kafka related requests, in milliseconds. */
private final long kafkaReqTimeout;

/**
* @param ign Ignite instance.
* @param log Logger.
@@ -124,6 +124,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
* @param caches Cache ids.
* @param maxBatchSize Maximum batch size.
* @param stopped Stopped flag.
* @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds.
*/
public KafkaToIgniteCdcStreamerApplier(
IgniteEx ign,
@@ -134,7 +135,8 @@ public KafkaToIgniteCdcStreamerApplier(
int kafkaPartTo,
Set<Integer> caches,
int maxBatchSize,
AtomicBoolean stopped
AtomicBoolean stopped,
long kafkaReqTimeout
) {
super(maxBatchSize);

@@ -146,6 +148,7 @@ public KafkaToIgniteCdcStreamerApplier(
this.caches = caches;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
this.kafkaReqTimeout = kafkaReqTimeout;
}

/** {@inheritDoc} */
@@ -182,7 +185,7 @@ public KafkaToIgniteCdcStreamerApplier(
finally {
for (KafkaConsumer<Integer, byte[]> consumer : cnsmrs) {
try {
consumer.close(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
consumer.close(Duration.ofMillis(kafkaReqTimeout));
}
catch (Exception e) {
log.warning("Close error!", e);
@@ -201,7 +204,7 @@ public KafkaToIgniteCdcStreamerApplier(
* @param cnsmr Data consumer.
*/
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));

if (log.isDebugEnabled()) {
log.debug(
@@ -211,7 +214,7 @@ private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedExce

apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));

cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
}

/**

0 comments on commit 2ed8662

Please sign in to comment.