Skip to content
Permalink
Browse files

[SPARK-23416][SS] Add a specific stop method for ContinuousExecution.

## What changes were proposed in this pull request?

Add a specific stop method for ContinuousExecution. The previous StreamExecution.stop() method had a race condition as applied to continuous processing: if the cancellation was round-tripped to the driver too quickly, the generic SparkException it caused would be reported as the query death cause. We earlier decided that SparkException should not be added to the StreamExecution.isInterruptionException() whitelist, so we need to ensure this never happens instead.

## How was this patch tested?

Existing tests. I could consistently reproduce the previous flakiness by putting Thread.sleep(1000) between the first job cancellation and thread interruption in StreamExecution.stop().

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21384 from jose-torres/fixKafka.
  • Loading branch information...
jose-torres authored and maropu committed May 24, 2018
1 parent 7f13fd0 commit 55d5a19c8e01de945c4c9e42752ed132df4b9110
@@ -110,6 +110,24 @@ class MicroBatchExecution(
_logicalPlan
}

/**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
*/
override def stop(): Unit = {
// Set the state to TERMINATED so that the batching thread knows that it was interrupted
// intentionally
state.set(TERMINATED)
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
queryExecutionThread.join()
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
logInfo(s"Query $prettyIdString was stopped")
}

/**
* Repeatedly attempts to run batches as data arrives.
*/
@@ -378,24 +378,6 @@ abstract class StreamExecution(
}
}

/**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
*/
override def stop(): Unit = {
// Set the state to TERMINATED so that the batching thread knows that it was interrupted
// intentionally
state.set(TERMINATED)
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
queryExecutionThread.join()
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
logInfo(s"Query $prettyIdString was stopped")
}

/**
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is intended for use primarily when writing tests.
@@ -362,6 +362,22 @@ class ContinuousExecution(
}
}
}

/**
* Stops the query execution thread to terminate the query.
*/
override def stop(): Unit = {
// Set the state to TERMINATED so that the batching thread knows that it was interrupted
// intentionally
state.set(TERMINATED)
if (queryExecutionThread.isAlive) {
// The query execution thread will clean itself up in the finally clause of runContinuous.
// We just need to interrupt the long running job.
queryExecutionThread.interrupt()
queryExecutionThread.join()
}
logInfo(s"Query $prettyIdString was stopped")
}
}

object ContinuousExecution {

0 comments on commit 55d5a19

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.