Skip to content

Commit

Permalink
[SPARK-31394][K8S] Adds support for Kubernetes NFS volume mounts
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR (SPARK-31394) aims to add a new feature that enables mounting of Kubernetes NFS volumes. Most of the codes are just slight modifications from the existing codes for EmptyDir/HostDir/PVC support.

### Why are the changes needed?
Kubernetes supports various kinds of volumes, but Spark for Kubernetes supports only EmptyDir/HostDir/PVC. By adding support for NFS, we can use Spark for Kubernetes with NFS storage.

In order to use NFS with the current Spark using PVC, the user needs to first create an empty new PVC with NFS. Kubernetes' NFS provisioner will create a new empty dir in NFS under some pre-configured dir for this PVC, for example, `/nfs/k8s/sjcho-my-notebook-pvc-dce84888-7a9d-11e6-b1ee-5254001e0c1b`. Then the user should add files to process in the newly created PVC using some file-copying job, and then run the desired Spark job using that populated PVC. And then to get the final results out, the user should run another file-copying job.

This in theory works, but for data analysis tasks, is quite cumbersome. With this change, one could simply use existing files in NFS, say `/nfs/home/sjcho/myfiles10.sstable` from the Spark job directly, and also write the results directly to some existing dir under NFS such as `/nfs/home/sjcho/output`.

This PR doesn't use any features other than the features already provided by Kubernetes itself, so there should be no compatibility issues (other than limited by k8s) between the wide variety of NFS choices. This PR merely enables an existing volume type `nfs` supported officially by Kubernetes, just like Spark is currently supporting `hostPath` and `persistentVolumeClaim` right now.

### Does this PR introduce any user-facing change?
Users can now mount NFS volumes by running commands like:
```
spark-submit \
--conf spark.kubernetes.driver.volumes.nfs.myshare.mount.path=/myshare \
--conf spark.kubernetes.driver.volumes.nfs.myshare.mount.readOnly=false \
--conf spark.kubernetes.driver.volumes.nfs.myshare.options.server=nfs.example.com \
--conf spark.kubernetes.driver.volumes.nfs.myshare.options.path=/storage/myshare \
...
```

### How was this patch tested?
Test cases were added just like the existing EmptyDir support.

The code were tested using minikube using the following script:
https://gist.github.com/w4-sjcho/4ba48f8c35a9685f5307fbd46b2c0656#file-run-test-sh

The script creates a new minikube cluster, launches an NFS server inside the cluster, copy `README.md` file to the NFS share, and run `JavaWordCount` example against the file located in NFS.

Closes #27364 from w4-sjcho/master.

Authored-by: Seongjin Cho <sjcho@wisefour.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
Seongjin Cho authored and dongjoon-hyun committed Apr 15, 2020
1 parent 744c248 commit 7699f76
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 0 deletions.
Expand Up @@ -432,13 +432,15 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
val KUBERNETES_VOLUMES_NFS_TYPE = "nfs"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"

val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Expand Up @@ -29,6 +29,11 @@ private[spark] case class KubernetesEmptyDirVolumeConf(
sizeLimit: Option[String])
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesNFSVolumeConf(
path: String,
server: String)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesVolumeSpec(
volumeName: String,
mountPath: String,
Expand Down
Expand Up @@ -78,6 +78,13 @@ private[spark] object KubernetesVolumeUtils {
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey))

case KUBERNETES_VOLUMES_NFS_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
val serverKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY"
KubernetesNFSVolumeConf(
options(pathKey),
options(serverKey))

case _ =>
throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported")
}
Expand Down
Expand Up @@ -66,6 +66,10 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.withEmptyDir(
new EmptyDirVolumeSource(medium.getOrElse(""),
sizeLimit.map(new Quantity(_)).orNull))

case KubernetesNFSVolumeConf(path, server) =>
new VolumeBuilder()
.withNfs(new NFSVolumeSource(path, null, server))
}

val volume = volumeBuilder.withName(spec.volumeName).build()
Expand Down
Expand Up @@ -120,6 +120,11 @@ object KubernetesTestConf {
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
(KUBERNETES_VOLUMES_EMPTYDIR_TYPE, mconf ++ lconf)

case KubernetesNFSVolumeConf(path, server) =>
(KUBERNETES_VOLUMES_NFS_TYPE, Map(
KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path,
KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY -> server))
}

conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_PATH_KEY), spec.mountPath)
Expand Down
Expand Up @@ -117,4 +117,58 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
}
assert(e.getMessage.contains("hostPath.volumeName.options.path"))
}

test("Parses read-only nfs volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.nfs.volumeName.mount.path", "/path")
sparkConf.set("test.nfs.volumeName.mount.readOnly", "true")
sparkConf.set("test.nfs.volumeName.options.path", "/share")
sparkConf.set("test.nfs.volumeName.options.server", "nfs.example.com")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly === true)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesNFSVolumeConf] ===
KubernetesNFSVolumeConf("/share", "nfs.example.com"))
}

test("Parses read/write nfs volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.nfs.volumeName.mount.path", "/path")
sparkConf.set("test.nfs.volumeName.mount.readOnly", "false")
sparkConf.set("test.nfs.volumeName.options.path", "/share")
sparkConf.set("test.nfs.volumeName.options.server", "nfs.example.com")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly === false)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesNFSVolumeConf] ===
KubernetesNFSVolumeConf("/share", "nfs.example.com"))
}

test("Fails on missing path option") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.nfs.volumeName.mount.path", "/path")
sparkConf.set("test.nfs.volumeName.mount.readOnly", "true")
sparkConf.set("test.nfs.volumeName.options.server", "nfs.example.com")

val e = intercept[NoSuchElementException] {
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
}
assert(e.getMessage.contains("nfs.volumeName.options.path"))
}

test("Fails on missing server option") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.nfs.volumeName.mount.path", "/path")
sparkConf.set("test.nfs.volumeName.mount.readOnly", "true")
sparkConf.set("test.nfs.volumeName.options.path", "/share")

val e = intercept[NoSuchElementException] {
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
}
assert(e.getMessage.contains("nfs.volumeName.options.server"))
}
}
Expand Up @@ -109,6 +109,50 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false)
}

test("Mounts read/write nfs volumes") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
false,
KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0).getNfs.getPath === "/share/name")
assert(configuredPod.pod.getSpec.getVolumes.get(0).getNfs.getReadOnly === null)
assert(configuredPod.pod.getSpec.getVolumes.get(0).getNfs.getServer === "nfs.example.com")
assert(configuredPod.container.getVolumeMounts.size() === 1)
assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false)
}

test("Mounts read-only nfs volumes") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesNFSVolumeConf("/share/name", "nfs.example.com")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0).getNfs.getPath === "/share/name")
assert(configuredPod.pod.getSpec.getVolumes.get(0).getNfs.getReadOnly === null)
assert(configuredPod.pod.getSpec.getVolumes.get(0).getNfs.getServer === "nfs.example.com")
assert(configuredPod.container.getVolumeMounts.size() === 1)
assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === true)
}

test("Mounts multiple volumes") {
val hpVolumeConf = KubernetesVolumeSpec(
"hpVolume",
Expand Down

0 comments on commit 7699f76

Please sign in to comment.