Skip to content

Commit

Permalink
[SPARK-25877][K8S] Move all feature logic to feature classes.
Browse files Browse the repository at this point in the history
This change makes the driver and executor builders a lot simpler
by encapsulating almost all feature logic into the respective
feature classes. The only logic that remains is the creation of
the initial pod, which needs to happen before anything else so
is better to be left in the builder class.

Most feature classes already behave fine when the config has nothing
they should handle, but a few minor tweaks had to be added. Unit
tests were also updated or added to account for these.

The builder suites were simplified a lot and just test the remaining
pod-related code in the builders themselves.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #23220 from vanzin/SPARK-25877.
  • Loading branch information
Marcelo Vanzin authored and mccheah committed Dec 12, 2018
1 parent 570b8f3 commit a63e7b2
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 676 deletions.
Expand Up @@ -31,10 +31,10 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf)

override def configurePod(pod: SparkPod): SparkPod = {
val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
require(hadoopConfDirCMapName.isDefined,
"Ensure that the env `HADOOP_CONF_DIR` is defined either in the client or " +
" using pre-existing ConfigMaps")
logInfo("HADOOP_CONF_DIR defined")
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
if (hadoopConfDirCMapName.isDefined) {
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
} else {
pod
}
}
}
Expand Up @@ -28,7 +28,8 @@ private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutor
extends KubernetesFeatureConfigStep {

override def configurePod(pod: SparkPod): SparkPod = {
val sparkUserName = conf.get(KERBEROS_SPARK_USER_NAME)
HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod)
conf.getOption(KERBEROS_SPARK_USER_NAME).map { user =>
HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod)
}.getOrElse(pod)
}
}
Expand Up @@ -27,18 +27,20 @@ import org.apache.spark.internal.Logging
private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf)
extends KubernetesFeatureConfigStep with Logging {

private val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found")

override def configurePod(pod: SparkPod): SparkPod = {
logInfo(s"Mounting Resources for Kerberos")
HadoopBootstrapUtil.bootstrapKerberosPod(
conf.get(KERBEROS_DT_SECRET_NAME),
conf.get(KERBEROS_DT_SECRET_KEY),
conf.get(KERBEROS_SPARK_USER_NAME),
None,
None,
maybeKrb5CMap,
pod)
val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
if (maybeKrb5CMap.isDefined) {
logInfo(s"Mounting Resources for Kerberos")
HadoopBootstrapUtil.bootstrapKerberosPod(
conf.get(KERBEROS_DT_SECRET_NAME),
conf.get(KERBEROS_DT_SECRET_KEY),
conf.get(KERBEROS_SPARK_USER_NAME),
None,
None,
maybeKrb5CMap,
pod)
} else {
pod
}
}
}
Expand Up @@ -28,44 +28,60 @@ import org.apache.spark.deploy.k8s.Constants._

private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {

private val hasTemplate = conf.contains(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)

def configurePod(pod: SparkPod): SparkPod = {
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolume()
.withName(POD_TEMPLATE_VOLUME)
.withNewConfigMap()
.withName(POD_TEMPLATE_CONFIGMAP)
.addNewItem()
.withKey(POD_TEMPLATE_KEY)
.withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.endSpec()
.build()
if (hasTemplate) {
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolume()
.withName(POD_TEMPLATE_VOLUME)
.withNewConfigMap()
.withName(POD_TEMPLATE_CONFIGMAP)
.addNewItem()
.withKey(POD_TEMPLATE_KEY)
.withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.endSpec()
.build()

val containerWithVolume = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(POD_TEMPLATE_VOLUME)
.withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
.endVolumeMount()
.build()
SparkPod(podWithVolume, containerWithVolume)
val containerWithVolume = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(POD_TEMPLATE_VOLUME)
.withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
.endVolumeMount()
.build()
SparkPod(podWithVolume, containerWithVolume)
} else {
pod
}
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String](
KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
override def getAdditionalPodSystemProperties(): Map[String, String] = {
if (hasTemplate) {
Map[String, String](
KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
} else {
Map.empty
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(POD_TEMPLATE_CONFIGMAP)
.endMetadata()
.addToData(POD_TEMPLATE_KEY, podTemplateString)
.build())
if (hasTemplate) {
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(POD_TEMPLATE_CONFIGMAP)
.endMetadata()
.addToData(POD_TEMPLATE_KEY, podTemplateString)
.build())
} else {
Nil
}
}
}
Expand Up @@ -104,7 +104,7 @@ private[spark] class Client(
watcher: LoggingPodStatusWatcher) extends Logging {

def run(): Unit = {
val resolvedDriverSpec = builder.buildFromFeatures(conf)
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
Expand Down Expand Up @@ -232,7 +232,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
None)) { kubernetesClient =>
val client = new Client(
kubernetesConf,
KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
new KubernetesDriverBuilder(),
kubernetesClient,
waitForAppCompletion,
watcher)
Expand Down
Expand Up @@ -20,90 +20,49 @@ import java.io.File

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._

private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) =
new BasicDriverFeatureStep(_),
provideCredentialsStep: (KubernetesDriverConf => DriverKubernetesCredentialsFeatureStep) =
new DriverKubernetesCredentialsFeatureStep(_),
provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) =
new DriverServiceFeatureStep(_),
provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
new LocalDirsFeatureStep(_),
provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
new MountVolumesFeatureStep(_),
provideDriverCommandStep: (KubernetesDriverConf => DriverCommandFeatureStep) =
new DriverCommandFeatureStep(_),
provideHadoopGlobalStep: (KubernetesDriverConf => KerberosConfDriverFeatureStep) =
new KerberosConfDriverFeatureStep(_),
providePodTemplateConfigMapStep: (KubernetesConf => PodTemplateConfigMapStep) =
new PodTemplateConfigMapStep(_),
provideInitialPod: () => SparkPod = () => SparkPod.initialPod) {
private[spark] class KubernetesDriverBuilder {

def buildFromFeatures(kubernetesConf: KubernetesDriverConf): KubernetesDriverSpec = {
val baseFeatures = Seq(
provideBasicStep(kubernetesConf),
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))

val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf))
} else Nil
val envSecretFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
Seq(provideEnvSecretsStep(kubernetesConf))
} else Nil
val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val podTemplateFeature = if (
kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
Seq(providePodTemplateConfigMapStep(kubernetesConf))
} else Nil

val driverCommandStep = provideDriverCommandStep(kubernetesConf)

val hadoopConfigStep = Some(provideHadoopGlobalStep(kubernetesConf))
def buildFromFeatures(
conf: KubernetesDriverConf,
client: KubernetesClient): KubernetesDriverSpec = {
val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
.map { file =>
KubernetesUtils.loadPodFromTemplate(
client,
new File(file),
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
}
.getOrElse(SparkPod.initialPod())

val allFeatures: Seq[KubernetesFeatureConfigStep] =
baseFeatures ++ Seq(driverCommandStep) ++
secretFeature ++ envSecretFeature ++ volumesFeature ++
hadoopConfigStep ++ podTemplateFeature
val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))

var spec = KubernetesDriverSpec(
provideInitialPod(),
val spec = KubernetesDriverSpec(
initialPod,
driverKubernetesResources = Seq.empty,
kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
conf.sparkConf.getAll.toMap)

features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedResources = feature.getAdditionalKubernetesResources()
spec = KubernetesDriverSpec(
KubernetesDriverSpec(
configuredPod,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
spec
}
}

private[spark] object KubernetesDriverBuilder {
def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = {
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
.map(new File(_))
.map(file => new KubernetesDriverBuilder(provideInitialPod = () =>
KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
file,
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
))
.getOrElse(new KubernetesDriverBuilder())
}
}
Expand Up @@ -136,7 +136,8 @@ private[spark] class ExecutorPodsAllocator(
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
Expand Down
Expand Up @@ -95,7 +95,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorPodsAllocator = new ExecutorPodsAllocator(
sc.conf,
sc.env.securityManager,
KubernetesExecutorBuilder(kubernetesClient, sc.conf),
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
new SystemClock())
Expand Down

0 comments on commit a63e7b2

Please sign in to comment.