From e142b23463365e0ac7a35a40c7f1917d7dccef33 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 4 Oct 2016 16:04:45 -0700 Subject: [PATCH 1/2] Report NoClassDefFoundError in StreamExecution --- .../execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/streaming/StreamSuite.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9825f19b86a55..bf448072b3688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -207,7 +207,7 @@ class StreamExecution( }) } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() - case NonFatal(e) => + case e if NonFatal(e) || e.isInstanceOf[NoClassDefFoundError] => streamDeathCause = new StreamingQueryException( this, s"Query $name terminated with exception: ${e.getMessage}", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 1caafb9d74440..7dfb535006cc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -236,6 +236,26 @@ class StreamSuite extends StreamTest { } } + test("NoClassDefFoundError from an incompatible source") { + val brokenSource = new Source { + override def getOffset: Option[Offset] = { + throw new NoClassDefFoundError + } + + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + throw new NoClassDefFoundError + } + + override def schema: StructType = StructType(Array(StructField("value", IntegerType))) + + override def stop(): Unit = {} + } + val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(brokenSource)) + testStream(df)( + ExpectFailure[NoClassDefFoundError]() + ) + } + test("output mode API in Scala") { val o1 = OutputMode.Append assert(o1 === InternalOutputModes.Append) From 5c6210867c57b9c6a5b47d702520d89410259af6 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 5 Oct 2016 13:20:14 -0700 Subject: [PATCH 2/2] Always catch Throwable and report it --- .../execution/streaming/StreamExecution.scala | 7 +++- .../spark/sql/streaming/StreamSuite.scala | 39 ++++++++++++------- .../spark/sql/streaming/StreamTest.scala | 3 +- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bf448072b3688..e4db3a929a231 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -207,13 +207,18 @@ class StreamExecution( }) } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() - case e if NonFatal(e) || e.isInstanceOf[NoClassDefFoundError] => + case e: Throwable => streamDeathCause = new StreamingQueryException( this, s"Query $name terminated with exception: ${e.getMessage}", e, Some(committedOffsets.toCompositeOffset(sources))) logError(s"Query $name terminated with error", e) + // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to + // handle them + if (!NonFatal(e)) { + throw e + } } finally { state = TERMINATED sparkSession.streams.notifyQueryTermination(StreamExecution.this) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 7dfb535006cc3..cdbad901dba8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.streaming +import scala.reflect.ClassTag +import scala.util.control.ControlThrowable + import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.StreamSourceProvider -import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.ManualClock @@ -236,24 +238,31 @@ class StreamSuite extends StreamTest { } } - test("NoClassDefFoundError from an incompatible source") { - val brokenSource = new Source { - override def getOffset: Option[Offset] = { - throw new NoClassDefFoundError - } + testQuietly("fatal errors from a source should be sent to the user") { + for (e <- Seq( + new VirtualMachineError {}, + new ThreadDeath, + new LinkageError, + new ControlThrowable {} + )) { + val source = new Source { + override def getOffset: Option[Offset] = { + throw e + } - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - throw new NoClassDefFoundError - } + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + throw e + } - override def schema: StructType = StructType(Array(StructField("value", IntegerType))) + override def schema: StructType = StructType(Array(StructField("value", IntegerType))) - override def stop(): Unit = {} + override def stop(): Unit = {} + } + val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source)) + testStream(df)( + ExpectFailure()(ClassTag(e.getClass)) + ) } - val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(brokenSource)) - testStream(df)( - ExpectFailure[NoClassDefFoundError]() - ) } test("output mode API in Scala") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index aa6515bc7a909..4774f6a398c63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** Signals that a failure is expected and should not kill the test. */ case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] - override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]" + override def toString(): String = s"ExpectFailure[${causeClass.getName}]" } /** Assert that a body is true */ @@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { new UncaughtExceptionHandler { override def uncaughtException(t: Thread, e: Throwable): Unit = { streamDeathCause = e - testThread.interrupt() } })