From e6f77e066c0780352e036aa19173508d02b836ef Mon Sep 17 00:00:00 2001 From: Chen Yuechen Date: Thu, 4 May 2017 21:21:27 +0800 Subject: [PATCH] allow standby namenodes in spark.yarn.access.namenodes Change-Id: Id0eedfbd594b24d2a3c283a9b5febdb6042c4dd1 --- .../security/HDFSCredentialProvider.scala | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala index ebb176bc95caa..b102b7b891365 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala @@ -24,6 +24,8 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.ipc.RemoteException +import org.apache.hadoop.ipc.StandbyException import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.Credentials @@ -45,9 +47,16 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider creds: Credentials): Option[Long] = { // NameNode to access, used to get tokens from different FileSystems nnsToAccess(hadoopConf, sparkConf).foreach { dst => - val dstFs = dst.getFileSystem(hadoopConf) - logInfo("getting token for namenode: " + dst) - dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds) + try { + val dstFs = dst.getFileSystem(hadoopConf) + logInfo("getting token for namenode: " + dst) + dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds) + } catch { + case e: StandbyException => + logWarning(s"Namenode ${dst} is in state standby", e) + case e: RemoteException => + logWarning(s"Namenode ${dst} is in state standby", e) + } } // Get the token renewal interval if it is not set. It will only be called once. @@ -75,8 +84,15 @@ private[security] class HDFSCredentialProvider extends ServiceCredentialProvider sparkConf.get(PRINCIPAL).flatMap { renewer => val creds = new Credentials() nnsToAccess(hadoopConf, sparkConf).foreach { dst => - val dstFs = dst.getFileSystem(hadoopConf) - dstFs.addDelegationTokens(renewer, creds) + try { + val dstFs = dst.getFileSystem(hadoopConf) + dstFs.addDelegationTokens(renewer, creds) + } catch { + case e: StandbyException => + logWarning(s"Namenode ${dst} is in state standby", e) + case e: RemoteException => + logWarning(s"Namenode ${dst} is in state standby", e) + } } val hdfsToken = creds.getAllTokens.asScala .find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)