diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0b24fe2894f9e..e3aba7eb33e93 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -610,14 +610,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") - val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = - getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds + val executorTimeoutThresholdMs = + getTimeAsSeconds("spark.network.timeout", "120s") * 1000 + val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL) // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. - require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + - s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.") + require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + + s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " + + s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.") } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9ac8063739984..334806753085f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -171,6 +171,11 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] + /** + * Interval to send heartbeats, in milliseconds + */ + private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) + // Executor for the heartbeat task. private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") @@ -832,11 +837,9 @@ private[spark] class Executor( } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) - val heartbeatIntervalInSec = - conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval")) + message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key)) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() @@ -858,7 +861,7 @@ private[spark] class Executor( * Schedules a task to report heartbeat and partial metrics for active tasks to driver. */ private def startDriverHeartbeater(): Unit = { - val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + val intervalMs = HEARTBEAT_INTERVAL_MS // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5c17b9b3a3207..559b0e10cbf64 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -79,6 +79,11 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = + ConfigBuilder("spark.executor.heartbeatInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 178de30f0f381..bac0246b7ddc5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config +import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) slave.shuffleRegistered = true }