Skip to content
Permalink
Browse files
IGNITE-14355 CDC Consumer metrics added (#73)
  • Loading branch information
nizhikov committed Oct 7, 2021
1 parent 4667513 commit 8053e3d8844deff2a40a54339171f90ffb16834f
Showing 8 changed files with 213 additions and 32 deletions.
@@ -32,7 +32,6 @@

<properties>
<kafka.version>2.7.0</kafka.version>
<test.containers.version>1.15.1</test.containers.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>

@@ -19,7 +19,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -58,9 +57,6 @@ public abstract class CdcEventsApplier {
/** */
private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);

/** */
protected final AtomicLong evtsApplied = new AtomicLong();

/**
* @param maxBatchSize Maximum batch size.
*/
@@ -70,11 +66,14 @@ public CdcEventsApplier(int maxBatchSize) {

/**
* @param evts Events to process.
* @return Number of applied events.
* @throws IgniteCheckedException If failed.
*/
protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;

int evtsApplied = 0;

for (CdcEvent evt : evts) {
if (log().isDebugEnabled())
log().debug("Event received [key=" + evt.key() + ']');
@@ -93,7 +92,7 @@ protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
});

if (cache != currCache) {
applyIf(currCache, hasUpdates, hasRemoves);
evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);

currCache = cache;
}
@@ -103,7 +102,7 @@ protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
KeyCacheObject key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());

if (evt.value() != null) {
applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);

CacheObject val;

@@ -116,17 +115,17 @@ protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
}
else {
applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));

rmvBatch.put(key,
new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
}

evtsApplied.incrementAndGet();
}

if (currCache != null)
applyIf(currCache, hasUpdates, hasRemoves);
evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);

return evtsApplied;
}

/**
@@ -135,19 +134,24 @@ protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
* @param cache Current cache.
* @param applyUpd Apply update batch flag supplier.
* @param applyRmv Apply remove batch flag supplier.
* @return Number of applied events.
* @throws IgniteCheckedException In case of error.
*/
private void applyIf(
private int applyIf(
IgniteInternalCache<BinaryObject, BinaryObject> cache,
BooleanSupplier applyUpd,
BooleanSupplier applyRmv
) throws IgniteCheckedException {
int evtsApplied = 0;

if (applyUpd.getAsBoolean()) {
if (log().isDebugEnabled())
log().debug("Applying put batch [cache=" + cache.name() + ']');

cache.putAllConflict(updBatch);

evtsApplied += updBatch.size();

updBatch.clear();
}

@@ -157,8 +161,12 @@ private void applyIf(

cache.removeAllConflict(rmvBatch);

evtsApplied += rmvBatch.size();

rmvBatch.clear();
}

return evtsApplied;
}

/** @return {@code True} if update batch should be applied. */
@@ -29,6 +29,8 @@
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.resources.LoggerResource;
@@ -48,6 +50,18 @@
* @see CacheVersionConflictResolverImpl
*/
public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
/** */
public static final String EVTS_CNT = "EventsCount";

/** */
public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";

/** */
public static final String LAST_EVT_TIME = "LastEventTime";

/** */
public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";

/** Destination cluster client configuration. */
private final IgniteConfiguration destIgniteCfg;

@@ -57,6 +71,12 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
/** Destination Ignite cluster client */
private IgniteEx dest;

/** Timestamp of last sent message. */
private AtomicLongMetric lastEvtTs;

/** Count of events applied to destination cluster. */
protected AtomicLongMetric evtsCnt;

/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -83,26 +103,34 @@ public IgniteToIgniteCdcStreamer(IgniteConfiguration destIgniteCfg, boolean only
}

/** {@inheritDoc} */
@Override public void start() {
@Override public void start(MetricRegistry mreg) {
if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');

dest = (IgniteEx)Ignition.start(destIgniteCfg);

this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
}

/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
try {
apply(() -> F.iterator(
long msgsSnt = apply(() -> F.iterator(
evts,
F.identity(),
true,
evt -> !onlyPrimary || evt.primary(),
evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
evt -> evt.version().otherClusterVersion() == null));

if (log.isInfoEnabled())
log.info("Events applied [evtsApplied=" + evtsApplied.get() + ']');
if (msgsSnt > 0) {
evtsCnt.add(msgsSnt);
lastEvtTs.value(System.currentTimeMillis());

if (log.isInfoEnabled())
log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']');
}

return true;
}
@@ -32,6 +32,8 @@
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.resources.LoggerResource;
@@ -41,6 +43,10 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;

import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

@@ -64,6 +70,12 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Default kafka request timeout in seconds. */
public static final int DFLT_REQ_TIMEOUT = 5;

/** Bytes sent metric name. */
public static final String BYTES_SENT = "BytesSent";

/** Bytes sent metric description. */
public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent.";

/** Log. */
@LoggerResource
private IgniteLogger log;
@@ -89,8 +101,14 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Max batch size. */
private final int maxBatchSize;

/** Timestamp of last sent message. */
private AtomicLongMetric lastMsgTs;

/** Count of bytes sent to the Kafka. */
private AtomicLongMetric bytesSnt;

/** Count of sent messages. */
private long msgCnt;
private AtomicLongMetric msgsSnt;

/**
* @param topic Topic name.
@@ -158,35 +176,43 @@ public IgniteToKafkaCdcStreamer(
continue;
}

msgCnt++;
byte[] bytes = IgniteUtils.toBytes(evt);

bytesSnt.add(bytes.length);

futs.add(producer.send(new ProducerRecord<>(
topic,
evt.partition() % kafkaParts,
evt.cacheId(),
IgniteUtils.toBytes(evt)
bytes
)));

if (log.isDebugEnabled())
log.debug("Event sent asynchronously [evt=" + evt + ']');
}

try {
for (Future<RecordMetadata> fut : futs)
fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
if (!futs.isEmpty()) {
try {
for (Future<RecordMetadata> fut : futs)
fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);

msgsSnt.add(futs.size());

lastMsgTs.value(System.currentTimeMillis());
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}

if (log.isInfoEnabled())
log.info("Events processed [sentMessagesCount=" + msgCnt + ']');
if (log.isInfoEnabled())
log.info("Events processed [sentMessagesCount=" + msgsSnt.value() + ']');
}

return true;
}

/** {@inheritDoc} */
@Override public void start() {
@Override public void start(MetricRegistry mreg) {
try {
producer = new KafkaProducer<>(kafkaProps);

@@ -196,6 +222,10 @@ public IgniteToKafkaCdcStreamer(
catch (Exception e) {
throw new RuntimeException(e);
}

this.msgsSnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.lastMsgTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
this.bytesSnt = mreg.longMetric(BYTES_SENT, BYTES_SENT_DESCRIPTION);
}

/** {@inheritDoc} */

0 comments on commit 8053e3d

Please sign in to comment.