Skip to content

Commit

Permalink
[SPARK-22094][SS] processAllAvailable should check the query state
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

`processAllAvailable` should also check the query state and if the query is stopped, it should return.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19314 from zsxwing/SPARK-22094.
  • Loading branch information
zsxwing committed Sep 22, 2017
1 parent f32a842 commit fedf696
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ class StreamExecution(
if (streamDeathCause != null) {
throw streamDeathCause
}
if (noNewData) {
if (noNewData || !isActive) {
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

test("processAllAvailable should not block forever when a query is stopped") {
val input = MemoryStream[Int]
input.addData(1)
val query = input.toDF().writeStream
.trigger(Trigger.Once())
.format("console")
.start()
failAfter(streamingTimeout) {
query.processAllAvailable()
}
}

/** Create a streaming DF that only execute one batch in which it returns the given static DF */
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
require(!triggerDF.isStreaming)
Expand Down

0 comments on commit fedf696

Please sign in to comment.