From fedf6961be4e99139eb7ab08d5e6e29187ea5ccf Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 21 Sep 2017 21:55:07 -0700 Subject: [PATCH] [SPARK-22094][SS] processAllAvailable should check the query state ## What changes were proposed in this pull request? `processAllAvailable` should also check the query state and if the query is stopped, it should return. ## How was this patch tested? The new unit test. Author: Shixiong Zhu Closes #19314 from zsxwing/SPARK-22094. --- .../sql/execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/streaming/StreamingQuerySuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b2d6c6082b025..406560c260f07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -840,7 +840,7 @@ class StreamExecution( if (streamDeathCause != null) { throw streamDeathCause } - if (noNewData) { + if (noNewData || !isActive) { return } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 3823e336d0b64..ab35079dca23f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -640,6 +640,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("processAllAvailable should not block forever when a query is stopped") { + val input = MemoryStream[Int] + input.addData(1) + val query = input.toDF().writeStream + .trigger(Trigger.Once()) + .format("console") + .start() + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } + /** Create a streaming DF that only execute one batch in which it returns the given static DF */ private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = { require(!triggerDF.isStreaming)