From 84d533f6607a29120cd4cb66b081628f4383139f Mon Sep 17 00:00:00 2001 From: astralidea Date: Sat, 22 Oct 2016 00:23:01 +0800 Subject: [PATCH] add maxRegisteredWaitingTime conf for receiver --- .../streaming/scheduler/ReceiverTracker.scala | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index b9d898a72362e..cf3ab68b15d42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -148,6 +148,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false */ private val receiverPreferredLocations = new HashMap[Int, Option[String]] + // start receiver after maxRegisteredWaitingTime milliseconds + private val maxRegisteredWaitingTimeMs = + ssc.conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") + private val createTime = System.currentTimeMillis() + /** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { @@ -414,21 +419,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } } - /** - * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the - * receivers to be scheduled on the same node. - * - * TODO Should poll the executor number and wait for executors according to - * "spark.scheduler.minRegisteredResourcesRatio" and - * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job. - */ - private def runDummySparkJob(): Unit = { - if (!ssc.sparkContext.isLocal) { - ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() - } - assert(getExecutors.nonEmpty) - } - /** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. @@ -440,7 +430,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false rcvr } - runDummySparkJob() + while ((System.currentTimeMillis() - createTime) < maxRegisteredWaitingTimeMs) {} + + logInfo("Receiver is ready for scheduling beginning after waiting " + + s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)") logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers))