From 1e4ad6756253914f2bf3c5b25dfb4c3efbf2deb9 Mon Sep 17 00:00:00 2001 From: Chris Wu Date: Tue, 1 Jun 2021 21:54:51 -0700 Subject: [PATCH 1/6] Ensure driver is ready before executors start Before creating executor pods, wait until driver gets ready. The driver's headless service can be resolved by DNS only after the driver pod is ready. If the executor tries to connect to the headless service before driver pod is ready, it will hit UnkownHostException and get into error state but will not be restarted. --- .../cluster/k8s/ExecutorPodsAllocator.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 3349e0c14771d..3b70870a15afe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.JavaConverters._ @@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator( podAllocationDelay * 5, conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) + private val driverPodReadinessTimeout = 5 + private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000 private val namespace = conf.get(KUBERNETES_NAMESPACE) @@ -345,6 +348,16 @@ private[spark] class ExecutorPodsAllocator( s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) + // wait until the driver pod is ready to ensure executors can connect to driver svc + if (numExecutorsToAllocate > 0) { + try { + kubernetesClient.pods().inNamespace(namespace).withName(kubernetesDriverPodName.get). + waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES) + } catch { + case e: InterruptedException => + logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready") + } + } for ( _ <- 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf( From 66981fe3d536bab334f2080d9eac73ae409b667a Mon Sep 17 00:00:00 2001 From: Chris Wu Date: Wed, 2 Jun 2021 10:51:02 -0700 Subject: [PATCH 2/6] Move the wait for driver readiness Move the wait from onNewSnapshots() to start() in ExecutorPodsAllocator, so that this check will be only done once on start. Mock waitUntilReady() to not break existing tests --- .../cluster/k8s/ExecutorPodsAllocator.scala | 20 +++++++++---------- .../k8s/ExecutorPodsAllocatorSuite.scala | 5 ++++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 3b70870a15afe..07fb789b9f729 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -102,6 +102,16 @@ private[spark] class ExecutorPodsAllocator( @volatile private var deletedExecutorIds = Set.empty[Long] def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { + // wait until the driver pod is ready to ensure executors can connect to driver svc + try { + kubernetesClient.pods() + .withName(kubernetesDriverPodName.get) + .waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES) + } catch { + case e: InterruptedException => + logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready in " + + s"namespace $namespace") + } snapshotsStore.addSubscriber(podAllocationDelay) { onNewSnapshots(applicationId, schedulerBackend, _) } @@ -348,16 +358,6 @@ private[spark] class ExecutorPodsAllocator( s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) - // wait until the driver pod is ready to ensure executors can connect to driver svc - if (numExecutorsToAllocate > 0) { - try { - kubernetesClient.pods().inNamespace(namespace).withName(kubernetesDriverPodName.get). - waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES) - } catch { - case e: InterruptedException => - logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready") - } - } for ( _ <- 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 17bdb1d2bce7c..a16ccd10438a8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -24,7 +25,7 @@ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.ArgumentMatchers.{any, anyInt, eq => meq} import org.mockito.Mockito.{never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -104,6 +105,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(driverPodOperations.get).thenReturn(driverPod) + when(driverPodOperations.waitUntilReady(anyInt(), any(classOf[TimeUnit]))) + .thenReturn(driverPod) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() From 02cf0f3daf2bc727ddb8adc13b9629cf78840b80 Mon Sep 17 00:00:00 2001 From: Chris Wu Date: Wed, 2 Jun 2021 19:25:10 -0700 Subject: [PATCH 3/6] Improve comment and code style --- .../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 3 ++- .../scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala | 6 ++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 07fb789b9f729..7f5aa86a863c2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -102,7 +102,8 @@ private[spark] class ExecutorPodsAllocator( @volatile private var deletedExecutorIds = Set.empty[Long] def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { - // wait until the driver pod is ready to ensure executors can connect to driver svc + // Wait until the driver pod is ready before starting executors, as the headless service won't + // be resolvable by DNS until the driver pod is ready. try { kubernetesClient.pods() .withName(kubernetesDriverPodName.get) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index a16ccd10438a8..9a04d53b3d63d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant -import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -25,7 +24,7 @@ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.ArgumentMatchers.{any, anyInt, eq => meq} +import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -105,8 +104,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(driverPodOperations.get).thenReturn(driverPod) - when(driverPodOperations.waitUntilReady(anyInt(), any(classOf[TimeUnit]))) - .thenReturn(driverPod) + when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() From 9fafec335e674f87a98f4e8f90dccc6800b17b80 Mon Sep 17 00:00:00 2001 From: Chris Wu Date: Thu, 3 Jun 2021 20:12:00 -0700 Subject: [PATCH 4/6] Add config for the timeout --- .../scala/org/apache/spark/deploy/k8s/Config.scala | 10 ++++++++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 13 +++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 8df8e66912799..370704c89eff1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -300,6 +300,16 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Allocation batch delay must be a positive time value.") .createWithDefaultString("1s") + val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT = + ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout") + .doc("Time to wait for driver pod to get ready before creating executor pods. This wait " + + "only happens on application start. If timeout happens, executor pods will still be " + + "created.") + .version("3.2.0") + .timeConf(TimeUnit.SECONDS) + .checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive time value.") + .createWithDefaultString("1s") + val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT = ConfigBuilder("spark.kubernetes.allocation.executor.timeout") .doc("Time to wait before a newly created executor POD request, which does not reached " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 7f5aa86a863c2..e8ca7ea873f6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -62,7 +62,7 @@ private[spark] class ExecutorPodsAllocator( podAllocationDelay * 5, conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) - private val driverPodReadinessTimeout = 5 + private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT) private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000 @@ -104,14 +104,11 @@ private[spark] class ExecutorPodsAllocator( def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { // Wait until the driver pod is ready before starting executors, as the headless service won't // be resolvable by DNS until the driver pod is ready. - try { - kubernetesClient.pods() + Utils.tryLogNonFatalError { + kubernetesClient + .pods() .withName(kubernetesDriverPodName.get) - .waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES) - } catch { - case e: InterruptedException => - logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready in " + - s"namespace $namespace") + .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) } snapshotsStore.addSubscriber(podAllocationDelay) { onNewSnapshots(applicationId, schedulerBackend, _) From 20509d147102819d1b2a28d02c1c2437ab989668 Mon Sep 17 00:00:00 2001 From: Chris Wu Date: Thu, 3 Jun 2021 20:49:27 -0700 Subject: [PATCH 5/6] Fix lint issue --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 370704c89eff1..2019b7d0353c5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -307,7 +307,8 @@ private[spark] object Config extends Logging { "created.") .version("3.2.0") .timeConf(TimeUnit.SECONDS) - .checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive time value.") + .checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive " + + "time value.") .createWithDefaultString("1s") val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT = From 4dbae0a7280471df9903c9a4b2a998f915968bc9 Mon Sep 17 00:00:00 2001 From: Chris Wu Date: Thu, 3 Jun 2021 23:01:31 -0700 Subject: [PATCH 6/6] Update config version to 3.1.3 This config needs to be backported to branch-3.1. --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2019b7d0353c5..9fceca783ae0c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -305,7 +305,7 @@ private[spark] object Config extends Logging { .doc("Time to wait for driver pod to get ready before creating executor pods. This wait " + "only happens on application start. If timeout happens, executor pods will still be " + "created.") - .version("3.2.0") + .version("3.1.3") .timeConf(TimeUnit.SECONDS) .checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive " + "time value.")