From 5745f2a9f0537487e554131711b7b5f9bb26d53b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 4 Sep 2017 16:57:23 +0800 Subject: [PATCH] not switch ugi to itself --- .../CoarseGrainedExecutorBackend.scala | 90 +++++++++---------- .../spark/deploy/yarn/ApplicationMaster.scala | 6 +- 2 files changed, 45 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a5d60e90210f1..45231e30b3dde 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -26,8 +26,6 @@ import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal -import org.apache.hadoop.security.{Credentials, UserGroupInformation} - import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil @@ -187,56 +185,54 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { Utils.initDaemon(log) - SparkHadoopUtil.get.runAsSparkUser { () => - // Debug code - Utils.checkHost(hostname) - - // Bootstrap to fetch the driver's Spark properties. - val executorConf = new SparkConf - val fetcher = RpcEnv.create( - "driverPropsFetcher", - hostname, - -1, - executorConf, - new SecurityManager(executorConf), - clientMode = true) - val driver = fetcher.setupEndpointRefByURI(driverUrl) - val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) - val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) - fetcher.shutdown() - - // Create SparkEnv using properties we fetched from the driver. - val driverConf = new SparkConf() - for ((key, value) <- props) { - // this is required for SSL in standalone mode - if (SparkConf.isExecutorStartupConf(key)) { - driverConf.setIfMissing(key, value) - } else { - driverConf.set(key, value) - } - } - if (driverConf.contains("spark.yarn.credentials.file")) { - logInfo("Will periodically update credentials from: " + - driverConf.get("spark.yarn.credentials.file")) - SparkHadoopUtil.get.startCredentialUpdater(driverConf) + // Debug code + Utils.checkHost(hostname) + + // Bootstrap to fetch the driver's Spark properties. + val executorConf = new SparkConf + val fetcher = RpcEnv.create( + "driverPropsFetcher", + hostname, + -1, + executorConf, + new SecurityManager(executorConf), + clientMode = true) + val driver = fetcher.setupEndpointRefByURI(driverUrl) + val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) + val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) + fetcher.shutdown() + + // Create SparkEnv using properties we fetched from the driver. + val driverConf = new SparkConf() + for ((key, value) <- props) { + // this is required for SSL in standalone mode + if (SparkConf.isExecutorStartupConf(key)) { + driverConf.setIfMissing(key, value) + } else { + driverConf.set(key, value) } + } + if (driverConf.contains("spark.yarn.credentials.file")) { + logInfo("Will periodically update credentials from: " + + driverConf.get("spark.yarn.credentials.file")) + SparkHadoopUtil.get.startCredentialUpdater(driverConf) + } - cfg.hadoopDelegationCreds.foreach { hadoopCreds => - val creds = SparkHadoopUtil.get.deserialize(hadoopCreds) - SparkHadoopUtil.get.addCurrentUserCredentials(creds) - } + cfg.hadoopDelegationCreds.foreach { hadoopCreds => + val creds = SparkHadoopUtil.get.deserialize(hadoopCreds) + SparkHadoopUtil.get.addCurrentUserCredentials(creds) + } - val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) + val env = SparkEnv.createExecutorEnv( + driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) - env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( - env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) - workerUrl.foreach { url => - env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) - } - env.rpcEnv.awaitTermination() - SparkHadoopUtil.get.stopCredentialUpdater() + env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( + env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) + workerUrl.foreach { url => + env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } + env.rpcEnv.awaitTermination() + SparkHadoopUtil.get.stopCredentialUpdater() } def main(args: Array[String]) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e227bff88f71d..097b493132519 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -775,10 +775,8 @@ object ApplicationMaster extends Logging { sys.props(k) = v } } - SparkHadoopUtil.get.runAsSparkUser { () => - master = new ApplicationMaster(amArgs, new YarnRMClient) - System.exit(master.run()) - } + master = new ApplicationMaster(amArgs, new YarnRMClient) + System.exit(master.run()) } private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {