Skip to content

Commit

Permalink
SPARK-24634 Add a new metric regarding number of rows later than wate…
Browse files Browse the repository at this point in the history
…rmark

* This adds a new metric to count the number of rows arrived later than watermark
  • Loading branch information
HeartSaVioR committed Jun 23, 2018
1 parent 4e7d867 commit ff1b895
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ case class FlatMapGroupsWithStateExec(
// If timeout is based on event time, then filter late data based on watermark
val filteredIter = watermarkPredicateForData match {
case Some(predicate) if timeoutConf == EventTimeTimeout =>
iter.filter(row => !predicate.eval(row))
applyRemovingRowsOlderThanWatermark(iter, predicate)
case _ =>
iter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ case class StreamingSymmetricHashJoinExec(
WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
case Some(watermarkExpr) =>
val predicate = newPredicate(watermarkExpr, inputAttributes)
inputIter.filter { row => !predicate.eval(row) }
applyRemovingRowsOlderThanWatermark(inputIter, predicate)
case None =>
inputIter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numLateRows" -> SQLMetrics.createMetric(sparkContext,
"number of rows which are later than watermark"),
"numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
"numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"),
"allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to update rows"),
Expand All @@ -93,7 +95,9 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
new StateOperatorProgress(
numRowsTotal = longMetric("numTotalStateRows").value,
numRowsUpdated = longMetric("numUpdatedStateRows").value,
memoryUsedBytes = longMetric("stateMemory").value)
memoryUsedBytes = longMetric("stateMemory").value,
numLateInputRows = longMetric("numLateRows").value
)
}

/** Records the duration of running `body` for the next query progress update. */
Expand Down Expand Up @@ -122,6 +126,15 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
}.toMap
}

protected def applyRemovingRowsOlderThanWatermark(iter: Iterator[InternalRow],
predicate: Predicate): Iterator[InternalRow] = {
iter.filter { row =>
val filteredIn = !predicate.eval(row)
if (!filteredIn) longMetric("numLateRows") += 1
filteredIn
}
}

/**
* Should the MicroBatchExecution run another batch based on this stateful operator and the
* current updated metadata.
Expand Down Expand Up @@ -301,7 +314,8 @@ case class StateStoreSaveExec(
// Assumption: watermark predicates must be non-empty if append mode is allowed
case Some(Append) =>
allUpdatesTimeMs += timeTakenMs {
val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
watermarkPredicateForData.get)
while (filteredIter.hasNext) {
val row = filteredIter.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
Expand Down Expand Up @@ -344,7 +358,7 @@ case class StateStoreSaveExec(
new NextIterator[InternalRow] {
// Filter late date using watermark if specified
private[this] val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
case None => iter
}
private val updatesStartTimeNs = System.nanoTime
Expand Down Expand Up @@ -421,14 +435,13 @@ case class StreamingDeduplicateExec(
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val numOutputRows = longMetric("numOutputRows")
val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
val commitTimeMs = longMetric("commitTimeMs")

val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
case None => iter
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.apache.spark.annotation.InterfaceStability
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long,
val memoryUsedBytes: Long
val memoryUsedBytes: Long,
val numLateInputRows: Long
) extends Serializable {

/** The compact JSON representation of this progress. */
Expand All @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
def prettyJson: String = pretty(render(jsonValue))

private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes)
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows)

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes))
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("numLateInputRows" -> JInt(numLateInputRows))
}

override def toString: String = prettyJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((10, 5)),
assertNumStateRows(2),
assertNumLateInputRows(0),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateInputRows(1)
)
}

Expand All @@ -187,12 +189,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((25, 1)),
assertNumStateRows(2),
assertNumLateInputRows(0),
AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
CheckNewAnswer((25, 2)),
assertNumStateRows(2),
assertNumLateInputRows(1),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateInputRows(1)
)
}

Expand Down Expand Up @@ -491,6 +496,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}

private def assertNumLateInputRows(numLateInputRows: Long): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
val progressWithData = q.recentProgress.lastOption.get
assert(progressWithData.stateOperators(0).numLateInputRows === numLateInputRows)
true
}

private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
AssertOnQuery { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ trait StateStoreMetricsTest extends StreamTest {
lastCheckedRecentProgressIndex = -1
}

def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery =
AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q =>
def assertNumStateRows(total: Seq[Long], updated: Seq[Long],
lateInputRows: Seq[Long]): AssertOnQuery =
AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" +
s", late input rows = $lateInputRows") { q =>
val recentProgress = q.recentProgress
require(recentProgress.nonEmpty, "No progress made, cannot check num state rows")
require(recentProgress.length < spark.sessionState.conf.streamingProgressRetention,
Expand All @@ -57,12 +59,15 @@ trait StateStoreMetricsTest extends StreamTest {
val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators)
assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString")

val numLateInputRows = recentProgress.last.stateOperators.map(_.numLateInputRows)
assert(numLateInputRows === lateInputRows, s"incorrect late input rows, $debugString")

lastCheckedRecentProgressIndex = recentProgress.length - 1
true
}

def assertNumStateRows(total: Long, updated: Long): AssertOnQuery =
assertNumStateRows(Seq(total), Seq(updated))
def assertNumStateRows(total: Long, updated: Long, lateInputRows: Long = 0): AssertOnQuery =
assertNumStateRows(Seq(total), Seq(updated), Seq(lateInputRows))

def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = {
if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
testStream(result, Append)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates`
CheckLastBatch(),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, "b" -> 1),
CheckLastBatch("b" -> 1),
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand All @@ -107,7 +107,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(total = 1, updated = 0),
assertNumStateRows(total = 1, updated = 0, lateInputRows = 1),

AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25
CheckNewAnswer(45),
Expand All @@ -131,23 +131,23 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
CheckLastBatch(),
// states in aggregate in [10, 14), [15, 20) (2 windows)
// states in deduplicate is 10 to 15
assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)),
assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, 25), // Advance watermark to 15 seconds
CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate, emitted with no-data-batch
// states in aggregate in [15, 20) and [25, 30); no-data-batch removed [10, 14)
// states in deduplicate is 25, no-data-batch removed 10 to 14
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckLastBatch(),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 1L)),

AddData(inputData, 40), // Advance watermark to 30 seconds
CheckLastBatch((15 -> 1), (25 -> 1)),
// states in aggregate is [40, 45); no-data-batch removed [15, 20) and [25, 30)
// states in deduplicate is 40; no-data-batch removed 25
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand All @@ -163,16 +163,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
testStream(result, Update)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1L),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 1), // Dropped
CheckLastBatch(),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 2),
CheckLastBatch("a" -> 3L),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "b" -> 1),
CheckLastBatch("b" -> 1L),
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand All @@ -188,16 +188,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
testStream(result, Complete)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1L),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 1), // Dropped
CheckLastBatch("a" -> 1L),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 2),
CheckLastBatch("a" -> 3L),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "b" -> 1),
CheckLastBatch("a" -> 3L, "b" -> 1L),
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(input2, 1),
CheckNewAnswer(), // Should not join as < 15 removed
assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15
assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15

AddData(input1, 5),
CheckNewAnswer(), // Same reason as above
assertNumStateRows(total = 2, updated = 0)
assertNumStateRows(total = 2, updated = 0, lateInputRows = 1) // row later than watermark
)
}

Expand Down Expand Up @@ -216,12 +216,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
// (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state
AddData(rightInput, (1, 20), (1, 21), (1, 28)),
CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
assertNumStateRows(total = 5, updated = 1),
assertNumStateRows(total = 5, updated = 1, lateInputRows = 1),

// New data to left input with leftTime <= 20 should be filtered due to event time watermark
AddData(leftInput, (1, 20), (1, 21)),
CheckNewAnswer((1, 21, 28)),
assertNumStateRows(total = 6, updated = 1)
assertNumStateRows(total = 6, updated = 1, lateInputRows = 1)
)
}

Expand Down Expand Up @@ -290,7 +290,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state
CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)),
assertNumStateRows(total = 11, updated = 1), // only 31 added
assertNumStateRows(total = 11, updated = 1, lateInputRows = 1), // only 31 added

// Advance the watermark
AddData(rightInput, (1, 80)),
Expand All @@ -304,7 +304,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state
CheckNewAnswer((1, 49, 50), (1, 50, 50)),
assertNumStateRows(total = 7, updated = 1) // 50 added
assertNumStateRows(total = 7, updated = 1, lateInputRows = 1) // 50 added
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2
| "memoryUsedBytes" : 2,
| "numLateInputRows" : 0
| } ],
| "sources" : [ {
| "description" : "source",
Expand Down Expand Up @@ -91,7 +92,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2
| "memoryUsedBytes" : 2,
| "numLateInputRows" : 0
| } ],
| "sources" : [ {
| "description" : "source",
Expand Down Expand Up @@ -230,7 +232,7 @@ object StreamingQueryStatusAndProgressSuite {
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputRows = 0)),
sources = Array(
new SourceProgress(
description = "source",
Expand All @@ -254,7 +256,7 @@ object StreamingQueryStatusAndProgressSuite {
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputRows = 0)),
sources = Array(
new SourceProgress(
description = "source",
Expand Down

0 comments on commit ff1b895

Please sign in to comment.