Skip to content

Commit

Permalink
[HUDI-4873] Report number of messages to be processed via metrics (ap…
Browse files Browse the repository at this point in the history
…ache#6271)

Co-authored-by: Volodymyr Burenin <volodymyr.burenin@cloudkitchens.com>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
  • Loading branch information
3 people authored and fengjian committed Apr 5, 2023
1 parent b46bd20 commit fed4744
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
Expand Up @@ -101,6 +101,12 @@ public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) {
}
}

public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) {
if (config.isMetricsOn()) {
Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount);
}
}

public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
Expand Down
Expand Up @@ -57,8 +57,10 @@ protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> lastCheckpointStr,
long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
metrics.updateDeltaStreamerKafkaMessageInCount(0);
return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
}
metrics.updateDeltaStreamerKafkaMessageInCount(totalNewMsgs);
JavaRDD<T> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
Expand Down

0 comments on commit fed4744

Please sign in to comment.