Skip to content

Commit

Permalink
Add last processed event timestamp to streaming job (#120)
Browse files Browse the repository at this point in the history
* Add last processed event timestamp

Signed-off-by: Terence Lim <terencelimxp@gmail.com>

* Fix dependency

Signed-off-by: Terence Lim <terencelimxp@gmail.com>
  • Loading branch information
terryyylim committed Mar 2, 2022
1 parent 1c35e78 commit 4fcf740
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
12 changes: 12 additions & 0 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -235,6 +241,12 @@
<version>2.27.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.1-b12</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.metrics.source
import org.apache.spark.metrics.AtomicLongGauge
import org.apache.spark.sql.streaming.StreamingQueryProgress

import java.time.Instant

class StreamingMetricSource extends BaseMetricSource {
override val sourceName: String = StreamingMetricSource.sourceName

Expand All @@ -30,11 +32,19 @@ class StreamingMetricSource extends BaseMetricSource {
metricRegistry.register(gaugeWithLabels("processed_rows_per_second"), new AtomicLongGauge())
private val LAST_CONSUMED_KAFKA_TIMESTAMP_GAUGE =
metricRegistry.register(gaugeWithLabels("last_consumed_kafka_timestamp"), new AtomicLongGauge())
private val LAST_PROCESSED_EVENT_TIMESTAMP_GAUGE =
metricRegistry.register(
gaugeWithLabels("last_processed_event_timestamp"),
new AtomicLongGauge()
)

def updateStreamingProgress(progress: StreamingQueryProgress): Unit = {
BATCH_DURATION_GAUGE.value.set(progress.batchDuration)
INPUT_ROWS_PER_SECOND_GAUGE.value.set(progress.inputRowsPerSecond.toLong)
PROCESSED_ROWS_PER_SECOND_GAUGE.value.set(progress.processedRowsPerSecond.toLong)

val epochTimestamp = Instant.parse(progress.timestamp).getEpochSecond
LAST_PROCESSED_EVENT_TIMESTAMP_GAUGE.value.set(epochTimestamp)
}

def updateKafkaTimestamp(timestamp: Long): Unit = {
Expand Down

0 comments on commit 4fcf740

Please sign in to comment.