From e06bd4fdc7d052ef55e2d98e68441586fe9d2026 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 10 Dec 2014 16:25:39 +0800 Subject: [PATCH 1/3] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' --- .../spark/streaming/ContextWaiter.scala | 74 +++++++++++++++---- 1 file changed, 59 insertions(+), 15 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index a0aeacbc733bd..c7caa973b8e54 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -17,30 +17,74 @@ package org.apache.spark.streaming +import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.locks.ReentrantLock +import javax.annotation.concurrent.GuardedBy + private[streaming] class ContextWaiter { + + private val lock = new ReentrantLock() + private val condition = lock.newCondition() + + @GuardedBy("lock") private var error: Throwable = null + + @GuardedBy("lock") private var stopped: Boolean = false - def notifyError(e: Throwable) = synchronized { - error = e - notifyAll() + def notifyError(e: Throwable) = { + lock.lock() + try { + error = e + condition.signalAll() + } finally { + lock.unlock() + } } - def notifyStop() = synchronized { - stopped = true - notifyAll() + def notifyStop() = { + lock.lock() + try { + stopped = true + condition.signalAll() + } finally { + lock.unlock() + } } - def waitForStopOrError(timeout: Long = -1) = synchronized { - // If already had error, then throw it - if (error != null) { - throw error - } + /** + * Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or + * `false` if the waiting time detectably elapsed before return from the method. + */ + def waitForStopOrError(timeout: Long = -1): Boolean = { + lock.lock() + try { + if (timeout < 0) { + while (true) { + // If already stopped, then exit + if (stopped) return true + // If already had error, then throw it + if (error != null) throw error + + condition.await() + } + } else { + var nanos = TimeUnit.MILLISECONDS.toNanos(timeout) + while (true) { + // If already stopped, then exit + if (stopped) return true + // If already had error, then throw it + if (error != null) throw error + // If no time remains, then exit + if (nanos <= 0) return false - // If not already stopped, then wait - if (!stopped) { - if (timeout < 0) wait() else wait(timeout) - if (error != null) throw error + nanos = condition.awaitNanos(nanos) + } + } + // Never reached. Make the compiler happy. + true + } finally { + lock.unlock() } } } From be42bcfaa38a3f3fbe4fc759656a61c72f0fb556 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 10 Dec 2014 17:57:48 +0800 Subject: [PATCH 2/3] Update as per review suggestion --- .../spark/streaming/ContextWaiter.scala | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index c7caa973b8e54..76c71fac115b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -17,19 +17,18 @@ package org.apache.spark.streaming -import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock -import javax.annotation.concurrent.GuardedBy private[streaming] class ContextWaiter { private val lock = new ReentrantLock() private val condition = lock.newCondition() - @GuardedBy("lock") + // Guarded by "lock" private var error: Throwable = null - @GuardedBy("lock") + // Guarded by "lock" private var stopped: Boolean = false def notifyError(e: Throwable) = { @@ -60,29 +59,19 @@ private[streaming] class ContextWaiter { lock.lock() try { if (timeout < 0) { - while (true) { - // If already stopped, then exit - if (stopped) return true - // If already had error, then throw it - if (error != null) throw error - + while (!stopped && error == null) { condition.await() } } else { var nanos = TimeUnit.MILLISECONDS.toNanos(timeout) - while (true) { - // If already stopped, then exit - if (stopped) return true - // If already had error, then throw it - if (error != null) throw error - // If no time remains, then exit - if (nanos <= 0) return false - + while (!stopped && error == null && nanos > 0) { nanos = condition.awaitNanos(nanos) } } - // Never reached. Make the compiler happy. - true + // If already had error, then throw it + if (error != null) throw error + // already stopped or timeout + stopped } finally { lock.unlock() } From 52247f5ff48f1fdf285daac20846c7587a30f340 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 17 Dec 2014 09:32:56 +0800 Subject: [PATCH 3/3] Add explicit unit type --- .../main/scala/org/apache/spark/streaming/ContextWaiter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index 76c71fac115b4..fdbbe2aa6ef08 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -31,7 +31,7 @@ private[streaming] class ContextWaiter { // Guarded by "lock" private var stopped: Boolean = false - def notifyError(e: Throwable) = { + def notifyError(e: Throwable): Unit = { lock.lock() try { error = e @@ -41,7 +41,7 @@ private[streaming] class ContextWaiter { } } - def notifyStop() = { + def notifyStop(): Unit = { lock.lock() try { stopped = true