From 76f2718199f64aad5101965790931d3ee2967a7d Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 13 Nov 2016 05:16:10 +0900 Subject: [PATCH 1/3] Fix ReceiverTracker --- .../spark/streaming/scheduler/ReceiverTracker.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index b9d898a72362e..d48b8ee55aae0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -165,6 +165,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Stop the receiver execution thread. */ def stop(graceful: Boolean): Unit = synchronized { + if (isTrackerInitialized) { + trackerState = Stopping + // `ReceivedBlockTracker` is open when this instance is created. We should + // close this even if this `ReceiverTracker` is not started. + receivedBlockTracker.stop() + logInfo("ReceiverTracker stopped") + trackerState = Stopped + } + if (isTrackerStarted) { // First, stop the receivers trackerState = Stopping @@ -446,6 +455,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false endpoint.send(StartAllReceivers(receivers)) } + /** Check if tracker has been marked for starting */ + private def isTrackerInitialized: Boolean = trackerState == Initialized + /** Check if tracker has been marked for starting */ private def isTrackerStarted: Boolean = trackerState == Started From 8b05ba7cc88d7d5115b97ebb14a0786099c5fe9f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 13 Nov 2016 05:17:01 +0900 Subject: [PATCH 2/3] Fix comment better --- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index d48b8ee55aae0..886ee8818f00a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -455,7 +455,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false endpoint.send(StartAllReceivers(receivers)) } - /** Check if tracker has been marked for starting */ + /** Check if tracker has been marked for initiated */ private def isTrackerInitialized: Boolean = trackerState == Initialized /** Check if tracker has been marked for starting */ From 4fe4c1bbfea9b87e8f6e86a7ee5505aa1471ed15 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 14 Nov 2016 21:23:00 +0900 Subject: [PATCH 3/3] clean up --- .../streaming/scheduler/ReceiverTracker.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 886ee8818f00a..8f55d982a904c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -165,15 +165,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Stop the receiver execution thread. */ def stop(graceful: Boolean): Unit = synchronized { - if (isTrackerInitialized) { - trackerState = Stopping - // `ReceivedBlockTracker` is open when this instance is created. We should - // close this even if this `ReceiverTracker` is not started. - receivedBlockTracker.stop() - logInfo("ReceiverTracker stopped") - trackerState = Stopped - } - if (isTrackerStarted) { // First, stop the receivers trackerState = Stopping @@ -206,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") trackerState = Stopped + } else if (isTrackerInitialized) { + trackerState = Stopping + // `ReceivedBlockTracker` is open when this instance is created. We should + // close this even if this `ReceiverTracker` is not started. + receivedBlockTracker.stop() + logInfo("ReceiverTracker stopped") + trackerState = Stopped } }