Skip to content

Commit

Permalink
[SPARK-27419][CORE] Avoid casting heartbeat interval to seconds (2.4)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Right now as we cast the heartbeat interval to seconds, any value less than 1 second will be casted to 0. This PR just backports the changes of the heartbeat interval in #22473 from master.

## How was this patch tested?

Jenkins

Closes #24329 from zsxwing/SPARK-27419.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
zsxwing authored and cloud-fan committed Apr 10, 2019
1 parent baadfc8 commit 53658ab
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Expand Up @@ -610,14 +610,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")

val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
val executorHeartbeatInterval =
getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
val executorTimeoutThresholdMs =
getTimeAsSeconds("spark.network.timeout", "120s") * 1000
val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +
s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " +
s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " +
s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
}

/**
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Expand Up @@ -171,6 +171,11 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

/**
* Interval to send heartbeats, in milliseconds
*/
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)

// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

Expand Down Expand Up @@ -832,11 +837,9 @@ private[spark] class Executor(
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val heartbeatIntervalInSec =
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval"))
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
Expand All @@ -858,7 +861,7 @@ private[spark] class Executor(
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val intervalMs = HEARTBEAT_INTERVAL_MS

// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
Expand Down
Expand Up @@ -79,6 +79,11 @@ package object config {
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional

private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
ConfigBuilder("spark.executor.heartbeatInterval")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")

private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional

Expand Down
Expand Up @@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
Expand Down Expand Up @@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}

Expand Down

0 comments on commit 53658ab

Please sign in to comment.