From d0003cfd9756106e53a757bdb5353cc0c67f3d9e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 9 Feb 2016 18:23:05 -0800 Subject: [PATCH] Addressed comments, and added multiple failure tests for awaitAnyTermination --- .../apache/spark/sql/ContinuousQuery.scala | 2 +- .../spark/sql/ContinuousQueryException.scala | 13 ++--- .../spark/sql/ContinuousQueryManager.scala | 35 ++++++++---- .../execution/streaming/StreamExecution.scala | 20 +++++-- .../sql/util/ContinuousQueryListener.scala | 2 +- .../org/apache/spark/sql/StreamTest.scala | 11 ++-- .../ContinuousQueryManagerSuite.scala | 53 ++++++++++++++----- 7 files changed, 91 insertions(+), 45 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala index a02a45fa8becf..eb69804c39b5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala @@ -66,7 +66,7 @@ trait ContinuousQuery { * If the query has terminated with an exception, then the exception will be thrown. * * If the query has terminated, then all subsequent calls to this method will either return - * `true` immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query was terminated by `stop()`), or throw the exception * immediately (if the query has terminated with exception). * * @throws ContinuousQueryException, if `this` query has terminated with an exception. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala index 66910b6c6e826..ee2369e2e9051 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala @@ -18,12 +18,13 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.Offset +import org.apache.spark.sql.execution.streaming.{StreamExecution, Offset} +import org.apache.spark.util.Utils /** * :: Experimental :: * Exception that stopped a [[ContinuousQuery]]. - * @paaram query Query that caused the exception + * @param query Query that caused the exception * @param message Message of this exception * @param cause Internal cause of this exception * @param startOffset Starting offset (if known) of the range of data in which exception occurred @@ -46,15 +47,9 @@ class ContinuousQueryException private[sql]( val causeStr = s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}" s""" - |$message - | - |=== Error === |$causeStr | - |=== Offset range === - |Start: ${startOffset.map { _.toString }.getOrElse("-")} - |End: ${endOffset.map { _.toString }.getOrElse("-")} - | + |${query.asInstanceOf[StreamExecution].toDebugString} """.stripMargin } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 7ead4bcfa6923..13142d0e61f71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -62,7 +62,7 @@ class ContinuousQueryManager(sqlContext: SQLContext) { /** * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `clearTermination()` was called. If any query was terminated + * creation of the context, or since `resetTerminated()` was called. If any query was terminated * with an exception, then the exception will be thrown. * * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either @@ -70,9 +70,13 @@ class ContinuousQueryManager(sqlContext: SQLContext) { * or throw the exception immediately (if the query was terminated with exception). Use * `resetTerminated()` to clear past terminations and wait for new terminations. * - * Note that if multiple queries have terminated - * @throws ContinuousQueryException, if any query has terminated with an exception without - * `timeoutMs` milliseconds. + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. + * + * @throws ContinuousQueryException, if any query has terminated with an exception * * @since 2.0.0 */ @@ -89,25 +93,32 @@ class ContinuousQueryManager(sqlContext: SQLContext) { /** * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `clearTermination()` was called. Returns whether the query - * has terminated or not. If the query has terminated with an exception, - * then the exception will be thrown. + * creation of the context, or since `resetTerminated()` was called. Returns whether any query + * has terminated or not (multiple may have terminated). If any query has terminated with an + * exception, then the exception will be thrown. * * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either * return `true` immediately (if the query was terminated by `query.stop()`), * or throw the exception immediately (if the query was terminated with exception). Use * `resetTerminated()` to clear past terminations and wait for new terminations. * + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. + * * @throws ContinuousQueryException, if any query has terminated with an exception * * @since 2.0.0 */ def awaitAnyTermination(timeoutMs: Long): Boolean = { - val endTime = System.currentTimeMillis + timeoutMs - def timeLeft = math.max(endTime - System.currentTimeMillis, 0) + + val startTime = System.currentTimeMillis + def isTimedout = System.currentTimeMillis - startTime >= timeoutMs awaitTerminationLock.synchronized { - while (timeLeft > 0 && lastTerminatedQuery == null) { + while (!isTimedout && lastTerminatedQuery == null) { awaitTerminationLock.wait(10) } if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { @@ -173,7 +184,9 @@ class ContinuousQueryManager(sqlContext: SQLContext) { activeQueries -= terminatedQuery.name } awaitTerminationLock.synchronized { - lastTerminatedQuery = terminatedQuery + if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { + lastTerminatedQuery = terminatedQuery + } awaitTerminationLock.notifyAll() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c842b6db8f8bc..bc7c520930f9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -128,7 +128,7 @@ class StreamExecution( case NonFatal(e) => streamDeathCause = new ContinuousQueryException( this, - s"Query terminated with exception", + s"Query $name terminated with exception: ${e.getMessage}", e, Some(streamProgress.toCompositeOffset(sources))) logError(s"Query $name terminated with error", e) @@ -286,16 +286,28 @@ class StreamExecution( } } - override def toString: String = + override def toString: String = { + s"Continuous Query - $name [state = $state]" + } + + def toDebugString: String = { + val deathCauseStr = if (streamDeathCause != null) { + "Error:\n" + stackTraceToString(streamDeathCause.cause) + } else "" s""" - |=== Streaming Query === + |=== Continuous Query === + |Name: $name |Current Offsets: $streamProgress + | |Current State: $state |Thread State: ${microBatchThread.getState} - |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""} | + |Logical Plan: |$logicalPlan + | + |$deathCauseStr """.stripMargin + } trait State case object INITIALIZED extends State diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala index c7a897c00df56..73c78d1b62bbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala @@ -53,7 +53,7 @@ abstract class ContinuousQueryListener { @Experimental object ContinuousQueryListener { - /** Base type of [[ContinuousQueryListener]] events */ + /** Base type of [[ContinuousQueryListener]] events */ trait Event /** Event representing the start of a query */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index fa4ccab39678a..a2cfe92448a6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -484,13 +484,14 @@ trait StreamTest extends QueryTest with Timeouts { } case e: ExpectException[_] => - val thrownException = withClue("Did not throw exception when expected.") { - intercept[ContinuousQueryException] { - failAfter(testTimeout) { - awaitTermFunc() + val thrownException = + withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") { + intercept[ContinuousQueryException] { + failAfter(testTimeout) { + awaitTermFunc() + } } } - } assert(thrownException.cause.getClass === e.t.runtimeClass, "exception of incorrect type was throw") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 8fd0a003aac0e..c25e2f38f5719 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} +import org.apache.spark.sql.{ContinuousQueryException, ContinuousQuery, Dataset, StreamTest} import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation} import org.apache.spark.sql.test.SharedSQLContext @@ -37,6 +37,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with import AwaitTerminationTester._ import testImplicits._ + override val streamingTimeout = 20.seconds + before { assert(sqlContext.streams.active.isEmpty) sqlContext.streams.resetTerminated() @@ -91,8 +93,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with } } - test("awaitAnyTermination without timeout and reset") { - val datasets = Seq.fill(3)(makeDataset._2) + test("awaitAnyTermination without timeout and resetTerminated") { + val datasets = Seq.fill(5)(makeDataset._2) withQueriesOn(datasets: _*) { queries => require(queries.size === datasets.size) assert(sqlContext.streams.active.toSet === queries.toSet) @@ -101,9 +103,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with testAwaitAnyTermination(ExpectBlocked) // Stop a query asynchronously and see if it is reported through awaitAnyTermination - val q1 = stopRandomQueryAsync(stopAfter = 500 milliseconds, withError = false) + val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false) testAwaitAnyTermination(ExpectNotBlocked) - require(!q1.isActive) // should be marked active by the time the prev awaitAnyTerm returned + require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned // All subsequent calls to awaitAnyTermination should be non-blocking testAwaitAnyTermination(ExpectNotBlocked) @@ -114,9 +116,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with // Terminate a query asynchronously with exception and see awaitAnyTermination throws // the exception - val q2 = stopRandomQueryAsync(500 milliseconds, withError = true) + val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) testAwaitAnyTermination(ExpectException[SparkException]) - require(!q2.isActive) // should be marked active by the time the prev awaitAnyTerm returned + require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned // All subsequent calls to awaitAnyTermination should throw the exception testAwaitAnyTermination(ExpectException[SparkException]) @@ -124,11 +126,21 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with // Resetting termination should make awaitAnyTermination() blocking again sqlContext.streams.resetTerminated() testAwaitAnyTermination(ExpectBlocked) + + // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws + // the exception + val q3 = stopRandomQueryAsync(10 milliseconds, withError = false) + testAwaitAnyTermination(ExpectNotBlocked) + require(!q3.isActive) + val q4 = stopRandomQueryAsync(10 milliseconds, withError = true) + eventually(Timeout(streamingTimeout)) { require(!q4.isActive) } + // After q4 terminates with exception, awaitAnyTerm should start throwing exception + testAwaitAnyTermination(ExpectException[SparkException]) } } - test("awaitTermination with timeout") { - val datasets = Seq.fill(5)(makeDataset._2) + test("awaitAnyTermination with timeout and resetTerminated") { + val datasets = Seq.fill(6)(makeDataset._2) withQueriesOn(datasets: _*) { queries => require(queries.size === datasets.size) assert(sqlContext.streams.active.toSet === queries.toSet) @@ -153,7 +165,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with awaitTimeout = 1 second, expectedReturnedValue = true, testBehaviorFor = 2 seconds) - require(!q1.isActive) // should be marked active by the time the prev awaitAnyTerm returned + require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned // All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high testAwaitAnyTermination( @@ -169,12 +181,12 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with // Terminate a query asynchronously with exception within timeout, awaitAnyTermination should // throws the exception - val q2 = stopRandomQueryAsync(500 milliseconds, withError = true) + val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) testAwaitAnyTermination( ExpectException[SparkException], awaitTimeout = 1 second, testBehaviorFor = 2 seconds) - require(!q2.isActive) // should be marked active by the time the prev awaitAnyTerm returned + require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned // All subsequent calls to awaitAnyTermination should throw the exception testAwaitAnyTermination( @@ -197,6 +209,20 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with ExpectException[SparkException], awaitTimeout = 100 milliseconds, testBehaviorFor = 2 seconds) + + + // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws + // the exception + sqlContext.streams.resetTerminated() + + val q4 = stopRandomQueryAsync(10 milliseconds, withError = false) + testAwaitAnyTermination( + ExpectNotBlocked, awaitTimeout = 1 second, expectedReturnedValue = true) + require(!q4.isActive) + val q5 = stopRandomQueryAsync(10 milliseconds, withError = true) + eventually(Timeout(streamingTimeout)) { require(!q5.isActive) } + // After q4 terminates with exception, awaitAnyTerm should start throwing exception + //testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 100 milliseconds) } } @@ -234,7 +260,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with expectedBehavior: ExpectedBehavior, expectedReturnedValue: Boolean = false, awaitTimeout: Span = null, - testBehaviorFor: Span = 1000 milliseconds + testBehaviorFor: Span = 2 seconds ): Unit = { def awaitTermFunc(): Unit = { @@ -268,7 +294,6 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with logDebug(s"Stopping query ${queryToStop.name}") queryToStop.stop() } - } queryToStop }