Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21377][YARN] Make jars specify with --jars/--packages load-able in AM's credential renwer #18616

Closed
wants to merge 4 commits into from

Conversation

jerryshao
Copy link
Contributor

What changes were proposed in this pull request?

In this issue we have a long running Spark application with secure HBase, which requires HBaseCredentialProvider to get tokens periodically, we specify HBase related jars with --packages, but these dependencies are not added into AM classpath, so when HBaseCredentialProvider tries to initialize HBase connections to get tokens, it will be failed.

Currently because jars specified with --jars or --packages are not added into AM classpath, the only way to extend AM classpath is to use "spark.driver.extraClassPath" which supposed to be used in yarn cluster mode.

So in this fix, we proposed to use/reuse a classloader for AMCredentialRenewer to acquire new tokens.

Also in this patch, we fixed AM cannot get tokens from HDFS issue, it is because FileSystem is gotten before kerberos logged, so using this FS to get tokens will throw exception.

How was this patch tested?

Manual verification.

@SparkQA
Copy link

SparkQA commented Jul 13, 2017

Test build #79575 has finished for PR 18616 at commit c49cbab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

CC @vanzin @tgravescs can you please help to review, thanks!

val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now spark.driver.userClassPathFirst will apply to the client AM, is that what you want? Shouldn't it be a separate option?

private def startAMCredentialRenewer(): Unit = {
// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't you already checked this in L445?

}
}

private def startAMCredentialRenewerThread(): Thread = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments explaining this is done so that AMCredentialRenewer picks up the custom class loader? This thread will be very short-lived, so it would be nice to explain why it's needed.

@@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
private[spark] class HadoopDelegationTokenManager(
sparkConf: SparkConf,
hadoopConf: Configuration,
fileSystems: Set[FileSystem])
fileSystems: () => Set[FileSystem])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain better what these changes are for? You only mention there is an issue, but don't really explain why it's an issue.

"FileSystem is gotten before kerberos logged", for all I understand, shouldn't be an issue, because the code should have valid tokens distributed by YARN when the container was started.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vanzin , according to my debug, though in AMCredentialRenewer we already logged n kdc, but the FS is gotten before KDC logged, so as you can see from the log, the auth to visit FS is still "SIMPLE":

17/07/13 03:13:37 INFO AMCredentialRenewer: Attempting to login to KDC using principal: ambari-qa@EXAMPLE.COM
17/07/13 03:13:38 INFO AMCredentialRenewer: Successfully logged into KDC.
17/07/13 03:13:38 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1195648233_1, ugi=ambari-qa (auth:SIMPLE)]]

And from Hadoop code, the delegation token operation check will be failed when auth is simple:

  private boolean isAllowedDelegationTokenOp() throws IOException {
    AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
    if (UserGroupInformation.isSecurityEnabled()
        && (authMethod != AuthenticationMethod.KERBEROS)
        && (authMethod != AuthenticationMethod.KERBEROS_SSL)
        && (authMethod != AuthenticationMethod.CERTIFICATE)) {
      return false;
    }
    return true;
  }

That's why Spark throws exception like this:

