diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 8df8e66912799..9fceca783ae0c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -300,6 +300,17 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Allocation batch delay must be a positive time value.") .createWithDefaultString("1s") + val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT = + ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout") + .doc("Time to wait for driver pod to get ready before creating executor pods. This wait " + + "only happens on application start. If timeout happens, executor pods will still be " + + "created.") + .version("3.1.3") + .timeConf(TimeUnit.SECONDS) + .checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive " + + "time value.") + .createWithDefaultString("1s") + val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT = ConfigBuilder("spark.kubernetes.allocation.executor.timeout") .doc("Time to wait before a newly created executor POD request, which does not reached " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 3349e0c14771d..e8ca7ea873f6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.JavaConverters._ @@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator( podAllocationDelay * 5, conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) + private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT) + private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000 private val namespace = conf.get(KUBERNETES_NAMESPACE) @@ -99,6 +102,14 @@ private[spark] class ExecutorPodsAllocator( @volatile private var deletedExecutorIds = Set.empty[Long] def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { + // Wait until the driver pod is ready before starting executors, as the headless service won't + // be resolvable by DNS until the driver pod is ready. + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .withName(kubernetesDriverPodName.get) + .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) + } snapshotsStore.addSubscriber(podAllocationDelay) { onNewSnapshots(applicationId, schedulerBackend, _) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 17bdb1d2bce7c..9a04d53b3d63d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -104,6 +104,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(driverPodOperations.get).thenReturn(driverPod) + when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()