From be56b70c0ed9d665d075757152b5a6e118f2bcf8 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 7 Apr 2016 16:30:00 +0800 Subject: [PATCH 1/3] Fix NPE in ReceiverTracker#allocatedExecutors when calling in receiver-less scenario --- .../spark/streaming/scheduler/ReceiverTracker.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index d67f70732d0e1..b35a79f6dd0f2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -241,8 +241,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * @return a map containing receiver ids to optional executor ids. */ def allocatedExecutors(): Map[Int, Option[String]] = { - endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { - _.runningExecutor.map { _.executorId } + if (isTrackerStarted) { + endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { + _.runningExecutor.map { + _.executorId + } + } + } else { + Map.empty } } From 499128cecda363e075c230866eb8da8c4adb367c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 8 Apr 2016 09:06:35 +0800 Subject: [PATCH 2/3] Add the unit test --- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../scheduler/ReceiverTrackerSuite.scala | 27 ++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index b35a79f6dd0f2..3b33a979df882 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -240,7 +240,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * Get the executors allocated to each receiver. * @return a map containing receiver ids to optional executor ids. */ - def allocatedExecutors(): Map[Int, Option[String]] = { + def allocatedExecutors(): Map[Int, Option[String]] = synchronized { if (isTrackerStarted) { endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { _.runningExecutor.map { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 7654bb2d03b43..1c26d7e98a4bc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver._ /** Testsuite for receiver scheduling */ @@ -102,6 +102,31 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("get allocated executors") { + // Test get allocated executors when 1 receiver is registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val input = ssc.receiverStream(new TestReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + eventually(timeout(10 seconds), interval(10 millis)) { + assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1) + } + } + + // Test get allocated executors when there's no receiver registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val rdd = ssc.sc.parallelize(1 to 10) + val input = new ConstantInputDStream(ssc, rdd) + val output = new TestOutputStream(input) + output.register() + ssc.start() + eventually(timeout(10 seconds), interval(10 millis)) { + assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty) + } + } + } } /** An input DStream with for testing rate controlling */ From 4440605c9e0b73968f29e305fb95e00bb54151ee Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sat, 9 Apr 2016 10:21:43 +0800 Subject: [PATCH 3/3] Address the comments --- .../spark/streaming/scheduler/ReceiverTrackerSuite.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 1c26d7e98a4bc..df122ac090c3e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -110,9 +110,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { val output = new TestOutputStream(input) output.register() ssc.start() - eventually(timeout(10 seconds), interval(10 millis)) { - assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1) - } + assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1) } // Test get allocated executors when there's no receiver registered @@ -122,9 +120,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { val output = new TestOutputStream(input) output.register() ssc.start() - eventually(timeout(10 seconds), interval(10 millis)) { - assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty) - } + assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty) } } }