Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23361][yarn] Allow AM to restart after initial tokens expire. #20657

Closed
wants to merge 6 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Feb 22, 2018

Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed, and a lot of code in the renewer to be
simplified.

I also made two currently hardcoded values (the renewal time ratio, and
the retry wait) configurable; while this probably never needs to be set
by anyone in a production environment, it helps with testing; that's also
why they're not documented.

Tested on real cluster with a specially crafted application to test this
functionality: checked proper access to HDFS, Hive and HBase in cluster
mode with token renewal on and AM restarts. Tested things still work in
client mode too.

Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed, and a lot of code in the renewer to be
simplified.

I also made two currently hardcoded values (the renewal time ratio, and
the retry wait) configurable; while this probably never needs to be set
by anyone in a production environment, it helps with testing; that's also
why they're not documented.

Tested on real cluster with a specially crafted application to test this
functionality: checked proper access to HDFS, Hive and HBase in cluster
mode with token renewal on and AM restarts. Tested things still work in
client mode too.
@vanzin
Copy link
Contributor Author

vanzin commented Feb 22, 2018

@jerryshao @tgravescs

@SparkQA
Copy link

SparkQA commented Feb 22, 2018

Test build #87605 has finished for PR 20657 at commit 2c3448d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Feb 22, 2018

Known flaky (SPARK-23458).

@vanzin
Copy link
Contributor Author

vanzin commented Feb 22, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Feb 22, 2018

Test build #87621 has finished for PR 20657 at commit 2c3448d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

Will review it soon. 😄

@tgravescs
Copy link
Contributor

at a high level looks good. I'm glad to see us passing these to the executors over rpc now. I haven't looked in detail at the mesos side.

@vanzin
Copy link
Contributor Author

vanzin commented Mar 5, 2018

Ping (also adding @squito to try to move this forward).

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will probably take me a few passes to properly understand all the changes -- seems fine so far.

logWarning("Error while attempting to cleanup old credentials. If you are seeing many " +
"such warnings there may be an issue with your HDFS cluster.", e)
val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
logWarning(s"Failed to update tokens, will try again ${UIUtils.formatDuration(delay)}! " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will try again in ...

@@ -1009,7 +987,7 @@ private[spark] class Client(
}

def setupCredentials(): Unit = {
loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
loginFromKeytab = sparkConf.contains(PRINCIPAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a user only specifies keytab, but no principal, I don't think this will fail in a friendly way. This will be a no-op, so it'll succeed, and then in ApplicationMaster / AMCredentialRenewer, you'll get an error trying to do sparkConf.get(PRINCIPAL).get.

ConfigBuilder("spark.security.credentials.retryWait")
.doc("How long to wait before retrying to fetch new credentials after a failure.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1h")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.internal() for both

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're not really internal, just not documented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that is what internal meant ... a user could specify them, but we don't document them at all, so not a stable part of the api etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me internal is something that is for internal Spark use, e.g. the configs I'm removing which are set by Spark itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be better to make them undocumented, so that developers still could adjust them to test. But end users don't need to touch them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this "1h" too big if the token expire time is small, for example 8 hours, or even smaller, which will make the next retry directly fail.


val timeToWait = SparkHadoopUtil.nextCredentialRenewalTime(nextRenewal, sparkConf) -
System.currentTimeMillis()
scheduleRenewal(timeToWait)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old code had more guards against bad renewal times:

if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
nearestTimeOfNextRenewal != Long.MaxValue) {

mostly just an observation, I don't think those extra would actually help (a bad renewer could still schedule renewal every 1ms).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new code has pretty much the same check in scheduleRenewal:

    val _delay = math.max(0, delay)

@@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
val creds = deserialize(tokens)
logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
logInfo("Updating delegation tokens for current user.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought -- rather than just serializing the Credentials, would it be helpful to serialize a timestamp when the tokens were obtained and when they will be refreshed as well, so it could be logged here?
you have spent more time debugging cases with problems so you will probably have a better idea if that would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That information is already logged by the AM, that's enough for debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was thinking it might be handy to have it logged in the executors and driver as well, sort of as an RPC id, so you could correlate the log lines, in case there was ever a delay in propagation or a failure to get to one executor or something, since you're choosing to always log something here. Still, your call.

@SparkQA
Copy link

SparkQA commented Mar 6, 2018

Test build #87983 has finished for PR 20657 at commit 3294596.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand this decently now. My only concern is about the changes to the "defensive copy" of the credentials. I don't fully understand the purpose of that in the original code, but I see some places where that may have changed so want to double check.

}

// Defensive copy of the credentials
private val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this appears to be unused. did you mean to use this in setupSecurityToken()? not really sure what you're defending against with the copy, perhaps that should go in the comment as well ... I see it was just in the old code though.

* This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
* without interruption while accessing secured services. It periodically logs in to the KDC with
* user-provided credentials, and contacts all the configured secure services to obtain delegation
* tokens to be distributed to the rest of the application.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for folks like me less familiar with this, this seems like a good spot to explain the overall flow a little bit more. Eg.

The KDC provides a ticket granting ticket (tgt), which is then used to obtain delegation tokens for each service. The KDC does not expose the tgt's expiry time, so renewal is controlled by a conf (by default 1m, much more frequent than usual expiry times). Each providers delegation token provider should determine the expiry time of the delegation token, so they can be renewed appropriately.

(in particular I needed an extra read to figure out why the tgt had its own renewal mechanism)

@squito
Copy link
Contributor

squito commented Mar 7, 2018

btw I took a look at the code in MesosHadoopDelegationTokenManager, there seems to be a lot of duplication that could probably be factored out, and I wonder if the things that are different really should be the same. Eg. mesos doesn't expose a conf for KERBEROS_RELOGIN, its just using the renewal time from the delegation tokens. Seems pretty easy for that to be wrong.

I can open a separate ticket for that but wanted to see if this makes sense

@vanzin
Copy link
Contributor Author

vanzin commented Mar 7, 2018

I plan to open a bug for cleaning things up after this code goes in.

@SparkQA
Copy link

SparkQA commented Mar 8, 2018

Test build #88066 has finished for PR 20657 at commit 3eb6a32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jerryshao jerryshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically looks fine to me. Need another deep look.

ConfigBuilder("spark.security.credentials.retryWait")
.doc("How long to wait before retrying to fetch new credentials after a failure.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1h")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be better to make them undocumented, so that developers still could adjust them to test. But end users don't need to touch them.

// Keytab is copied by YARN to the working directory of the AM, so full path is
// not needed.

// HACK:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still make this hack work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the question. This comment talks about a lot of things. The only thing that really applies still is the using a new UGI to get new delegation tokens. That's not really a hack, that's just how the API works...

private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
private val freshHadoopConf =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why for now we don't need to create a new Hadoop configuration by disabling fs cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the new code doesn't need to write to HDFS at all.

@vanzin
Copy link
Contributor Author

vanzin commented Mar 19, 2018

Ping

@squito
Copy link
Contributor

squito commented Mar 19, 2018

@jerryshao I know you said you wanted to take a deeper look, but its been a while. otherwise I'll merge in the next day or two

@jerryshao
Copy link
Contributor

I'm really sorry about the delay @vanzin @squito . I will take another review today and back to you.

@@ -105,7 +105,8 @@ private[spark] class MesosHadoopDelegationTokenManager(
case e: Exception =>
// Log the error and try to write new tokens back in an hour
logWarning("Couldn't broadcast tokens, trying again in an hour", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update the log to reflect the configured waiting hour.

@jerryshao
Copy link
Contributor

LGTM, just one small comment.

@SparkQA
Copy link

SparkQA commented Mar 20, 2018

Test build #88427 has finished for PR 20657 at commit ab60dda.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jerryshao jerryshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@jerryshao
Copy link
Contributor

Thanks, merging to master branch!

@asfgit asfgit closed this in 5fa4384 Mar 23, 2018
@vanzin vanzin deleted the SPARK-23361 branch April 9, 2018 21:02
turboFei pushed a commit to turboFei/spark that referenced this pull request Apr 24, 2019
…kens expire.

Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed, and a lot of code in the renewer to be
simplified.

I also made two currently hardcoded values (the renewal time ratio, and
the retry wait) configurable; while this probably never needs to be set
by anyone in a production environment, it helps with testing; that's also
why they're not documented.

Tested on real cluster with a specially crafted application to test this
functionality: checked proper access to HDFS, Hive and HBase in cluster
mode with token renewal on and AM restarts. Tested things still work in
client mode too.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#20657 from vanzin/SPARK-23361.
pan3793 pushed a commit to NetEase/spark that referenced this pull request Feb 9, 2022
…kens expire.

Currently, the Spark AM relies on the initial set of tokens created by
the submission client to be able to talk to HDFS and other services that
require delegation tokens. This means that after those tokens expire, a
new AM will fail to start (e.g. when there is an application failure and
re-attempts are enabled).

This PR makes it so that the first thing the AM does when the user provides
a principal and keytab is to create new delegation tokens for use. This
makes sure that the AM can be started irrespective of how old the original
token set is. It also allows all of the token management to be done by the
AM - there is no need for the submission client to set configuration values
to tell the AM when to renew tokens.

Note that even though in this case the AM will not be using the delegation
tokens created by the submission client, those tokens still need to be provided
to YARN, since they are used to do log aggregation.

To be able to re-use the code in the AMCredentialRenewal for the above
purposes, I refactored that class a bit so that it can fetch tokens into
a pre-defined UGI, insted of always logging in.

Another issue with re-attempts is that, after the fix that allows the AM
to restart correctly, new executors would get confused about when to
update credentials, because the credential updater used the update time
initially set up by the submission code. This could make the executor
fail to update credentials in time, since that value would be very out
of date in the situation described in the bug.

To fix that, I changed the YARN code to use the new RPC-based mechanism
for distributing tokens to executors. This allowed the old credential
updater code to be removed, and a lot of code in the renewer to be
simplified.

I also made two currently hardcoded values (the renewal time ratio, and
the retry wait) configurable; while this probably never needs to be set
by anyone in a production environment, it helps with testing; that's also
why they're not documented.

Tested on real cluster with a specially crafted application to test this
functionality: checked proper access to HDFS, Hive and HBase in cluster
mode with token renewal on and AM restarts. Tested things still work in
client mode too.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#20657 from vanzin/SPARK-23361.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants