From 7075b0888e8e46fbf7dcb94446e88678bc53bd95 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 28 Jun 2017 16:02:41 -0700 Subject: [PATCH 1/3] The clean up codes in StreamExecution should not be interrupted --- .../spark/util/UninterruptibleThread.scala | 10 +------- .../util/UninterruptibleThreadSuite.scala | 25 ------------------- .../execution/streaming/StreamExecution.scala | 2 +- 3 files changed, 2 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala index 27922b31949b6..6a58ec142dd7f 100644 --- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala +++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala @@ -55,9 +55,6 @@ private[spark] class UninterruptibleThread( * Run `f` uninterruptibly in `this` thread. The thread won't be interrupted before returning * from `f`. * - * If this method finds that `interrupt` is called before calling `f` and it's not inside another - * `runUninterruptibly`, it will throw `InterruptedException`. - * * Note: this method should be called only in `this` thread. */ def runUninterruptibly[T](f: => T): T = { @@ -73,12 +70,7 @@ private[spark] class UninterruptibleThread( uninterruptibleLock.synchronized { // Clear the interrupted status if it's set. - if (Thread.interrupted() || shouldInterruptThread) { - shouldInterruptThread = false - // Since it's interrupted, we don't need to run `f` which may be a long computation. - // Throw InterruptedException as we don't have a T to return. - throw new InterruptedException() - } + shouldInterruptThread = Thread.interrupted() || shouldInterruptThread uninterruptible = true } try { diff --git a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala index 39b31f8ddeaba..4d51519a28888 100644 --- a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala @@ -59,31 +59,6 @@ class UninterruptibleThreadSuite extends SparkFunSuite { assert(interruptStatusBeforeExit === true) } - test("interrupt before runUninterruptibly runs") { - val interruptLatch = new CountDownLatch(1) - @volatile var hasInterruptedException = false - @volatile var interruptStatusBeforeExit = false - val t = new UninterruptibleThread("test") { - override def run(): Unit = { - Uninterruptibles.awaitUninterruptibly(interruptLatch, 10, TimeUnit.SECONDS) - try { - runUninterruptibly { - assert(false, "Should not reach here") - } - } catch { - case _: InterruptedException => hasInterruptedException = true - } - interruptStatusBeforeExit = Thread.interrupted() - } - } - t.start() - t.interrupt() - interruptLatch.countDown() - t.join() - assert(hasInterruptedException === true) - assert(interruptStatusBeforeExit === false) - } - test("nested runUninterruptibly") { val enterRunUninterruptibly = new CountDownLatch(1) val interruptLatch = new CountDownLatch(1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d5f8d2acba92b..0f6c928ae3787 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -357,7 +357,7 @@ class StreamExecution( if (!NonFatal(e)) { throw e } - } finally { + } finally microBatchThread.runUninterruptibly { // Release latches to unblock the user codes since exception can happen in any place and we // may not get a chance to release them startLatch.countDown() From 0079fca3a949a127700cf638a532d742684808f2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 28 Jun 2017 16:10:59 -0700 Subject: [PATCH 2/3] Update test --- .../util/UninterruptibleThreadSuite.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala index 4d51519a28888..6a190f63ac9d0 100644 --- a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala @@ -59,6 +59,30 @@ class UninterruptibleThreadSuite extends SparkFunSuite { assert(interruptStatusBeforeExit === true) } + test("interrupt before runUninterruptibly runs") { + val interruptLatch = new CountDownLatch(1) + @volatile var hasInterruptedException = false + @volatile var interruptStatusBeforeExit = false + val t = new UninterruptibleThread("test") { + override def run(): Unit = { + Uninterruptibles.awaitUninterruptibly(interruptLatch, 10, TimeUnit.SECONDS) + try { + runUninterruptibly { + } + } catch { + case _: InterruptedException => hasInterruptedException = true + } + interruptStatusBeforeExit = Thread.interrupted() + } + } + t.start() + t.interrupt() + interruptLatch.countDown() + t.join() + assert(hasInterruptedException === false) + assert(interruptStatusBeforeExit === true) + } + test("nested runUninterruptibly") { val enterRunUninterruptibly = new CountDownLatch(1) val interruptLatch = new CountDownLatch(1) From b7f45d4bad20892c5c88e4902fbaacee7e5ca44c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 29 Jun 2017 11:21:13 -0700 Subject: [PATCH 3/3] Add comments --- .../spark/sql/execution/streaming/StreamExecution.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 0f6c928ae3787..10c42a7338e85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -358,6 +358,10 @@ class StreamExecution( throw e } } finally microBatchThread.runUninterruptibly { + // The whole `finally` block must run inside `runUninterruptibly` to avoid being interrupted + // when a query is stopped by the user. We need to make sure the following codes finish + // otherwise it may throw `InterruptedException` to `UncaughtExceptionHandler` (SPARK-21248). + // Release latches to unblock the user codes since exception can happen in any place and we // may not get a chance to release them startLatch.countDown()