From f5943410efd2f8f0cc82493eee5c5a4c30f7ebe3 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Fri, 15 Jun 2018 13:32:33 +0800 Subject: [PATCH 1/5] blockManagerSlaveTimeoutMs default config --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index ff960b396dbf1..71f739719eb24 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -75,7 +75,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses // "milliseconds" private val slaveTimeoutMs = - sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s") + sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms") private val executorTimeoutMs = sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000 From 10c221efc67fe6cdadf4f867ee150f69004484db Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Fri, 15 Jun 2018 19:26:49 +0800 Subject: [PATCH 2/5] use DurationConvertions --- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 71f739719eb24..24a534c17fb56 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable +import scala.concurrent.duration._ import scala.concurrent.Future import org.apache.spark.internal.Logging @@ -76,9 +77,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // "milliseconds" private val slaveTimeoutMs = sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms") + s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms") private val executorTimeoutMs = - sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000 + sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms").seconds.toMillis // "spark.network.timeoutInterval" uses "seconds", while // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" From 967361302682ad86437261e704c1b14b933d3a86 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Sat, 16 Jun 2018 12:47:30 +0800 Subject: [PATCH 3/5] add tests --- .../org/apache/spark/HeartbeatReceiver.scala | 5 ++-- .../org/apache/spark/SparkConfSuite.scala | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 24a534c17fb56..27b32e2b82843 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -20,8 +20,8 @@ package org.apache.spark import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable -import scala.concurrent.duration._ import scala.concurrent.Future +import scala.concurrent.duration._ import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} @@ -86,7 +86,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) private val timeoutIntervalMs = sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") private val checkTimeoutIntervalMs = - sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 + sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", + s"${timeoutIntervalMs}ms").seconds.toMillis private var timeoutCheckingTask: ScheduledFuture[_] = null diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 0d06b02e74e34..7e134db4ec008 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -371,6 +371,29 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(thrown.getMessage.contains(key)) } } + + test("SPARK-24566") { + val conf = new SparkConf() + conf.set("spark.network.timeout", "110") + val defaultSlaveTimeoutMs = + conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + s"${conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms") + assert(defaultSlaveTimeoutMs === 110000) + conf.set("spark.storage.blockManagerSlaveTimeoutMs", "13000ms") + val slaveTimeoutMs = + conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + s"${conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms") + assert(slaveTimeoutMs === 13000) + conf.remove("spark.network.timeout") + val executorTimeoutMs = + conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms").seconds.toMillis + assert(executorTimeoutMs === 13000) + val timeoutIntervalMs = 60000 + conf.set("spark.network.timeoutInterval", "130") + val checkTimeoutIntervalMs = conf.getTimeAsSeconds("spark.network.timeoutInterval", + s"${timeoutIntervalMs}ms").seconds.toMillis + assert(checkTimeoutIntervalMs === 130000) + } } class Class1 {} From 025bfb32648a4cbd2e98d5a7948449b063df28f4 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Thu, 21 Jun 2018 14:54:22 +0800 Subject: [PATCH 4/5] remove temp val slaveTimeoutMs and timeoutIntervalMs --- .../org/apache/spark/HeartbeatReceiver.scala | 12 ++++----- .../org/apache/spark/SparkConfSuite.scala | 26 +++++++------------ 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 27b32e2b82843..c68b6beb0747f 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -75,19 +75,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses // "milliseconds" - private val slaveTimeoutMs = - sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms") private val executorTimeoutMs = - sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms").seconds.toMillis + sc.conf.getTimeAsSeconds("spark.network.timeout", + s"${sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + "120s")}ms").seconds.toMillis // "spark.network.timeoutInterval" uses "seconds", while // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" - private val timeoutIntervalMs = - sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") private val checkTimeoutIntervalMs = sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", - s"${timeoutIntervalMs}ms").seconds.toMillis + s"${sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", + "60s")}ms").seconds.toMillis private var timeoutCheckingTask: ScheduledFuture[_] = null diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 7e134db4ec008..f7a2086d8899d 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -374,25 +374,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("SPARK-24566") { val conf = new SparkConf() - conf.set("spark.network.timeout", "110") - val defaultSlaveTimeoutMs = - conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms") - assert(defaultSlaveTimeoutMs === 110000) conf.set("spark.storage.blockManagerSlaveTimeoutMs", "13000ms") - val slaveTimeoutMs = - conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms") - assert(slaveTimeoutMs === 13000) - conf.remove("spark.network.timeout") + conf.set("spark.storage.blockManagerTimeoutIntervalMs", "13000ms") + val executorTimeoutMs = - conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms").seconds.toMillis + conf.getTimeAsSeconds("spark.network.timeout", + s"${conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + "120s")}ms").seconds.toMillis assert(executorTimeoutMs === 13000) - val timeoutIntervalMs = 60000 - conf.set("spark.network.timeoutInterval", "130") - val checkTimeoutIntervalMs = conf.getTimeAsSeconds("spark.network.timeoutInterval", - s"${timeoutIntervalMs}ms").seconds.toMillis - assert(checkTimeoutIntervalMs === 130000) + val checkTimeoutIntervalMs = + conf.getTimeAsSeconds("spark.network.timeoutInterval", + s"${conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", + "60s")}ms").seconds.toMillis + assert(checkTimeoutIntervalMs === 13000) } } From 536025d98cf5f1a11415c67b385e689ec6074699 Mon Sep 17 00:00:00 2001 From: xueyu <278006819@qq.com> Date: Fri, 29 Jun 2018 20:27:01 +0800 Subject: [PATCH 5/5] fix format --- .../org/apache/spark/HeartbeatReceiver.scala | 12 +++++------- .../scala/org/apache/spark/SparkConfSuite.scala | 17 ----------------- .../MesosCoarseGrainedSchedulerBackend.scala | 2 +- 3 files changed, 6 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index c68b6beb0747f..bcbc8df0d5865 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable import scala.concurrent.Future -import scala.concurrent.duration._ import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} @@ -76,16 +75,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses // "milliseconds" private val executorTimeoutMs = - sc.conf.getTimeAsSeconds("spark.network.timeout", - s"${sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - "120s")}ms").seconds.toMillis + sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s") // "spark.network.timeoutInterval" uses "seconds", while // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" + private val timeoutIntervalMs = + sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") private val checkTimeoutIntervalMs = - sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", - s"${sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", - "60s")}ms").seconds.toMillis + sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 private var timeoutCheckingTask: ScheduledFuture[_] = null diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index f7a2086d8899d..0d06b02e74e34 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -371,23 +371,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(thrown.getMessage.contains(key)) } } - - test("SPARK-24566") { - val conf = new SparkConf() - conf.set("spark.storage.blockManagerSlaveTimeoutMs", "13000ms") - conf.set("spark.storage.blockManagerTimeoutIntervalMs", "13000ms") - - val executorTimeoutMs = - conf.getTimeAsSeconds("spark.network.timeout", - s"${conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - "120s")}ms").seconds.toMillis - assert(executorTimeoutMs === 13000) - val checkTimeoutIntervalMs = - conf.getTimeAsSeconds("spark.network.timeoutInterval", - s"${conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", - "60s")}ms").seconds.toMillis - assert(checkTimeoutIntervalMs === 13000) - } } class Class1 {} diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d35bea4aca311..1ce2f816dffb2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -634,7 +634,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( slave.hostname, externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"), + s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) slave.shuffleRegistered = true }