Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.submission.oauthTokenProvider</code></td>
<td>(none)</td>
<td>
The name of the class that implements OAuthTokenProvider interface to provide an OAuth token refresh mechanism for
long-running jobs. The class will be used to fetch the OAuth token to use when authenticating against the Kubernetes
API server when starting the driver. The class must be in a driver's classpath.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.caCertFile</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -776,6 +786,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.oauthTokenProvider</code></td>
<td>(none)</td>
<td>
The name of the class that implements OAuthTokenProvider interface to provide an OAuth token refresh mechanism for
long running jobs. The class will be used to fetch the OAuth token to use when authenticating against the Kubernetes
API server from the driver pod when requesting executors. The class must be in a driver's classpath.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.mounted.caCertFile</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -885,6 +905,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.oauthTokenProvider</code></td>
<td>(none)</td>
<td>
In client mode, the name of a class that implements OAuthTokenProvider interface to provide an OAuth token refresh
mechanism for long running jobs. The class must be in a driver's classpath.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.label.[LabelName]</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val OAUTH_TOKEN_PROVIDER_CONF_SUFFIX = "oauthTokenProvider"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder, OAuthTokenProvider}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
Expand All @@ -56,11 +56,18 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
.map(new File(_))
.orElse(defaultServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
KubernetesUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +
s"value $oauthTokenConf.")
val oauthTokenProviderConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_PROVIDER_CONF_SUFFIX"
val oauthTokenProvider = sparkConf.getOption(oauthTokenProviderConf)
.map(Utils.classForName(_)
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[OAuthTokenProvider])

require(
Seq(oauthTokenFile, oauthTokenValue, oauthTokenProvider).count(_.isDefined) <= 1,
s"OAuth token should be specified via only one of $oauthTokenFileConf, $oauthTokenConf " +
s"or $oauthTokenProviderConf."
)

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
Expand Down Expand Up @@ -94,7 +101,9 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
.withRequestTimeout(clientType.requestTimeout(sparkConf))
.withConnectionTimeout(clientType.connectionTimeout(sparkConf))
.withTrustCerts(sparkConf.get(KUBERNETES_TRUST_CERTIFICATES))
.withOption(oauthTokenValue) {
.withOption(oauthTokenProvider) {
(provider, configBuilder) => configBuilder.withOauthTokenProvider(provider)
}.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
Expand Down