From 4c9769e232a5d028f48235260bde682a1d3b059a Mon Sep 17 00:00:00 2001 From: Karthik Palaniappan Date: Fri, 8 Sep 2017 15:10:13 -0700 Subject: [PATCH] [SPARK-21960][Streaming] Spark Streaming Dynamic Allocation should respect spark.executor.instances --- .../scheduler/ExecutorAllocationManager.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 7b29b40668def..8717555dea491 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} /** - * Class that manages executor allocated to a StreamingContext, and dynamically request or kill + * Class that manages executors allocated to a StreamingContext, and dynamically requests or kills * executors based on the statistics of the streaming computation. This is different from the core * dynamic allocation policy; the core policy relies on executors being idle for a while, but the * micro-batch model of streaming prevents any particular executors from being idle for a long @@ -43,6 +43,10 @@ import org.apache.spark.util.{Clock, Utils} * * This features should ideally be used in conjunction with backpressure, as backpressure ensures * system stability, while executors are being readjusted. + * + * Note that an initial set of executors (spark.executor.instances) was allocated when the + * SparkContext was created. This class scales executors up/down after the StreamingContext + * has started. */ private[streaming] class ExecutorAllocationManager( client: ExecutorAllocationClient, @@ -202,12 +206,7 @@ private[streaming] object ExecutorAllocationManager extends Logging { val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors" def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { - val numExecutor = conf.getInt("spark.executor.instances", 0) val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false) - if (numExecutor != 0 && streamingDynamicAllocationEnabled) { - throw new IllegalArgumentException( - "Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.") - } if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) { throw new IllegalArgumentException( """ @@ -217,7 +216,7 @@ private[streaming] object ExecutorAllocationManager extends Logging { """.stripMargin) } val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false) - numExecutor == 0 && streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing) + streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing) } def createIfEnabled(