From 77df0ce8885c84e5e6e418774d0bcd8897e788cd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 25 Nov 2015 16:15:21 -0800 Subject: [PATCH 1/2] Allow recovery from partial stop of StreamingContext. --- .../streaming/StreamingContextState.java | 6 +++ .../spark/streaming/StreamingContext.scala | 52 +++++++++++-------- .../spark/streaming/dstream/DStream.scala | 2 +- 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java b/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java index d7b639383ee34..752d7e97b7373 100644 --- a/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java +++ b/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java @@ -38,6 +38,12 @@ public enum StreamingContextState { */ ACTIVE, + /** + * The context was in the process of stopping but was interrupted. + * It is no longer usable and needs to be completely stopped by calling `stop()` again. + */ + PARTIALLY_STOPPED, + /** * The context has been stopped and cannot be used any more. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index aee172a4f549a..bc0249b8ed16c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -627,6 +627,8 @@ class StreamingContext private[streaming] ( logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") + case PARTIALLY_STOPPED => + throw new IllegalStateException("StreamingContext is partially stopped") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } @@ -698,28 +700,34 @@ class StreamingContext private[streaming] ( " AsynchronousListenerBus") } synchronized { - try { - state match { - case INITIALIZED => - logWarning("StreamingContext has not been started yet") - case STOPPED => - logWarning("StreamingContext has already been stopped") - case ACTIVE => - scheduler.stop(stopGracefully) - // Removing the streamingSource to de-register the metrics on stop() - env.metricsSystem.removeSource(streamingSource) - uiTab.foreach(_.detach()) - StreamingContext.setActiveContext(null) - waiter.notifyStop() - if (shutdownHookRef != null) { - shutdownHookRefToRemove = shutdownHookRef - shutdownHookRef = null - } - logInfo("StreamingContext stopped successfully") - } - } finally { - // The state should always be Stopped after calling `stop()`, even if we haven't started yet - state = STOPPED + // The state should always be Stopped after calling `stop()`, even if we haven't started yet + state match { + case INITIALIZED => + logWarning("StreamingContext has not been started yet") + state = STOPPED + case STOPPED => + logWarning("StreamingContext has already been stopped") + state = STOPPED + case ACTIVE | PARTIALLY_STOPPED => + // It's important that we don't set state = STOPPED until the very end of this case, + // since we need to ensure that we're still able to call `stop()` to recover from + // a partially-stopped StreamingContext which resulted from this `stop()` call being + // interrupted. See SPARK-12001 for more details. Instead, we record that we're in the + // process of stopping. Because the body of this case can be executed twice in the case + // of a partial stop, all methods called here need to be idempotent. + state = PARTIALLY_STOPPED + scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) + uiTab.foreach(_.detach()) + StreamingContext.setActiveContext(null) + waiter.notifyStop() + if (shutdownHookRef != null) { + shutdownHookRefToRemove = shutdownHookRef + shutdownHookRef = null + } + logInfo("StreamingContext stopped successfully") + state = STOPPED } } if (shutdownHookRefToRemove != null) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 1a6edf9473d84..ca8b0bf878ad5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -220,7 +220,7 @@ abstract class DStream[T: ClassTag] ( throw new IllegalStateException( "Adding new inputs, transformations, and output operations after " + "starting a context is not supported") - case StreamingContextState.STOPPED => + case StreamingContextState.PARTIALLY_STOPPED | StreamingContextState.STOPPED => throw new IllegalStateException( "Adding new inputs, transformations, and output operations after " + "stopping a context is not supported") From 787acc424b88da80f80c7ef158f90779f59b0efc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 1 Dec 2015 15:46:05 -0800 Subject: [PATCH 2/2] Back out of PARTIALLY_STOPPED changes. --- .../apache/spark/streaming/StreamingContextState.java | 6 ------ .../org/apache/spark/streaming/StreamingContext.scala | 11 ++++------- .../org/apache/spark/streaming/dstream/DStream.scala | 2 +- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java b/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java index 752d7e97b7373..d7b639383ee34 100644 --- a/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java +++ b/streaming/src/main/java/org/apache/spark/streaming/StreamingContextState.java @@ -38,12 +38,6 @@ public enum StreamingContextState { */ ACTIVE, - /** - * The context was in the process of stopping but was interrupted. - * It is no longer usable and needs to be completely stopped by calling `stop()` again. - */ - PARTIALLY_STOPPED, - /** * The context has been stopped and cannot be used any more. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index bc0249b8ed16c..8871afe8bf031 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -627,8 +627,6 @@ class StreamingContext private[streaming] ( logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") - case PARTIALLY_STOPPED => - throw new IllegalStateException("StreamingContext is partially stopped") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } @@ -708,14 +706,13 @@ class StreamingContext private[streaming] ( case STOPPED => logWarning("StreamingContext has already been stopped") state = STOPPED - case ACTIVE | PARTIALLY_STOPPED => + case ACTIVE => // It's important that we don't set state = STOPPED until the very end of this case, // since we need to ensure that we're still able to call `stop()` to recover from // a partially-stopped StreamingContext which resulted from this `stop()` call being - // interrupted. See SPARK-12001 for more details. Instead, we record that we're in the - // process of stopping. Because the body of this case can be executed twice in the case - // of a partial stop, all methods called here need to be idempotent. - state = PARTIALLY_STOPPED + // interrupted. See SPARK-12001 for more details. Because the body of this case can be + // executed twice in the case of a partial stop, all methods called here need to be + // idempotent. scheduler.stop(stopGracefully) // Removing the streamingSource to de-register the metrics on stop() env.metricsSystem.removeSource(streamingSource) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index ca8b0bf878ad5..1a6edf9473d84 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -220,7 +220,7 @@ abstract class DStream[T: ClassTag] ( throw new IllegalStateException( "Adding new inputs, transformations, and output operations after " + "starting a context is not supported") - case StreamingContextState.PARTIALLY_STOPPED | StreamingContextState.STOPPED => + case StreamingContextState.STOPPED => throw new IllegalStateException( "Adding new inputs, transformations, and output operations after " + "stopping a context is not supported")