Skip to content

Commit

Permalink
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to …
Browse files Browse the repository at this point in the history
…the execution when merging all accumulator updates

## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.

## How was this patch tested?
Updated unit test.

Author: Carson Wang <carson.wang@intel.com>

Closes #17009 from carsonwang/FixSQLMetrics.
  • Loading branch information
carsonwang authored and cloud-fan committed Feb 23, 2017
1 parent f87a6a5 commit eff7b40
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
(accumulatorUpdate._1, accumulatorUpdate._2)
}
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
}

val driverUpdates = executionUIData.driverAccumUpdates.toSeq
mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
case (id, _) => executionUIData.accumulatorMetrics.contains(id)
}
mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
executionUIData.accumulatorMetrics(accumulatorId).metricType)
case None =>
// This execution has been dropped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest

checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))

// Driver accumulator updates don't belong to this execution should be filtered and no
// exception will be thrown.
listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))

listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
Expand Down

0 comments on commit eff7b40

Please sign in to comment.