From 0ac901bea82474b1a472064a458936089111af21 Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Tue, 28 Aug 2018 14:52:07 +0100 Subject: [PATCH 1/5] [SPARK-25262][K8S] Configurability of local dirs on K8S - Skip creating an emptyDir volume if the pod spec already defines an appropriate volume. This change is in preparation for SPARK-24434 changes - Provide the ability to specify that local dirs on K8S should be backed by tmpfs --- .../org/apache/spark/deploy/k8s/Config.scala | 9 +++++++++ .../k8s/features/LocalDirsFeatureStep.scala | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1b582fe53624a..5a8fe3ad4ba9d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -225,6 +225,15 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val KUBERNETES_LOCAL_DIRS_TMPFS = + ConfigBuilder("spark.kubernetes.local.dirs.tmpfs") + .doc("If set to true then emptyDir volumes created to back spark.local.dirs will have their " + + "medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed volumes. " + + "This may improve performance but scratch space usage will count towards your pods memory " + + "limit so you may wish to request more memory.") + .booleanConf + .createWithDefault(false) + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 70b307303d149..86ee306eeb9e9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -19,8 +19,11 @@ package org.apache.spark.deploy.k8s.features import java.nio.file.Paths import java.util.UUID +import collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} +import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} private[spark] class LocalDirsFeatureStep( @@ -37,17 +40,28 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs .zipWithIndex + // To allow customisation of local dirs backing volumes we should avoid creating + // emptyDir volumes if the volume is already defined in the pod spec + .filter { + case (localDir, index) => !hasVolume(pod, s"spark-local-dir-${index + 1}") + } .map { case (localDir, index) => new VolumeBuilder() .withName(s"spark-local-dir-${index + 1}") .withNewEmptyDir() + .withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null // Default - use nodes backing storage + }) .endEmptyDir() .build() } + val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => @@ -74,4 +88,8 @@ private[spark] class LocalDirsFeatureStep( override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty + + def hasVolume(pod: SparkPod, name: String): Boolean = { + pod.pod.getSpec().getVolumes().asScala.exists(v => v.getName.equals(name)) + } } From 6bddc63a61f45db724986b1d3269d4304aad609e Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Tue, 28 Aug 2018 16:44:23 +0100 Subject: [PATCH 2/5] [SPARK-25262][K8S] Tests for local dir configurability Adds unit tests to the LocalDirsFeatureStepSuite to account for changes made to make local dirs configurable. Also adjusts parts of the logic in LocalDirsFeatureStep to make sure that we appropriately mount pre-defined volumes. --- .../k8s/features/LocalDirsFeatureStep.scala | 37 ++++++---- .../features/LocalDirsFeatureStepSuite.scala | 71 ++++++++++++++++++- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 86ee306eeb9e9..5996ce2c85256 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -45,21 +45,26 @@ private[spark] class LocalDirsFeatureStep( override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs .zipWithIndex - // To allow customisation of local dirs backing volumes we should avoid creating - // emptyDir volumes if the volume is already defined in the pod spec - .filter { - case (localDir, index) => !hasVolume(pod, s"spark-local-dir-${index + 1}") - } .map { case (localDir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .withMedium(useLocalDirTmpFs match { - case true => "Memory" // Use tmpfs - case false => null // Default - use nodes backing storage - }) - .endEmptyDir() - .build() + val name = s"spark-local-dir-${index + 1}" + // To allow customisation of local dirs backing volumes we should avoid creating + // emptyDir volumes if the volume is already defined in the pod spec + hasVolume(pod, name) match { + case true => + // For pre-existing volume definitions just re-use the volume + pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get + case false => + // Create new emptyDir volume + new VolumeBuilder() + .withName(name) + .withNewEmptyDir() + .withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null // Default - use nodes backing storage + }) + .endEmptyDir() + .build() + } } val localDirVolumeMounts = localDirVolumes @@ -72,7 +77,9 @@ private[spark] class LocalDirsFeatureStep( } val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes: _*) + // Don't want to re-add volume mounts that already existed in the incoming spec + // as duplicate definitions will lead to K8S API errors + .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) .endSpec() .build() val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index a339827b819a9..3b122efb15355 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -16,11 +16,12 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} import org.mockito.Mockito import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { @@ -111,4 +112,72 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2") .build()) } + + test("Use predefined volume for local dirs") { + Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") + Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") + val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val initialPod = + new PodBuilder(SparkPod.initialPod().pod) + .editSpec() + .addToVolumes(new VolumeBuilder() + .withName("spark-local-dir-1") + .withNewConfigMap() + .withName("foo") + .endConfigMap() + .build()) + .endSpec() + .build() + + val configuredPod = stepUnderTest.configurePod(new SparkPod(initialPod, + new ContainerBuilder().build())) + assert(configuredPod.pod.getSpec.getVolumes.size === 1) + assert(configuredPod.pod.getSpec.getVolumes.get(0) === + new VolumeBuilder() + .withName(s"spark-local-dir-1") + .withNewConfigMap() + .withName("foo") + .endConfigMap() + .build()) + assert(configuredPod.container.getVolumeMounts.size === 1) + assert(configuredPod.container.getVolumeMounts.get(0) === + new VolumeMountBuilder() + .withName(s"spark-local-dir-1") + .withMountPath(defaultLocalDir) + .build()) + assert(configuredPod.container.getEnv.size === 1) + assert(configuredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue(defaultLocalDir) + .build()) + } + + test("Use tmpfs to back default local dir") { + Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") + Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") + Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS) + val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod()) + assert(configuredPod.pod.getSpec.getVolumes.size === 1) + assert(configuredPod.pod.getSpec.getVolumes.get(0) === + new VolumeBuilder() + .withName(s"spark-local-dir-1") + .withNewEmptyDir() + .withMedium("Memory") + .endEmptyDir() + .build()) + assert(configuredPod.container.getVolumeMounts.size === 1) + assert(configuredPod.container.getVolumeMounts.get(0) === + new VolumeMountBuilder() + .withName(s"spark-local-dir-1") + .withMountPath(defaultLocalDir) + .build()) + assert(configuredPod.container.getEnv.size === 1) + assert(configuredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue(defaultLocalDir) + .build()) + } } From 70338f1c3133b6be732248dbe27402d28e7739f6 Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Wed, 29 Aug 2018 12:13:52 +0100 Subject: [PATCH 3/5] [SPARK-25262][K8S] Handle local dirs corner cases - Avoid creating volume mounts if pod template already defines them - Refuse to create pod if template has conflicting volume mount definitions present - Unit tests for the above - Scalastyle corrections --- .../org/apache/spark/deploy/k8s/Config.scala | 8 +- .../k8s/features/LocalDirsFeatureStep.scala | 54 ++++++++++--- .../features/LocalDirsFeatureStepSuite.scala | 78 ++++++++++++++++++- 3 files changed, 126 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 5a8fe3ad4ba9d..cc8da906f976d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -227,10 +227,10 @@ private[spark] object Config extends Logging { val KUBERNETES_LOCAL_DIRS_TMPFS = ConfigBuilder("spark.kubernetes.local.dirs.tmpfs") - .doc("If set to true then emptyDir volumes created to back spark.local.dirs will have their " + - "medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed volumes. " + - "This may improve performance but scratch space usage will count towards your pods memory " + - "limit so you may wish to request more memory.") + .doc("If set to true then emptyDir volumes created to back spark.local.dirs will have " + + "their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " + + "volumes. This may improve performance but scratch space usage will count towards " + + "your pods memory limit so you may wish to request more memory.") .booleanConf .createWithDefault(false) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 5996ce2c85256..31ba97ea562bd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -20,11 +20,11 @@ import java.nio.file.Paths import java.util.UUID import collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMount, VolumeMountBuilder} -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} - -import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.SparkException import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ private[spark] class LocalDirsFeatureStep( conf: KubernetesConf[_ <: KubernetesRoleSpecificConf], @@ -70,14 +70,34 @@ private[spark] class LocalDirsFeatureStep( val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() + hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case true => + // For pre-existing volume mounts just re-use the mount + pod.container.getVolumeMounts().asScala + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get + case false => + // Create new volume mount + new VolumeMountBuilder() + .withName (localDirVolume.getName) + .withMountPath (localDirPath) + .build() + } } + + // Check for conflicting volume mounts + for (m: VolumeMount <- localDirVolumeMounts) { + if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) { + throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " + + "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " + + "then the expected ${m.getPath}") + } + } + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - // Don't want to re-add volume mounts that already existed in the incoming spec + // Don't want to re-add volumes that already existed in the incoming spec // as duplicate definitions will lead to K8S API errors .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) .endSpec() @@ -87,7 +107,10 @@ private[spark] class LocalDirsFeatureStep( .withName("SPARK_LOCAL_DIRS") .withValue(resolvedLocalDirs.mkString(",")) .endEnv() - .addToVolumeMounts(localDirVolumeMounts: _*) + // Don't want to re-add volume mounts that already existed in the incoming spec + // as duplicate definitions will lead to K8S API errors + .addToVolumeMounts(localDirVolumeMounts + .filter(m => !hasVolumeMount(pod, m.getName, m.getMountPath)): _*) .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } @@ -99,4 +122,17 @@ private[spark] class LocalDirsFeatureStep( def hasVolume(pod: SparkPod, name: String): Boolean = { pod.pod.getSpec().getVolumes().asScala.exists(v => v.getName.equals(name)) } + + def hasVolumeMount(pod: SparkPod, name: String, path: String): Boolean = { + pod.container.getVolumeMounts().asScala + .exists(m => m.getName.equals(name) && m.getMountPath.equals(path)) + } + + def hasConflictingVolumeMount(pod: SparkPod, name: String, path: String): Seq[VolumeMount] = { + // A volume mount is considered conflicting if it matches one, and only one of, the name/path + // of a volume mount we are creating + pod.container.getVolumeMounts().asScala + .filter(m => (m.getName.equals(name) && !m.getMountPath.equals(path)) || + (m.getMountPath.equals(path) && !m.getName.equals(name))) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 3b122efb15355..10286df24125b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -18,9 +18,11 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} import org.mockito.Mockito +import org.scalatest._ import org.scalatest.BeforeAndAfter +import Matchers._ -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} @@ -153,6 +155,80 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .build()) } + test("Use predefined volume and mount for local dirs") { + Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") + Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") + val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val initialPod = + new PodBuilder(SparkPod.initialPod().pod) + .editSpec() + .addToVolumes(new VolumeBuilder() + .withName("spark-local-dir-1") + .withNewConfigMap() + .withName("foo") + .endConfigMap() + .build()) + .endSpec() + .build() + val initialContainer = + new ContainerBuilder() + .withVolumeMounts(new VolumeMountBuilder() + .withName("spark-local-dir-1") + .withMountPath(defaultLocalDir) + .build()) + .build() + + val configuredPod = stepUnderTest.configurePod(new SparkPod(initialPod, + initialContainer)) + assert(configuredPod.pod.getSpec.getVolumes.size === 1) + assert(configuredPod.pod.getSpec.getVolumes.get(0) === + new VolumeBuilder() + .withName(s"spark-local-dir-1") + .withNewConfigMap() + .withName("foo") + .endConfigMap() + .build()) + assert(configuredPod.container.getVolumeMounts.size === 1) + assert(configuredPod.container.getVolumeMounts.get(0) === + new VolumeMountBuilder() + .withName(s"spark-local-dir-1") + .withMountPath(defaultLocalDir) + .build()) + assert(configuredPod.container.getEnv.size === 1) + assert(configuredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue(defaultLocalDir) + .build()) + } + + test("Using conflicting volume mount for local dirs should error") { + Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") + Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") + val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val initialPod = + new PodBuilder(SparkPod.initialPod().pod) + .editSpec() + .addToVolumes(new VolumeBuilder() + .withName("spark-local-dir-1") + .withNewConfigMap() + .withName("foo") + .endConfigMap() + .build()) + .endSpec() + .build() + val initialContainer = + new ContainerBuilder() + .withVolumeMounts(new VolumeMountBuilder() + .withName("spark-local-dir-1") + .withMountPath("/bad/path") + .build()) + .build() + + an [SparkException] should be thrownBy stepUnderTest.configurePod(new SparkPod(initialPod, + initialContainer)) + } + test("Use tmpfs to back default local dir") { Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") From 8762ac1edb3bd8b9a645ef59f900b9fa2a5a27d6 Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Wed, 29 Aug 2018 17:21:22 +0100 Subject: [PATCH 4/5] [SPARK-25262][K8S] Document K8S local storage config Adds documentation of how K8S uses local storage and how to configure it for different environments. --- docs/running-on-kubernetes.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c83dad6df1e7b..238d725954e8f 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -215,6 +215,22 @@ spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.clai The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. +## Local Storage + +Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. + +`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. + +### Using RAM for local storage + +As `emptyDir` volumes use the nodes backing storage for ephemeral storage this default behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network having lots of executors doing IO to this remote storage may actually degrade performance. + +In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests via the normal `spark.driver.memory` and `spark.executor.memory` configuration properties. + +### Using arbitrary volumes for local storage + +Alternatively if using the pod template feature you can provide a volume named `spark-local-dirs-N`, where N is a 1 based index to the entires in your `SPARK_LOCAL_DIRS` variable, in your specification and that will be used for local storage. This enables you to use a volume type that is appropriate to your compute environment. + ## Introspection and Debugging These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and From 90a5ae5e8529e546b14fd9e7f9ae8a21b648ed8e Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Thu, 30 Aug 2018 18:17:13 +0100 Subject: [PATCH 5/5] [SPARK-25262][K8S] Fix test scala style issues --- .../spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 10286df24125b..d9c66f4a3f325 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} import org.mockito.Mockito import org.scalatest._ -import org.scalatest.BeforeAndAfter import Matchers._ +import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} -import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir"