Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23529][K8s] Support mounting hostPath volumes #21095

Closed
wants to merge 18 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,18 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes."

val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath"
val KUBERNETES_VOLUMES_MOUNT_KEY = "mount"
val KUBERNETES_VOLUMES_PATH_KEY = "path"
val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly"
val KUBERNETES_VOLUMES_OPTIONS_KEY = "options"

val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.LocalObjectReference

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import scala.collection.mutable.Map

private[spark] case class KubernetesVolumeSpec(
var mountPath: Option[String],
var mountReadOnly: Option[Boolean],
var optionsSpec: Map[String, String])

private[spark] object KubernetesVolumeSpec {
def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec(None, None, Map())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import scala.collection.mutable.HashMap

import io.fabric8.kubernetes.api.model._

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._

private[spark] object KubernetesVolumeUtils {

/**
* Given hostPath volume specs, add volume to pod and volume mount to container.
*
* @param pod original specification of the pod
* @param container original specification of the container
* @param sparkConf Spark configuration
* @param prefix the prefix for volume configuration
* @return a tuple of (pod with the volume(s) added, container with mount(s) added)
*/
def addVolumes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scaladoc should not mention hostPath as this function is not hostPath exclusively.

pod: Pod,
container: Container,
sparkConf: SparkConf,
prefix : String): (Pod, Container) = {
val hostPathVolumeSpecs = parseHostPathVolumesWithPrefix(sparkConf, prefix)
addHostPathVolumes(pod, container, hostPathVolumeSpecs)
}

/**
* Extract Spark volume configuration properties with a given name prefix.
*
* @param sparkConf Spark configuration
* @param prefix the given property name prefix
* @param volumeTypeKey the given property name prefix
* @return a Map storing with volume name as key and spec as value
*/
def parseVolumesWithPrefix(
sparkConf: SparkConf,
prefix: String,
volumeTypeKey: String): Map[String, KubernetesVolumeSpec] = {
val volumes = HashMap[String, KubernetesVolumeSpec]()
val properties = sparkConf.getAllWithPrefix(s"$prefix$volumeTypeKey.").toList
// Extract volume names
properties.foreach {
k =>
val keys = k._1.split("\\.")
if (keys.nonEmpty && !volumes.contains(keys(0))) {
volumes.update(keys(0), KubernetesVolumeSpec.emptySpec())
}
}
// Populate spec
volumes.foreach {
case (name, spec) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case line should be merged into the previous line according to the Spark code convention, e.g., volumes.foreach { case (name, spec) =>.

properties.foreach {
k =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

k._1.split("\\.") match {
case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) =>
spec.mountPath = Some(k._2)
case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) =>
spec.mountReadOnly = Some(k._2.toBoolean)
case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) =>
spec.optionsSpec.update(option, k._2)
case _ =>
None
}
}
}
volumes.toMap
}

/**
* Extract Spark hostPath volume configuration properties with a given name prefix and
* return the result as a Map.
*
* @param sparkConf Spark configuration
* @param prefix the given property name prefix
* @return a Map storing with volume name as key and spec as value
*/
def parseHostPathVolumesWithPrefix(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you don't really need this function as it's just a wrapper of parseVolumesWithPrefix .

sparkConf: SparkConf,
prefix: String): Map[String, KubernetesVolumeSpec] = {
parseVolumesWithPrefix(sparkConf, prefix, KUBERNETES_VOLUMES_HOSTPATH_KEY)
}

/**
* Given hostPath volume specs, add volume to pod and volume mount to container.
*
* @param pod original specification of the pod
* @param container original specification of the container
* @param volumes list of named volume specs
* @return a tuple of (pod with the volume(s) added, container with mount(s) added)
*/
def addHostPathVolumes(
pod: Pod,
container: Container,
volumes: Map[String, KubernetesVolumeSpec]): (Pod, Container) = {
val podBuilder = new PodBuilder(pod).editOrNewSpec()
val containerBuilder = new ContainerBuilder(container)
volumes foreach {
case (name, spec) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

var hostPath: Option[String] = None
if (spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) {
hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY))
}
if (hostPath.isDefined && spec.mountPath.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this if block can be combined with the previous if block, e.g., if (spec.mountPath.isDefined && spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) {...}.

podBuilder.addToVolumes(new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath.get))
.withName(name)
.build())
val volumeBuilder = new VolumeMountBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/volumeBuilder/mountBuilder.

.withMountPath(spec.mountPath.get)
.withName(name)
if (spec.mountReadOnly.isDefined) {
containerBuilder
.addToVolumeMounts(volumeBuilder
.withReadOnly(spec.mountReadOnly.get)
.build())
} else {
containerBuilder.addToVolumeMounts(volumeBuilder.build())
}
}
}
(podBuilder.endSpec().build(), containerBuilder.build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.deploy.k8s.features
import scala.collection.JavaConverters._
import scala.collection.mutable

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model._

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -109,7 +109,15 @@ private[spark] class BasicDriverFeatureStep(
.addToImagePullSecrets(conf.imagePullSecrets(): _*)
.endSpec()
.build()
SparkPod(driverPod, driverContainer)

val (driverPodWithVolumes, driverContainerVolumes) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of putting the logic of volume mounting in BasicDriverFeatureStep and BasicExecutorFeatureStep, we should add a new step for mounting volumes, similarly to how we handle secrets, e.g., MountVolumesFeatureStep where the logic of addVolumes should be. This feature step can be used for both the driver and executors.

KubernetesVolumeUtils.addVolumes(
driverPod,
driverContainer,
conf.sparkConf,
KUBERNETES_DRIVER_VOLUMES_PREFIX)

SparkPod(driverPodWithVolumes, driverContainerVolumes)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model._

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
Expand Down Expand Up @@ -170,7 +170,14 @@ private[spark] class BasicExecutorFeatureStep(
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
SparkPod(executorPod, containerWithLimitCores)

val (executorPodWithVolumes, executorContainerWithVolumes) =
KubernetesVolumeUtils.addVolumes(executorPod,
containerWithLimitCores,
kubernetesConf.sparkConf,
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)

SparkPod(executorPodWithVolumes, executorContainerWithVolumes)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
"spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt")
assert(additionalProperties === expectedSparkConf)
}

test("single driver hostPath volume gets mounted") {
hostPathVolumeTest(1, false)
}

test("multiple driver hostPath volumes get mounted") {
hostPathVolumeTest(2, false)
}

test("single driver hostPath volume gets mounted w/ readOnly option") {
hostPathVolumeTest(1, true)
}

test("multiple driver hostPath volumes get mounted w/ readOnly option") {
hostPathVolumeTest(2, true)
}

private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(CONTAINER_IMAGE, "spark-driver:latest")
for (i <- 0 until numVolumes) {
sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.path",
s"/opt/mount$i")
sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.options.path",
s"/tmp/mount$i")
if (readOnly) {
sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.readOnly",
"true")
}
}
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
None,
APP_NAME,
MAIN_CLASS,
APP_ARGS),
RESOURCE_NAME_PREFIX,
APP_ID,
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty)
val step = new BasicDriverFeatureStep(kubernetesConf)
val driver = step.configurePod(SparkPod.initialPod())

assert(driver.container.getVolumeMounts.size() === numVolumes)
assert(driver.pod.getSpec.getVolumes.size() === numVolumes)
for (i <- 0 until numVolumes) {
assert(driver.container.getVolumeMounts.asScala
.exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i")))
assert(driver.pod.getSpec.getVolumes.asScala
.exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i")))
if (readOnly) {
assert(driver.container.getVolumeMounts.get(i).getReadOnly == true)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,62 @@ class BasicExecutorFeatureStepSuite
checkOwnerReferences(executor.pod, DRIVER_POD_UID)
}

test("single executor hostPath volume gets mounted") {
hostPathVolumeTest(1, false)
}

test("multiple executor hostPath volumes get mounted") {
hostPathVolumeTest(2, false)
}

test("single executor hostPath volume gets mounted w/ readOnly option") {
hostPathVolumeTest(1, true)
}

test("multiple executor hostPath volumes get mounted w/ readOnly option") {
hostPathVolumeTest(2, true)
}

private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = {
val conf = baseConf.clone()
for (i <- 0 until numVolumes) {
conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.path",
s"/opt/mount$i")
conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.path",
s"/tmp/mount$i")
if (readOnly) {
conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.readOnly",
"true")
}
}
val step = new BasicExecutorFeatureStep(
KubernetesConf(
conf,
KubernetesExecutorSpecificConf("1", DRIVER_POD),
RESOURCE_NAME_PREFIX,
APP_ID,
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty))
val executor = step.configurePod(SparkPod.initialPod())

assert(executor.container.getImage === EXECUTOR_IMAGE)
assert(executor.container.getVolumeMounts.size() === numVolumes)
assert(executor.pod.getSpec.getVolumes.size() === numVolumes)
for (i <- 0 until numVolumes) {
assert(executor.container.getVolumeMounts.asScala
.exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i")))
assert(executor.pod.getSpec.getVolumes.asScala
.exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i")))
if (readOnly) {
assert(executor.container.getVolumeMounts.get(i).getReadOnly == true)
}
}

checkOwnerReferences(executor.pod, DRIVER_POD_UID)
}

// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down