Skip to content

Commit

Permalink
Rename numDroppedRowsByWatermark to numRowsDroppedByWatermark
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Jun 15, 2020
1 parent a712b4d commit 75d12d3
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 24 deletions.
4 changes: 2 additions & 2 deletions docs/structured-streaming-programming-guide.md
Expand Up @@ -1678,9 +1678,9 @@ emits late rows if the operator uses Append mode.
Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:

1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
2. On Streaming Query Listener: check "numDroppedRowsByWatermark" in "stateOperators" in QueryProcessEvent.
2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent.

Please note that "numDroppedRowsByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator.
Please note that "numRowsDroppedByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator.
It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero.

Expand Down
Expand Up @@ -77,7 +77,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numDroppedRowsByWatermark" -> SQLMetrics.createMetric(sparkContext,
"numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
"number of rows which are dropped by watermark"),
"numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
"numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"),
Expand All @@ -102,7 +102,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
numRowsTotal = longMetric("numTotalStateRows").value,
numRowsUpdated = longMetric("numUpdatedStateRows").value,
memoryUsedBytes = longMetric("stateMemory").value,
numDroppedRowsByWatermark = longMetric("numDroppedRowsByWatermark").value,
numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value,
javaConvertedCustomMetrics
)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = {
iter.filterNot { row =>
val lateInput = predicateDropRowByWatermark.eval(row)
if (lateInput) longMetric("numDroppedRowsByWatermark") += 1
if (lateInput) longMetric("numRowsDroppedByWatermark") += 1
lateInput
}
}
Expand Down
Expand Up @@ -43,7 +43,7 @@ class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long,
val memoryUsedBytes: Long,
val numDroppedRowsByWatermark: Long,
val numRowsDroppedByWatermark: Long,
val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
) extends Serializable {

Expand All @@ -63,7 +63,7 @@ class StateOperatorProgress private[sql](
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("numDroppedRowsByWatermark" -> JInt(numDroppedRowsByWatermark)) ~
("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
("customMetrics" -> {
if (!customMetrics.isEmpty) {
val keys = customMetrics.keySet.asScala.toSeq.sorted
Expand Down
Expand Up @@ -298,11 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((10, 5)),
assertNumStateRows(2),
assertNumDroppedRowsByWatermark(0),
assertNumRowsDroppedByWatermark(0),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2),
assertNumDroppedRowsByWatermark(1)
assertNumRowsDroppedByWatermark(1)
)
}

Expand All @@ -323,15 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((25, 1)),
assertNumStateRows(2),
assertNumDroppedRowsByWatermark(0),
assertNumRowsDroppedByWatermark(0),
AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
CheckNewAnswer((25, 2)),
assertNumStateRows(2),
assertNumDroppedRowsByWatermark(1),
assertNumRowsDroppedByWatermark(1),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2),
assertNumDroppedRowsByWatermark(1)
assertNumRowsDroppedByWatermark(1)
)
}

Expand Down Expand Up @@ -788,17 +788,17 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}

private def assertNumDroppedRowsByWatermark(
numDroppedRowsByWatermark: Long): AssertOnQuery = AssertOnQuery { q =>
private def assertNumRowsDroppedByWatermark(
numRowsDroppedByWatermark: Long): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
val progressWithData = q.recentProgress.filterNot { p =>
// filter out batches which are falling into one of types:
// 1) doesn't execute the batch run
// 2) empty input batch
p.inputRowsPerSecond == 0
}.lastOption.get
assert(progressWithData.stateOperators(0).numDroppedRowsByWatermark
=== numDroppedRowsByWatermark)
assert(progressWithData.stateOperators(0).numRowsDroppedByWatermark
=== numRowsDroppedByWatermark)
true
}

Expand Down
Expand Up @@ -55,8 +55,8 @@ trait StateStoreMetricsTest extends StreamTest {
val allNumUpdatedRowsSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))

val allnumDroppedRowsByWatermarkSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numDroppedRowsByWatermark))
val allNumRowsDroppedByWatermarkSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsDroppedByWatermark))

lazy val debugString = "recent progresses:\n" +
progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
Expand All @@ -67,9 +67,9 @@ trait StateStoreMetricsTest extends StreamTest {
val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators)
assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString")

val numDroppedRowsByWatermark = arraySum(allnumDroppedRowsByWatermarkSinceLastCheck,
val numRowsDroppedByWatermark = arraySum(allNumRowsDroppedByWatermarkSinceLastCheck,
numStateOperators)
assert(numDroppedRowsByWatermark === lateInputs,
assert(numRowsDroppedByWatermark === lateInputs,
s"incorrect dropped rows by watermark, $debugString")

lastCheckedRecentProgressIndex = recentProgress.length - 1
Expand Down
Expand Up @@ -64,7 +64,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 3,
| "numDroppedRowsByWatermark" : 0,
| "numRowsDroppedByWatermark" : 0,
| "customMetrics" : {
| "loadedMapCacheHitCount" : 1,
| "loadedMapCacheMissCount" : 0,
Expand Down Expand Up @@ -115,7 +115,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2,
| "numDroppedRowsByWatermark" : 0
| "numRowsDroppedByWatermark" : 0
| } ],
| "sources" : [ {
| "description" : "source",
Expand Down Expand Up @@ -323,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite {
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numDroppedRowsByWatermark = 0,
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numRowsDroppedByWatermark = 0,
customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
"loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
.mapValues(long2Long).asJava)
Expand Down Expand Up @@ -355,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite {
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numDroppedRowsByWatermark = 0)),
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numRowsDroppedByWatermark = 0)),
sources = Array(
new SourceProgress(
description = "source",
Expand Down

0 comments on commit 75d12d3

Please sign in to comment.