Skip to content

Commit

Permalink
SPARK-26170 Add missing metrics in FlatMapGroupsWithState
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Nov 26, 2018
1 parent 6339c8c commit 56f39cc
Showing 1 changed file with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,41 @@ case class FlatMapGroupsWithStateExec(
indexOrdinal = None,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val numOutputRows = longMetric("numOutputRows")
val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
val commitTimeMs = longMetric("commitTimeMs")

val processor = new InputProcessor(store)

// If timeout is based on event time, then filter late data based on watermark
val filteredIter = watermarkPredicateForData match {
case Some(predicate) if timeoutConf == EventTimeTimeout =>
iter.filter(row => !predicate.eval(row))
case _ =>
iter
// variable `outputIterator` is no chance to be unassigned, so setting to null is safe
var outputIterator: Iterator[InternalRow] = null
allUpdatesTimeMs += timeTakenMs {
// If timeout is based on event time, then filter late data based on watermark
val filteredIter = watermarkPredicateForData match {
case Some(predicate) if timeoutConf == EventTimeTimeout =>
iter.filter(row => !predicate.eval(row))
case _ =>
iter
}
// Generate a iterator that returns the rows grouped by the grouping function
// Note that this code ensures that the filtering for timeout occurs only after
// all the data has been processed. This is to ensure that the timeout information of all
// the keys with data is updated before they are processed for timeouts.
outputIterator = processor.processNewData(filteredIter) ++
processor.processTimedOutState()
}
// Generate a iterator that returns the rows grouped by the grouping function
// Note that this code ensures that the filtering for timeout occurs only after
// all the data has been processed. This is to ensure that the timeout information of all
// the keys with data is updated before they are processed for timeouts.
val outputIterator =
processor.processNewData(filteredIter) ++ processor.processTimedOutState()

// Return an iterator of all the rows generated by all the keys, such that when fully
// consumed, all the state updates will be committed by the state store
CompletionIterator[InternalRow, Iterator[InternalRow]](
outputIterator,
outputIterator.map { row =>
numOutputRows += 1
row
},
{
store.commit()
commitTimeMs += timeTakenMs {
store.commit()
}
setStoreMetrics(store)
}
)
Expand Down

0 comments on commit 56f39cc

Please sign in to comment.