Skip to content

Commit

Permalink
[SPARK-13055] SQLHistoryListener throws ClassCastException
Browse files Browse the repository at this point in the history
This is an existing issue uncovered recently by #10835. The reason for the exception was because the `SQLHistoryListener` gets all sorts of accumulators, not just the ones that represent SQL metrics. For example, the listener gets the `internal.metrics.shuffleRead.remoteBlocksFetched`, which is an Int, then it proceeds to cast the Int to a Long, which fails.

The fix is to mark accumulators representing SQL metrics using some internal metadata. Then we can identify which ones are SQL metrics and only process those in the `SQLHistoryListener`.

Author: Andrew Or <andrew@databricks.com>

Closes #10971 from andrewor14/fix-sql-history.
  • Loading branch information
Andrew Or authored and zsxwing committed Jan 29, 2016
1 parent 2b027e9 commit e38b0ba
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 45 deletions.
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/Accumulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, Serializable}
import scala.collection.generic.Growable
import scala.reflect.ClassTag

import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -187,6 +188,13 @@ class Accumulable[R, T] private (
*/
private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }

/**
* Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
new AccumulableInfo(id, name, update, value, internal, countFailedValues)
}

// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
* field is always empty, since this represents the partial updates recorded in this task,
* not the aggregated value across multiple tasks.
*/
def accumulatorUpdates(): Seq[AccumulableInfo] = accums.map { a =>
new AccumulableInfo(a.id, a.name, Some(a.localValue), None, a.isInternal, a.countFailedValues)
def accumulatorUpdates(): Seq[AccumulableInfo] = {
accums.map { a => a.toInfo(Some(a.localValue), None) }
}

// If we are reconstructing this TaskMetrics on the driver, some metrics may already be set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
* @param value total accumulated value so far, maybe None if used on executors to describe a task
* @param internal whether this accumulator was internal
* @param countFailedValues whether to count this accumulator's partial value if the task failed
* @param metadata internal metadata associated with this accumulator, if any
*/
@DeveloperApi
case class AccumulableInfo private[spark] (
Expand All @@ -43,7 +44,9 @@ case class AccumulableInfo private[spark] (
update: Option[Any], // represents a partial update within a task
value: Option[Any],
private[spark] val internal: Boolean,
private[spark] val countFailedValues: Boolean)
private[spark] val countFailedValues: Boolean,
// TODO: use this to identify internal task metrics instead of encoding it in the name
private[spark] val metadata: Option[String] = None)


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,11 +1101,8 @@ class DAGScheduler(
acc ++= partialValue
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name
stage.latestInfo.accumulables(id) = new AccumulableInfo(
id, name, None, Some(acc.value), acc.isInternal, acc.countFailedValues)
event.taskInfo.accumulables += new AccumulableInfo(
id, name, Some(partialValue), Some(acc.value), acc.isInternal, acc.countFailedValues)
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
event.taskInfo.accumulables += acc.toInfo(Some(partialValue), Some(acc.value))
}
}
} catch {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ private[spark] object JsonProtocol {
("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~
("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~
("Internal" -> accumulableInfo.internal) ~
("Count Failed Values" -> accumulableInfo.countFailedValues)
("Count Failed Values" -> accumulableInfo.countFailedValues) ~
("Metadata" -> accumulableInfo.metadata)
}

/**
Expand Down Expand Up @@ -728,7 +729,8 @@ private[spark] object JsonProtocol {
val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
new AccumulableInfo(id, name, update, value, internal, countFailedValues)
val metadata = (json \ "Metadata").extractOpt[String]
new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,6 @@ private[spark] object TaskMetricsSuite extends Assertions {
* Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
* info as an accumulator update.
*/
def makeInfo(a: Accumulable[_, _]): AccumulableInfo = {
new AccumulableInfo(a.id, a.name, Some(a.value), None, a.isInternal, a.countFailedValues)
}
def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)

}
Original file line number Diff line number Diff line change
Expand Up @@ -1581,12 +1581,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(Accumulators.get(acc1.id).isDefined)
assert(Accumulators.get(acc2.id).isDefined)
assert(Accumulators.get(acc3.id).isDefined)
val accInfo1 = new AccumulableInfo(
acc1.id, acc1.name, Some(15L), None, internal = false, countFailedValues = false)
val accInfo2 = new AccumulableInfo(
acc2.id, acc2.name, Some(13L), None, internal = false, countFailedValues = false)
val accInfo3 = new AccumulableInfo(
acc3.id, acc3.name, Some(18L), None, internal = false, countFailedValues = false)
val accInfo1 = acc1.toInfo(Some(15L), None)
val accInfo2 = acc2.toInfo(Some(13L), None)
val accInfo3 = acc3.toInfo(Some(18L), None)
val accumUpdates = Seq(accInfo1, accInfo2, accInfo3)
val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates)
submit(new MyRDD(sc, 1, Nil), Array(0))
Expand Down Expand Up @@ -1954,10 +1951,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
val accumUpdates = reason match {
case Success =>
task.initialAccumulators.map { a =>
new AccumulableInfo(a.id, a.name, Some(a.zero), None, a.isInternal, a.countFailedValues)
}
case Success => task.initialAccumulators.map { a => a.toInfo(Some(a.zero), None) }
case ef: ExceptionFailure => ef.accumUpdates
case _ => Seq.empty[AccumulableInfo]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a =>
new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
}
val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a => a.toInfo(Some(0L), None) }

// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
Expand All @@ -186,9 +184,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
task.initialAccumulators.map { a =>
new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
}
task.initialAccumulators.map { a => a.toInfo(Some(0L), None) }
}

// First three offers should all find tasks
Expand Down
16 changes: 10 additions & 6 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,18 @@ class JsonProtocolSuite extends SparkFunSuite {
test("AccumulableInfo backward compatibility") {
// "Internal" property of AccumulableInfo was added in 1.5.1
val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true)
val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
.removeField({ _._1 == "Internal" })
val accumulableInfoJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
val oldJson = accumulableInfoJson.removeField({ _._1 == "Internal" })
val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
assert(!oldInfo.internal)
// "Count Failed Values" property of AccumulableInfo was added in 2.0.0
val oldJson2 = JsonProtocol.accumulableInfoToJson(accumulableInfo)
.removeField({ _._1 == "Count Failed Values" })
val oldJson2 = accumulableInfoJson.removeField({ _._1 == "Count Failed Values" })
val oldInfo2 = JsonProtocol.accumulableInfoFromJson(oldJson2)
assert(!oldInfo2.countFailedValues)
// "Metadata" property of AccumulableInfo was added in 2.0.0
val oldJson3 = accumulableInfoJson.removeField({ _._1 == "Metadata" })
val oldInfo3 = JsonProtocol.accumulableInfoFromJson(oldJson3)
assert(oldInfo3.metadata.isEmpty)
}

test("ExceptionFailure backward compatibility: accumulator updates") {
Expand Down Expand Up @@ -820,9 +823,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
private def makeAccumulableInfo(
id: Int,
internal: Boolean = false,
countFailedValues: Boolean = false): AccumulableInfo =
countFailedValues: Boolean = false,
metadata: Option[String] = None): AccumulableInfo =
new AccumulableInfo(id, Some(s"Accumulable$id"), Some(s"delta$id"), Some(s"val$id"),
internal, countFailedValues)
internal, countFailedValues, metadata)

/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.metric

import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils

/**
Expand All @@ -27,9 +28,16 @@ import org.apache.spark.util.Utils
* An implementation of SQLMetric should override `+=` and `add` to avoid boxing.
*/
private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T](
name: String, val param: SQLMetricParam[R, T])
name: String,
val param: SQLMetricParam[R, T])
extends Accumulable[R, T](param.zero, param, Some(name), internal = true) {

// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
new AccumulableInfo(id, Some(name), update, value, isInternal, countFailedValues,
Some(SQLMetrics.ACCUM_IDENTIFIER))
}

def reset(): Unit = {
this.value = param.zero
}
Expand Down Expand Up @@ -73,6 +81,14 @@ private[sql] class LongSQLMetricValue(private var _value : Long) extends SQLMetr

// Although there is a boxing here, it's fine because it's only called in SQLListener
override def value: Long = _value

// Needed for SQLListenerSuite
override def equals(other: Any): Boolean = {
other match {
case o: LongSQLMetricValue => value == o.value
case _ => false
}
}
}

