From d22768a6be42bdd8af147112a01ec0910d8d0931 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 23 Sep 2019 05:47:25 -0700 Subject: [PATCH] [SPARK-29036][SQL] SparkThriftServer cancel job after execute() thread interrupted ### What changes were proposed in this pull request? Discuss in https://github.com/apache/spark/pull/25611 If cancel() and close() is called very quickly after the query is started, then they may both call cleanup() before Spark Jobs are started. Then sqlContext.sparkContext.cancelJobGroup(statementId) does nothing. But then the execute thread can start the jobs, and only then get interrupted and exit through here. But then it will exit here, and no-one will cancel these jobs and they will keep running even though this execution has exited. So when execute() was interrupted by `cancel()`, when get into catch block, we should call canJobGroup again to make sure the job was canceled. ### Why are the changes needed? ### Does this PR introduce any user-facing change? NO ### How was this patch tested? MT Closes #25743 from AngersZhuuuu/SPARK-29036. Authored-by: angerszhu Signed-off-by: Yuming Wang --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f246f43435c75..ce3cbc3a2fc41 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -267,6 +267,13 @@ private[hive] class SparkExecuteStatementOperation( // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + // When cancel() or close() is called very quickly after the query is started, + // then they may both call cleanup() before Spark Jobs are started. But before background + // task interrupted, it may have start some spark job, so we need to cancel again to + // make sure job was cancelled when background thread was interrupted + if (statementId != null) { + sqlContext.sparkContext.cancelJobGroup(statementId) + } val currentState = getStatus().getState() if (currentState.isTerminal) { // This may happen if the execution was cancelled, and then closed from another thread.