From d179372ae1d93adf67f8cd7ea818c443c584a8a0 Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Sun, 25 Jan 2015 22:31:19 +0800 Subject: [PATCH 1/5] Add graceful shutdown unit test covering slow receiver onStop Signed-off-by: Jesper Lundgren --- .../streaming/StreamingContextSuite.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9f352bdcb0893..cfb04840c1c39 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -205,6 +205,33 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("stop slow receiver gracefully") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + conf.set("spark.cleaner.ttl", "3600") + conf.set("spark.streaming.gracefulStopTimeout", "20000") + sc = new SparkContext(conf) + logInfo("==================================\n\n\n") + ssc = new StreamingContext(sc, Milliseconds(100)) + var runningCount = 0 + SlowTestReceiver.receivedAllRecords = false + //Create test receiver that sleeps in onStop() + val totalNumRecords = 15 + val recordsPerSecond = 1 + val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond)) + input.count().foreachRDD { rdd => + val count = rdd.first() + runningCount += count.toInt + logInfo("Count = " + count + ", Running count = " + runningCount) + } + ssc.start() + ssc.awaitTermination(500) + ssc.stop(stopSparkContext = false, stopGracefully = true) + logInfo("Running count = " + runningCount) + assert(runningCount > 0) + assert(runningCount == totalNumRecords) + Thread.sleep(100) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) @@ -319,6 +346,38 @@ object TestReceiver { val counter = new AtomicInteger(1) } +/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ +class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { + val thread = new Thread() { + override def run() { + logInfo("Receiving started") + for(i <- 1 to totalRecords) { + Thread.sleep(recordsPerSecond * 1000) + store(i) + } + SlowTestReceiver.receivedAllRecords = true + logInfo(s"Received all $totalRecords records") + } + } + receivingThreadOption = Some(thread) + thread.start() + } + + def onStop() { + // Simulate slow receiver by waiting for all records to be produced + while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100) + // no cleanup to be done, the receiving thread should stop on it own + } +} + +object SlowTestReceiver { + var receivedAllRecords = false +} + /** Streaming application for testing DStream and RDD creation sites */ package object testPackage extends Assertions { def test() { From 9a9ff884d878824548f9a08d724f60b0c14f8310 Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Fri, 30 Jan 2015 08:34:03 +0100 Subject: [PATCH 2/5] wait for receivers to shutdown and receiver job to terminate Signed-off-by: Jesper Lundgren --- .../streaming/scheduler/JobScheduler.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 0e0f5bd3b9db4..b3ffc71904c76 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -73,7 +73,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { logDebug("Stopping JobScheduler") // First, stop receiving - receiverTracker.stop() + receiverTracker.stop(processAllReceivedData) // Second, stop generating jobs. If it has to process all received data, // then this will wait for all the processing through JobScheduler to be over. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 4f998869731ed..3703f82d5f19e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -86,10 +86,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } /** Stop the receiver execution thread. */ - def stop() = synchronized { + def stop(graceful: Boolean) = synchronized { if (!receiverInputStreams.isEmpty && actor != null) { // First, stop the receivers - if (!skipReceiverLaunch) receiverExecutor.stop() + if (!skipReceiverLaunch) receiverExecutor.stop(graceful) // Finally, stop the actor ssc.env.actorSystem.stop(actor) @@ -218,6 +218,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** This thread class runs all the receivers on the cluster. */ class ReceiverLauncher { @transient val env = ssc.env + @volatile @transient private var running = true @transient val thread = new Thread() { override def run() { try { @@ -233,7 +234,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false thread.start() } - def stop() { + def stop(graceful: Boolean) { // Send the stop signal to all the receivers stopReceivers() @@ -241,6 +242,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // That is, for the receivers to quit gracefully. thread.join(10000) + if (graceful) { + val pollTime = 100 + def done = { receiverInfo.isEmpty && !running } + logInfo("Waiting for receiver job to terminate gracefully") + while(!done) { + Thread.sleep(pollTime) + } + logInfo("Waited for receiver job to terminate gracefully") + } + // Check if all the receivers have been deregistered or not if (!receiverInfo.isEmpty) { logWarning("All of the receivers have not deregistered, " + receiverInfo) @@ -295,7 +306,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") + running = false ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) + running = true logInfo("All of the receivers have been terminated") } From 3d0bd3501d24a8ab5e10e1867b464d471b3bbb67 Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Fri, 30 Jan 2015 08:41:00 +0100 Subject: [PATCH 3/5] switch boleans to match running status instead of terminated Signed-off-by: Jesper Lundgren --- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3703f82d5f19e..00456ab2a0c92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -218,7 +218,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** This thread class runs all the receivers on the cluster. */ class ReceiverLauncher { @transient val env = ssc.env - @volatile @transient private var running = true + @volatile @transient private var running = false @transient val thread = new Thread() { override def run() { try { @@ -306,9 +306,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") - running = false - ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) running = true + ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)) + running = false logInfo("All of the receivers have been terminated") } From f969b6ea410875d61c2f6269ecb0474a64f17af2 Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Tue, 3 Feb 2015 21:42:01 +0100 Subject: [PATCH 4/5] fix inversed logic in unit test --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index cfb04840c1c39..36fe8ea05324a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -356,7 +356,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receive override def run() { logInfo("Receiving started") for(i <- 1 to totalRecords) { - Thread.sleep(recordsPerSecond * 1000) + Thread.sleep(1000 / recordsPerSecond) store(i) } SlowTestReceiver.receivedAllRecords = true From a9cf223c8a6a7c195defbc4e9c240fe0c3c763f3 Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Tue, 3 Feb 2015 21:45:51 +0100 Subject: [PATCH 5/5] remove cleaner.ttl config --- .../scala/org/apache/spark/streaming/StreamingContextSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 36fe8ea05324a..0b5af25e0f7cc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -207,7 +207,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop slow receiver gracefully") { val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("spark.cleaner.ttl", "3600") conf.set("spark.streaming.gracefulStopTimeout", "20000") sc = new SparkContext(conf) logInfo("==================================\n\n\n")