Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.submission.oauthTokenProvider</code></td>
<td>(none)</td>
<td>
The name of the class that implements OAuthTokenProvider interface to provide an OAuth token refresh mechanism for
long-running jobs. The class will be used to fetch the OAuth token to use when authenticating against the Kubernetes
API server when starting the driver. The class must be in a driver's classpath.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.caCertFile</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -725,6 +735,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.oauthTokenProvider</code></td>
<td>(none)</td>
<td>
The name of the class that implements OAuthTokenProvider interface to provide an OAuth token refresh mechanism for
long running jobs. The class will be used to fetch the OAuth token to use when authenticating against the Kubernetes
API server from the driver pod when requesting executors. The class must be in a driver's classpath.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.driver.mounted.caCertFile</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -834,6 +854,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.oauthTokenProvider</code></td>
<td>(none)</td>
<td>
In client mode, the name of a class that implements OAuthTokenProvider interface to provide an OAuth token refresh
mechanism for long running jobs. The class must be in a driver's classpath.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.label.[LabelName]</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val OAUTH_TOKEN_PROVIDER_CONF_SUFFIX = "oauthTokenProvider"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, OAuthTokenProvider}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
Expand All @@ -56,11 +56,18 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
.map(new File(_))
.orElse(defaultServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
KubernetesUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +
s"value $oauthTokenConf.")
val oauthTokenProviderConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_PROVIDER_CONF_SUFFIX"
val oauthTokenProviderInstance = sparkConf.getOption(oauthTokenProviderConf)
.map(Utils.classForName(_)
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[OAuthTokenProvider])

require(
Seq(oauthTokenFile, oauthTokenValue, oauthTokenProviderInstance).count(_.isDefined) <= 1,
s"OAuth token should be specified via only one of $oauthTokenFileConf, $oauthTokenConf " +
s"or $oauthTokenProviderConf."
)

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
Expand Down Expand Up @@ -94,7 +101,9 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
.withRequestTimeout(clientType.requestTimeout(sparkConf))
.withConnectionTimeout(clientType.connectionTimeout(sparkConf))
.withTrustCerts(sparkConf.get(KUBERNETES_TRUST_CERTIFICATES))
.withOption(oauthTokenValue) {
.withOption(oauthTokenProviderInstance) {
(provider, configBuilder) => configBuilder.withOauthTokenProvider(provider)
}.withOption(oauthTokenValue) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, if we have two configurations at the same time mistakingly, we invoke OAuthTokenProvider and override it with oauthTokenValue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check what happens when we call configBuilder.withOauthTokenProvider(provider) and configBuilder.withOauthToken(token) together?

Copy link
Author

@haodemon haodemon Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token that was specified in configBuilder.withOauthToken(token) will stay unused, and the class that implements OAuthTokenProvider will be used instead. I confirmed this via tests on my cluster by simultaneously:

1 . Adding a debugging log into my class and providing it via

--conf spark.kubernetes.client.oauth.token.provider.class <myClass>
  1. Specifying tokens via
--conf spark.kubernetes.authenticate.submission.oauthToken=<token>
--conf spark.kubernetes.authenticate.driver.oauthToken=<token> 
--conf spark.kubernetes.authenticate.oauthToken=<token>

My log:

[2021-08-11 12:51:58,012] DEBUG (OAuthGoogleTokenProvider.java:62) - Refreshing kubernetes oauth token via [/usr/lib/google-cloud-sdk/bin/gcloud, config, config-helper, --format=json]
[2021-08-11 12:51:58,498] DEBUG (OAuthGoogleTokenProvider.java:73) - New token expiry time is 3511s

From the log it could be concluded that OAuthTokenProvider has a higher precedence over the token that was specified in configBuilder.withOauthToken(token). It could also be confirmed by looking at the fabric's code in
https://github.com/fabric8io/kubernetes-client/blob/74cc63df9b6333d083ee24a6ff3455eaad0a6da8/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/RequestConfig.java#L136

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for checking. In that case, the current situation may cause a confusion to the Spark users. From Spark perspective, we have two options.

  • Option A: We can revise the description of this new configuration about that precedence.
  • Option B: We can ignore spark.kubernetes.client.oauth.token.provider.class when oauthToken exists from Spark side.

Which one do you prefer?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current situation may cause a confusion to the Spark users.

Indeed. Thanks for making me realize this. The patch is missing a way for a Spark user to specify how they would like the Spark to authenticate in Kubernetes when running on a client or cluster mode. There is several options present for oauthToken:

spark.kubernetes.authenticate.submission.oauthToken
spark.kubernetes.authenticate.driver.oauthToken 
spark.kubernetes.authenticate.oauthToken

And I think we need to have the same for token provider, like spark.kubernetes.authenticate.*.oauthTokenProvider.

This change would:

  • Make the new options consistent with the rest of kubernetes options.
  • Make existing options mutually exclusive. For every mode only one of the oauthToken, oauthTokenFile, oauthTokenProvider would be allowed.

If we try this, we won't have to add anything about precedence into the docs and there would be no need to ignore anything in the code.

@dongjoon-hyun, sorry for a lot of text.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either precedence or doing an assertion that only one of these is set is fine, but let's just pick one and do it so we can get this in.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prepared the changes, so now we have:

OAuthTokenProvider options are now consistent with the other two options – OAuthToken and OAuthTokenFile.
• Added an assertion that only one of the options is set (either OAuthToken, OAuthTokenFile or OAuthTokenProvider
• Added docs

I have tested this in client mode on Kubernetes.

(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
Expand Down