From 58b1969ae4efc0c995ffa817f54e370721f4058f Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 10 Jul 2015 16:19:51 -0700 Subject: [PATCH 1/8] Simple attempt 1. --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4cec9017b8adb..f81bd55a734bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -481,8 +481,14 @@ object SparkSubmit { } // Let YARN know it's a pyspark app, so it distributes needed libraries. - if (clusterManager == YARN && args.isPython) { - sysProps.put("spark.yarn.isPython", "true") + if (clusterManager == YARN) { + if (args.isPython) { + sysProps.put("spark.yarn.isPython", "true") + } + if (!isYarnCluster && args.principal != null) { + require(args.keytab != null, "Keytab must be specified when the keytab is specified") + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + } } // In yarn-cluster mode, use yarn.Client as a wrapper around the user class From 26f8bfa20f15b2a1be62d23b9b1d09a7b4b874d1 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 14 Jul 2015 00:47:42 -0700 Subject: [PATCH 2/8] [SPARK-8851][YARN] In Client mode, make sure the client logs in and updates tokens. In client side, the flow is SparkSubmit -> SparkContext -> yarn/Client. Since the yarn client only gets a cloned config and the staging dir is set here, it is not really possible to do re-logins in the SparkContext. So, do the initial logins in Spark Submit and do re-logins as we do now in the AM, but the Client behaves like an executor in this specific context and reads the credentials file to update the tokens. This way, even if the streaming context is started up from checkpoint - it is fine since we have logged in from SparkSubmit itself itself. --- .../apache/spark/deploy/SparkHadoopUtil.scala | 30 ++++++++++------ .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 36 +++++++++++++------ 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6d14590a1d192..15fad45f3453d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -39,6 +39,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkConf, SparkException} +import scala.util.control.NonFatal + /** * :: DeveloperApi :: * Contains util methods to interact with Hadoop from Spark. @@ -248,19 +250,25 @@ class SparkHadoopUtil extends Logging { dir: Path, prefix: String, exclusionSuffix: String): Array[FileStatus] = { - val fileStatuses = remoteFs.listStatus(dir, - new PathFilter { - override def accept(path: Path): Boolean = { - val name = path.getName - name.startsWith(prefix) && !name.endsWith(exclusionSuffix) + try { + val fileStatuses = remoteFs.listStatus(dir, + new PathFilter { + override def accept(path: Path): Boolean = { + val name = path.getName + name.startsWith(prefix) && !name.endsWith(exclusionSuffix) + } + }) + Arrays.sort(fileStatuses, new Comparator[FileStatus] { + override def compare(o1: FileStatus, o2: FileStatus): Int = { + Longs.compare(o1.getModificationTime, o2.getModificationTime) } }) - Arrays.sort(fileStatuses, new Comparator[FileStatus] { - override def compare(o1: FileStatus, o2: FileStatus): Int = { - Longs.compare(o1.getModificationTime, o2.getModificationTime) - } - }) - fileStatuses + fileStatuses + } catch { + case NonFatal(e) => + logWarning("Error while attempting to list files from application straging dir", e) + Array.empty + } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index f81bd55a734bf..9fb3e4b8b73e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -485,7 +485,7 @@ object SparkSubmit { if (args.isPython) { sysProps.put("spark.yarn.isPython", "true") } - if (!isYarnCluster && args.principal != null) { + if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when the keytab is specified") UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f0af6f875f523..3dffa579d9d62 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -80,9 +80,14 @@ private[spark] class Client( private val isClusterMode = args.isClusterMode private var loginFromKeytab = false + private var principal: String = null + private var keytab: String = null + private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) + private var delegationTokenUpdater: Option[ExecutorDelegationTokenUpdater] = None + def stop(): Unit = yarnClient.stop() @@ -120,6 +125,10 @@ private[spark] class Client( // Finally, submit and monitor the application logInfo(s"Submitting application ${appId.getId} to ResourceManager") yarnClient.submitApplication(appContext) + if (loginFromKeytab && !isClusterMode) { + delegationTokenUpdater = Some(new ExecutorDelegationTokenUpdater(sparkConf, yarnConf)) + delegationTokenUpdater.foreach(_.updateCredentialsIfRequired()) + } appId } catch { case e: Throwable => @@ -339,7 +348,7 @@ private[spark] class Client( if (loginFromKeytab) { logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + " via the YARN Secure Distributed Cache.") - val (_, localizedPath) = distribute(args.keytab, + val (_, localizedPath) = distribute(keytab, destName = Some(sparkConf.get("spark.yarn.keytab")), appMasterOnly = true) require(localizedPath != null, "Keytab file already distributed.") @@ -785,19 +794,26 @@ private[spark] class Client( } def setupCredentials(): Unit = { - if (args.principal != null) { - require(args.keytab != null, "Keytab must be specified when principal is specified.") + loginFromKeytab = args.principal != null || sparkConf.contains("spark.yarn.principal") + if (loginFromKeytab) { + principal = { + if (args.principal != null) { + keytab = args.keytab + args.principal + } else { + keytab = sparkConf.getOption("spark.yarn.keytab").orNull + sparkConf.get("spark.yarn.principal") + } + } + require(keytab != null, "Keytab must be specified when principal is specified.") logInfo("Attempting to login to the Kerberos" + - s" using principal: ${args.principal} and keytab: ${args.keytab}") - val f = new File(args.keytab) + s" using principal: $principal and keytab: $keytab") + val f = new File(keytab) // Generate a file name that can be used for the keytab file, that does not conflict // with any user file. val keytabFileName = f.getName + "-" + UUID.randomUUID().toString - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - loginFromKeytab = true sparkConf.set("spark.yarn.keytab", keytabFileName) - sparkConf.set("spark.yarn.principal", args.principal) - logInfo("Successfully logged into the KDC.") + sparkConf.set("spark.yarn.principal", principal) } credentials = UserGroupInformation.getCurrentUser.getCredentials } @@ -1162,7 +1178,7 @@ object Client extends Logging { * * If not a "local:" file and no alternate name, the environment is not modified. * - * @parma conf Spark configuration. + * @param conf Spark configuration. * @param uri URI to add to classpath (optional). * @param fileName Alternate name for the file (optional). * @param env Map holding the environment variables. From 0c48ac2eb0971aa57862bf06531d021093f3abd0 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 14 Jul 2015 11:55:59 -0700 Subject: [PATCH 3/8] Remove direct use of ExecutorDelegationTokenUpdater in Client. --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3dffa579d9d62..d76a6e292d2a6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -86,10 +86,10 @@ private[spark] class Client( private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) - private var delegationTokenUpdater: Option[ExecutorDelegationTokenUpdater] = None - - - def stop(): Unit = yarnClient.stop() + def stop(): Unit = { + SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() + yarnClient.stop() + } /** * Submit an application running our ApplicationMaster to the ResourceManager. @@ -126,8 +126,7 @@ private[spark] class Client( logInfo(s"Submitting application ${appId.getId} to ResourceManager") yarnClient.submitApplication(appContext) if (loginFromKeytab && !isClusterMode) { - delegationTokenUpdater = Some(new ExecutorDelegationTokenUpdater(sparkConf, yarnConf)) - delegationTokenUpdater.foreach(_.updateCredentialsIfRequired()) + SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(sparkConf) } appId } catch { From 2c805404cbde576165ee190ca4b4954144232eaa Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 14 Jul 2015 12:52:40 -0700 Subject: [PATCH 4/8] Move token renewal to YarnClientSchedulerBackend. --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 +------- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 7 +++++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d76a6e292d2a6..d0e9acd4b0f1c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -86,10 +86,7 @@ private[spark] class Client( private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) - def stop(): Unit = { - SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() - yarnClient.stop() - } + def stop(): Unit = yarnClient.stop() /** * Submit an application running our ApplicationMaster to the ResourceManager. @@ -125,9 +122,6 @@ private[spark] class Client( // Finally, submit and monitor the application logInfo(s"Submitting application ${appId.getId} to ResourceManager") yarnClient.submitApplication(appContext) - if (loginFromKeytab && !isClusterMode) { - SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(sparkConf) - } appId } catch { case e: Throwable => diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 3a0b9443d2d7b..bd8bf200dab37 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,10 +20,9 @@ package org.apache.spark.scheduler.cluster import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} -import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( @@ -61,6 +60,9 @@ private[spark] class YarnClientSchedulerBackend( // to the executors super.start() + if (conf.contains("spark.yarn.credentials.file")) { + YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf) + } waitForApplication() monitorThread = asyncMonitorApplication() monitorThread.start() @@ -158,6 +160,7 @@ private[spark] class YarnClientSchedulerBackend( } super.stop() client.stop() + YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() logInfo("Stopped") } From c872caa0e0a40096407164fd3d85c33914a54de1 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 14 Jul 2015 13:54:53 -0700 Subject: [PATCH 5/8] Fix typo in log message. --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 15fad45f3453d..97ac07abf84d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -266,7 +266,7 @@ class SparkHadoopUtil extends Logging { fileStatuses } catch { case NonFatal(e) => - logWarning("Error while attempting to list files from application straging dir", e) + logWarning("Error while attempting to list files from application staging dir", e) Array.empty } } From 5c4fa637f4ef186cf02458b7e7888313f864e935 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 14 Jul 2015 14:59:04 -0700 Subject: [PATCH 6/8] Add a comment explaining what is being done in YarnClientSchedulerBackend. --- .../spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index bd8bf200dab37..dd9d0f47b5518 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -60,6 +60,9 @@ private[spark] class YarnClientSchedulerBackend( // to the executors super.start() + // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver + // reads the credentials from HDFS, just like the executors and updates its own credentials + // cache. if (conf.contains("spark.yarn.credentials.file")) { YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf) } From de08f5797c350eaf53205c85babab9dd582c49e8 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 14 Jul 2015 18:15:34 -0700 Subject: [PATCH 7/8] Fix import order. --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 97ac07abf84d4..befa8b0862b1a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -25,6 +25,7 @@ import java.util.{Arrays, Comparator} import scala.collection.JavaConversions._ import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration @@ -39,8 +40,6 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkConf, SparkException} -import scala.util.control.NonFatal - /** * :: DeveloperApi :: * Contains util methods to interact with Hadoop from Spark. From 9a2166f71c04e189d08ae44a97fcd976a7f861ac Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 16 Jul 2015 11:57:10 -0700 Subject: [PATCH 8/8] make it possible to use command line args and config parameters together. --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 13 +++++++------ .../cluster/YarnClientSchedulerBackend.scala | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d0e9acd4b0f1c..a2b243ac40e94 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -789,15 +789,16 @@ private[spark] class Client( def setupCredentials(): Unit = { loginFromKeytab = args.principal != null || sparkConf.contains("spark.yarn.principal") if (loginFromKeytab) { - principal = { - if (args.principal != null) { - keytab = args.keytab - args.principal + principal = + if (args.principal != null) args.principal else sparkConf.get("spark.yarn.principal") + keytab = { + if (args.keytab != null) { + args.keytab } else { - keytab = sparkConf.getOption("spark.yarn.keytab").orNull - sparkConf.get("spark.yarn.principal") + sparkConf.getOption("spark.yarn.keytab").orNull } } + require(keytab != null, "Keytab must be specified when principal is specified.") logInfo("Attempting to login to the Kerberos" + s" using principal: $principal and keytab: $keytab") diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index dd9d0f47b5518..d97fa2e2151bc 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -60,13 +60,14 @@ private[spark] class YarnClientSchedulerBackend( // to the executors super.start() + waitForApplication() + // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver // reads the credentials from HDFS, just like the executors and updates its own credentials // cache. if (conf.contains("spark.yarn.credentials.file")) { YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf) } - waitForApplication() monitorThread = asyncMonitorApplication() monitorThread.start() }