/**
Expand Down Expand Up @@ -126,6 +142,9 @@ private object StaticsLongSQLMetricParam extends LongSQLMetricParam(

private[sql] object SQLMetrics {

// Identifier for distinguishing SQL metrics from other accumulators
private[sql] val ACCUM_IDENTIFIER = "sql"

private def createLongMetric(
sc: SparkContext,
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricParam, SQLMetricValue}
import org.apache.spark.sql.execution.metric._
import org.apache.spark.ui.SparkUI

@DeveloperApi
Expand Down Expand Up @@ -314,24 +314,33 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi

}


/**
* A [[SQLListener]] for rendering the SQL UI in the history server.
*/
private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
extends SQLListener(conf) {

private var sqlTabAttached = false

override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
// Do nothing
override def onExecutorMetricsUpdate(u: SparkListenerExecutorMetricsUpdate): Unit = {
// Do nothing; these events are not logged
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
updateTaskAccumulatorValues(
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
taskEnd.taskInfo.accumulables.map { a =>
val newValue = new LongSQLMetricValue(a.update.map(_.asInstanceOf[Long]).getOrElse(0L))
a.copy(update = Some(newValue))
taskEnd.taskInfo.accumulables.flatMap { a =>
// Filter out accumulators that are not SQL metrics
// For now we assume all SQL metrics are Long's that have been JSON serialized as String's
if (a.metadata.exists(_ == SQLMetrics.ACCUM_IDENTIFIER)) {
val newValue = new LongSQLMetricValue(a.update.map(_.toString.toLong).getOrElse(0L))
Some(a.copy(update = Some(newValue)))
} else {
None
}
},
finishTask = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
import org.apache.spark.util.{JsonProtocol, Utils}


class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
Expand Down Expand Up @@ -356,6 +356,28 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("metrics can be loaded by history server") {
val metric = new LongSQLMetric("zanzibar", LongSQLMetricParam)
metric += 10L
val metricInfo = metric.toInfo(Some(metric.localValue), None)
metricInfo.update match {
case Some(v: LongSQLMetricValue) => assert(v.value === 10L)
case Some(v) => fail(s"metric value was not a LongSQLMetricValue: ${v.getClass.getName}")
case _ => fail("metric update is missing")
}
assert(metricInfo.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
// After serializing to JSON, the original value type is lost, but we can still
// identify that it's a SQL metric from the metadata
val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo)
val metricInfoDeser = JsonProtocol.accumulableInfoFromJson(metricInfoJson)
metricInfoDeser.update match {
case Some(v: String) => assert(v.toLong === 10L)
case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}")
case _ => fail("deserialized metric update is missing")
}
assert(metricInfoDeser.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
}

}

private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI

class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -335,8 +336,43 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
}

test("SPARK-13055: history listener only tracks SQL metrics") {
val listener = new SQLHistoryListener(sparkContext.conf, mock(classOf[SparkUI]))
// We need to post other events for the listener to track our accumulators.
// These are largely just boilerplate unrelated to what we're trying to test.
val df = createTestDataFrame
val executionStart = SparkListenerSQLExecutionStart(
0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
val stageInfo = createStageInfo(0, 0)
val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
// This task has both accumulators that are SQL metrics and accumulators that are not.
// The listener should only track the ones that are actually SQL metrics.
val sqlMetric = SQLMetrics.createLongMetric(sparkContext, "beach umbrella")
val nonSqlMetric = sparkContext.accumulator[Int](0, "baseball")
val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.localValue), None)
val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.localValue), None)
val taskInfo = createTaskInfo(0, 0)
taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo)
val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
listener.onOtherEvent(executionStart)
listener.onJobStart(jobStart)
listener.onStageSubmitted(stageSubmitted)
// Before SPARK-13055, this throws ClassCastException because the history listener would
// assume that the accumulator value is of type Long, but this may not be true for
// accumulators that are not SQL metrics.
listener.onTaskEnd(taskEnd)
val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { stageMetrics =>
stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
}
// Listener tracks only SQL metrics, not other accumulators
assert(trackedAccums.size === 1)
assert(trackedAccums.head === sqlMetricInfo)
}

}


class SQLListenerMemoryLeakSuite extends SparkFunSuite {

test("no memory leak") {
Expand Down

0 comments on commit e38b0ba

Please sign in to comment.