diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala index 66b11ecddf233..699bc177d4dda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.streaming +import java.text.SimpleDateFormat + import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.{Source => CodahaleSource} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.streaming.StreamingQueryProgress /** @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", + s => convertStringDateToMillis(s.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + + private def convertStringDateToMillis(isoUtcDateStr: String) = { + if (isoUtcDateStr != null) { + timestampFormat.parse(isoUtcDateStr).getTime + } else { + 0L + } + } + private def registerGauge[T]( name: String, f: StreamingQueryProgress => T, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index dcf6cb5d609ee..a649e75f9aab6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -462,6 +462,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0) assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0) assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 0.0) + assert(gauges.get("eventTime-watermark").getValue.asInstanceOf[Long] == 0) + assert(gauges.get("states-rowsTotal").getValue.asInstanceOf[Long] == 0) + assert(gauges.get("states-usedBytes").getValue.asInstanceOf[Long] == 0) sq.stop() } }