Skip to content

Commit

Permalink
Addressed comments, and added multiple failure tests for awaitAnyTerm…
Browse files Browse the repository at this point in the history
…ination
  • Loading branch information
tdas committed Feb 10, 2016
1 parent 5c3c690 commit d0003cf
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ trait ContinuousQuery {
* If the query has terminated with an exception, then the exception will be thrown.
*
* If the query has terminated, then all subsequent calls to this method will either return
* `true` immediately (if the query was terminated by `stop()`), or throw the exception
* immediately (if the query was terminated by `stop()`), or throw the exception
* immediately (if the query has terminated with exception).
*
* @throws ContinuousQueryException, if `this` query has terminated with an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark.sql

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.Offset
import org.apache.spark.sql.execution.streaming.{StreamExecution, Offset}
import org.apache.spark.util.Utils

/**
* :: Experimental ::
* Exception that stopped a [[ContinuousQuery]].
* @paaram query Query that caused the exception
* @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
* @param startOffset Starting offset (if known) of the range of data in which exception occurred
Expand All @@ -46,15 +47,9 @@ class ContinuousQueryException private[sql](
val causeStr =
s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
s"""
|$message
|
|=== Error ===
|$causeStr
|
|=== Offset range ===
|Start: ${startOffset.map { _.toString }.getOrElse("-")}
|End: ${endOffset.map { _.toString }.getOrElse("-")}
|
|${query.asInstanceOf[StreamExecution].toDebugString}
""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,21 @@ class ContinuousQueryManager(sqlContext: SQLContext) {

/**
* Wait until any of the queries on the associated SQLContext has terminated since the
* creation of the context, or since `clearTermination()` was called. If any query was terminated
* creation of the context, or since `resetTerminated()` was called. If any query was terminated
* with an exception, then the exception will be thrown.
*
* If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
* return immediately (if the query was terminated by `query.stop()`),
* or throw the exception immediately (if the query was terminated with exception). Use
* `resetTerminated()` to clear past terminations and wait for new terminations.
*
* Note that if multiple queries have terminated
* @throws ContinuousQueryException, if any query has terminated with an exception without
* `timeoutMs` milliseconds.
* In the case where multiple queries have terminated since `resetTermination()` was called,
* if any query has terminated with exception, then `awaitAnyTermination()` will
* throw any of the exception. For correctly documenting exceptions across multiple queries,
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws ContinuousQueryException, if any query has terminated with an exception
*
* @since 2.0.0
*/
Expand All @@ -89,25 +93,32 @@ class ContinuousQueryManager(sqlContext: SQLContext) {

/**
* Wait until any of the queries on the associated SQLContext has terminated since the
* creation of the context, or since `clearTermination()` was called. Returns whether the query
* has terminated or not. If the query has terminated with an exception,
* then the exception will be thrown.
* creation of the context, or since `resetTerminated()` was called. Returns whether any query
* has terminated or not (multiple may have terminated). If any query has terminated with an
* exception, then the exception will be thrown.
*
* If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
* return `true` immediately (if the query was terminated by `query.stop()`),
* or throw the exception immediately (if the query was terminated with exception). Use
* `resetTerminated()` to clear past terminations and wait for new terminations.
*
* In the case where multiple queries have terminated since `resetTermination()` was called,
* if any query has terminated with exception, then `awaitAnyTermination()` will
* throw any of the exception. For correctly documenting exceptions across multiple queries,
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws ContinuousQueryException, if any query has terminated with an exception
*
* @since 2.0.0
*/
def awaitAnyTermination(timeoutMs: Long): Boolean = {
val endTime = System.currentTimeMillis + timeoutMs
def timeLeft = math.max(endTime - System.currentTimeMillis, 0)

val startTime = System.currentTimeMillis
def isTimedout = System.currentTimeMillis - startTime >= timeoutMs

awaitTerminationLock.synchronized {
while (timeLeft > 0 && lastTerminatedQuery == null) {
while (!isTimedout && lastTerminatedQuery == null) {
awaitTerminationLock.wait(10)
}
if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) {
Expand Down Expand Up @@ -173,7 +184,9 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
activeQueries -= terminatedQuery.name
}
awaitTerminationLock.synchronized {
lastTerminatedQuery = terminatedQuery
if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
lastTerminatedQuery = terminatedQuery
}
awaitTerminationLock.notifyAll()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class StreamExecution(
case NonFatal(e) =>
streamDeathCause = new ContinuousQueryException(
this,
s"Query terminated with exception",
s"Query $name terminated with exception: ${e.getMessage}",
e,
Some(streamProgress.toCompositeOffset(sources)))
logError(s"Query $name terminated with error", e)
Expand Down Expand Up @@ -286,16 +286,28 @@ class StreamExecution(
}
}

override def toString: String =
override def toString: String = {
s"Continuous Query - $name [state = $state]"
}

def toDebugString: String = {
val deathCauseStr = if (streamDeathCause != null) {
"Error:\n" + stackTraceToString(streamDeathCause.cause)
} else ""
s"""
|=== Streaming Query ===
|=== Continuous Query ===
|Name: $name
|Current Offsets: $streamProgress
|
|Current State: $state
|Thread State: ${microBatchThread.getState}
|${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
|
|Logical Plan:
|$logicalPlan
|
|$deathCauseStr
""".stripMargin
}

trait State
case object INITIALIZED extends State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ abstract class ContinuousQueryListener {
@Experimental
object ContinuousQueryListener {

/** Base type of [[ContinuousQueryListener]] events */
/** Base type of [[ContinuousQueryListener]] events */
trait Event

/** Event representing the start of a query */
Expand Down
11 changes: 6 additions & 5 deletions sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,14 @@ trait StreamTest extends QueryTest with Timeouts {
}

case e: ExpectException[_] =>
val thrownException = withClue("Did not throw exception when expected.") {
intercept[ContinuousQueryException] {
failAfter(testTimeout) {
awaitTermFunc()
val thrownException =
withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") {
intercept[ContinuousQueryException] {
failAfter(testTimeout) {
awaitTermFunc()
}
}
}
}
assert(thrownException.cause.getClass === e.t.runtimeClass,
"exception of incorrect type was throw")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest}
import org.apache.spark.sql.{ContinuousQueryException, ContinuousQuery, Dataset, StreamTest}
import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation}
import org.apache.spark.sql.test.SharedSQLContext

Expand All @@ -37,6 +37,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
import AwaitTerminationTester._
import testImplicits._

override val streamingTimeout = 20.seconds

before {
assert(sqlContext.streams.active.isEmpty)
sqlContext.streams.resetTerminated()
Expand Down Expand Up @@ -91,8 +93,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
}
}

test("awaitAnyTermination without timeout and reset") {
val datasets = Seq.fill(3)(makeDataset._2)
test("awaitAnyTermination without timeout and resetTerminated") {
val datasets = Seq.fill(5)(makeDataset._2)
withQueriesOn(datasets: _*) { queries =>
require(queries.size === datasets.size)
assert(sqlContext.streams.active.toSet === queries.toSet)
Expand All @@ -101,9 +103,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
testAwaitAnyTermination(ExpectBlocked)

// Stop a query asynchronously and see if it is reported through awaitAnyTermination
val q1 = stopRandomQueryAsync(stopAfter = 500 milliseconds, withError = false)
val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
testAwaitAnyTermination(ExpectNotBlocked)
require(!q1.isActive) // should be marked active by the time the prev awaitAnyTerm returned
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should be non-blocking
testAwaitAnyTermination(ExpectNotBlocked)
Expand All @@ -114,21 +116,31 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with

// Terminate a query asynchronously with exception and see awaitAnyTermination throws
// the exception
val q2 = stopRandomQueryAsync(500 milliseconds, withError = true)
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
testAwaitAnyTermination(ExpectException[SparkException])
require(!q2.isActive) // should be marked active by the time the prev awaitAnyTerm returned
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should throw the exception
testAwaitAnyTermination(ExpectException[SparkException])

// Resetting termination should make awaitAnyTermination() blocking again
sqlContext.streams.resetTerminated()
testAwaitAnyTermination(ExpectBlocked)

// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
// the exception
val q3 = stopRandomQueryAsync(10 milliseconds, withError = false)
testAwaitAnyTermination(ExpectNotBlocked)
require(!q3.isActive)
val q4 = stopRandomQueryAsync(10 milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q4.isActive) }
// After q4 terminates with exception, awaitAnyTerm should start throwing exception
testAwaitAnyTermination(ExpectException[SparkException])
}
}

test("awaitTermination with timeout") {
val datasets = Seq.fill(5)(makeDataset._2)
test("awaitAnyTermination with timeout and resetTerminated") {
val datasets = Seq.fill(6)(makeDataset._2)
withQueriesOn(datasets: _*) { queries =>
require(queries.size === datasets.size)
assert(sqlContext.streams.active.toSet === queries.toSet)
Expand All @@ -153,7 +165,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
awaitTimeout = 1 second,
expectedReturnedValue = true,
testBehaviorFor = 2 seconds)
require(!q1.isActive) // should be marked active by the time the prev awaitAnyTerm returned
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
testAwaitAnyTermination(
Expand All @@ -169,12 +181,12 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with

// Terminate a query asynchronously with exception within timeout, awaitAnyTermination should
// throws the exception
val q2 = stopRandomQueryAsync(500 milliseconds, withError = true)
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 1 second,
testBehaviorFor = 2 seconds)
require(!q2.isActive) // should be marked active by the time the prev awaitAnyTerm returned
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should throw the exception
testAwaitAnyTermination(
Expand All @@ -197,6 +209,20 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
ExpectException[SparkException],
awaitTimeout = 100 milliseconds,
testBehaviorFor = 2 seconds)


// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
// the exception
sqlContext.streams.resetTerminated()

val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked, awaitTimeout = 1 second, expectedReturnedValue = true)
require(!q4.isActive)
val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
// After q4 terminates with exception, awaitAnyTerm should start throwing exception
//testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 100 milliseconds)
}
}

Expand Down Expand Up @@ -234,7 +260,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
expectedBehavior: ExpectedBehavior,
expectedReturnedValue: Boolean = false,
awaitTimeout: Span = null,
testBehaviorFor: Span = 1000 milliseconds
testBehaviorFor: Span = 2 seconds
): Unit = {

def awaitTermFunc(): Unit = {
Expand Down Expand Up @@ -268,7 +294,6 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
logDebug(s"Stopping query ${queryToStop.name}")
queryToStop.stop()
}

}
queryToStop
}
Expand Down

0 comments on commit d0003cf

Please sign in to comment.