Skip to content

Commit

Permalink
[SPARK-32975][K8S] Add config for driver readiness timeout before exe…
Browse files Browse the repository at this point in the history
…cutors start

### What changes were proposed in this pull request?
Add a new config that controls the timeout of waiting for driver pod's readiness before allocating executor pods. This wait only happens once on application start.

### Why are the changes needed?
The driver's headless service can be resolved by DNS only after the driver pod is ready. If the executor tries to connect to the headless service before driver pod is ready, it will hit UnkownHostException and get into error state but will not be restarted. **This case usually happens when the driver pod has sidecar containers but hasn't finished their creation when executors start.** So basically there is a race condition. This issue can be mitigated by tweaking this config.

### Does this PR introduce _any_ user-facing change?
A new config `spark.kubernetes.allocation.driver.readinessTimeout` added.

### How was this patch tested?
Exisiting tests.

Closes #32752 from cchriswu/SPARK-32975-fix.

Lead-authored-by: Chris Wu <wucaowei19@gmail.com>
Co-authored-by: Chris Wu <wcaowei@vmware.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 497c80a)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
2 people authored and dongjoon-hyun committed Jun 4, 2021
1 parent e75fabf commit 252dfd9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,17 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay must be a positive time value.")
.createWithDefaultString("1s")

val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT =
ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout")
.doc("Time to wait for driver pod to get ready before creating executor pods. This wait " +
"only happens on application start. If timeout happens, executor pods will still be " +
"created.")
.version("3.1.3")
.timeConf(TimeUnit.SECONDS)
.checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive "
+ "time value.")
.createWithDefaultString("1s")

val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT =
ConfigBuilder("spark.kubernetes.allocation.executor.timeout")
.doc("Time to wait before a newly created executor POD request, which does not reached " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))

private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)

private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000

private val namespace = conf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -99,6 +102,14 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]

def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
// Wait until the driver pod is ready before starting executors, as the headless service won't
// be resolvable by DNS until the driver pod is ready.
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withName(kubernetesDriverPodName.get)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
Expand Down

0 comments on commit 252dfd9

Please sign in to comment.