Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Mar 8, 2019
1 parent 14b1312 commit c7f2536
Showing 1 changed file with 23 additions and 7 deletions.
Expand Up @@ -268,13 +268,29 @@ class ContinuousExecution(
logInfo(s"Query $id ignoring exception from reconfiguring: $t")
// interrupted by reconfiguration - swallow exception so we can restart the query
} finally {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
SparkEnv.get.rpcEnv.stop(epochEndpoint)

epochUpdateThread.interrupt()
epochUpdateThread.join()

sparkSession.sparkContext.cancelJobGroup(runId.toString)
// The above execution may finish before getting interrupted, for example, a Spark job having
// 0 partitions will complete immediately. Then the interrupted status will sneak here.
//
// To handle this case, we do the two things here:
//
// 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase
// the waiting time of `stop` but should be minor because the operations here are very fast
// (just sending an RPC message in the same process and stopping a very simple thread).
// 2. Clear the interrupted status at the end so that it won't impact the `runContinuous`
// call. We may clear the interrupted status set by `stop`, but it doesn't affect the query
// termination because `runActivatedStream` will check `state` and exit accordingly.
queryExecutionThread.runUninterruptibly {
try {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
} finally {
SparkEnv.get.rpcEnv.stop(epochEndpoint)
epochUpdateThread.interrupt()
epochUpdateThread.join()
// The following line must be the last line because it may fail if SparkContext is stopped
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}
Thread.interrupted()
}
}

Expand Down

0 comments on commit c7f2536

Please sign in to comment.