WARN AMCredentialRenewer: Failed to write out new credentials to HDFS, will try again in an hour! If this happens too often tasks will fail.
org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7087)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:676)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:998)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)

	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
	at org.apache.hadoop.ipc.Client.call(Client.java:1498)
	at org.apache.hadoop.ipc.Client.call(Client.java:1398)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
	at com.sun.proxy.$Proxy10.getDelegationToken(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:980)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
	at com.sun.proxy.$Proxy11.getDelegationToken(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1041)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1688)
	at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:549)
	at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:527)
	at org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2400)
	at org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider$$anonfun$fetchDelegationTokens$1.apply(HadoopFSDelegationTokenProvider.scala:97)
	at org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider$$anonfun$fetchDelegationTokens$1.apply(HadoopFSDelegationTokenProvider.scala:95)
	at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
	at org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.fetchDelegationTokens(HadoopFSDelegationTokenProvider.scala:95)
	at org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.obtainDelegationTokens(HadoopFSDelegationTokenProvider.scala:46)
	at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$obtainDelegationTokens$2.apply(HadoopDelegationTokenManager.scala:111)
	at org.apache.spark.deploy.security.HadoopDelegationTokenManager$$anonfun$obtainDelegationTokens$2.apply(HadoopDelegationTokenManager.scala:109)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.scala:109)
	at org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager.obtainDelegationTokens(YARNHadoopDelegationTokenManager.scala:56)
	at org.apache.spark.deploy.yarn.security.AMCredentialRenewer$$anon$2.run(AMCredentialRenewer.scala:177)
	at org.apache.spark.deploy.yarn.security.AMCredentialRenewer$$anon$2.run(AMCredentialRenewer.scala:174)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at org.apache.spark.deploy.yarn.security.AMCredentialRenewer.org$apache$spark$deploy$yarn$security$AMCredentialRenewer$$writeNewCredentialsToHDFS(AMCredentialRenewer.scala:174)
	at org.apache.spark.deploy.yarn.security.AMCredentialRenewer$$anon$1.run(AMCredentialRenewer.scala:107)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the auth to visit FS is still "SIMPLE"

That sounds like your HDFS configuration is missing something. The yarnConf object passed to hadoopFSsToAccess() should have Kerberos configured as the auth mechanism, and because the container has valid delegation tokens, the HDFS library should use them.

Otherwise, I don't see how this feature would work at all; when submitting an app in cluster mode, the driver does not do a kerberos login (until tokens need to be renewed), and yet applications are able to talk to HDFS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this issue is only happened when getting delegation tokens, which requires kerberos logged UGI, not token auth-ed UGI. For normal HDFS operations, token auth-ed UGI should be enough. That's why the issue is only happened when getting new tokens from HDFS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the difference of HadoopFSDelegationTokenProvider before and after is whether to get FS in keytabLoggedInUGI or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds like this is a regression caused by #17723. I remember testing that change and not running into this, but the behavior has changed as you describe.

Do you mind fixing this problem separately so that we keep track of the history of how it happened?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's regression from #17723. OK, I will create a separate patch about this issue.

@SparkQA
Copy link

SparkQA commented Jul 14, 2017

Test build #79625 has finished for PR 18616 at commit a9e1a21.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a couple of nits.

@@ -438,6 +441,24 @@ private[spark] class ApplicationMaster(
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr)

// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .key is not necessary (also in other call)

// Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the
// classloader so that main jar and secondary jars could be used by AMCredentialRenewer.
val credentialRenewerThread = new Thread {
setName("AMCredentialRenewerThread")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/Thread/Starter

@jerryshao
Copy link
Contributor Author

Thanks @vanzin for your review, I will update it soon.

Change-Id: Ia5aba0bbfba4540340a8e9efa8e90512eaa502c5
Change-Id: Ife35388c8f8185dc7b083c1a7ce7aff1e280e627
Change-Id: I0ed8a9b639acaa9d61cca5d6420ad65c16e5a2e1
@SparkQA
Copy link

SparkQA commented Jul 14, 2017

Test build #79629 has finished for PR 18616 at commit 545ae9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, one more suggestion...

@@ -438,6 +441,24 @@ private[spark] class ApplicationMaster(
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr)

// If the credentials file config is present, we must periodically renew tokens. So create
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I know I already said "LGTM", but I think these two paths could be consolidated into one.

Basically, move this code to the run() method before either driver or executor launcher is started; then there's a single place where the token renewer is started, and then both client and cluster mode start it the same way.

}

credentialRenewerThread.start()
credentialRenewerThread.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but this is not capturing errors from the thread you started. It's unlikely an exception would occur at that point, but if it does, it should be re-thrown by this thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will change the code.

Change-Id: Iea5e1eb1f61bcd1ad063ac2c9958950858879756
@SparkQA
Copy link

SparkQA commented Jul 17, 2017

Test build #79679 has finished for PR 18616 at commit 7dbc726.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jul 17, 2017

Merging to master.

@asfgit asfgit closed this in 5346507 Jul 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants