Skip to content

Commit

Permalink
[SPARK-24232][K8S] Add support for secret env vars
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

* Allows to refer a secret as an env var.
* Introduces new config properties in the form: spark.kubernetes{driver,executor}.secretKeyRef.ENV_NAME=name:key
  ENV_NAME is case sensitive.

* Updates docs.
* Adds required unit tests.

## How was this patch tested?
Manually tested and confirmed that the secrets exist in driver's and executor's container env.
Also job finished successfully.
First created a secret with the following yaml:
```
apiVersion: v1
kind: Secret
metadata:
  name: test-secret
data:
  username: c3RhdnJvcwo=
  password: Mzk1MjgkdmRnN0pi

-------

$ echo -n 'stavros' | base64
c3RhdnJvcw==
$ echo -n '39528$vdg7Jb' | base64
MWYyZDFlMmU2N2Rm
```
Run a job as follows:
```./bin/spark-submit \
      --master k8s://http://localhost:9000 \
      --deploy-mode cluster \
      --name spark-pi \
      --class org.apache.spark.examples.SparkPi \
      --conf spark.executor.instances=1 \
      --conf spark.kubernetes.container.image=skonto/spark:k8envs3 \
      --conf spark.kubernetes.driver.secretKeyRef.MY_USERNAME=test-secret:username \
      --conf spark.kubernetes.driver.secretKeyRef.My_password=test-secret:password \
      --conf spark.kubernetes.executor.secretKeyRef.MY_USERNAME=test-secret:username \
      --conf spark.kubernetes.executor.secretKeyRef.My_password=test-secret:password \
      local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 10000
```

Secret loaded correctly at the driver container:
![image](https://user-images.githubusercontent.com/7945591/40174346-7fee70c8-59dd-11e8-8705-995a5472716f.png)

Also if I log into the exec container:

kubectl exec -it spark-pi-1526555613156-exec-1 bash
bash-4.4# env

> SPARK_EXECUTOR_MEMORY=1g
> SPARK_EXECUTOR_CORES=1
> LANG=C.UTF-8
> HOSTNAME=spark-pi-1526555613156-exec-1
> SPARK_APPLICATION_ID=spark-application-1526555618626
> **MY_USERNAME=stavros**
>
> JAVA_HOME=/usr/lib/jvm/java-1.8-openjdk
> KUBERNETES_PORT_443_TCP_PROTO=tcp
> KUBERNETES_PORT_443_TCP_ADDR=10.100.0.1
> JAVA_VERSION=8u151
> KUBERNETES_PORT=tcp://10.100.0.1:443
> PWD=/opt/spark/work-dir
> HOME=/root
> SPARK_LOCAL_DIRS=/var/data/spark-b569b0ae-b7ef-4f91-bcd5-0f55535d3564
> KUBERNETES_SERVICE_PORT_HTTPS=443
> KUBERNETES_PORT_443_TCP_PORT=443
> SPARK_HOME=/opt/spark
> SPARK_DRIVER_URL=spark://CoarseGrainedSchedulerspark-pi-1526555613156-driver-svc.default.svc:7078
> KUBERNETES_PORT_443_TCP=tcp://10.100.0.1:443
> SPARK_EXECUTOR_POD_IP=9.0.9.77
> TERM=xterm
> SPARK_EXECUTOR_ID=1
> SHLVL=1
> KUBERNETES_SERVICE_PORT=443
> SPARK_CONF_DIR=/opt/spark/conf
> PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-1.8-openjdk/jre/bin:/usr/lib/jvm/java-1.8-openjdk/bin
> JAVA_ALPINE_VERSION=8.151.12-r0
> KUBERNETES_SERVICE_HOST=10.100.0.1
> **My_password=39528$vdg7Jb**
> _=/usr/bin/env
>

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21317 from skonto/k8s-fix-env-secrets.
  • Loading branch information
Stavros Kontopoulos authored and foxish committed May 31, 2018
1 parent cc976f6 commit 21e1fc7
Show file tree
Hide file tree
Showing 18 changed files with 222 additions and 12 deletions.
22 changes: 22 additions & 0 deletions docs/running-on-kubernetes.md
Expand Up @@ -140,6 +140,12 @@ namespace as that of the driver and executor pods. For example, to mount a secre
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```

To use a secret through an environment variable use the following options to the `spark-submit` command:
```
--conf spark.kubernetes.driver.secretKeyRef.ENV_NAME=name:key
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
```

## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down Expand Up @@ -602,4 +608,20 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.secretKeyRef.[EnvName]</code></td>
<td>(none)</td>
<td>
Add as an environment variable to the driver container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
<code>spark.kubernetes.driver.secretKeyRef.ENV_VAR=spark-secret:key</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.secretKeyRef.[EnvName]</code></td>
<td>(none)</td>
<td>
Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
<code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
</td>
</tr>
</table>
Expand Up @@ -162,10 +162,12 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."

val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Expand Up @@ -54,6 +54,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -129,6 +130,8 @@ private[spark] object KubernetesConf {
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

Expand All @@ -140,6 +143,7 @@ private[spark] object KubernetesConf {
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
driverEnvs)
}

Expand Down Expand Up @@ -167,8 +171,10 @@ private[spark] object KubernetesConf {
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap

KubernetesConf(
Expand All @@ -178,7 +184,8 @@ private[spark] object KubernetesConf {
appId,
executorLabels,
executorAnnotations,
executorSecrets,
executorMountSecrets,
executorEnvSecrets,
executorEnv)
}
}
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}

private[spark] class EnvSecretsFeatureStep(
kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val addedEnvSecrets = kubernetesConf
.roleSecretEnvNamesToKeyRefs
.map{ case (envName, keyRef) =>
// Keyref parts
val keyRefParts = keyRef.split(":")
require(keyRefParts.size == 2, "SecretKeyRef must be in the form name:key.")
val name = keyRefParts(0)
val key = keyRefParts(1)
new EnvVarBuilder()
.withName(envName)
.withNewValueFrom()
.withNewSecretKeyRef()
.withKey(key)
.withName(name)
.endSecretKeyRef()
.endValueFrom()
.build()
}

val containerWithEnvVars = new ContainerBuilder(pod.container)
.addAllToEnv(addedEnvSecrets.toSeq.asJava)
.build()
SparkPod(pod.pod, containerWithEnvVars)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features._

private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
Expand All @@ -30,6 +30,9 @@ private[spark] class KubernetesDriverBuilder(
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {
Expand All @@ -41,10 +44,14 @@ private[spark] class KubernetesDriverBuilder(
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures

allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
} else allFeatures

var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
Expand Down
Expand Up @@ -17,24 +17,32 @@
package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}

private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_),
provideSecretsStep:
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep:
(KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {

def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures

allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
} else allFeatures

var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
executorPod = feature.configurePod(executorPod)
Expand Down
Expand Up @@ -40,6 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite {
private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
"secret1" -> "/mnt/secrets/secret1",
"secret2" -> "/mnt/secrets/secret2")
private val SECRET_ENV_VARS = Map(
"envName1" -> "name1:key1",
"envName2" -> "name2:key2")
private val CUSTOM_ENVS = Map(
"customEnvKey1" -> "customEnvValue1",
"customEnvKey2" -> "customEnvValue2")
Expand Down Expand Up @@ -103,6 +106,9 @@ class KubernetesConfSuite extends SparkFunSuite {
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$key", value)
}
SECRET_ENV_VARS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX$key", value)
}
CUSTOM_ENVS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_ENV_PREFIX$key", value)
}
Expand All @@ -121,6 +127,7 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
assert(conf.roleEnvs === CUSTOM_ENVS)
}

Expand Down Expand Up @@ -155,6 +162,9 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_ANNOTATION_PREFIX$key", value)
}
SECRET_ENV_VARS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX$key", value)
}
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRETS_PREFIX$key", value)
}
Expand All @@ -170,6 +180,6 @@ class KubernetesConfSuite extends SparkFunSuite {
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
}

}
Expand Up @@ -69,6 +69,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
DRIVER_ENVS)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -138,6 +139,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
Expand Down
Expand Up @@ -87,6 +87,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty))
val executor = step.configurePod(SparkPod.initialPod())

Expand Down Expand Up @@ -124,6 +125,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}
Expand All @@ -142,6 +144,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map("qux" -> "quux")))
val executor = step.configurePod(SparkPod.initialPod())

Expand Down
Expand Up @@ -59,6 +59,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
Expand Down Expand Up @@ -88,6 +89,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)

val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -124,6 +126,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
Expand Down
Expand Up @@ -65,6 +65,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
Expand Down Expand Up @@ -94,6 +95,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
Expand All @@ -113,6 +115,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
Expand Down Expand Up @@ -141,6 +144,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
val driverService = configurationStep
Expand All @@ -166,6 +170,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
fail("The driver bind address should not be allowed.")
Expand All @@ -189,6 +194,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
fail("The driver host address should not be allowed.")
Expand Down

0 comments on commit 21e1fc7

Please sign in to comment.