From cdc820bf2a93a3d187b5f5961719c7707c94ba52 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 23 Jan 2015 14:39:56 +0800 Subject: [PATCH 1/3] Add awaitTerminationOrTimeout --- .../apache/spark/streaming/StreamingContext.scala | 13 +++++++++++++ .../streaming/api/java/JavaStreamingContext.scala | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 8ef0787137845..54aa7346edcc8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -469,10 +469,23 @@ class StreamingContext private[streaming] ( * will be thrown in this thread. * @param timeout time to wait in milliseconds */ + @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0") def awaitTermination(timeout: Long) { waiter.waitForStopOrError(timeout) } + /** + * Wait for the execution to stop. Any exceptions that occurs during the execution + * will be thrown in this thread. + * + * @param timeout time to wait in milliseconds + * @return `true` if it's stopped; or throw the reported error during the execution; or `false` + * if the waiting time elapsed before returning from the method. + */ + def awaitTerminationOrTimeout(timeout: Long): Boolean = { + waiter.waitForStopOrError(timeout) + } + /** * Stop the execution of the streams immediately (does not wait for all received data * to be processed). diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 9a2254bcdc1f7..35d260d0cc0fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -547,10 +547,23 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * will be thrown in this thread. * @param timeout time to wait in milliseconds */ + @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0") def awaitTermination(timeout: Long): Unit = { ssc.awaitTermination(timeout) } + /** + * Wait for the execution to stop. Any exceptions that occurs during the execution + * will be thrown in this thread. + * + * @param timeout time to wait in milliseconds + * @return `true` if it's stopped; or throw the reported error during the execution; or `false` + * if the waiting time elapsed before returning from the method. + */ + def awaitTerminationOrTimeout(timeout: Long): Boolean = { + ssc.awaitTerminationOrTimeout(timeout) + } + /** * Stop the execution of the streams. Will stop the associated JavaSparkContext as well. */ From 8a89f920c2e427a4da6815b73f915375e3a1d546 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 4 Feb 2015 13:55:35 +0800 Subject: [PATCH 2/3] Add awaitTerminationOrTimeout to python --- python/pyspark/streaming/context.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index d48f3598e33b2..16953f05e5291 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -191,6 +191,15 @@ def awaitTermination(self, timeout=None): else: self._jssc.awaitTermination(int(timeout * 1000)) + def awaitTerminationOrTimeout(self, timeout): + """ + Wait for the execution to stop. Return `true` if it's stopped; or + throw the reported error during the execution; or `false` if the + waiting time elapsed before returning from the method. + @param timeout: time to wait in seconds + """ + self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) + def stop(self, stopSparkContext=True, stopGraceFully=False): """ Stop the execution of the streams, with option of ensuring all From c9e660b4c8e4547a16c00364fa7baa2a40536345 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 4 Feb 2015 14:11:57 +0800 Subject: [PATCH 3/3] Add a unit test for awaitTerminationOrTimeout --- .../streaming/StreamingContextSuite.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9f352bdcb0893..cebc22d0590b7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -278,6 +278,30 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(exception.getMessage.contains("transform"), "Expected exception not thrown") } + test("awaitTerminationOrTimeout") { + ssc = new StreamingContext(master, appName, batchDuration) + val inputStream = addInputStream(ssc) + inputStream.map(x => x).register() + + ssc.start() + + // test whether awaitTerminationOrTimeout() return false after give amount of time + failAfter(1000 millis) { + assert(ssc.awaitTerminationOrTimeout(500) === false) + } + + // test whether awaitTerminationOrTimeout() return true if context is stopped + failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown + new Thread() { + override def run() { + Thread.sleep(500) + ssc.stop() + } + }.start() + assert(ssc.awaitTerminationOrTimeout(10000) === true) + } + } + test("DStream and generated RDD creation sites") { testPackage.test() }