Skip to content

Commit

Permalink
report number of messages to be processed via metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Volodymyr Burenin committed Aug 1, 2022
1 parent c72d895 commit 719c8ec
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
Expand Up @@ -101,6 +101,12 @@ public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) {
}
}

public void updateDeltaStreamerKafkaAvroMessagesToProcess(long totalNewMsgs) {
if (config.isMetricsOn()) {
Metrics.registerGauge(getMetricsName("deltastreamer", "kafka_avro_messages_in"), totalNewMsgs);
}
}

public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
Expand Down
Expand Up @@ -95,8 +95,10 @@ protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastChe
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
metrics.updateDeltaStreamerKafkaAvroMessagesToProcess(0);
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
metrics.updateDeltaStreamerKafkaAvroMessagesToProcess(totalNewMsgs);
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
Expand Down

0 comments on commit 719c8ec

Please sign in to comment.