diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala index bca66759d586e..da332881ae1a2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -31,10 +31,10 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf) override def configurePod(pod: SparkPod): SparkPod = { val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME) - require(hadoopConfDirCMapName.isDefined, - "Ensure that the env `HADOOP_CONF_DIR` is defined either in the client or " + - " using pre-existing ConfigMaps") - logInfo("HADOOP_CONF_DIR defined") - HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod) + if (hadoopConfDirCMapName.isDefined) { + HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod) + } else { + pod + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala index e342110763196..c038e75491ca5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala @@ -28,7 +28,8 @@ private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutor extends KubernetesFeatureConfigStep { override def configurePod(pod: SparkPod): SparkPod = { - val sparkUserName = conf.get(KERBEROS_SPARK_USER_NAME) - HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod) + conf.getOption(KERBEROS_SPARK_USER_NAME).map { user => + HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod) + }.getOrElse(pod) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala index 32bb6a5d2bcbb..907271b1cb483 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala @@ -27,18 +27,20 @@ import org.apache.spark.internal.Logging private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep with Logging { - private val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME) - require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found") - override def configurePod(pod: SparkPod): SparkPod = { - logInfo(s"Mounting Resources for Kerberos") - HadoopBootstrapUtil.bootstrapKerberosPod( - conf.get(KERBEROS_DT_SECRET_NAME), - conf.get(KERBEROS_DT_SECRET_KEY), - conf.get(KERBEROS_SPARK_USER_NAME), - None, - None, - maybeKrb5CMap, - pod) + val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME) + if (maybeKrb5CMap.isDefined) { + logInfo(s"Mounting Resources for Kerberos") + HadoopBootstrapUtil.bootstrapKerberosPod( + conf.get(KERBEROS_DT_SECRET_NAME), + conf.get(KERBEROS_DT_SECRET_KEY), + conf.get(KERBEROS_SPARK_USER_NAME), + None, + None, + maybeKrb5CMap, + pod) + } else { + pod + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala index 09dcf93a54f8e..7f41ca43589b6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -28,44 +28,60 @@ import org.apache.spark.deploy.k8s.Constants._ private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { + + private val hasTemplate = conf.contains(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + def configurePod(pod: SparkPod): SparkPod = { - val podWithVolume = new PodBuilder(pod.pod) - .editSpec() - .addNewVolume() - .withName(POD_TEMPLATE_VOLUME) - .withNewConfigMap() - .withName(POD_TEMPLATE_CONFIGMAP) - .addNewItem() - .withKey(POD_TEMPLATE_KEY) - .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME) - .endItem() - .endConfigMap() - .endVolume() - .endSpec() - .build() + if (hasTemplate) { + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolume() + .withName(POD_TEMPLATE_VOLUME) + .withNewConfigMap() + .withName(POD_TEMPLATE_CONFIGMAP) + .addNewItem() + .withKey(POD_TEMPLATE_KEY) + .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME) + .endItem() + .endConfigMap() + .endVolume() + .endSpec() + .build() - val containerWithVolume = new ContainerBuilder(pod.container) - .addNewVolumeMount() - .withName(POD_TEMPLATE_VOLUME) - .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH) - .endVolumeMount() - .build() - SparkPod(podWithVolume, containerWithVolume) + val containerWithVolume = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(POD_TEMPLATE_VOLUME) + .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH) + .endVolumeMount() + .build() + SparkPod(podWithVolume, containerWithVolume) + } else { + pod + } } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( - KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> - (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) + override def getAdditionalPodSystemProperties(): Map[String, String] = { + if (hasTemplate) { + Map[String, String]( + KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> + (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) + } else { + Map.empty + } + } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { - require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) - val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get - val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) - Seq(new ConfigMapBuilder() - .withNewMetadata() - .withName(POD_TEMPLATE_CONFIGMAP) - .endMetadata() - .addToData(POD_TEMPLATE_KEY, podTemplateString) - .build()) + if (hasTemplate) { + val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get + val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) + Seq(new ConfigMapBuilder() + .withNewMetadata() + .withName(POD_TEMPLATE_CONFIGMAP) + .endMetadata() + .addToData(POD_TEMPLATE_KEY, podTemplateString) + .build()) + } else { + Nil + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 70a93c968795e..3888778bf84ca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -104,7 +104,7 @@ private[spark] class Client( watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { - val resolvedDriverSpec = builder.buildFromFeatures(conf) + val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient) val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map" val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties) // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the @@ -232,7 +232,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { None)) { kubernetesClient => val client = new Client( kubernetesConf, - KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf), + new KubernetesDriverBuilder(), kubernetesClient, waitForAppCompletion, watcher) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index a5ad9729aee9a..d2c0ced9fa2f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -20,90 +20,49 @@ import java.io.File import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ -private[spark] class KubernetesDriverBuilder( - provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) = - new BasicDriverFeatureStep(_), - provideCredentialsStep: (KubernetesDriverConf => DriverKubernetesCredentialsFeatureStep) = - new DriverKubernetesCredentialsFeatureStep(_), - provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) = - new DriverServiceFeatureStep(_), - provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = - new MountSecretsFeatureStep(_), - provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = - new EnvSecretsFeatureStep(_), - provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) = - new LocalDirsFeatureStep(_), - provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) = - new MountVolumesFeatureStep(_), - provideDriverCommandStep: (KubernetesDriverConf => DriverCommandFeatureStep) = - new DriverCommandFeatureStep(_), - provideHadoopGlobalStep: (KubernetesDriverConf => KerberosConfDriverFeatureStep) = - new KerberosConfDriverFeatureStep(_), - providePodTemplateConfigMapStep: (KubernetesConf => PodTemplateConfigMapStep) = - new PodTemplateConfigMapStep(_), - provideInitialPod: () => SparkPod = () => SparkPod.initialPod) { +private[spark] class KubernetesDriverBuilder { - def buildFromFeatures(kubernetesConf: KubernetesDriverConf): KubernetesDriverSpec = { - val baseFeatures = Seq( - provideBasicStep(kubernetesConf), - provideCredentialsStep(kubernetesConf), - provideServiceStep(kubernetesConf), - provideLocalDirsStep(kubernetesConf)) - - val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) { - Seq(provideSecretsStep(kubernetesConf)) - } else Nil - val envSecretFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) { - Seq(provideEnvSecretsStep(kubernetesConf)) - } else Nil - val volumesFeature = if (kubernetesConf.volumes.nonEmpty) { - Seq(provideVolumesStep(kubernetesConf)) - } else Nil - val podTemplateFeature = if ( - kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { - Seq(providePodTemplateConfigMapStep(kubernetesConf)) - } else Nil - - val driverCommandStep = provideDriverCommandStep(kubernetesConf) - - val hadoopConfigStep = Some(provideHadoopGlobalStep(kubernetesConf)) + def buildFromFeatures( + conf: KubernetesDriverConf, + client: KubernetesClient): KubernetesDriverSpec = { + val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) + .map { file => + KubernetesUtils.loadPodFromTemplate( + client, + new File(file), + conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME)) + } + .getOrElse(SparkPod.initialPod()) - val allFeatures: Seq[KubernetesFeatureConfigStep] = - baseFeatures ++ Seq(driverCommandStep) ++ - secretFeature ++ envSecretFeature ++ volumesFeature ++ - hadoopConfigStep ++ podTemplateFeature + val features = Seq( + new BasicDriverFeatureStep(conf), + new DriverKubernetesCredentialsFeatureStep(conf), + new DriverServiceFeatureStep(conf), + new MountSecretsFeatureStep(conf), + new EnvSecretsFeatureStep(conf), + new LocalDirsFeatureStep(conf), + new MountVolumesFeatureStep(conf), + new DriverCommandFeatureStep(conf), + new KerberosConfDriverFeatureStep(conf), + new PodTemplateConfigMapStep(conf)) - var spec = KubernetesDriverSpec( - provideInitialPod(), + val spec = KubernetesDriverSpec( + initialPod, driverKubernetesResources = Seq.empty, - kubernetesConf.sparkConf.getAll.toMap) - for (feature <- allFeatures) { + conf.sparkConf.getAll.toMap) + + features.foldLeft(spec) { case (spec, feature) => val configuredPod = feature.configurePod(spec.pod) val addedSystemProperties = feature.getAdditionalPodSystemProperties() val addedResources = feature.getAdditionalKubernetesResources() - spec = KubernetesDriverSpec( + KubernetesDriverSpec( configuredPod, spec.driverKubernetesResources ++ addedResources, spec.systemProperties ++ addedSystemProperties) } - spec } -} -private[spark] object KubernetesDriverBuilder { - def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = { - conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) - .map(new File(_)) - .map(file => new KubernetesDriverBuilder(provideInitialPod = () => - KubernetesUtils.loadPodFromTemplate( - kubernetesClient, - file, - conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME)) - )) - .getOrElse(new KubernetesDriverBuilder()) - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index ac42554b1334b..da3edfeca9b1f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -136,7 +136,8 @@ private[spark] class ExecutorPodsAllocator( newExecutorId.toString, applicationId, driverPod) - val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr) + val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, + kubernetesClient) val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() .addToContainers(executorPod.container) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index b31fbb420ed6d..809bdf8ca8c27 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -95,7 +95,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val executorPodsAllocator = new ExecutorPodsAllocator( sc.conf, sc.env.securityManager, - KubernetesExecutorBuilder(kubernetesClient, sc.conf), + new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock()) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index ba273cad6a8e5..0b74966fe8685 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -20,86 +20,36 @@ import java.io.File import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.SecurityManager import org.apache.spark.deploy.k8s._ -import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ -private[spark] class KubernetesExecutorBuilder( - provideBasicStep: (KubernetesExecutorConf, SecurityManager) => BasicExecutorFeatureStep = - new BasicExecutorFeatureStep(_, _), - provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = - new MountSecretsFeatureStep(_), - provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = - new EnvSecretsFeatureStep(_), - provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) = - new LocalDirsFeatureStep(_), - provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) = - new MountVolumesFeatureStep(_), - provideHadoopConfStep: (KubernetesExecutorConf => HadoopConfExecutorFeatureStep) = - new HadoopConfExecutorFeatureStep(_), - provideKerberosConfStep: (KubernetesExecutorConf => KerberosConfExecutorFeatureStep) = - new KerberosConfExecutorFeatureStep(_), - provideHadoopSparkUserStep: (KubernetesExecutorConf => HadoopSparkUserExecutorFeatureStep) = - new HadoopSparkUserExecutorFeatureStep(_), - provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { +private[spark] class KubernetesExecutorBuilder { def buildFromFeatures( - kubernetesConf: KubernetesExecutorConf, - secMgr: SecurityManager): SparkPod = { - val sparkConf = kubernetesConf.sparkConf - val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) - val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME) - val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY) - - val baseFeatures = Seq(provideBasicStep(kubernetesConf, secMgr), - provideLocalDirsStep(kubernetesConf)) - val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) { - Seq(provideSecretsStep(kubernetesConf)) - } else Nil - val secretEnvFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) { - Seq(provideEnvSecretsStep(kubernetesConf)) - } else Nil - val volumesFeature = if (kubernetesConf.volumes.nonEmpty) { - Seq(provideVolumesStep(kubernetesConf)) - } else Nil - - val maybeHadoopConfFeatureSteps = maybeHadoopConfigMap.map { _ => - val maybeKerberosStep = - if (maybeDTSecretName.isDefined && maybeDTDataItem.isDefined) { - provideKerberosConfStep(kubernetesConf) - } else { - provideHadoopSparkUserStep(kubernetesConf) - } - Seq(provideHadoopConfStep(kubernetesConf)) :+ - maybeKerberosStep - }.getOrElse(Seq.empty[KubernetesFeatureConfigStep]) - - val allFeatures: Seq[KubernetesFeatureConfigStep] = - baseFeatures ++ - secretFeature ++ - secretEnvFeature ++ - volumesFeature ++ - maybeHadoopConfFeatureSteps - - var executorPod = provideInitialPod() - for (feature <- allFeatures) { - executorPod = feature.configurePod(executorPod) - } - executorPod + conf: KubernetesExecutorConf, + secMgr: SecurityManager, + client: KubernetesClient): SparkPod = { + val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + .map { file => + KubernetesUtils.loadPodFromTemplate( + client, + new File(file), + conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)) + } + .getOrElse(SparkPod.initialPod()) + + val features = Seq( + new BasicExecutorFeatureStep(conf, secMgr), + new MountSecretsFeatureStep(conf), + new EnvSecretsFeatureStep(conf), + new LocalDirsFeatureStep(conf), + new MountVolumesFeatureStep(conf), + new HadoopConfExecutorFeatureStep(conf), + new KerberosConfExecutorFeatureStep(conf), + new HadoopSparkUserExecutorFeatureStep(conf)) + + features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } } -} -private[spark] object KubernetesExecutorBuilder { - def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesExecutorBuilder = { - conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) - .map(new File(_)) - .map(file => new KubernetesExecutorBuilder(provideInitialPod = () => - KubernetesUtils.loadPodFromTemplate( - kubernetesClient, - file, - conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)) - )) - .getOrElse(new KubernetesExecutorBuilder()) - } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala new file mode 100644 index 0000000000000..7dde0c1377168 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File + +import io.fabric8.kubernetes.api.model.{Config => _, _} +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, never, verify, when} +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.internal.config.ConfigEntry + +abstract class PodBuilderSuite extends SparkFunSuite { + + protected def templateFileConf: ConfigEntry[_] + + protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod + + private val baseConf = new SparkConf(false) + .set(Config.CONTAINER_IMAGE, "spark-executor:latest") + + test("use empty initial pod if template is not specified") { + val client = mock(classOf[KubernetesClient]) + buildPod(baseConf.clone(), client) + verify(client, never()).pods() + } + + test("load pod template if specified") { + val client = mockKubernetesClient() + val sparkConf = baseConf.clone().set(templateFileConf.key, "template-file.yaml") + val pod = buildPod(sparkConf, client) + verifyPod(pod) + } + + test("complain about misconfigured pod template") { + val client = mockKubernetesClient( + new PodBuilder() + .withNewMetadata() + .addToLabels("test-label-key", "test-label-value") + .endMetadata() + .build()) + val sparkConf = baseConf.clone().set(templateFileConf.key, "template-file.yaml") + val exception = intercept[SparkException] { + buildPod(sparkConf, client) + } + assert(exception.getMessage.contains("Could not load pod from template file.")) + } + + private def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = { + val kubernetesClient = mock(classOf[KubernetesClient]) + val pods = + mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]]) + val podResource = mock(classOf[PodResource[Pod, DoneablePod]]) + when(kubernetesClient.pods()).thenReturn(pods) + when(pods.load(any(classOf[File]))).thenReturn(podResource) + when(podResource.get()).thenReturn(pod) + kubernetesClient + } + + private def verifyPod(pod: SparkPod): Unit = { + val metadata = pod.pod.getMetadata + assert(metadata.getLabels.containsKey("test-label-key")) + assert(metadata.getAnnotations.containsKey("test-annotation-key")) + assert(metadata.getNamespace === "namespace") + assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference")) + val spec = pod.pod.getSpec + assert(!spec.getContainers.asScala.exists(_.getName == "executor-container")) + assert(spec.getDnsPolicy === "dns-policy") + assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname"))) + assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference")) + assert(spec.getInitContainers.asScala.exists(_.getName == "init-container")) + assert(spec.getNodeName == "node-name") + assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value") + assert(spec.getSchedulerName === "scheduler") + assert(spec.getSecurityContext.getRunAsUser === 1000L) + assert(spec.getServiceAccount === "service-account") + assert(spec.getSubdomain === "subdomain") + assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key")) + assert(spec.getVolumes.asScala.exists(_.getName == "test-volume")) + val container = pod.container + assert(container.getName === "executor-container") + assert(container.getArgs.contains("arg")) + assert(container.getCommand.equals(List("command").asJava)) + assert(container.getEnv.asScala.exists(_.getName == "env-key")) + assert(container.getResources.getLimits.get("gpu") === + new QuantityBuilder().withAmount("1").build()) + assert(container.getSecurityContext.getRunAsNonRoot) + assert(container.getStdin) + assert(container.getTerminationMessagePath === "termination-message-path") + assert(container.getTerminationMessagePolicy === "termination-message-policy") + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume")) + } + + private def podWithSupportedFeatures(): Pod = { + new PodBuilder() + .withNewMetadata() + .addToLabels("test-label-key", "test-label-value") + .addToAnnotations("test-annotation-key", "test-annotation-value") + .withNamespace("namespace") + .addNewOwnerReference() + .withController(true) + .withName("owner-reference") + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withDnsPolicy("dns-policy") + .withHostAliases(new HostAliasBuilder().withHostnames("hostname").build()) + .withImagePullSecrets( + new LocalObjectReferenceBuilder().withName("local-reference").build()) + .withInitContainers(new ContainerBuilder().withName("init-container").build()) + .withNodeName("node-name") + .withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava) + .withSchedulerName("scheduler") + .withNewSecurityContext() + .withRunAsUser(1000L) + .endSecurityContext() + .withServiceAccount("service-account") + .withSubdomain("subdomain") + .withTolerations(new TolerationBuilder() + .withKey("toleration-key") + .withOperator("Equal") + .withEffect("NoSchedule") + .build()) + .addNewVolume() + .withNewHostPath() + .withPath("/test") + .endHostPath() + .withName("test-volume") + .endVolume() + .addNewContainer() + .withArgs("arg") + .withCommand("command") + .addNewEnv() + .withName("env-key") + .withValue("env-value") + .endEnv() + .withImagePullPolicy("Always") + .withName("executor-container") + .withNewResources() + .withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava) + .endResources() + .withNewSecurityContext() + .withRunAsNonRoot(true) + .endSecurityContext() + .withStdin(true) + .withTerminationMessagePath("termination-message-path") + .withTerminationMessagePolicy("termination-message-policy") + .addToVolumeMounts( + new VolumeMountBuilder() + .withName("test-volume") + .withMountPath("/test") + .build()) + .endContainer() + .endSpec() + .build() + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index 7295b82ca4799..5e7388dc8e672 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -20,25 +20,32 @@ import java.io.{File, PrintWriter} import java.nio.file.Files import io.fabric8.kubernetes.api.model.ConfigMap -import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ -class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { - private var kubernetesConf : KubernetesConf = _ - private var templateFile: File = _ +class PodTemplateConfigMapStepSuite extends SparkFunSuite { - before { - templateFile = Files.createTempFile("pod-template", "yml").toFile + test("Do nothing when executor template is not specified") { + val conf = KubernetesTestConf.createDriverConf() + val step = new PodTemplateConfigMapStep(conf) + + val initialPod = SparkPod.initialPod() + val configuredPod = step.configurePod(initialPod) + assert(configuredPod === initialPod) + + assert(step.getAdditionalKubernetesResources().isEmpty) + assert(step.getAdditionalPodSystemProperties().isEmpty) + } + + test("Mounts executor template volume if config specified") { + val templateFile = Files.createTempFile("pod-template", "yml").toFile templateFile.deleteOnExit() val sparkConf = new SparkConf(false) .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath) - kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) - } + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) - test("Mounts executor template volume if config specified") { val writer = new PrintWriter(templateFile) writer.write("pod-template-contents") writer.close() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index e9c05fef6f5db..1bb926cbca23d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -126,7 +126,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { MockitoAnnotations.initMocks(this) kconf = KubernetesTestConf.createDriverConf( resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) - when(driverBuilder.buildFromFeatures(kconf)).thenReturn(BUILT_KUBERNETES_SPEC) + when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(POD_NAME)).thenReturn(namedPods) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 7e7dc4763c2e7..6518c91a1a1fd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -16,201 +16,21 @@ */ package org.apache.spark.deploy.k8s.submit -import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.client.KubernetesClient -import org.mockito.Mockito._ -import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s._ -import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE} -import org.apache.spark.deploy.k8s.features._ +import org.apache.spark.internal.config.ConfigEntry -class KubernetesDriverBuilderSuite extends SparkFunSuite { +class KubernetesDriverBuilderSuite extends PodBuilderSuite { - private val BASIC_STEP_TYPE = "basic" - private val CREDENTIALS_STEP_TYPE = "credentials" - private val SERVICE_STEP_TYPE = "service" - private val LOCAL_DIRS_STEP_TYPE = "local-dirs" - private val SECRETS_STEP_TYPE = "mount-secrets" - private val DRIVER_CMD_STEP_TYPE = "driver-command" - private val ENV_SECRETS_STEP_TYPE = "env-secrets" - private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global" - private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" - private val TEMPLATE_VOLUME_STEP_TYPE = "template-volume" - - private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep]) - - private val credentialsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - CREDENTIALS_STEP_TYPE, classOf[DriverKubernetesCredentialsFeatureStep]) - - private val serviceStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep]) - - private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep]) - - private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep]) - - private val driverCommandStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - DRIVER_CMD_STEP_TYPE, classOf[DriverCommandFeatureStep]) - - private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) - - private val hadoopGlobalStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - HADOOP_GLOBAL_STEP_TYPE, classOf[KerberosConfDriverFeatureStep]) - - private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) - - private val templateVolumeStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - TEMPLATE_VOLUME_STEP_TYPE, classOf[PodTemplateConfigMapStep] - ) - - private val builderUnderTest: KubernetesDriverBuilder = - new KubernetesDriverBuilder( - _ => basicFeatureStep, - _ => credentialsStep, - _ => serviceStep, - _ => secretsStep, - _ => envSecretsStep, - _ => localDirsStep, - _ => mountVolumesStep, - _ => driverCommandStep, - _ => hadoopGlobalStep, - _ => templateVolumeStep) - - test("Apply fundamental steps all the time.") { - val conf = KubernetesTestConf.createDriverConf() - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE, - HADOOP_GLOBAL_STEP_TYPE) + override protected def templateFileConf: ConfigEntry[_] = { + Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE } - test("Apply secrets step if secrets are present.") { - val conf = KubernetesTestConf.createDriverConf( - secretEnvNamesToKeyRefs = Map("EnvName" -> "SecretName:secretKey"), - secretNamesToMountPaths = Map("secret" -> "secretMountPath")) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - SECRETS_STEP_TYPE, - ENV_SECRETS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE, - HADOOP_GLOBAL_STEP_TYPE) - } - - test("Apply volumes step if mounts are present.") { - val volumeSpec = KubernetesVolumeSpec( - "volume", - "/tmp", - "", - false, - KubernetesHostPathVolumeConf("/path")) - val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec)) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - MOUNT_VOLUMES_STEP_TYPE, - DRIVER_CMD_STEP_TYPE, - HADOOP_GLOBAL_STEP_TYPE) - } - - test("Apply volumes step if a mount subpath is present.") { - val volumeSpec = KubernetesVolumeSpec( - "volume", - "/tmp", - "foo", - false, - KubernetesHostPathVolumeConf("/path")) - val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec)) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - MOUNT_VOLUMES_STEP_TYPE, - DRIVER_CMD_STEP_TYPE, - HADOOP_GLOBAL_STEP_TYPE) - } - - test("Apply template volume step if executor template is present.") { - val sparkConf = new SparkConf(false) - .set(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "filename") + override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = { val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE, - HADOOP_GLOBAL_STEP_TYPE, - TEMPLATE_VOLUME_STEP_TYPE) - } - - private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) - : Unit = { - val addedProperties = resolvedSpec.systemProperties - .filter { case (k, _) => !k.startsWith("spark.") } - .toMap - assert(addedProperties.keys.toSet === stepTypes.toSet) - stepTypes.foreach { stepType => - assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType) - assert(resolvedSpec.driverKubernetesResources.containsSlice( - KubernetesFeaturesTestUtils.getSecretsForStepType(stepType))) - assert(resolvedSpec.systemProperties(stepType) === stepType) - } - } - - test("Start with empty pod if template is not specified") { - val kubernetesClient = mock(classOf[KubernetesClient]) - val driverBuilder = KubernetesDriverBuilder.apply(kubernetesClient, new SparkConf()) - verify(kubernetesClient, never()).pods() + new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod } - test("Starts with template if specified") { - val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient() - val sparkConf = new SparkConf(false) - .set(CONTAINER_IMAGE, "spark-driver:latest") - .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") - val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) - val driverSpec = KubernetesDriverBuilder - .apply(kubernetesClient, sparkConf) - .buildFromFeatures(kubernetesConf) - PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(driverSpec.pod) - } - - test("Throws on misconfigured pod template") { - val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient( - new PodBuilder() - .withNewMetadata() - .addToLabels("test-label-key", "test-label-value") - .endMetadata() - .build()) - val sparkConf = new SparkConf(false) - .set(CONTAINER_IMAGE, "spark-driver:latest") - .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") - val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) - val exception = intercept[SparkException] { - KubernetesDriverBuilder - .apply(kubernetesClient, sparkConf) - .buildFromFeatures(kubernetesConf) - } - assert(exception.getMessage.contains("Could not load pod from template file.")) - } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala deleted file mode 100644 index c92e9e6e3b6b3..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.submit - -import java.io.File - -import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource} -import org.mockito.Matchers.any -import org.mockito.Mockito.{mock, when} -import org.scalatest.FlatSpec -import scala.collection.JavaConverters._ - -import org.apache.spark.deploy.k8s.SparkPod - -object PodBuilderSuiteUtils extends FlatSpec { - - def loadingMockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = { - val kubernetesClient = mock(classOf[KubernetesClient]) - val pods = - mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]]) - val podResource = mock(classOf[PodResource[Pod, DoneablePod]]) - when(kubernetesClient.pods()).thenReturn(pods) - when(pods.load(any(classOf[File]))).thenReturn(podResource) - when(podResource.get()).thenReturn(pod) - kubernetesClient - } - - def verifyPodWithSupportedFeatures(pod: SparkPod): Unit = { - val metadata = pod.pod.getMetadata - assert(metadata.getLabels.containsKey("test-label-key")) - assert(metadata.getAnnotations.containsKey("test-annotation-key")) - assert(metadata.getNamespace === "namespace") - assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference")) - val spec = pod.pod.getSpec - assert(!spec.getContainers.asScala.exists(_.getName == "executor-container")) - assert(spec.getDnsPolicy === "dns-policy") - assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname"))) - assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference")) - assert(spec.getInitContainers.asScala.exists(_.getName == "init-container")) - assert(spec.getNodeName == "node-name") - assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value") - assert(spec.getSchedulerName === "scheduler") - assert(spec.getSecurityContext.getRunAsUser === 1000L) - assert(spec.getServiceAccount === "service-account") - assert(spec.getSubdomain === "subdomain") - assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key")) - assert(spec.getVolumes.asScala.exists(_.getName == "test-volume")) - val container = pod.container - assert(container.getName === "executor-container") - assert(container.getArgs.contains("arg")) - assert(container.getCommand.equals(List("command").asJava)) - assert(container.getEnv.asScala.exists(_.getName == "env-key")) - assert(container.getResources.getLimits.get("gpu") === - new QuantityBuilder().withAmount("1").build()) - assert(container.getSecurityContext.getRunAsNonRoot) - assert(container.getStdin) - assert(container.getTerminationMessagePath === "termination-message-path") - assert(container.getTerminationMessagePolicy === "termination-message-policy") - assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume")) - - } - - - def podWithSupportedFeatures(): Pod = new PodBuilder() - .withNewMetadata() - .addToLabels("test-label-key", "test-label-value") - .addToAnnotations("test-annotation-key", "test-annotation-value") - .withNamespace("namespace") - .addNewOwnerReference() - .withController(true) - .withName("owner-reference") - .endOwnerReference() - .endMetadata() - .withNewSpec() - .withDnsPolicy("dns-policy") - .withHostAliases(new HostAliasBuilder().withHostnames("hostname").build()) - .withImagePullSecrets( - new LocalObjectReferenceBuilder().withName("local-reference").build()) - .withInitContainers(new ContainerBuilder().withName("init-container").build()) - .withNodeName("node-name") - .withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava) - .withSchedulerName("scheduler") - .withNewSecurityContext() - .withRunAsUser(1000L) - .endSecurityContext() - .withServiceAccount("service-account") - .withSubdomain("subdomain") - .withTolerations(new TolerationBuilder() - .withKey("toleration-key") - .withOperator("Equal") - .withEffect("NoSchedule") - .build()) - .addNewVolume() - .withNewHostPath() - .withPath("/test") - .endHostPath() - .withName("test-volume") - .endVolume() - .addNewContainer() - .withArgs("arg") - .withCommand("command") - .addNewEnv() - .withName("env-key") - .withValue("env-value") - .endEnv() - .withImagePullPolicy("Always") - .withName("executor-container") - .withNewResources() - .withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava) - .endResources() - .withNewSecurityContext() - .withRunAsNonRoot(true) - .endSecurityContext() - .withStdin(true) - .withTerminationMessagePath("termination-message-path") - .withTerminationMessagePolicy("termination-message-policy") - .addToVolumeMounts( - new VolumeMountBuilder() - .withName("test-volume") - .withMountPath("/test") - .build()) - .endContainer() - .endSpec() - .build() - -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index d4fa31af3d5ce..278a3821a6f3d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -80,8 +80,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(driverPodOperations.get).thenReturn(driverPod) - when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr))) - .thenAnswer(executorPodAnswer()) + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), + meq(kubernetesClient))).thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() waitForExecutorPodsClock = new ManualClock(0L) podsAllocatorUnderTest = new ExecutorPodsAllocator( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index ef521fd801e97..bd716174a8271 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -16,147 +16,23 @@ */ package org.apache.spark.scheduler.cluster.k8s -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{Config => _, _} import io.fabric8.kubernetes.client.KubernetesClient -import org.mockito.Mockito.{mock, never, verify} -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.k8s._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils -import org.apache.spark.util.SparkConfWithEnv - -class KubernetesExecutorBuilderSuite extends SparkFunSuite { - private val BASIC_STEP_TYPE = "basic" - private val SECRETS_STEP_TYPE = "mount-secrets" - private val ENV_SECRETS_STEP_TYPE = "env-secrets" - private val LOCAL_DIRS_STEP_TYPE = "local-dirs" - private val HADOOP_CONF_STEP_TYPE = "hadoop-conf-step" - private val HADOOP_SPARK_USER_STEP_TYPE = "hadoop-spark-user" - private val KERBEROS_CONF_STEP_TYPE = "kerberos-step" - private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" - - private val secMgr = new SecurityManager(new SparkConf(false)) - - private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep]) - private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep]) - private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) - private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep]) - private val hadoopConfStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - HADOOP_CONF_STEP_TYPE, classOf[HadoopConfExecutorFeatureStep]) - private val hadoopSparkUser = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - HADOOP_SPARK_USER_STEP_TYPE, classOf[HadoopSparkUserExecutorFeatureStep]) - private val kerberosConf = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - KERBEROS_CONF_STEP_TYPE, classOf[KerberosConfExecutorFeatureStep]) - private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) +import org.apache.spark.internal.config.ConfigEntry - private val builderUnderTest = new KubernetesExecutorBuilder( - (_, _) => basicFeatureStep, - _ => mountSecretsStep, - _ => envSecretsStep, - _ => localDirsStep, - _ => mountVolumesStep, - _ => hadoopConfStep, - _ => kerberosConf, - _ => hadoopSparkUser) - - test("Basic steps are consistently applied.") { - val conf = KubernetesTestConf.createExecutorConf() - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) - } - - test("Apply secrets step if secrets are present.") { - val conf = KubernetesTestConf.createExecutorConf( - secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"), - secretNamesToMountPaths = Map("secret" -> "secretMountPath")) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf, secMgr), - BASIC_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - SECRETS_STEP_TYPE, - ENV_SECRETS_STEP_TYPE) - } +class KubernetesExecutorBuilderSuite extends PodBuilderSuite { - test("Apply volumes step if mounts are present.") { - val volumeSpec = KubernetesVolumeSpec( - "volume", - "/tmp", - "", - false, - KubernetesHostPathVolumeConf("/checkpoint")) - val conf = KubernetesTestConf.createExecutorConf( - volumes = Seq(volumeSpec)) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf, secMgr), - BASIC_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - MOUNT_VOLUMES_STEP_TYPE) + override protected def templateFileConf: ConfigEntry[_] = { + Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE } - test("Apply basicHadoop step if HADOOP_CONF_DIR is defined") { - // HADOOP_DELEGATION_TOKEN - val conf = KubernetesTestConf.createExecutorConf( - sparkConf = new SparkConfWithEnv(Map("HADOOP_CONF_DIR" -> "/var/hadoop-conf")) - .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") - .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf, secMgr), - BASIC_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - HADOOP_CONF_STEP_TYPE, - HADOOP_SPARK_USER_STEP_TYPE) + override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = { + sparkConf.set("spark.driver.host", "https://driver.host.com") + val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf) + val secMgr = new SecurityManager(sparkConf) + new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client) } - test("Apply kerberos step if DT secrets created") { - val conf = KubernetesTestConf.createExecutorConf( - sparkConf = new SparkConf(false) - .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") - .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name") - .set(KERBEROS_SPARK_USER_NAME, "spark-user") - .set(KERBEROS_DT_SECRET_NAME, "dt-secret") - .set(KERBEROS_DT_SECRET_KEY, "dt-key" )) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf, secMgr), - BASIC_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - HADOOP_CONF_STEP_TYPE, - KERBEROS_CONF_STEP_TYPE) - } - - private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = { - assert(resolvedPod.pod.getMetadata.getLabels.asScala.keys.toSet === stepTypes.toSet) - } - - test("Starts with empty executor pod if template is not specified") { - val kubernetesClient = mock(classOf[KubernetesClient]) - val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, new SparkConf()) - verify(kubernetesClient, never()).pods() - } - - test("Starts with executor template if specified") { - val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient() - val sparkConf = new SparkConf(false) - .set("spark.driver.host", "https://driver.host.com") - .set(Config.CONTAINER_IMAGE, "spark-executor:latest") - .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml") - val kubernetesConf = KubernetesTestConf.createExecutorConf( - sparkConf = sparkConf, - driverPod = Some(new PodBuilder() - .withNewMetadata() - .withName("driver") - .endMetadata() - .build())) - val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf) - .buildFromFeatures(kubernetesConf, secMgr) - PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod) - } }