From 75d12d31814cf774f35368bb487cf0f1ecea904d Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 15 Jun 2020 16:45:30 +0900 Subject: [PATCH] Rename numDroppedRowsByWatermark to numRowsDroppedByWatermark --- docs/structured-streaming-programming-guide.md | 4 ++-- .../streaming/statefulOperators.scala | 6 +++--- .../apache/spark/sql/streaming/progress.scala | 4 ++-- .../streaming/EventTimeWatermarkSuite.scala | 18 +++++++++--------- .../sql/streaming/StateStoreMetricsTest.scala | 8 ++++---- .../StreamingQueryStatusAndProgressSuite.scala | 8 ++++---- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 18fb67279f1d5..a371f4f50f9f0 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1678,9 +1678,9 @@ emits late rows if the operator uses Append mode. Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue: 1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab -2. On Streaming Query Listener: check "numDroppedRowsByWatermark" in "stateOperators" in QueryProcessEvent. +2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent. -Please note that "numDroppedRowsByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator. +Please note that "numRowsDroppedByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator. It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs, hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 52550b6d1943c..bf606d2782431 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -77,7 +77,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numDroppedRowsByWatermark" -> SQLMetrics.createMetric(sparkContext, + "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext, "number of rows which are dropped by watermark"), "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), @@ -102,7 +102,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, memoryUsedBytes = longMetric("stateMemory").value, - numDroppedRowsByWatermark = longMetric("numDroppedRowsByWatermark").value, + numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value, javaConvertedCustomMetrics ) } @@ -140,7 +140,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = { iter.filterNot { row => val lateInput = predicateDropRowByWatermark.eval(row) - if (lateInput) longMetric("numDroppedRowsByWatermark") += 1 + if (lateInput) longMetric("numRowsDroppedByWatermark") += 1 lateInput } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3265244808b56..67c4d968511a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -43,7 +43,7 @@ class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, val memoryUsedBytes: Long, - val numDroppedRowsByWatermark: Long, + val numRowsDroppedByWatermark: Long, val customMetrics: ju.Map[String, JLong] = new ju.HashMap() ) extends Serializable { @@ -63,7 +63,7 @@ class StateOperatorProgress private[sql]( ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ - ("numDroppedRowsByWatermark" -> JInt(numDroppedRowsByWatermark)) ~ + ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~ ("customMetrics" -> { if (!customMetrics.isEmpty) { val keys = customMetrics.keySet.asScala.toSeq.sorted diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index b5e2ab5ed4cc3..b5e313d2e107c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -298,11 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(0), + assertNumRowsDroppedByWatermark(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(1) + assertNumRowsDroppedByWatermark(1) ) } @@ -323,15 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(0), + assertNumRowsDroppedByWatermark(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(1), + assertNumRowsDroppedByWatermark(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(1) + assertNumRowsDroppedByWatermark(1) ) } @@ -788,8 +788,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } - private def assertNumDroppedRowsByWatermark( - numDroppedRowsByWatermark: Long): AssertOnQuery = AssertOnQuery { q => + private def assertNumRowsDroppedByWatermark( + numRowsDroppedByWatermark: Long): AssertOnQuery = AssertOnQuery { q => q.processAllAvailable() val progressWithData = q.recentProgress.filterNot { p => // filter out batches which are falling into one of types: @@ -797,8 +797,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // 2) empty input batch p.inputRowsPerSecond == 0 }.lastOption.get - assert(progressWithData.stateOperators(0).numDroppedRowsByWatermark - === numDroppedRowsByWatermark) + assert(progressWithData.stateOperators(0).numRowsDroppedByWatermark + === numRowsDroppedByWatermark) true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index e28a1b77d2270..662e2fae0d8a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -55,8 +55,8 @@ trait StateStoreMetricsTest extends StreamTest { val allNumUpdatedRowsSinceLastCheck = progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated)) - val allnumDroppedRowsByWatermarkSinceLastCheck = - progressesSinceLastCheck.map(_.stateOperators.map(_.numDroppedRowsByWatermark)) + val allNumRowsDroppedByWatermarkSinceLastCheck = + progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsDroppedByWatermark)) lazy val debugString = "recent progresses:\n" + progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n") @@ -67,9 +67,9 @@ trait StateStoreMetricsTest extends StreamTest { val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString") - val numDroppedRowsByWatermark = arraySum(allnumDroppedRowsByWatermarkSinceLastCheck, + val numRowsDroppedByWatermark = arraySum(allNumRowsDroppedByWatermarkSinceLastCheck, numStateOperators) - assert(numDroppedRowsByWatermark === lateInputs, + assert(numRowsDroppedByWatermark === lateInputs, s"incorrect dropped rows by watermark, $debugString") lastCheckedRecentProgressIndex = recentProgress.length - 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 6ddc376c37a2c..98e2342c78e56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -64,7 +64,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, | "memoryUsedBytes" : 3, - | "numDroppedRowsByWatermark" : 0, + | "numRowsDroppedByWatermark" : 0, | "customMetrics" : { | "loadedMapCacheHitCount" : 1, | "loadedMapCacheMissCount" : 0, @@ -115,7 +115,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, | "memoryUsedBytes" : 2, - | "numDroppedRowsByWatermark" : 0 + | "numRowsDroppedByWatermark" : 0 | } ], | "sources" : [ { | "description" : "source", @@ -323,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numDroppedRowsByWatermark = 0, + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numRowsDroppedByWatermark = 0, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) @@ -355,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numDroppedRowsByWatermark = 0)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numRowsDroppedByWatermark = 0)), sources = Array( new SourceProgress( description = "source",