From 6857f5eba6671016d7c2277902cb9f9488835565 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Sun, 25 Nov 2018 22:18:42 +0200 Subject: [PATCH] add config option to keep executor pods --- docs/running-on-kubernetes.md | 7 +++++++ .../org/apache/spark/deploy/k8s/Config.scala | 7 +++++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 15 ++++++++++----- .../k8s/ExecutorPodsLifecycleManager.scala | 8 ++++++-- .../k8s/KubernetesClusterSchedulerBackend.scala | 15 ++++++++++----- .../k8s/ExecutorPodsLifecycleManagerSuite.scala | 14 +++++++++++++- 6 files changed, 53 insertions(+), 13 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index a9d448820e700..3b31772dd03f3 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -926,6 +926,13 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml` + + spark.kubernetes.executor.deleteOnTermination + true + + Specify whether executor pods should be deleted in case of failure or normal termination. + + #### Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index a32bd93bb65bc..a64124f47ee40 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -281,6 +281,13 @@ private[spark] object Config extends Logging { val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." + val KUBERNETES_DELETE_EXECUTORS = + ConfigBuilder("spark.kubernetes.executor.deleteOnTermination") + .doc("If set to false then executor pods will not be deleted in case " + + "of failure or normal termination.") + .booleanConf + .createWithDefault(true) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 77bb9c3fcc9f4..ef4cbdf162c6c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -51,6 +51,8 @@ private[spark] class ExecutorPodsAllocator( private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) + private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + private val driverPod = kubernetesDriverPodName .map(name => Option(kubernetesClient.pods() .withName(name) @@ -86,11 +88,14 @@ private[spark] class ExecutorPodsAllocator( s" cluster after $podCreationTimeout milliseconds despite the fact that a" + " previous allocation attempt tried to create it. The executor may have been" + " deleted but the application missed the deletion event.") - Utils.tryLogNonFatalError { - kubernetesClient - .pods() - .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) - .delete() + + if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) + .delete() + } } newlyCreatedExecutors -= execId } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index 77a1d6cfae3bd..95e1ba8362a02 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -30,7 +30,7 @@ import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.util.Utils private[spark] class ExecutorPodsLifecycleManager( - conf: SparkConf, + val conf: SparkConf, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, // Use a best-effort to track which executors have been removed already. It's not generally @@ -43,6 +43,8 @@ private[spark] class ExecutorPodsLifecycleManager( private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + private lazy val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { snapshotsStore.addSubscriber(eventProcessingInterval) { onNewSnapshots(schedulerBackend, _) @@ -112,7 +114,9 @@ private[spark] class ExecutorPodsLifecycleManager( schedulerBackend: KubernetesClusterSchedulerBackend, execIdsRemovedInRound: mutable.Set[Long]): Unit = { removeExecutorFromSpark(schedulerBackend, podState, execId) - removeExecutorFromK8s(podState.pod) + if (shouldDeleteExecutors) { + removeExecutorFromK8s(podState.pod) + } execIdsRemovedInRound += execId } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index fa6dc2c479bbf..6356b58645806 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService import io.fabric8.kubernetes.client.KubernetesClient import scala.concurrent.{ExecutionContext, Future} +import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} @@ -51,6 +52,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = { removeExecutor(executorId, reason) @@ -82,11 +85,13 @@ private[spark] class KubernetesClusterSchedulerBackend( pollEvents.stop() } - Utils.tryLogNonFatalError { - kubernetesClient.pods() - .withLabel(SPARK_APP_ID_LABEL, applicationId()) - .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) - .delete() + if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { + kubernetesClient.pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .delete() + } } Utils.tryLogNonFatalError { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index 3995b2afe7c45..7411f8f9d69e9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Matchers.any -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{mock, never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter @@ -30,6 +30,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.scheduler.ExecutorExited @@ -100,6 +101,17 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) } + test("Keep executor pods in k8s if configured.") { + val failedPod = failedExecutorWithoutDeletion(1) + eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false) + snapshotsStore.updatePod(failedPod) + snapshotsStore.notifySubscribers() + val msg = exitReasonMessage(1, failedPod) + val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg) + verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) + verify(podOperations, never()).delete() + } + private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = { val reason = Option(failedPod.getStatus.getReason) val message = Option(failedPod.getStatus.getMessage)