Skip to content
Permalink
Browse files

[SPARK-23491][SS] Remove explicit job cancellation from ContinuousExe…

…cution reconfiguring

## What changes were proposed in this pull request?

Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without.

## How was this patch tested?

existing tests

Author: Jose Torres <jose@databricks.com>

Closes #20622 from jose-torres/SPARK-23441.
  • Loading branch information...
jose-torres authored and maropu committed Feb 26, 2018
1 parent abce846 commit 7f13fd0c5a79ab21c4ace2445127e6c69a7f745c
@@ -236,9 +236,7 @@ class ContinuousExecution(
startTrigger()

if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
stopSources()
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
}
false
@@ -266,12 +264,20 @@ class ContinuousExecution(
SQLExecution.withNewExecutionId(
sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
}
} catch {
case t: Throwable
if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING =>
logInfo(s"Query $id ignoring exception from reconfiguring: $t")
// interrupted by reconfiguration - swallow exception so we can restart the query
} finally {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
SparkEnv.get.rpcEnv.stop(epochEndpoint)

epochUpdateThread.interrupt()
epochUpdateThread.join()

stopSources()
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}

0 comments on commit 7f13fd0

Please sign in to comment.
You can’t perform that action at this time.