Skip to content

Commit

Permalink
[SPARK-28556][SQL] QueryExecutionListener should also notify Error
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Right now `Error` is not sent to `QueryExecutionListener.onFailure`. If there is any `Error` (such as `AssertionError`) when running a query, `QueryExecutionListener.onFailure` cannot be triggered.

This PR changes `onFailure` to accept a `Throwable` instead.

## How was this patch tested?

Jenkins

Closes #25292 from zsxwing/fix-QueryExecutionListener.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
zsxwing authored and HyukjinKwon committed Jul 30, 2019
1 parent caa23e3 commit 196a4d7
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 26 deletions.
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Expand Up @@ -376,7 +376,11 @@ object MimaExcludes {

// [SPARK-28199][SS] Remove deprecated ProcessingTime
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$"),

// [SPARK-28556][SQL] QueryExecutionListener should also notify Error
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure")
)

// Exclude rules for 2.4.x
Expand Down
Expand Up @@ -85,7 +85,7 @@ object SQLExecution {
}.getOrElse(callSite.shortForm)

withSQLConfPropagated(sparkSession) {
var ex: Option[Exception] = None
var ex: Option[Throwable] = None
val startTime = System.nanoTime()
try {
sc.listenerBus.post(SparkListenerSQLExecutionStart(
Expand All @@ -99,7 +99,7 @@ object SQLExecution {
time = System.currentTimeMillis()))
body
} catch {
case e: Exception =>
case e: Throwable =>
ex = Some(e)
throw e
} finally {
Expand Down
Expand Up @@ -60,7 +60,7 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
@JsonIgnore private[sql] var qe: QueryExecution = null

// The exception object that caused this execution to fail. None if the execution doesn't fail.
@JsonIgnore private[sql] var executionFailure: Option[Exception] = None
@JsonIgnore private[sql] var executionFailure: Option[Throwable] = None
}

/**
Expand Down
Expand Up @@ -58,12 +58,12 @@ trait QueryExecutionListener {
* @param funcName the name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
* @param exception the exception that failed this query.
* @param error the error that failed this query.
*
* @note This can be invoked by multiple different threads.
*/
@DeveloperApi
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 11, 2020

Contributor

Aren't we breaking compatibility here for anyone that has implemented this? I know this is a developer API, but I'm worried there are a lot of implementations of this.

This comment has been minimized.

Copy link
@marmbrus

marmbrus Mar 11, 2020

Contributor

Could we instead wrap the errors in a runtime exception or something so we don't need to change the signature?

}


Expand Down
Expand Up @@ -135,7 +135,7 @@ class SessionStateSuite extends SparkFunSuite {
test("fork new session and inherit listener manager") {
class CommandCollector extends QueryExecutionListener {
val commands: ArrayBuffer[String] = ArrayBuffer.empty[String]
override def onFailure(funcName: String, qe: QueryExecution, ex: Exception) : Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable) : Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
commands += funcName
}
Expand Down
Expand Up @@ -28,7 +28,7 @@ class TestQueryExecutionListener extends QueryExecutionListener {
OnSuccessCall.isOnSuccessCalled.set(true)
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { }
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { }
}

/**
Expand Down
Expand Up @@ -338,7 +338,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
withTempPath { path =>
var numTotalCachedHit = 0
val listener = new QueryExecutionListener {
override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}
override def onFailure(f: String, qe: QueryExecution, e: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
qe.withCachedData match {
Expand Down
Expand Up @@ -180,13 +180,13 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSQLContext {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> format,
SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> format) {
val commands = ArrayBuffer.empty[(String, LogicalPlan)]
val exceptions = ArrayBuffer.empty[(String, Exception)]
val errors = ArrayBuffer.empty[(String, Throwable)]
val listener = new QueryExecutionListener {
override def onFailure(
funcName: String,
qe: QueryExecution,
exception: Exception): Unit = {
exceptions += funcName -> exception
error: Throwable): Unit = {
errors += funcName -> error
}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
Expand Down
Expand Up @@ -267,7 +267,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
plan = qe.analyzed

}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}
}

spark.listenerManager.register(listener)
Expand Down
Expand Up @@ -36,7 +36,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)]
val listener = new QueryExecutionListener {
// Only test successful case here, so no need to implement `onFailure`
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
metrics += ((funcName, qe, duration))
Expand All @@ -63,10 +63,10 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
}

testQuietly("execute callback functions when a DataFrame action failed") {
val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)]
val metrics = ArrayBuffer.empty[(String, QueryExecution, Throwable)]
val listener = new QueryExecutionListener {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
metrics += ((funcName, qe, exception))
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {
metrics += ((funcName, qe, error))
}

// Only test failed case here, so no need to implement `onSuccess`
Expand All @@ -92,7 +92,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
val metrics = ArrayBuffer.empty[Long]
val listener = new QueryExecutionListener {
// Only test successful case here, so no need to implement `onFailure`
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
val metric = qe.executedPlan match {
Expand Down Expand Up @@ -132,7 +132,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
val metrics = ArrayBuffer.empty[Long]
val listener = new QueryExecutionListener {
// Only test successful case here, so no need to implement `onFailure`
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
metrics += qe.executedPlan.longMetric("dataSize").value
Expand Down Expand Up @@ -172,10 +172,10 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {

test("execute callback functions for DataFrameWriter") {
val commands = ArrayBuffer.empty[(String, LogicalPlan)]
val exceptions = ArrayBuffer.empty[(String, Exception)]
val errors = ArrayBuffer.empty[(String, Throwable)]
val listener = new QueryExecutionListener {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
exceptions += funcName -> exception
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {
errors += funcName -> error
}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
Expand Down Expand Up @@ -221,9 +221,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
spark.range(10).select($"id", $"id").write.insertInto("tab")
}
sparkContext.listenerBus.waitUntilEmpty(1000)
assert(exceptions.length == 1)
assert(exceptions.head._1 == "insertInto")
assert(exceptions.head._2 == e)
assert(errors.length == 1)
assert(errors.head._1 == "insertInto")
assert(errors.head._2 == e)
}
}
}
Expand Up @@ -57,7 +57,7 @@ private class CountingQueryExecutionListener extends QueryExecutionListener {
CALLBACK_COUNT.incrementAndGet()
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {
CALLBACK_COUNT.incrementAndGet()
}

Expand Down

0 comments on commit 196a4d7

Please sign in to comment.