Skip to content

Commit

Permalink
Fixed bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 9, 2016
1 parent 2697218 commit 144adbb
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.streaming.Offset
/**
* :: Experimental ::
* Exception that stopped a [[ContinuousQuery]].
*
* @paaram query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
* @param startOffset Starting offset (if known) of the range of data in which exception occurred
Expand All @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.streaming.Offset
*/
@Experimental
class ContinuousQueryException private[sql](
val query: ContinuousQuery,
val message: String,
val cause: Throwable,
val startOffset: Option[Offset] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ class StreamExecution(
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
case NonFatal(e) =>
streamDeathCause = new ContinuousQueryException(
s"Query terminated with exception", e, Some(streamProgress.toCompositeOffset(sources)))
this,
s"Query terminated with exception",
e,
Some(streamProgress.toCompositeOffset(sources)))
logError(s"Query $name terminated with error", e)
} finally {
state = TERMINATED
Expand Down Expand Up @@ -261,7 +264,7 @@ class StreamExecution(
}

override def awaitTermination(): Unit = {
if (state != INITIALIZED) {
if (state == INITIALIZED) {
throw new IllegalStateException("Cannot wait for termination on a query that has not started")
}
terminationLatch.await()
Expand All @@ -271,7 +274,7 @@ class StreamExecution(
}

override def awaitTermination(timeoutMs: Long): Boolean = {
if (state != INITIALIZED) {
if (state == INITIALIZED) {
throw new IllegalStateException("Cannot wait for termination on a query that has not started")
}
require(timeoutMs > 0, "Timeout has to be positive")
Expand Down
15 changes: 13 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,18 @@ trait StreamTest extends QueryTest with Timeouts {
}

def failTest(message: String, cause: Throwable = null) = {
val c = Option(cause).map { e =>
e.getMessage + e.getStackTrace.take(10).mkString("\n\t", "\n|\t", "\n")

// Recursively pretty print a exception with truncated stacktrace and internal cause
def exceptionToString(e: Throwable, prefix: String = ""): String = {
val base = s"$prefix${e.getMessage}" +
e.getStackTrace.take(10).mkString(s"\n$prefix", s"\n$prefix\t", "\n")
if (e.getCause != null) {
base + s"\n$prefix\tCaused by: " + exceptionToString(e.getCause, s"$prefix\t")
} else {
base
}
}
val c = Option(cause).map(exceptionToString(_))
val m = if (message != null && message.size > 0) Some(message) else None
fail(
s"""
Expand Down Expand Up @@ -312,6 +321,8 @@ trait StreamTest extends QueryTest with Timeouts {
eventually("microbatch thread not stopped after termination with failure") {
assert(!currentStream.microBatchThread.isAlive)
}
verify(thrownException.query.eq(currentStream),
s"incorrect query reference in exception")
verify(currentStream.exception === Some(thrownException),
s"incorrect exception returned by query.exception()")

Expand Down

0 comments on commit 144adbb

Please sign in to comment.