Skip to content

Conversation

@SaintBacchus
Copy link
Contributor

As discussed with @tgravescs and @harishreedharan at the 8867, if the SaslRpcClient's authentication is TOKEN, it will have the token expired exception.
But if the authentication is KERBEROS`, it will renew the token automatically.
This modify can change to authentication from *TOKEN * into *KERBEROS *.

@harishreedharan
Copy link
Contributor

Why would #8867 not be sufficient?It looks like that should be enough.

@SaintBacchus
Copy link
Contributor Author

@harishreedharan The evenLog will still be stopped by the token exception.
The event log was a long-running output stream, #8867 can't update its inner token.

java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:153)
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:153)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:153)
        at org.apache.spark.scheduler.EventLoggingListener.onStageCompleted(EventLoggingListener.scala:176)
        at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:32)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:32)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:32)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
        at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:82)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1217)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:66)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 2339 for spark) can't be found in cache
        at org.apache.hadoop.ipc.Client.call(Client.java:1511)
        at org.apache.hadoop.ipc.Client.call(Client.java:1442)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:416)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy16.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1652)
        at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1453)
        at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:579)

@harishreedharan
Copy link
Contributor

Hmm, this might be due to the cached token being missed? So it looks like the token got replaced alright, but it seems like the file could not be written with the new token?

@tgravescs might know more about this. I am not sure why this would cause an issue. It looks like the new token cannot be used to write an old file?

@SparkQA
Copy link

SparkQA commented Sep 30, 2015

Test build #43114 has finished for PR 8942 at commit fd1f735.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

Yeah I'm confused why 8867 didn't work.

The only time you need a new token is when the connection goes down and needs to be re-established. If you have an existing connection it will continue to stay up based on the old token. It looks like from the exception that it must have been dropping the connection to the namenode and needs a new one. Based on the exception I'm assuming that the new token isn't be adding properly or propogated to where it needs to be (if someone did a doas for instance and addCredentials isn't updating it for that ugi.

Can you tell from the log (HDFS_DELEGATION_TOKEN token 2339 for spark), if 2339 was the original token or the new token? Can you tell that a new token was properly added and is valid? What is your token timeout set at, hopefully its not to low that you are hitting a race with the code that waits a minute to get the new token.

I'm actually fine with doing it either way (token or from keytab), but if we do it from keytab I would rather see it more of a conditional where it doesn't add Tokens to the current users UGI if the keytab was supplied. That way it should be in "KERBEROS" mode and just login from the keytab for you. It would also be more obvious in the future what is going on and less prone to being broken by order.

You are running in yarn client mode?

@SaintBacchus
Copy link
Contributor Author

Yeah @tgravescs I'm running in yarn client mode. I'm sure that HDFS_DELEGATION_TOKEN token 2339 for spark is the original token gained by the driver. But I don't know which is the valid token used for the event-log writer. I set dfs.namenode.delegation.token.max-lifetime to be 5 minutes.
In our test, the event log will work fine if the login again.

@SaintBacchus
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 8, 2015

Test build #43383 has finished for PR 8942 at commit fd1f735.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@harishreedharan
Copy link
Contributor

Hmm, I think the real issue is that the event logging does not doAs.

I think in yarn-cluster, since the SparkContext is created in the AM, the updated credentials actually are in the cache of the user writing to the event logs (since we are already running as that user and don't do a doAs).

In yarn-client though, because we don'd do a doAs - is it possible that the new tokens are not being used to write to the event log?

@SaintBacchus Let me open a PR that does the doAs and combine it with your previous one #8867 and can you test it and see if it works? Or you can do it yourself - just add a doAs here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L66 , https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L143 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L48
(basically anywhere HDFS is accessed)

@SaintBacchus
Copy link
Contributor Author

I'm not very clear about how to use doAs for the EventLoggingListener. You can open a PR and I will help to test.

@harishreedharan
Copy link
Contributor

Actually that is not right..I posted an explanation on your other PR.

@harishreedharan
Copy link
Contributor

Here it is:

OK, I think I know the issue - the reason is probably that the credentials are cached in the FileSystem instance using which the write happens. Since we are replacing the credentials but not the FileSystem instance itself this might not work, which is why #8942 works. We can do with either that approach or we can replace the FileSystem instance which would require a close and reopen of the file.

@SaintBacchus
Copy link
Contributor Author

We had considered to the way to reopen the file. In that way it may have to consider the synchronization problem between event log producer and consumer with more codes.
Later, I found this way and it's more clear.

@harishreedharan
Copy link
Contributor

Agreed - synchronization is painful and we could end up missing events.

@tgravescs
Copy link
Contributor

@harishreedharan Can you clarify what is going on? You should not have to replace the FileSystem.

The EventLogListener is created in SparkContext and just does a FileSystem.get(). Which should get the cached filesystem or create a new one if one doesn't exist. In updateCredentialsIfRequired it just adds the new credentials to the current user.
The filesystem just uses the current user (even the cached one - this.ugi = UserGroupInformation.getCurrentUser();) . Although I do see some stuff about using the ticket cache in the code if that config is set but I didn't think it generally was.

@harishreedharan
Copy link
Contributor

So this is my theory (I don't have anything to back this up really). My assumption is based on the fact that if we don't set hadoop.fs.hdfs.impl.disable.cache=true, then the update of tokens seems to fail on the cached FileSystem instance (we needed to add that config in a PR at some point to ensure the update of tokens worked). So if that config needed to be set so the FileSystem.get() method to work, it likely means (again my theory), that a FileSystem object created using older tokens does not seem to know about the new ones. I can't be sure of this but that could explain why even updating the tokens locally using the ExecutorDelegationTokenRenewer does not fix the event log writes.

@tgravescs
Copy link
Contributor

Hmm, so looking at the FileSystem cache code it creates a new key that stores:

    this.ugi = UserGroupInformation.getCurrentUser();

So if the current user changes from when first filesystem is created to when we run then it would have this problem.

on the client side I don't think we do any specific runAsUser calls but perhaps its an issue with the keytab login and the user already logged in.

So perhaps it makes sense to do something like this PR but I would rather see it a if/else type thing where in the code we either login from keytab or we grab the tokens that way we don't end up with 2 different methods of login and we don't accidentally break things if order of things change.

@harishreedharan
Copy link
Contributor

If the current user's ugi is what is used by the FileSystem cache, this should not really be an issue no? Because we actually do update the current user's credentials. I am ok with doing something like this, but I'd rather know why before adding this. Why would new tokens not work? That seems like an HDFS issue no? Let me test this out with @SaintBacchus's other PR.

In client mode, that would really mean that tokens are used by executors and keytab used by AM and the driver. I am in half a mind to suggest not supporting long-running apps in client mode on secure HDFS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants