diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e9e1f3e280609..a4b2b98b0b649 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -140,6 +140,12 @@ namespace as that of the driver and executor pods. For example, to mount a secre
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```
+To use a secret through an environment variable use the following options to the `spark-submit` command:
+```
+--conf spark.kubernetes.driver.secretKeyRef.ENV_NAME=name:key
+--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
+```
+
## Introspection and Debugging
These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
@@ -602,4 +608,20 @@ specific to Spark on Kubernetes.
spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
.
+
+ spark.kubernetes.driver.secretKeyRef.[EnvName] |
+ (none) |
+
+ Add as an environment variable to the driver container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example,
+ spark.kubernetes.driver.secretKeyRef.ENV_VAR=spark-secret:key .
+ |
+
+
+ spark.kubernetes.executor.secretKeyRef.[EnvName] |
+ (none) |
+
+ Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example,
+ spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key .
+ |
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 4086970ffb256..560dedf431b08 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -162,10 +162,12 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
+ val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef."
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
+ val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 77b634ddfabcc..5a944187a7096 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -54,6 +54,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
+ roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String]) {
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
@@ -129,6 +130,8 @@ private[spark] object KubernetesConf {
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
+ val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs(
+ sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
@@ -140,6 +143,7 @@ private[spark] object KubernetesConf {
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
+ driverSecretEnvNamesToKeyRefs,
driverEnvs)
}
@@ -167,8 +171,10 @@ private[spark] object KubernetesConf {
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
- val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
+ val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+ val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
+ sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap
KubernetesConf(
@@ -178,7 +184,8 @@ private[spark] object KubernetesConf {
appId,
executorLabels,
executorAnnotations,
- executorSecrets,
+ executorMountSecrets,
+ executorEnvSecrets,
executorEnv)
}
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala
new file mode 100644
index 0000000000000..03ff7d48420ff
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
+
+private[spark] class EnvSecretsFeatureStep(
+ kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+ extends KubernetesFeatureConfigStep {
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val addedEnvSecrets = kubernetesConf
+ .roleSecretEnvNamesToKeyRefs
+ .map{ case (envName, keyRef) =>
+ // Keyref parts
+ val keyRefParts = keyRef.split(":")
+ require(keyRefParts.size == 2, "SecretKeyRef must be in the form name:key.")
+ val name = keyRefParts(0)
+ val key = keyRefParts(1)
+ new EnvVarBuilder()
+ .withName(envName)
+ .withNewValueFrom()
+ .withNewSecretKeyRef()
+ .withKey(key)
+ .withName(name)
+ .endSecretKeyRef()
+ .endValueFrom()
+ .build()
+ }
+
+ val containerWithEnvVars = new ContainerBuilder(pod.container)
+ .addAllToEnv(addedEnvSecrets.toSeq.asJava)
+ .build()
+ SparkPod(pod.pod, containerWithEnvVars)
+ }
+
+ override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
+
+ override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 10b0154466a3a..fdc5eb0d75832 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features._
private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
@@ -30,6 +30,9 @@ private[spark] class KubernetesDriverBuilder(
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
+ provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+ => EnvSecretsFeatureStep) =
+ new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {
@@ -41,10 +44,14 @@ private[spark] class KubernetesDriverBuilder(
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))
- val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+ var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures
+ allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+ allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
+ } else allFeatures
+
var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index d8f63d57574fb..d5e1de36a58df 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
@@ -25,6 +25,9 @@ private[spark] class KubernetesExecutorBuilder(
provideSecretsStep:
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
new MountSecretsFeatureStep(_),
+ provideEnvSecretsStep:
+ (KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) =
+ new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {
@@ -32,9 +35,14 @@ private[spark] class KubernetesExecutorBuilder(
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
- val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+ var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures
+
+ allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+ allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
+ } else allFeatures
+
var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
executorPod = feature.configurePod(executorPod)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index f10202f7a3546..3d23e1cb90fd2 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -40,6 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite {
private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
"secret1" -> "/mnt/secrets/secret1",
"secret2" -> "/mnt/secrets/secret2")
+ private val SECRET_ENV_VARS = Map(
+ "envName1" -> "name1:key1",
+ "envName2" -> "name2:key2")
private val CUSTOM_ENVS = Map(
"customEnvKey1" -> "customEnvValue1",
"customEnvKey2" -> "customEnvValue2")
@@ -103,6 +106,9 @@ class KubernetesConfSuite extends SparkFunSuite {
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$key", value)
}
+ SECRET_ENV_VARS.foreach { case (key, value) =>
+ sparkConf.set(s"$KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX$key", value)
+ }
CUSTOM_ENVS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_ENV_PREFIX$key", value)
}
@@ -121,6 +127,7 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+ assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
assert(conf.roleEnvs === CUSTOM_ENVS)
}
@@ -155,6 +162,9 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_ANNOTATION_PREFIX$key", value)
}
+ SECRET_ENV_VARS.foreach { case (key, value) =>
+ sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX$key", value)
+ }
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRETS_PREFIX$key", value)
}
@@ -170,6 +180,6 @@ class KubernetesConfSuite extends SparkFunSuite {
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+ assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
}
-
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index eee85b8baa730..b2813d8b3265d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -69,6 +69,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
+ Map.empty,
DRIVER_ENVS)
val featureStep = new BasicDriverFeatureStep(kubernetesConf)
@@ -138,6 +139,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
+ Map.empty,
Map.empty)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index a764f7630b5c8..9182134b3337c 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -87,6 +87,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
+ Map.empty,
Map.empty))
val executor = step.configurePod(SparkPod.initialPod())
@@ -124,6 +125,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
+ Map.empty,
Map.empty))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}
@@ -142,6 +144,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
+ Map.empty,
Map("qux" -> "quux")))
val executor = step.configurePod(SparkPod.initialPod())
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index 9f817d3bfc79a..f81894f8055f1 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -59,6 +59,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
@@ -88,6 +89,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
@@ -124,6 +126,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index c299d56865ec0..f265522a8823a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -65,6 +65,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
@@ -94,6 +95,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
@@ -113,6 +115,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
@@ -141,6 +144,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty),
clock)
val driverService = configurationStep
@@ -166,6 +170,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty),
clock)
fail("The driver bind address should not be allowed.")
@@ -189,6 +194,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty),
clock)
fail("The driver host address should not be allowed.")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
new file mode 100644
index 0000000000000..8b0b2d0739c76
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+
+class EnvSecretsFeatureStepSuite extends SparkFunSuite{
+ private val KEY_REF_NAME_FOO = "foo"
+ private val KEY_REF_NAME_BAR = "bar"
+ private val KEY_REF_KEY_FOO = "key_foo"
+ private val KEY_REF_KEY_BAR = "key_bar"
+ private val ENV_NAME_FOO = "MY_FOO"
+ private val ENV_NAME_BAR = "MY_bar"
+
+ test("sets up all keyRefs") {
+ val baseDriverPod = SparkPod.initialPod()
+ val envVarsToKeys = Map(
+ ENV_NAME_BAR -> s"${KEY_REF_NAME_BAR}:${KEY_REF_KEY_BAR}",
+ ENV_NAME_FOO -> s"${KEY_REF_NAME_FOO}:${KEY_REF_KEY_FOO}")
+ val sparkConf = new SparkConf(false)
+ val kubernetesConf = KubernetesConf(
+ sparkConf,
+ KubernetesExecutorSpecificConf("1", new PodBuilder().build()),
+ "resource-name-prefix",
+ "app-id",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ envVarsToKeys,
+ Map.empty)
+
+ val step = new EnvSecretsFeatureStep(kubernetesConf)
+ val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container
+
+ val expectedVars =
+ Seq(s"${ENV_NAME_BAR}", s"${ENV_NAME_FOO}")
+
+ expectedVars.foreach { envName =>
+ assert(KubernetesFeaturesTestUtils.containerHasEnvVar(driverContainerWithEnvSecrets, envName))
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
index 27bff74ce38af..f90380e30e52a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -16,7 +16,9 @@
*/
package org.apache.spark.deploy.k8s.features
-import io.fabric8.kubernetes.api.model.{HasMetadata, PodBuilder, SecretBuilder}
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Container, HasMetadata, PodBuilder, SecretBuilder}
import org.mockito.Matchers
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
@@ -58,4 +60,7 @@ object KubernetesFeaturesTestUtils {
.build())
}
+ def containerHasEnvVar(container: Container, envVarName: String): Boolean = {
+ container.getEnv.asScala.exists(envVar => envVar.getName == envVarName)
+ }
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index 91e184b84b86e..2542a02d37766 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -43,6 +43,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty)
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index 9d02f56cc206d..9155793774123 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -41,6 +41,7 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
Map.empty,
Map.empty,
secretNamesToMountPaths,
+ Map.empty,
Map.empty)
val step = new MountSecretsFeatureStep(kubernetesConf)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index c1b203e03a357..0775338098a13 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -142,6 +142,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty)
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
when(kubernetesClient.pods()).thenReturn(podOperations)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index a511d254d2175..cb724068ea4f3 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
-import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
class KubernetesDriverBuilderSuite extends SparkFunSuite {
@@ -27,6 +27,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val SERVICE_STEP_TYPE = "service"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val SECRETS_STEP_TYPE = "mount-secrets"
+ private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
@@ -43,12 +44,16 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
+ private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
+
private val builderUnderTest: KubernetesDriverBuilder =
new KubernetesDriverBuilder(
_ => basicFeatureStep,
_ => credentialsStep,
_ => serviceStep,
_ => secretsStep,
+ _ => envSecretsStep,
_ => localDirsStep)
test("Apply fundamental steps all the time.") {
@@ -64,6 +69,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
@@ -86,6 +92,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map("secret" -> "secretMountPath"),
+ Map("EnvName" -> "SecretName:secretKey"),
Map.empty)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
@@ -93,7 +100,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
- SECRETS_STEP_TYPE)
+ SECRETS_STEP_TYPE,
+ ENV_SECRETS_STEP_TYPE
+ )
}
private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index 9ee86b5a423a9..753cd30a237f3 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -20,23 +20,27 @@ import io.fabric8.kubernetes.api.model.PodBuilder
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val BASIC_STEP_TYPE = "basic"
private val SECRETS_STEP_TYPE = "mount-secrets"
+ private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
+ private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
private val builderUnderTest = new KubernetesExecutorBuilder(
_ => basicFeatureStep,
_ => mountSecretsStep,
+ _ => envSecretsStep,
_ => localDirsStep)
test("Basic steps are consistently applied.") {
@@ -49,6 +53,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
+ Map.empty,
Map.empty)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
@@ -64,12 +69,14 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map("secret" -> "secretMountPath"),
+ Map("secret-name" -> "secret-key"),
Map.empty)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
- SECRETS_STEP_TYPE)
+ SECRETS_STEP_TYPE,
+ ENV_SECRETS_STEP_TYPE)
}
private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = {