diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 707a76196f3ab..2a8e36e98dd80 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -721,6 +721,16 @@ See the [configuration page](configuration.html) for information on Spark config 2.3.0 + + spark.kubernetes.authenticate.submission.oauthTokenProvider + (none) + + The name of the class that implements OAuthTokenProvider interface to provide an OAuth token refresh mechanism for + long-running jobs. The class will be used to fetch the OAuth token to use when authenticating against the Kubernetes + API server when starting the driver. The class must be in a driver's classpath. + + 4.0.0 + spark.kubernetes.authenticate.driver.caCertFile (none) @@ -776,6 +786,16 @@ See the [configuration page](configuration.html) for information on Spark config 2.3.0 + + spark.kubernetes.authenticate.driver.oauthTokenProvider + (none) + + The name of the class that implements OAuthTokenProvider interface to provide an OAuth token refresh mechanism for + long running jobs. The class will be used to fetch the OAuth token to use when authenticating against the Kubernetes + API server from the driver pod when requesting executors. The class must be in a driver's classpath. + + 4.0.0 + spark.kubernetes.authenticate.driver.mounted.caCertFile (none) @@ -885,6 +905,15 @@ See the [configuration page](configuration.html) for information on Spark config 2.4.0 + + spark.kubernetes.authenticate.oauthTokenProvider + (none) + + In client mode, the name of a class that implements OAuthTokenProvider interface to provide an OAuth token refresh + mechanism for long running jobs. The class must be in a driver's classpath. + + 4.0.0 + spark.kubernetes.driver.label.[LabelName] (none) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 0c54191fb10d5..d742be3d750ab 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -225,6 +225,7 @@ private[spark] object Config extends Logging { val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate" val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val OAUTH_TOKEN_PROVIDER_CONF_SUFFIX = "oauthTokenProvider" val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 0b806f046402e..245cc924fbd9a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -21,7 +21,7 @@ import java.io.File import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder} +import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder, OAuthTokenProvider} import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY import io.fabric8.kubernetes.client.Config.autoConfigure import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory @@ -33,7 +33,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to @@ -56,11 +56,18 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .map(new File(_)) .orElse(defaultServiceAccountToken) val oauthTokenValue = sparkConf.getOption(oauthTokenConf) - KubernetesUtils.requireNandDefined( - oauthTokenFile, - oauthTokenValue, - s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " + - s"value $oauthTokenConf.") + val oauthTokenProviderConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_PROVIDER_CONF_SUFFIX" + val oauthTokenProvider = sparkConf.getOption(oauthTokenProviderConf) + .map(Utils.classForName(_) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[OAuthTokenProvider]) + + require( + Seq(oauthTokenFile, oauthTokenValue, oauthTokenProvider).count(_.isDefined) <= 1, + s"OAuth token should be specified via only one of $oauthTokenFileConf, $oauthTokenConf " + + s"or $oauthTokenProviderConf." + ) val caCertFile = sparkConf .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") @@ -94,7 +101,9 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .withRequestTimeout(clientType.requestTimeout(sparkConf)) .withConnectionTimeout(clientType.connectionTimeout(sparkConf)) .withTrustCerts(sparkConf.get(KUBERNETES_TRUST_CERTIFICATES)) - .withOption(oauthTokenValue) { + .withOption(oauthTokenProvider) { + (provider, configBuilder) => configBuilder.withOauthTokenProvider(provider) + }.withOption(oauthTokenValue) { (token, configBuilder) => configBuilder.withOauthToken(token) }.withOption(oauthTokenFile) { (file, configBuilder) =>