From 612ba0e0018378204bd5eebeec8fe77c29659f72 Mon Sep 17 00:00:00 2001 From: Chris Wu Date: Tue, 1 Jun 2021 21:54:51 -0700 Subject: [PATCH] Wait until driver is ready before executors start Before creating executor pods, wait until driver gets ready. The driver's headless service can be resolved by DNS only after the driver pod is ready. If the executor tries to connect to the headless service before driver pod is ready, it will hit UnkownHostException and get into error state but will not be restarted. --- .../cluster/k8s/ExecutorPodsAllocator.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 3349e0c14771d..3b70870a15afe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.JavaConverters._ @@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator( podAllocationDelay * 5, conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) + private val driverPodReadinessTimeout = 5 + private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000 private val namespace = conf.get(KUBERNETES_NAMESPACE) @@ -345,6 +348,16 @@ private[spark] class ExecutorPodsAllocator( s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) + // wait until the driver pod is ready to ensure executors can connect to driver svc + if (numExecutorsToAllocate > 0) { + try { + kubernetesClient.pods().inNamespace(namespace).withName(kubernetesDriverPodName.get). + waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES) + } catch { + case e: InterruptedException => + logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready") + } + } for ( _ <- 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf(