From b54a0c4700e9811dc1cdd2e0ecc60549d7900460 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 12:18:28 -0800 Subject: [PATCH 01/19] Initial skeleton for Yarn shuffle service This includes the necessary changes in making the Yarn shuffle service a module of Spark that core doesn't depend on. This is included in the network-yarn module, which depends on the network-shuffle module. --- network/yarn/pom.xml | 78 +++++++++++++++++++ .../network/yarn/YarnShuffleService.java | 63 +++++++++++++++ pom.xml | 1 + project/SparkBuild.scala | 6 +- 4 files changed, 145 insertions(+), 3 deletions(-) create mode 100644 network/yarn/pom.xml create mode 100644 network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml new file mode 100644 index 0000000000000..f5eb088e88919 --- /dev/null +++ b/network/yarn/pom.xml @@ -0,0 +1,78 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.2.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-network-yarn_2.10 + jar + Spark Project Yarn Shuffle Service Code + http://spark.apache.org/ + + network-yarn + + + + + + org.apache.spark + spark-network-shuffle_2.10 + ${project.version} + + + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + provided + + + org.apache.hadoop + hadoop-yarn-client + provided + + + org.apache.hadoop + hadoop-client + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java new file mode 100644 index 0000000000000..7734cfadbb160 --- /dev/null +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -0,0 +1,63 @@ + +package org.apache.spark.network.yarn; + +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.Date; + +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; + +/** + * External shuffle service used by Spark on Yarn. + */ +public class YarnShuffleService extends AuxiliaryService { + + private static final JobTokenSecretManager secretManager = new JobTokenSecretManager(); + + public YarnShuffleService() { + super("sparkshuffleservice"); + log("--- [ Welcome to YarnShuffleService v0.1 ] ---"); + } + + @Override + public void initializeApplication(ApplicationInitializationContext context) { + ApplicationId appId = context.getApplicationId(); + log("Initializing application " + appId + "!"); + } + + @Override + public void stopApplication(ApplicationTerminationContext context) { + ApplicationId appId = context.getApplicationId(); + log("Stopping application " + appId + "!"); + } + + @Override + public ByteBuffer getMetaData() { + log("Getting meta data"); + return ByteBuffer.wrap("".getBytes()); + } + + @Override + public void initializeContainer(ContainerInitializationContext context) { + ContainerId containerId = context.getContainerId(); + log("Initializing container " + containerId + "!"); + } + + @Override + public void stopContainer(ContainerTerminationContext context) { + ContainerId containerId = context.getContainerId(); + log("Stopping container " + containerId + "!"); + } + + private void log(String msg) { + Timestamp timestamp = new Timestamp(new Date().getTime()); + System.out.println("* org.apache.spark.YarnShuffleService " + timestamp + ": " + msg); + } +} diff --git a/pom.xml b/pom.xml index 6191cd3a541e2..eeb65d1e25540 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ tools network/common network/shuffle + network/yarn streaming sql/catalyst sql/core diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 33618f5401768..2ac82f515a53d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -42,8 +42,8 @@ object BuildCommons { Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl") .map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples") - .map(ProjectRef(buildLocation, _)) + val assemblyProjects@Seq(assembly, examples, networkYarn) = + Seq("assembly", "examples", "network-yarn").map(ProjectRef(buildLocation, _)) val tools = ProjectRef(buildLocation, "tools") // Root project. @@ -143,7 +143,7 @@ object SparkBuild extends PomBuild { // TODO: Add Sql to mima checks allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl, - streamingFlumeSink, networkCommon, networkShuffle).contains(x)).foreach { + streamingFlumeSink, networkCommon, networkShuffle, networkYarn).contains(x)).foreach { x => enable(MimaBuild.mimaSettings(sparkHome, x))(x) } From 43dcb969f597b1eca6c765e05f212cabcab92b71 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 15:00:19 -0800 Subject: [PATCH 02/19] First cut integration of shuffle service with Yarn aux service --- .../network/yarn/YarnShuffleService.java | 55 +++++++++++++------ .../spark/deploy/yarn/ExecutorRunnable.scala | 2 + 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 7734cfadbb160..27cb6241445e3 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -1,10 +1,16 @@ package org.apache.spark.network.yarn; +import java.lang.Override; import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.util.Date; +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.util.TransportConf; +import org.apache.spark.network.util.SystemPropertyConfigProvider; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -13,51 +19,68 @@ import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * External shuffle service used by Spark on Yarn. */ public class YarnShuffleService extends AuxiliaryService { - + private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); private static final JobTokenSecretManager secretManager = new JobTokenSecretManager(); + private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; + private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; + public YarnShuffleService() { - super("sparkshuffleservice"); - log("--- [ Welcome to YarnShuffleService v0.1 ] ---"); + super("spark_shuffle"); + logger.info("Initializing Yarn shuffle service for Spark"); + } + + /** + * Start the shuffle server with the given configuration. + */ + @Override + protected void serviceInit(Configuration conf) { + try { + int port = conf.getInt( + SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); + TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider()); + RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); + TransportContext transportContext = new TransportContext(transportConf, rpcHandler); + transportContext.createServer(port); + } catch (Exception e) { + logger.error("Exception in starting Yarn shuffle service for Spark!", e); + } } @Override public void initializeApplication(ApplicationInitializationContext context) { ApplicationId appId = context.getApplicationId(); - log("Initializing application " + appId + "!"); + logger.debug("Initializing application " + appId + "!"); } @Override public void stopApplication(ApplicationTerminationContext context) { ApplicationId appId = context.getApplicationId(); - log("Stopping application " + appId + "!"); + logger.debug("Stopping application " + appId + "!"); } @Override public ByteBuffer getMetaData() { - log("Getting meta data"); - return ByteBuffer.wrap("".getBytes()); + logger.debug("Getting meta data"); + return ByteBuffer.allocate(0); } @Override public void initializeContainer(ContainerInitializationContext context) { ContainerId containerId = context.getContainerId(); - log("Initializing container " + containerId + "!"); + logger.debug("Initializing container " + containerId + "!"); } @Override public void stopContainer(ContainerTerminationContext context) { ContainerId containerId = context.getContainerId(); - log("Stopping container " + containerId + "!"); - } - - private void log(String msg) { - Timestamp timestamp = new Timestamp(new Date().getTime()); - System.out.println("* org.apache.spark.YarnShuffleService " + timestamp + ": " + msg); + logger.debug("Stopping container " + containerId + "!"); } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 0b5a92d87d722..cdaec9e3be6f6 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -89,6 +89,8 @@ class ExecutorRunnable( ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) + ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0))) + // Send the start request to the ContainerManager nmClient.startContainer(container, ctx) } From b4b1f0c4b6d693a9ab4c0d9950c70201d81fc0c2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 15:30:44 -0800 Subject: [PATCH 03/19] 4 tabs -> 2 tabs --- .../network/yarn/YarnShuffleService.java | 96 +++++++++---------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 27cb6241445e3..5f8a40d31266b 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -26,61 +26,61 @@ * External shuffle service used by Spark on Yarn. */ public class YarnShuffleService extends AuxiliaryService { - private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); - private static final JobTokenSecretManager secretManager = new JobTokenSecretManager(); + private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); + private static final JobTokenSecretManager secretManager = new JobTokenSecretManager(); - private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; - private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; + private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; + private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; - public YarnShuffleService() { - super("spark_shuffle"); - logger.info("Initializing Yarn shuffle service for Spark"); - } + public YarnShuffleService() { + super("spark_shuffle"); + logger.info("Initializing Yarn shuffle service for Spark"); + } - /** - * Start the shuffle server with the given configuration. - */ - @Override - protected void serviceInit(Configuration conf) { - try { - int port = conf.getInt( - SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider()); - RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); - TransportContext transportContext = new TransportContext(transportConf, rpcHandler); - transportContext.createServer(port); - } catch (Exception e) { - logger.error("Exception in starting Yarn shuffle service for Spark!", e); - } + /** + * Start the shuffle server with the given configuration. + */ + @Override + protected void serviceInit(Configuration conf) { + try { + int port = conf.getInt( + SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); + TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider()); + RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); + TransportContext transportContext = new TransportContext(transportConf, rpcHandler); + transportContext.createServer(port); + } catch (Exception e) { + logger.error("Exception in starting Yarn shuffle service for Spark", e); } + } - @Override - public void initializeApplication(ApplicationInitializationContext context) { - ApplicationId appId = context.getApplicationId(); - logger.debug("Initializing application " + appId + "!"); - } + @Override + public void initializeApplication(ApplicationInitializationContext context) { + ApplicationId appId = context.getApplicationId(); + logger.debug("Initializing application " + appId + "!"); + } - @Override - public void stopApplication(ApplicationTerminationContext context) { - ApplicationId appId = context.getApplicationId(); - logger.debug("Stopping application " + appId + "!"); - } + @Override + public void stopApplication(ApplicationTerminationContext context) { + ApplicationId appId = context.getApplicationId(); + logger.debug("Stopping application " + appId + "!"); + } - @Override - public ByteBuffer getMetaData() { - logger.debug("Getting meta data"); - return ByteBuffer.allocate(0); - } + @Override + public ByteBuffer getMetaData() { + logger.debug("Getting meta data"); + return ByteBuffer.allocate(0); + } - @Override - public void initializeContainer(ContainerInitializationContext context) { - ContainerId containerId = context.getContainerId(); - logger.debug("Initializing container " + containerId + "!"); - } + @Override + public void initializeContainer(ContainerInitializationContext context) { + ContainerId containerId = context.getContainerId(); + logger.debug("Initializing container " + containerId + "!"); + } - @Override - public void stopContainer(ContainerTerminationContext context) { - ContainerId containerId = context.getContainerId(); - logger.debug("Stopping container " + containerId + "!"); - } + @Override + public void stopContainer(ContainerTerminationContext context) { + ContainerId containerId = context.getContainerId(); + logger.debug("Stopping container " + containerId + "!"); + } } From 1bf5109dea8e3ef706ba10bbf95b6ed351facc1d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 16:08:28 -0800 Subject: [PATCH 04/19] Use the shuffle service port specified through hadoop config --- .../org/apache/spark/storage/BlockManager.scala | 15 ++++++++++++++- .../spark/network/yarn/YarnShuffleService.java | 1 + 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5f5dd0dc1c63f..2408d1b044087 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -30,6 +30,7 @@ import akka.actor.{ActorSystem, Props} import sun.nio.ch.DirectBuffer import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor._ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ @@ -92,7 +93,19 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) - private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337) + + // In Yarn, the shuffle service port maybe set through the Hadoop config + private val shuffleServicePortKey = "spark.shuffle.service.port" + private val externalShuffleServicePort = { + val sparkPort = conf.getInt(shuffleServicePortKey, 7337) + if (SparkHadoopUtil.get.isYarnMode) { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + Option(hadoopConf.get(shuffleServicePortKey)).map(_.toInt).getOrElse(sparkPort) + } else { + sparkPort + } + } + // Check that we're not using external shuffle service with consolidated shuffle files. if (externalShuffleServiceEnabled && conf.getBoolean("spark.shuffle.consolidateFiles", false) diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 5f8a40d31266b..c3c26b5253981 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -49,6 +49,7 @@ protected void serviceInit(Configuration conf) { RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); TransportContext transportContext = new TransportContext(transportConf, rpcHandler); transportContext.createServer(port); + logger.info("Started Yarn shuffle service for Spark on port " + port); } catch (Exception e) { logger.error("Exception in starting Yarn shuffle service for Spark", e); } From ea764e0015bbcf4a9e20ee1dbd909f10f93304e0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 16:23:18 -0800 Subject: [PATCH 05/19] Connect to Yarn shuffle service only if it's enabled --- .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index cdaec9e3be6f6..088122227e018 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -89,7 +89,11 @@ class ExecutorRunnable( ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) - ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0))) + // If external shuffle service is enabled, register with the + // Yarn shuffle service already started on the node manager + if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { + ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0))) + } // Send the start request to the ContainerManager nmClient.startContainer(container, ctx) From cd076a4ebbd71813e447cb4d16c720ea3c6f2466 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 16:34:22 -0800 Subject: [PATCH 06/19] Require external shuffle service for dynamic allocation --- .../spark/ExecutorAllocationManager.scala | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c11f1db0064fd..d6f0dea8c482b 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // Lower and upper bounds on the number of executors. These are required. private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1) private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1) - verifyBounds() // How long there must be backlogged tasks for before an addition is triggered private val schedulerBacklogTimeout = conf.getLong( @@ -77,9 +76,11 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout) // How long an executor must be idle for before it is removed - private val removeThresholdSeconds = conf.getLong( + private val executorIdleTimeout = conf.getLong( "spark.dynamicAllocation.executorIdleTimeout", 600) + validateSettings() + // Number of executors to add in the next round private var numExecutorsToAdd = 1 @@ -110,10 +111,11 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging private var clock: Clock = new RealClock /** - * Verify that the lower and upper bounds on the number of executors are valid. + * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. */ - private def verifyBounds(): Unit = { + private def validateSettings(): Unit = { + // Verify that bounds are valid if (minNumExecutors < 0 || maxNumExecutors < 0) { throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!") } @@ -124,6 +126,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!") } + // Verify that timeouts are positive + if (schedulerBacklogTimeout <= 0) { + throw new SparkException(s"spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!") + } + if (sustainedSchedulerBacklogTimeout <= 0) { + throw new SparkException( + s"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!") + } + if (executorIdleTimeout <= 0) { + throw new SparkException(s"spark.dynamicAllocation.executorIdleTimeout must be > 0!") + } + // Verify that external shuffle service is enabled + if (!conf.getBoolean("spark.shuffle.service.enabled", false)) { + throw new SparkException(s"Dynamic allocation of executors requires the external " + + s"shuffle service. You may enable this through spark.shuffle.service.enabled.") + } } /** @@ -254,7 +272,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging val removeRequestAcknowledged = testing || sc.killExecutor(executorId) if (removeRequestAcknowledged) { logInfo(s"Removing executor $executorId because it has been idle for " + - s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})") + s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})") executorsPendingToRemove.add(executorId) true } else { @@ -329,8 +347,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging private def onExecutorIdle(executorId: String): Unit = synchronized { if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { logDebug(s"Starting idle timer for $executorId because there are no more tasks " + - s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)") - removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000 + s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)") + removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000 } } From 804e7ffb952519dd2a4c400607c3ef432ae29321 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 16:38:48 -0800 Subject: [PATCH 07/19] Include the Yarn shuffle service jar in the distribution --- make-distribution.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/make-distribution.sh b/make-distribution.sh index 0bc839e1dbe4d..ad5e4f60e234e 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -181,6 +181,7 @@ echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DI # Copy jars cp "$FWDIR"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/" cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/" +cp "$FWDIR"/network/yarn/target/scala*/spark-network-yarn*.jar "$DISTDIR/lib/" # Copy example sources (needed for python and SQL) mkdir -p "$DISTDIR/examples/src/main" From 5b419b82652fea74d535a58fa50f789c8e8ebabb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 16:43:50 -0800 Subject: [PATCH 08/19] Add missing license header --- .../spark/network/yarn/YarnShuffleService.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c3c26b5253981..849f58d9d75d1 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.network.yarn; From 5bf9b7ec3f910e961678af25347ce4c811cdd4eb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 17:45:07 -0800 Subject: [PATCH 09/19] Address a few minor comments --- .../apache/spark/ExecutorAllocationManager.scala | 15 +++++++-------- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../spark/network/yarn/YarnShuffleService.java | 15 ++++++++++++--- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index d6f0dea8c482b..f03913bbfd273 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -115,7 +115,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging * If not, throw an appropriate exception. */ private def validateSettings(): Unit = { - // Verify that bounds are valid if (minNumExecutors < 0 || maxNumExecutors < 0) { throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!") } @@ -126,21 +125,21 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!") } - // Verify that timeouts are positive if (schedulerBacklogTimeout <= 0) { - throw new SparkException(s"spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!") + throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!") } if (sustainedSchedulerBacklogTimeout <= 0) { throw new SparkException( - s"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!") + "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!") } if (executorIdleTimeout <= 0) { - throw new SparkException(s"spark.dynamicAllocation.executorIdleTimeout must be > 0!") + throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!") } - // Verify that external shuffle service is enabled + // Require external shuffle service for dynamic allocation + // Otherwise, we may lose shuffle files when killing executors if (!conf.getBoolean("spark.shuffle.service.enabled", false)) { - throw new SparkException(s"Dynamic allocation of executors requires the external " + - s"shuffle service. You may enable this through spark.shuffle.service.enabled.") + throw new SparkException("Dynamic allocation of executors requires the external " + + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2408d1b044087..68aeceffd7a85 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -100,7 +100,7 @@ private[spark] class BlockManager( val sparkPort = conf.getInt(shuffleServicePortKey, 7337) if (SparkHadoopUtil.get.isYarnMode) { val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - Option(hadoopConf.get(shuffleServicePortKey)).map(_.toInt).getOrElse(sparkPort) + hadoopConf.getInt(shuffleServicePortKey, sparkPort) } else { sparkPort } diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 849f58d9d75d1..18f23a1680375 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -22,12 +22,12 @@ import org.apache.spark.network.TransportContext; import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.util.SystemPropertyConfigProvider; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -43,11 +43,13 @@ */ public class YarnShuffleService extends AuxiliaryService { private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); - private static final JobTokenSecretManager secretManager = new JobTokenSecretManager(); private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; + // Actual server that serves the shuffle files + private TransportServer shuffleServer = null; + public YarnShuffleService() { super("spark_shuffle"); logger.info("Initializing Yarn shuffle service for Spark"); @@ -64,7 +66,7 @@ protected void serviceInit(Configuration conf) { TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider()); RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); TransportContext transportContext = new TransportContext(transportConf, rpcHandler); - transportContext.createServer(port); + shuffleServer = transportContext.createServer(port); logger.info("Started Yarn shuffle service for Spark on port " + port); } catch (Exception e) { logger.error("Exception in starting Yarn shuffle service for Spark", e); @@ -100,4 +102,11 @@ public void stopContainer(ContainerTerminationContext context) { ContainerId containerId = context.getContainerId(); logger.debug("Stopping container " + containerId + "!"); } + + @Override + protected void serviceStop() { + if (shuffleServer != null) { + shuffleServer.close(); + } + } } From baff916ca0336b83334602e940842e083789ab4e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 17:53:04 -0800 Subject: [PATCH 10/19] Fix tests --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index f03913bbfd273..f9590c032c0a5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -137,7 +137,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging } // Require external shuffle service for dynamic allocation // Otherwise, we may lose shuffle files when killing executors - if (!conf.getBoolean("spark.shuffle.service.enabled", false)) { + if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) { throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } From 15a5b37567b30918a1d9b209019b87f76f47f0c7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 17:56:38 -0800 Subject: [PATCH 11/19] Fix build for Hadoop 1.x --- pom.xml | 3 ++- .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index eeb65d1e25540..6333415dfe196 100644 --- a/pom.xml +++ b/pom.xml @@ -93,7 +93,6 @@ tools network/common network/shuffle - network/yarn streaming sql/catalyst sql/core @@ -1230,6 +1229,7 @@ yarn-alpha yarn + network/yarn @@ -1237,6 +1237,7 @@ yarn yarn + network/yarn diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7ee4b5c842df1..c782363147bc2 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -90,6 +90,12 @@ class ExecutorRunnable( ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) + // If external shuffle service is enabled, register with the + // Yarn shuffle service already started on the node manager + if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { + ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0))) + } + // Send the start request to the ContainerManager val startReq = Records.newRecord(classOf[StartContainerRequest]) .asInstanceOf[StartContainerRequest] From f39daa63a24c832557099d08b4dfe0784865b8a0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 18:35:57 -0800 Subject: [PATCH 12/19] Do not make network-yarn an assembly module Currently the user will have to add all three network jars to the NM class path. A future PR will use a maven plugin to merge these into one. --- make-distribution.sh | 2 ++ project/SparkBuild.scala | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/make-distribution.sh b/make-distribution.sh index ad5e4f60e234e..fac7f7e284be4 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -182,6 +182,8 @@ echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DI cp "$FWDIR"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/" cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/" cp "$FWDIR"/network/yarn/target/scala*/spark-network-yarn*.jar "$DISTDIR/lib/" +cp "$FWDIR"/network/yarn/target/scala*/spark-network-shuffle*.jar "$DISTDIR/lib/" +cp "$FWDIR"/network/yarn/target/scala*/spark-network-common*.jar "$DISTDIR/lib/" # Copy example sources (needed for python and SQL) mkdir -p "$DISTDIR/examples/src/main" diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2ac82f515a53d..657e4b4432775 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -38,12 +38,12 @@ object BuildCommons { "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _)) - val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl, sparkKinesisAsl) = - Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl") - .map(ProjectRef(buildLocation, _)) + val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, networkYarn, java8Tests, + sparkGangliaLgpl, sparkKinesisAsl) = Seq("yarn", "yarn-stable", "yarn-alpha", "network-yarn", + "java8-tests", "ganglia-lgpl", "kinesis-asl").map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(assembly, examples, networkYarn) = - Seq("assembly", "examples", "network-yarn").map(ProjectRef(buildLocation, _)) + val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples") + .map(ProjectRef(buildLocation, _)) val tools = ProjectRef(buildLocation, "tools") // Root project. From f48b20c8cc698ca38d57458d6280bbb8e7b194f0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 3 Nov 2014 22:52:31 -0800 Subject: [PATCH 13/19] Fix tests again --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index f9590c032c0a5..4819cf7783323 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -79,6 +79,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging private val executorIdleTimeout = conf.getLong( "spark.dynamicAllocation.executorIdleTimeout", 600) + // Whether we are testing this class. This should only be used internally. + private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) + validateSettings() // Number of executors to add in the next round @@ -104,9 +107,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // Polling loop interval (ms) private val intervalMillis: Long = 100 - // Whether we are testing this class. This should only be used internally. - private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) - // Clock used to schedule when executors should be added and removed private var clock: Clock = new RealClock From 9b6e0587555cada043cad4cf264d454e46feca57 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 4 Nov 2014 11:44:42 -0800 Subject: [PATCH 14/19] Address various feedback This includes moving the Yarn logic out of BM into Utils, updating a few test comments, adding a HadoopConfigProvider and using it in the Yarn shuffle service. --- .../spark/ExecutorAllocationManager.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 15 +------ .../scala/org/apache/spark/util/Utils.scala | 16 +++++++ .../network/yarn/YarnShuffleService.java | 16 +++---- .../yarn/util/HadoopConfigProvider.java | 42 +++++++++++++++++++ 5 files changed, 68 insertions(+), 23 deletions(-) create mode 100644 network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 4819cf7783323..ef93009a074e7 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -79,7 +79,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging private val executorIdleTimeout = conf.getLong( "spark.dynamicAllocation.executorIdleTimeout", 600) - // Whether we are testing this class. This should only be used internally. + // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) validateSettings() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 68aeceffd7a85..428eafbec9756 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -30,7 +30,6 @@ import akka.actor.{ActorSystem, Props} import sun.nio.ch.DirectBuffer import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor._ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ @@ -41,7 +40,6 @@ import org.apache.spark.network.util.{ConfigProvider, TransportConf} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.shuffle.hash.HashShuffleManager -import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.util._ private[spark] sealed trait BlockValues @@ -93,18 +91,7 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) - - // In Yarn, the shuffle service port maybe set through the Hadoop config - private val shuffleServicePortKey = "spark.shuffle.service.port" - private val externalShuffleServicePort = { - val sparkPort = conf.getInt(shuffleServicePortKey, 7337) - if (SparkHadoopUtil.get.isYarnMode) { - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - hadoopConf.getInt(shuffleServicePortKey, sparkPort) - } else { - sparkPort - } - } + private val externalShuffleServicePort = Utils.getExternalShuffleServicePort(conf) // Check that we're not using external shuffle service with consolidated shuffle files. if (externalShuffleServiceEnabled diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a33046d2040d8..8a4267a1ae179 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -44,6 +44,7 @@ import org.json4s._ import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** CallSite represents a place in user code. It can have a short and a long form. */ @@ -1767,6 +1768,21 @@ private[spark] object Utils extends Logging { val manifest = new JarManifest(manifestUrl.openStream()) manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION) }.getOrElse("Unknown") + + /** + * Return the port used in the external shuffle service as specified through + * `spark.shuffle.service.port`. In Yarn, this is set in the Hadoop configuration. + */ + def getExternalShuffleServicePort(conf: SparkConf): Int = { + val shuffleServicePortKey = "spark.shuffle.service.port" + val sparkPort = conf.getInt(shuffleServicePortKey, 7337) + if (SparkHadoopUtil.get.isYarnMode) { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + hadoopConf.getInt(shuffleServicePortKey, sparkPort) + } else { + sparkPort + } + } } /** diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 18f23a1680375..79caac164ec1e 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -20,13 +20,6 @@ import java.lang.Override; import java.nio.ByteBuffer; -import org.apache.spark.network.TransportContext; -import org.apache.spark.network.server.RpcHandler; -import org.apache.spark.network.server.TransportServer; -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; -import org.apache.spark.network.util.TransportConf; -import org.apache.spark.network.util.SystemPropertyConfigProvider; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -38,6 +31,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.server.TransportServer; +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.util.TransportConf; +import org.apache.spark.network.yarn.util.HadoopConfigProvider; + /** * External shuffle service used by Spark on Yarn. */ @@ -63,7 +63,7 @@ protected void serviceInit(Configuration conf) { try { int port = conf.getInt( SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider()); + TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); TransportContext transportContext = new TransportContext(transportConf, rpcHandler); shuffleServer = transportContext.createServer(port); diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java new file mode 100644 index 0000000000000..884861752e80d --- /dev/null +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn.util; + +import java.util.NoSuchElementException; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.spark.network.util.ConfigProvider; + +/** Use the Hadoop configuration to obtain config values. */ +public class HadoopConfigProvider extends ConfigProvider { + private final Configuration conf; + + public HadoopConfigProvider(Configuration conf) { + this.conf = conf; + } + + @Override + public String get(String name) { + String value = conf.get(name); + if (value == null) { + throw new NoSuchElementException(name); + } + return value; + } +} From d1124e413590a9b74f31c317ab919fcea4ca0118 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 4 Nov 2014 23:39:59 -0800 Subject: [PATCH 15/19] Add security to shuffle service (INCOMPLETE) This commit adds logic that allows the shuffle server to use SASL to authenticate shuffle requests. This is currently not working yet for two reasons: (1) The ExternalShuffleClient doesn't actually have the authentication logic yet. This will be implemented shortly in a separate PR. (2) All supported Yarn versions use a different version of guava that does not have the base encoding util functions that the SASL server uses. This will also be fixed in a separate PR. --- .../apache/spark/storage/BlockManager.scala | 6 +- .../scala/org/apache/spark/util/Utils.scala | 16 +-- .../network/sasl/ShuffleSecretManager.java | 117 ++++++++++++++++++ .../network/yarn/YarnShuffleService.java | 69 ++++++++--- .../spark/deploy/yarn/ExecutorRunnable.scala | 11 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 11 +- 6 files changed, 205 insertions(+), 25 deletions(-) create mode 100644 network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 488ba78b194e5..8859111cbe9c9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -95,7 +95,11 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) - private val externalShuffleServicePort = Utils.getExternalShuffleServicePort(conf) + + // Port used by the external shuffle service. In Yarn mode, this may be already be + // set through the Hadoop configuration as the server is launched in the Yarn NM. + private val externalShuffleServicePort = + Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt // Check that we're not using external shuffle service with consolidated shuffle files. if (externalShuffleServiceEnabled diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5f64e5bc59852..7caf6bcf94ef3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1783,19 +1783,19 @@ private[spark] object Utils extends Logging { }.getOrElse("Unknown") /** - * Return the port used in the external shuffle service as specified through - * `spark.shuffle.service.port`. In Yarn, this is set in the Hadoop configuration. + * Return the value of a config either through the SparkConf or the Hadoop configuration + * if this is Yarn mode. In the latter case, this defaults to the value set through SparkConf + * if the key is not set in the Hadoop configuration. */ - def getExternalShuffleServicePort(conf: SparkConf): Int = { - val shuffleServicePortKey = "spark.shuffle.service.port" - val sparkPort = conf.getInt(shuffleServicePortKey, 7337) + def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): String = { + val sparkValue = conf.get(key, default) if (SparkHadoopUtil.get.isYarnMode) { - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - hadoopConf.getInt(shuffleServicePortKey, sparkPort) + SparkHadoopUtil.get.newConfiguration(conf).get(key, sparkValue) } else { - sparkPort + sparkValue } } + } /** diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java new file mode 100644 index 0000000000000..4ffdb050f09a9 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.sasl; + +import java.lang.Override; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.sasl.SecretKeyHolder; + +/** + * A class that manages shuffle secret used by the external shuffle service. + */ +public class ShuffleSecretManager implements SecretKeyHolder { + private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class); + private final ConcurrentHashMap shuffleSecretMap; + + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + // Spark user used for authenticating SASL connections + // Note that this should match the value in org.apache.spark.SecurityManager + private static final String SPARK_SASL_USER = "sparkSaslUser"; + + /** + * Convert the given string to a byte buffer that can be converted back to a string + * through {@link #bytesToString(ByteBuffer)}. This is used if the external shuffle + * service represents shuffle secrets as bytes buffers instead of strings. + */ + public static ByteBuffer stringToBytes(String s) { + return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET)); + } + + /** + * Convert the given byte buffer to a string that can be converted back to a byte + * buffer through {@link #stringToBytes(String)}. This is used if the external shuffle + * service represents shuffle secrets as bytes buffers instead of strings. + */ + public static String bytesToString(ByteBuffer b) { + return new String(b.array(), UTF8_CHARSET); + } + + public ShuffleSecretManager() { + shuffleSecretMap = new ConcurrentHashMap(); + } + + /** + * Register the specified application with its secret. + * Executors need to first authenticate themselves with the same secret before + * the fetching shuffle files written by other executors in this application. + */ + public void registerApp(String appId, String shuffleSecret) { + if (!shuffleSecretMap.contains(appId)) { + shuffleSecretMap.put(appId, shuffleSecret); + logger.info("Registered shuffle secret for application {}", appId); + } else { + logger.debug("Application {} already registered", appId); + } + } + + /** + * Register the specified application with its secret specified as a byte buffer. + */ + public void registerApp(String appId, ByteBuffer shuffleSecret) { + registerApp(appId, bytesToString(shuffleSecret)); + } + + /** + * Unregister the specified application along with its secret. + * This is called when an application terminates. + */ + public void unregisterApp(String appId) { + if (shuffleSecretMap.contains(appId)) { + shuffleSecretMap.remove(appId); + logger.info("Unregistered shuffle secret for application {}", appId); + } else { + logger.warn("Attempted to unregister application {} when it is not registered", appId); + } + } + + /** + * Return the Spark user for authenticating SASL connections. + */ + @Override + public String getSaslUser(String appId) { + return SPARK_SASL_USER; + } + + /** + * Return the secret key registered with the specified application. + * This key is used to authenticate the executors in the application + * before they can fetch shuffle files from the external shuffle service. + * If the application is not registered, return null. + */ + @Override + public String getSecretKey(String appId) { + return shuffleSecretMap.get(appId); + } +} diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 79caac164ec1e..84d6fdd9b7d72 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory; import org.apache.spark.network.TransportContext; +import org.apache.spark.network.sasl.SaslRpcHandler; +import org.apache.spark.network.sasl.ShuffleSecretManager; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; @@ -44,9 +46,18 @@ public class YarnShuffleService extends AuxiliaryService { private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); + // Port on which the shuffle server listens for fetch requests private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; + // Whether the shuffle server should authenticate fetch requests + private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; + private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; + + // An entity that manages the shuffle secret per application + // This is used only if authentication is enabled + private ShuffleSecretManager secretManager; + // Actual server that serves the shuffle files private TransportServer shuffleServer = null; @@ -55,19 +66,38 @@ public YarnShuffleService() { logger.info("Initializing Yarn shuffle service for Spark"); } + /** + * Return whether authentication is enabled as specified by the configuration. + * If so, fetch requests will fail unless the appropriate authentication secret + * for the application is provided. + */ + private boolean isAuthenticationEnabled() { + return secretManager != null; + } + /** * Start the shuffle server with the given configuration. */ @Override protected void serviceInit(Configuration conf) { try { + // If authentication is enabled, set up the shuffle server to use a + // special RPC handler that filters out unauthenticated fetch requests + boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); + if (authEnabled) { + secretManager = new ShuffleSecretManager(); + rpcHandler = new SaslRpcHandler(rpcHandler, secretManager); + } + int port = conf.getInt( SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); - RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); TransportContext transportContext = new TransportContext(transportConf, rpcHandler); shuffleServer = transportContext.createServer(port); - logger.info("Started Yarn shuffle service for Spark on port " + port); + String authEnabledString = authEnabled ? "enabled" : "not enabled"; + logger.info("Started Yarn shuffle service for Spark on port {}. " + + "Authentication is {}.", port, authEnabledString); } catch (Exception e) { logger.error("Exception in starting Yarn shuffle service for Spark", e); } @@ -75,38 +105,49 @@ protected void serviceInit(Configuration conf) { @Override public void initializeApplication(ApplicationInitializationContext context) { - ApplicationId appId = context.getApplicationId(); - logger.debug("Initializing application " + appId + "!"); + String appId = context.getApplicationId().toString(); + ByteBuffer shuffleSecret = context.getApplicationDataForService(); + logger.debug("Initializing application {}", appId); + if (isAuthenticationEnabled()) { + secretManager.registerApp(appId, shuffleSecret); + } } @Override public void stopApplication(ApplicationTerminationContext context) { - ApplicationId appId = context.getApplicationId(); - logger.debug("Stopping application " + appId + "!"); - } - - @Override - public ByteBuffer getMetaData() { - logger.debug("Getting meta data"); - return ByteBuffer.allocate(0); + String appId = context.getApplicationId().toString(); + logger.debug("Stopping application {}", appId); + if (isAuthenticationEnabled()) { + secretManager.unregisterApp(appId); + } } @Override public void initializeContainer(ContainerInitializationContext context) { ContainerId containerId = context.getContainerId(); - logger.debug("Initializing container " + containerId + "!"); + logger.debug("Initializing container {}", containerId); } @Override public void stopContainer(ContainerTerminationContext context) { ContainerId containerId = context.getContainerId(); - logger.debug("Stopping container " + containerId + "!"); + logger.debug("Stopping container {}", containerId); } + /** + * Close the shuffle server to clean up any associated state. + */ @Override protected void serviceStop() { if (shuffleServer != null) { shuffleServer.close(); } } + + // Not currently used + @Override + public ByteBuffer getMetaData() { + return ByteBuffer.allocate(0); + } + } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index c782363147bc2..b848205513953 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} import org.apache.spark.{SecurityManager, SparkConf, Logging} +import org.apache.spark.network.sasl.ShuffleSecretManager @deprecated("use yarn/stable", "1.2.0") class ExecutorRunnable( @@ -93,7 +94,15 @@ class ExecutorRunnable( // If external shuffle service is enabled, register with the // Yarn shuffle service already started on the node manager if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { - ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0))) + val secretString = securityMgr.getSecretKey() + val secretBytes = + if (secretString != null) { + ShuffleSecretManager.stringToBytes(secretString) + } else { + // Authentication is not enabled, so just provide dummy metadata + ByteBuffer.allocate(0) + } + ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes)) } // Send the start request to the ContainerManager diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 088122227e018..533a322878be5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} import org.apache.spark.{SecurityManager, SparkConf, Logging} +import org.apache.spark.network.sasl.ShuffleSecretManager class ExecutorRunnable( @@ -92,7 +93,15 @@ class ExecutorRunnable( // If external shuffle service is enabled, register with the // Yarn shuffle service already started on the node manager if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { - ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> ByteBuffer.allocate(0))) + val secretString = securityMgr.getSecretKey() + val secretBytes = + if (secretString != null) { + ShuffleSecretManager.stringToBytes(secretString) + } else { + // Authentication is not enabled, so just provide dummy metadata + ByteBuffer.allocate(0) + } + ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes)) } // Send the start request to the ContainerManager From 7b71d8f937840a955cb9efd5a2a3cf7256abc13f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 5 Nov 2014 13:41:30 -0800 Subject: [PATCH 16/19] Add detailed java docs + reword a few comments --- .../network/sasl/ShuffleSecretManager.java | 30 +++++++++---------- .../network/yarn/YarnShuffleService.java | 25 ++++++++++++---- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java index 4ffdb050f09a9..0c1fa9903601e 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java @@ -41,18 +41,18 @@ public class ShuffleSecretManager implements SecretKeyHolder { private static final String SPARK_SASL_USER = "sparkSaslUser"; /** - * Convert the given string to a byte buffer that can be converted back to a string - * through {@link #bytesToString(ByteBuffer)}. This is used if the external shuffle - * service represents shuffle secrets as bytes buffers instead of strings. + * Convert the given string to a byte buffer. The resulting buffer can be converted back to + * the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external + * shuffle service represents shuffle secrets as bytes buffers instead of strings. */ public static ByteBuffer stringToBytes(String s) { return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET)); } /** - * Convert the given byte buffer to a string that can be converted back to a byte - * buffer through {@link #stringToBytes(String)}. This is used if the external shuffle - * service represents shuffle secrets as bytes buffers instead of strings. + * Convert the given byte buffer to a string. The resulting string can be converted back to + * the same byte buffer through {@link #stringToBytes(String)}. This is used if the external + * shuffle service represents shuffle secrets as bytes buffers instead of strings. */ public static String bytesToString(ByteBuffer b) { return new String(b.array(), UTF8_CHARSET); @@ -63,9 +63,9 @@ public ShuffleSecretManager() { } /** - * Register the specified application with its secret. + * Register an application with its secret. * Executors need to first authenticate themselves with the same secret before - * the fetching shuffle files written by other executors in this application. + * fetching shuffle files written by other executors in this application. */ public void registerApp(String appId, String shuffleSecret) { if (!shuffleSecretMap.contains(appId)) { @@ -77,15 +77,15 @@ public void registerApp(String appId, String shuffleSecret) { } /** - * Register the specified application with its secret specified as a byte buffer. + * Register an application with its secret specified as a byte buffer. */ public void registerApp(String appId, ByteBuffer shuffleSecret) { registerApp(appId, bytesToString(shuffleSecret)); } /** - * Unregister the specified application along with its secret. - * This is called when an application terminates. + * Unregister an application along with its secret. + * This is called when the application terminates. */ public void unregisterApp(String appId) { if (shuffleSecretMap.contains(appId)) { @@ -105,10 +105,10 @@ public String getSaslUser(String appId) { } /** - * Return the secret key registered with the specified application. - * This key is used to authenticate the executors in the application - * before they can fetch shuffle files from the external shuffle service. - * If the application is not registered, return null. + * Return the secret key registered with the given application. + * This key is used to authenticate the executors before they can fetch shuffle files + * written by this application from the external shuffle service. If the specified + * application is not registered, return null. */ @Override public String getSecretKey(String appId) { diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 84d6fdd9b7d72..f6be343360001 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -41,7 +41,22 @@ import org.apache.spark.network.yarn.util.HadoopConfigProvider; /** - * External shuffle service used by Spark on Yarn. + * An external shuffle service used by Spark on Yarn. + * + * This is intended to be a long-running auxiliary service that runs in the NodeManager process. + * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`. + * The application also automatically derives the service port through `spark.shuffle.service.port` + * specified in the Yarn configuration. This is so that both the clients and the server agree on + * the same port to communicate on. + * + * The service also optionally supports authentication. This ensures that executors from one + * application cannot read the shuffle files written by those from another. This feature can be + * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM. + * Note that the Spark application must also set `spark.authenticate` manually and, unlike in + * the case of the service port, will not inherit this setting from the Yarn configuration. This + * is because an application running on the same Yarn cluster may choose to not use the external + * shuffle service, in which case its setting of `spark.authenticate` should be independent of + * the service's. */ public class YarnShuffleService extends AuxiliaryService { private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); @@ -58,12 +73,12 @@ public class YarnShuffleService extends AuxiliaryService { // This is used only if authentication is enabled private ShuffleSecretManager secretManager; - // Actual server that serves the shuffle files + // The actual server that serves shuffle files private TransportServer shuffleServer = null; public YarnShuffleService() { super("spark_shuffle"); - logger.info("Initializing Yarn shuffle service for Spark"); + logger.info("Initializing YARN shuffle service for Spark"); } /** @@ -96,10 +111,10 @@ protected void serviceInit(Configuration conf) { TransportContext transportContext = new TransportContext(transportConf, rpcHandler); shuffleServer = transportContext.createServer(port); String authEnabledString = authEnabled ? "enabled" : "not enabled"; - logger.info("Started Yarn shuffle service for Spark on port {}. " + + logger.info("Started YARN shuffle service for Spark on port {}. " + "Authentication is {}.", port, authEnabledString); } catch (Exception e) { - logger.error("Exception in starting Yarn shuffle service for Spark", e); + logger.error("Exception in starting YARN shuffle service for Spark", e); } } From 6489db5396c4974fbb4a0888cef1bc4604a92e0d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 5 Nov 2014 13:50:19 -0800 Subject: [PATCH 17/19] Try catch at the right places On service init, we want to fail fast if the server fails to start for any reason (e.g. port bind exception). However, once the NM has been running for a while, it should be super robust so we need to add try catches around the application initialization / stopping code. --- .../network/yarn/YarnShuffleService.java | 70 +++++++++++-------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index f6be343360001..bb0b8f7e6cba6 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -95,58 +95,62 @@ private boolean isAuthenticationEnabled() { */ @Override protected void serviceInit(Configuration conf) { - try { - // If authentication is enabled, set up the shuffle server to use a - // special RPC handler that filters out unauthenticated fetch requests - boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); - RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); - if (authEnabled) { - secretManager = new ShuffleSecretManager(); - rpcHandler = new SaslRpcHandler(rpcHandler, secretManager); - } - - int port = conf.getInt( - SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); - TransportContext transportContext = new TransportContext(transportConf, rpcHandler); - shuffleServer = transportContext.createServer(port); - String authEnabledString = authEnabled ? "enabled" : "not enabled"; - logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}.", port, authEnabledString); - } catch (Exception e) { - logger.error("Exception in starting YARN shuffle service for Spark", e); + // If authentication is enabled, set up the shuffle server to use a + // special RPC handler that filters out unauthenticated fetch requests + boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); + if (authEnabled) { + secretManager = new ShuffleSecretManager(); + rpcHandler = new SaslRpcHandler(rpcHandler, secretManager); } + + int port = conf.getInt( + SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); + TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); + TransportContext transportContext = new TransportContext(transportConf, rpcHandler); + shuffleServer = transportContext.createServer(port); + String authEnabledString = authEnabled ? "enabled" : "not enabled"; + logger.info("Started YARN shuffle service for Spark on port {}. " + + "Authentication is {}.", port, authEnabledString); } @Override public void initializeApplication(ApplicationInitializationContext context) { String appId = context.getApplicationId().toString(); - ByteBuffer shuffleSecret = context.getApplicationDataForService(); - logger.debug("Initializing application {}", appId); - if (isAuthenticationEnabled()) { - secretManager.registerApp(appId, shuffleSecret); + try { + ByteBuffer shuffleSecret = context.getApplicationDataForService(); + logger.info("Initializing application {}", appId); + if (isAuthenticationEnabled()) { + secretManager.registerApp(appId, shuffleSecret); + } + } catch (Exception e) { + logger.error("Exception when initializing application {}", appId, e); } } @Override public void stopApplication(ApplicationTerminationContext context) { String appId = context.getApplicationId().toString(); - logger.debug("Stopping application {}", appId); - if (isAuthenticationEnabled()) { - secretManager.unregisterApp(appId); + try { + logger.info("Stopping application {}", appId); + if (isAuthenticationEnabled()) { + secretManager.unregisterApp(appId); + } + } catch (Exception e) { + logger.error("Exception when stopping application {}", appId, e); } } @Override public void initializeContainer(ContainerInitializationContext context) { ContainerId containerId = context.getContainerId(); - logger.debug("Initializing container {}", containerId); + logger.info("Initializing container {}", containerId); } @Override public void stopContainer(ContainerTerminationContext context) { ContainerId containerId = context.getContainerId(); - logger.debug("Stopping container {}", containerId); + logger.info("Stopping container {}", containerId); } /** @@ -154,8 +158,12 @@ public void stopContainer(ContainerTerminationContext context) { */ @Override protected void serviceStop() { - if (shuffleServer != null) { - shuffleServer.close(); + try { + if (shuffleServer != null) { + shuffleServer.close(); + } + } catch (Exception e) { + logger.error("Exception when stopping service", e); } } From 1c66046906f9b06a668bc44867b0211c46d7b197 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 5 Nov 2014 14:22:35 -0800 Subject: [PATCH 18/19] Remove unused provided dependencies --- network/yarn/pom.xml | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index f5eb088e88919..e60d8c1f7876c 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -44,26 +44,6 @@ - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - org.apache.hadoop - hadoop-yarn-server-web-proxy - provided - - - org.apache.hadoop - hadoop-yarn-client - provided - org.apache.hadoop hadoop-client From 0ee67a229f05b33ab4dc561ab6368e039188f04f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 5 Nov 2014 14:41:01 -0800 Subject: [PATCH 19/19] Minor wording suggestions --- .../org/apache/spark/network/sasl/ShuffleSecretManager.java | 2 +- .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 5 +++-- .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 5 +++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java index 0c1fa9903601e..e66c4af0f1ebd 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java @@ -37,7 +37,7 @@ public class ShuffleSecretManager implements SecretKeyHolder { private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); // Spark user used for authenticating SASL connections - // Note that this should match the value in org.apache.spark.SecurityManager + // Note that this must match the value in org.apache.spark.SecurityManager private static final String SPARK_SASL_USER = "sparkSaslUser"; /** diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index b848205513953..5f47c79cabaee 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -91,8 +91,9 @@ class ExecutorRunnable( ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) - // If external shuffle service is enabled, register with the - // Yarn shuffle service already started on the node manager + // If external shuffle service is enabled, register with the Yarn shuffle service already + // started on the NodeManager and, if authentication is enabled, provide it with our secret + // key for fetching shuffle files later if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { val secretString = securityMgr.getSecretKey() val secretBytes = diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 533a322878be5..18f48b4b6caf6 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -90,8 +90,9 @@ class ExecutorRunnable( ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) - // If external shuffle service is enabled, register with the - // Yarn shuffle service already started on the node manager + // If external shuffle service is enabled, register with the Yarn shuffle service already + // started on the NodeManager and, if authentication is enabled, provide it with our secret + // key for fetching shuffle files later if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { val secretString = securityMgr.getSecretKey() val secretBytes =