Skip to content

[SPARK-27997][K8S] Add support for kubernetes OAuth Token refresh#33675

Closed
haodemon wants to merge 11 commits intoapache:masterfrom
haodemon:SPARK-27997
Closed

[SPARK-27997][K8S] Add support for kubernetes OAuth Token refresh#33675
haodemon wants to merge 11 commits intoapache:masterfrom
haodemon:SPARK-27997

Conversation

@haodemon
Copy link

@haodemon haodemon commented Aug 7, 2021

What changes were proposed in this pull request?

This change allows a spark user to provide a class which implements fabric's OAuthTokenProvider to refresh tokens throughout the life of the spark app.

spark.kubernetes.authenticate.submission.oauthTokenProvider=<token>
spark.kubernetes.authenticate.driver.oauthTokenProvider=<token>
spark.kubernetes.authenticate.oauthTokenProvider=<token>

https://javadoc.io/doc/io.fabric8/kubernetes-client/5.6.0/io/fabric8/kubernetes/client/OAuthTokenProvider.html

Why are the changes needed?

Currently, while running spark on kubernetes, one should specify oauth tokens via config before starting an application.

spark.kubernetes.authenticate.submission.oauthToken=<token>
spark.kubernetes.authenticate.driver.oauthToken=<token>
spark.kubernetes.authenticate.oauthToken=<token>

The token has an expiration time (usually an hour, for GKE) and there is no way to update the token in the runtime. The spark app starts to throw exceptions.

io.fabric8.kubernetes.client.KubernetesClientException: Unauthorized
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onFailure(WatchConnectionManager.java:202)
	at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:571)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:198)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Does this PR introduce any user-facing change?

No, this is a new feature.

How was this patch tested?

A class which implements OAuthTokenProvider interface[0] was added into the classpath on driver node with no other spark options for tokens specified
It was also tested with expired tokens specified, and the token was updated via the user-provided class.

--conf spark.kubernetes.authenticate.submission.oauthToken=<expired>
--conf spark.kubernetes.authenticate.driver.oauthToken=<expired> 
--conf spark.kubernetes.authenticate.oauthToken=<expired>

There is no need to use any other token-related configuration options if this class is provided.

An example of the user-provided class for GKE
[0] https://gist.github.com/haodemon/5490fefdb258275c1f805d584319090b

import io.fabric8.kubernetes.client.OAuthTokenProvider

class OAuthGoogleTokenProvider extends OAuthTokenProvider {
  private val binary = "gcloud"
  private val args = "config config-helper --format=json"

  override def getToken: String = {
    val response = (binary + " " + args).!!
    val token = new ObjectMapper().readTree(response)
      .get("credential")
      .get("access_token")
    token.getTextValue
  }
}

ConfigBuilder("spark.kubernetes.client.oauth.token.provider.class")
.doc("A class that implements fabric's OAuthTokenProvider interface to " +
"provide a token refresh for long running jobs.")
.version("3.1.3")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use 3.3.0 because the master branch is 3.3.0-SNAPSHOT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review. Changed to 3.3.0

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, @haodemon .

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Aug 8, 2021

Test build #142193 has finished for PR 33675 at commit 7b0f6c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"A class that implements fabric's OAuthTokenProvider interface to \" +

@SparkQA
Copy link

SparkQA commented Aug 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46705/

@SparkQA
Copy link

SparkQA commented Aug 8, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46705/

@SparkQA
Copy link

SparkQA commented Aug 8, 2021

Test build #142197 has finished for PR 33675 at commit a20229b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"A class that implements OAuthTokenProvider interface to \" +

@SparkQA
Copy link

SparkQA commented Aug 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46709/

@SparkQA
Copy link

SparkQA commented Aug 8, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46709/

@haodemon
Copy link
Author

haodemon commented Aug 8, 2021

@dongjoon-hyun, answering inline.

  • Apache Spark mainly runs the unit tests on your GitHub Action. Could you enable the GitHub Action build_and_test job in your Spark fork?

    • We also have a Jenkins sever. I'll trigger it though.

I'm not sure why they didn't start before, but seems like now they're passing.

Updated, thank you.

.withOption(oauthTokenValue) {
.withOption(oauthTokenProviderInstance) {
(provider, configBuilder) => configBuilder.withOauthTokenProvider(provider)
}.withOption(oauthTokenValue) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, if we have two configurations at the same time mistakingly, we invoke OAuthTokenProvider and override it with oauthTokenValue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check what happens when we call configBuilder.withOauthTokenProvider(provider) and configBuilder.withOauthToken(token) together?

Copy link
Author

@haodemon haodemon Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token that was specified in configBuilder.withOauthToken(token) will stay unused, and the class that implements OAuthTokenProvider will be used instead. I confirmed this via tests on my cluster by simultaneously:

1 . Adding a debugging log into my class and providing it via

--conf spark.kubernetes.client.oauth.token.provider.class <myClass>
  1. Specifying tokens via
--conf spark.kubernetes.authenticate.submission.oauthToken=<token>
--conf spark.kubernetes.authenticate.driver.oauthToken=<token> 
--conf spark.kubernetes.authenticate.oauthToken=<token>

My log:

[2021-08-11 12:51:58,012] DEBUG (OAuthGoogleTokenProvider.java:62) - Refreshing kubernetes oauth token via [/usr/lib/google-cloud-sdk/bin/gcloud, config, config-helper, --format=json]
[2021-08-11 12:51:58,498] DEBUG (OAuthGoogleTokenProvider.java:73) - New token expiry time is 3511s

From the log it could be concluded that OAuthTokenProvider has a higher precedence over the token that was specified in configBuilder.withOauthToken(token). It could also be confirmed by looking at the fabric's code in
https://github.com/fabric8io/kubernetes-client/blob/74cc63df9b6333d083ee24a6ff3455eaad0a6da8/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/RequestConfig.java#L136

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for checking. In that case, the current situation may cause a confusion to the Spark users. From Spark perspective, we have two options.

  • Option A: We can revise the description of this new configuration about that precedence.
  • Option B: We can ignore spark.kubernetes.client.oauth.token.provider.class when oauthToken exists from Spark side.

Which one do you prefer?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current situation may cause a confusion to the Spark users.

Indeed. Thanks for making me realize this. The patch is missing a way for a Spark user to specify how they would like the Spark to authenticate in Kubernetes when running on a client or cluster mode. There is several options present for oauthToken:

spark.kubernetes.authenticate.submission.oauthToken
spark.kubernetes.authenticate.driver.oauthToken 
spark.kubernetes.authenticate.oauthToken

And I think we need to have the same for token provider, like spark.kubernetes.authenticate.*.oauthTokenProvider.

This change would:

  • Make the new options consistent with the rest of kubernetes options.
  • Make existing options mutually exclusive. For every mode only one of the oauthToken, oauthTokenFile, oauthTokenProvider would be allowed.

If we try this, we won't have to add anything about precedence into the docs and there would be no need to ignore anything in the code.

@dongjoon-hyun, sorry for a lot of text.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either precedence or doing an assertion that only one of these is set is fine, but let's just pick one and do it so we can get this in.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prepared the changes, so now we have:

OAuthTokenProvider options are now consistent with the other two options – OAuthToken and OAuthTokenFile.
• Added an assertion that only one of the options is set (either OAuthToken, OAuthTokenFile or OAuthTokenProvider
• Added docs

I have tested this in client mode on Kubernetes.

val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Shall we remove the redundant emtpy line?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@dongjoon-hyun
Copy link
Member

BTW, I embedded your example into the PR description to provide it as a commit message, @haodemon . Please let me know if that is not what you want.

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Test build #142330 has finished for PR 33675 at commit 5830123.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46838/

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46841/

@SparkQA
Copy link

SparkQA commented Aug 11, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46841/

@holdenk
Copy link
Contributor

holdenk commented Sep 13, 2021

Jenkins retest this please.
@haodemon have you run the K8s integration tests locally on your env?

@SparkQA
Copy link

SparkQA commented Sep 13, 2021

Test build #143220 has finished for PR 33675 at commit 5830123.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47722/

@SparkQA
Copy link

SparkQA commented Sep 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47722/

@SparkQA
Copy link

SparkQA commented Sep 15, 2021

Test build #143289 has finished for PR 33675 at commit 56fc132.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47792/

@SparkQA
Copy link

SparkQA commented Sep 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47796/

@SparkQA
Copy link

SparkQA commented Sep 15, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47796/

@SparkQA
Copy link

SparkQA commented Oct 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48455/

@SparkQA
Copy link

SparkQA commented Oct 7, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48455/

@SparkQA
Copy link

SparkQA commented Oct 7, 2021

Test build #143997 has started for PR 33675 at commit 58ce2f4.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Test build #144684 has finished for PR 33675 at commit b69ddc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class JavaModuleOptions
  • class HistogramPlotBase(NumericPlotBase):
  • class KdePlotBase(NumericPlotBase):
  • class IndexNameTypeHolder(object):
  • new_class = type(NameTypeHolder.short_name, (NameTypeHolder,),
  • new_class = param.type if isinstance(param, np.dtype) else param
  • class Database(NamedTuple):
  • class Table(NamedTuple):
  • class Column(NamedTuple):
  • class Function(NamedTuple):
  • class SparkUpgradeException(CapturedException):
  • protected class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
  • public class ExpressionImplUtils
  • public final class TableIndex
  • public abstract class Filter implements Expression, Serializable
  • public class NumericHistogram
  • * The Coord class defines a histogram bin, which is just an (x,y) pair.
  • public static class Coord implements Comparable
  • public class ColumnarBatch implements AutoCloseable
  • public final class ColumnarBatchRow extends InternalRow
  • class IndexAlreadyExistsException(message: String, cause: Option[Throwable] = None)
  • class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
  • trait FunctionExpressionBuilder
  • case class HistogramNumeric(
  • trait ExtractValue extends Expression with NullIntolerant
  • case class Sec(child: Expression)
  • case class Csc(child: Expression)
  • case class AesEncrypt(input: Expression, key: Expression, child: Expression)
  • case class AesDecrypt(input: Expression, key: Expression, child: Expression)
  • trait OperationHelper extends AliasHelper with PredicateHelper
  • case class AsOfJoin(
  • case class SetCatalogAndNamespace(child: LogicalPlan) extends UnaryCommand
  • case class CreateFunction(
  • case class CreateView(
  • case class CreateIndex(
  • case class SkewJoinChildWrapper(plan: SparkPlan) extends LeafExecNode
  • case class SetCatalogCommand(catalogName: String) extends LeafRunnableCommand
  • case class SetNamespaceCommand(namespace: Seq[String]) extends LeafRunnableCommand
  • case class ShowCatalogsCommand(pattern: Option[String]) extends LeafRunnableCommand
  • case class ShowCurrentNamespaceCommand() extends LeafRunnableCommand
  • case class WriterBucketSpec(
  • case class CreateIndexExec(
  • class SparkUDFExpressionBuilder extends FunctionExpressionBuilder

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49153/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49153/

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50658/

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50658/

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 25, 2022
@github-actions github-actions bot closed this Mar 26, 2022
@tiagovrtr
Copy link

@haodemon Can we reopen this and I'll help with anything needed to get the tests to pass?

@dongjoon-hyun
Copy link
Member

This is reopened according to @tiagovrtr 's request.

@haodemon
Copy link
Author

@haodemon Can we reopen this and I'll help with anything needed to get the tests to pass?

Thank you, @tiagovrtr. I'll take a look tomorrow and will try to update the PR. In any case, I'll post an update here.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@tiagovrtr
Copy link

this patch seems only to bring the latest changes from master, anything else to do here?

@tiagovrtr
Copy link

tiagovrtr commented Jun 4, 2022

Looping in @renato-farias as I'm leaving the @bbc and he'll very much welcome this fix in a future release 🙂

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 30, 2022
@github-actions github-actions bot closed this Oct 1, 2022
@dongjoon-hyun
Copy link
Member

Hi, All.
I'm going to take over this PR for Apache Spark 4.0.0 while keeping the authorship of @haodemon .

@dongjoon-hyun
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants