diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 548a5ccd1385b..cb6d5d0ca2037 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.ByteBuffer; import java.util.List; @@ -75,6 +76,20 @@ * is because an application running on the same Yarn cluster may choose to not use the external * shuffle service, in which case its setting of `spark.authenticate` should be independent of * the service's. + * + * The shuffle service will produce metrics via the YARN NodeManager's {@code metrics2} system + * under a namespace specified by the {@value SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY} config. + * + * By default, all configurations for the shuffle service will be taken directly from the + * Hadoop {@link Configuration} passed by the YARN NodeManager. It is also possible to configure + * the shuffle service by placing a resource named + * {@value SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME} into the classpath, which should be an + * XML file in the standard Hadoop Configuration resource format. Note that when the shuffle + * service is loaded in the default manner, without configuring + * {@code yarn.nodemanager.aux-services..classpath}, this file must be on the classpath + * of the NodeManager itself. When using the {@code classpath} configuration, it can be present + * either on the NodeManager's classpath, or specified in the classpath configuration. + * This {@code classpath} configuration is only supported on YARN versions >= 2.9.0. */ public class YarnShuffleService extends AuxiliaryService { private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class); @@ -83,6 +98,14 @@ public class YarnShuffleService extends AuxiliaryService { private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port"; private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337; + /** + * The namespace to use for the metrics record which will contain all metrics produced by the + * shuffle service. + */ + static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY = + "spark.yarn.shuffle.service.metrics.namespace"; + private static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService"; + // Whether the shuffle server should authenticate fetch requests private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; @@ -103,6 +126,13 @@ public class YarnShuffleService extends AuxiliaryService { private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider .StoreVersion(1, 0); + /** + * The name of the resource to search for on the classpath to find a shuffle service-specific + * configuration overlay. If found, this will be parsed as a standard Hadoop + * {@link Configuration config} file and will override the configs passed from the NodeManager. + */ + static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME = "spark-shuffle-site.xml"; + // just for integration tests that want to look at this file -- in general not sensible as // a static @VisibleForTesting @@ -139,6 +169,13 @@ public class YarnShuffleService extends AuxiliaryService { private DB db; public YarnShuffleService() { + // The name of the auxiliary service configured within the NodeManager + // (`yarn.nodemanager.aux-services`) is treated as the source-of-truth, so this one can be + // arbitrary. The NodeManager will log a warning if the configured name doesn't match this name, + // to inform operators of a potential misconfiguration, but this name is otherwise not used. + // It is hard-coded instead of using the value of the `spark.shuffle.service.name` configuration + // because at this point in instantiation there is no Configuration object; it is not passed + // until `serviceInit` is called, at which point it's too late to adjust the name. super("spark_shuffle"); logger.info("Initializing YARN shuffle service for Spark"); instance = this; @@ -157,10 +194,18 @@ private boolean isAuthenticationEnabled() { * Start the shuffle server with the given configuration. */ @Override - protected void serviceInit(Configuration conf) throws Exception { - _conf = conf; + protected void serviceInit(Configuration externalConf) throws Exception { + _conf = new Configuration(externalConf); + URL confOverlayUrl = Thread.currentThread().getContextClassLoader() + .getResource(SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME); + if (confOverlayUrl != null) { + logger.info("Initializing Spark YARN shuffle service with configuration overlay from {}", + confOverlayUrl); + _conf.addResource(confOverlayUrl); + } + super.serviceInit(_conf); - boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); try { // In case this NM was killed while there were running spark applications, we need to restore @@ -172,7 +217,7 @@ protected void serviceInit(Configuration conf) throws Exception { registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); } - TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); + TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf)); MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance( transportConf); blockHandler = new ExternalBlockHandler( @@ -181,7 +226,7 @@ protected void serviceInit(Configuration conf) throws Exception { // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); - boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + boolean authEnabled = _conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); if (authEnabled) { secretManager = new ShuffleSecretManager(); if (_recoveryPath != null) { @@ -190,7 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception { bootstraps.add(new AuthServerBootstrap(transportConf, secretManager)); } - int port = conf.getInt( + int port = _conf.getInt( SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); transportContext = new TransportContext(transportConf, blockHandler, true); shuffleServer = transportContext.createServer(port, bootstraps); @@ -203,13 +248,16 @@ protected void serviceInit(Configuration conf) throws Exception { blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", shuffleServer.getRegisteredConnections()); blockHandler.getAllMetrics().getMetrics().putAll(shuffleServer.getAllMetrics().getMetrics()); + String metricsNamespace = _conf.get(SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY, + DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME); YarnShuffleServiceMetrics serviceMetrics = - new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics()); MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); metricsSystem.register( - "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); - logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + metricsNamespace, "Metrics on the Spark Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using namespace '{}'", + metricsNamespace); logger.info("Started YARN shuffle service for Spark on port {}. " + "Authentication is {}. Registered executor file is {}", port, authEnabledString, diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 81be6e8036ffe..f30abbd0f7fcd 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -32,9 +32,11 @@ */ class YarnShuffleServiceMetrics implements MetricsSource { + private final String metricsNamespace; private final MetricSet metricSet; - YarnShuffleServiceMetrics(MetricSet metricSet) { + YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet) { + this.metricsNamespace = metricsNamespace; this.metricSet = metricSet; } @@ -46,7 +48,7 @@ class YarnShuffleServiceMetrics implements MetricsSource { */ @Override public void getMetrics(MetricsCollector collector, boolean all) { - MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService"); + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord(metricsNamespace); for (Map.Entry entry : metricSet.getMetrics().entrySet()) { collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d988e522c3df7..1a18856e4156c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -680,6 +680,16 @@ package object config { private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337) + private[spark] val SHUFFLE_SERVICE_NAME = + ConfigBuilder("spark.shuffle.service.name") + .doc("The configured name of the Spark shuffle service the client should communicate with. " + + "This must match the name used to configure the Shuffle within the YARN NodeManager " + + "configuration (`yarn.nodemanager.aux-services`). Only takes effect when " + + s"$SHUFFLE_SERVICE_ENABLED is set to true.") + .version("3.2.0") + .stringConf + .createWithDefault("spark_shuffle") + private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab") .doc("Location of user's keytab.") .version("3.0.0") diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 72df64b3efc01..73bb76af65660 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -773,8 +773,28 @@ The following extra configuration options are available when the shuffle service NodeManagers where the Spark Shuffle Service is not running. + + spark.yarn.shuffle.service.metrics.namespace + sparkShuffleService + + The namespace to use when emitting shuffle service metrics into Hadoop metrics2 system of the + NodeManager. + + +Please note that the instructions above assume that the default shuffle service name, +`spark_shuffle`, has been used. It is possible to use any name here, but the values used in the +YARN NodeManager configurations must match the value of `spark.shuffle.service.name` in the +Spark application. + +The shuffle service will, by default, take all of its configurations from the Hadoop Configuration +used by the NodeManager (e.g. `yarn-site.xml`). However, it is also possible to configure the +shuffle service independently using a file named `spark-shuffle-site.xml` which should be placed +onto the classpath of the shuffle service (which is, by default, shared with the classpath of the +NodeManager). The shuffle service will treat this as a standard Hadoop Configuration resource and +overlay it on top of the NodeManager's configuration. + # Launching your application with Apache Oozie Apache Oozie can launch Spark applications as part of a workflow. @@ -823,3 +843,54 @@ do the following: to the list of filters in the spark.ui.filters configuration. Be aware that the history server information may not be up-to-date with the application's state. + +# Running multiple versions of the Spark Shuffle Service + +Please note that this section only applies when running on YARN versions >= 2.9.0. + +In some cases it may be desirable to run multiple instances of the Spark Shuffle Service which are +using different versions of Spark. This can be helpful, for example, when running a YARN cluster +with a mixed workload of applications running multiple Spark versions, since a given version of +the shuffle service is not always compatible with other versions of Spark. YARN versions since 2.9.0 +support the ability to run shuffle services within an isolated classloader +(see [YARN-4577](https://issues.apache.org/jira/browse/YARN-4577)), meaning multiple Spark versions +can coexist within a single NodeManager. The +`yarn.nodemanager.aux-services..classpath` and, starting from YARN 2.10.2/3.1.1/3.2.0, +`yarn.nodemanager.aux-services..remote-classpath` options can be used to configure +this. In addition to setting up separate classpaths, it's necessary to ensure the two versions +advertise to different ports. This can be achieved using the `spark-shuffle-site.xml` file described +above. For example, you may have configuration like: + +```properties + yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y + yarn.nodemanager.aux-services.spark_shuffle_x.classpath = /path/to/spark-x-yarn-shuffle.jar,/path/to/spark-x-config + yarn.nodemanager.aux-services.spark_shuffle_y.classpath = /path/to/spark-y-yarn-shuffle.jar,/path/to/spark-y-config +``` + +The two `spark-*-config` directories each contain one file, `spark-shuffle-site.xml`. These are XML +files in the [Hadoop Configuration format](https://hadoop.apache.org/docs/r3.2.2/api/org/apache/hadoop/conf/Configuration.html) +which each contain a few configurations to adjust the port number and metrics name prefix used: +```xml + + + spark.shuffle.service.port + 7001 + + + spark.yarn.shuffle.service.metrics.namespace + sparkShuffleServiceX + + +``` +The values should both be different for the two different services. + +Then, in the configuration of the Spark applications, one should be configured with: +```properties + spark.shuffle.service.name = spark_shuffle_x + spark.shuffle.service.port = 7001 +``` +and one should be configured with: +```properties + spark.shuffle.service.name = spark_shuffle_y + spark.shuffle.service.port = +``` diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ede39063cf1bd..717ce57b902c1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -115,7 +115,9 @@ private[yarn] class ExecutorRunnable( // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } - ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) + val serviceName = sparkConf.get(SHUFFLE_SERVICE_NAME) + logInfo(s"Initializing service data for shuffle service using name '$serviceName'") + ctx.setServiceData(Collections.singletonMap(serviceName, secretBytes)) } // Send the start request to the ContainerManager diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala new file mode 100644 index 0000000000000..db001a946fddf --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URLClassLoader + +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark._ +import org.apache.spark.internal.config._ +import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} +import org.apache.spark.tags.ExtendedYarnTest + +/** + * SPARK-34828: Integration test for the external shuffle service with an alternate name and + * configs (by using a configuration overlay) + */ +@ExtendedYarnTest +class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegrationSuite { + + private[this] val shuffleServiceName = "custom_shuffle_service_name" + + override def newYarnConfig(): YarnConfiguration = { + val yarnConfig = super.newYarnConfig() + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, shuffleServiceName) + yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format(shuffleServiceName), + classOf[YarnShuffleService].getCanonicalName) + val overlayConf = new YarnConfiguration() + // Enable authentication in the base NodeManager conf but not in the client. This would break + // shuffle, unless the shuffle service conf overlay overrides to turn off authentication. + overlayConf.setBoolean(NETWORK_AUTH_ENABLED.key, true) + // Add the authentication conf to a separate config object used as an overlay rather than + // setting it directly. This is necessary because a config overlay will override previous + // config overlays, but not configs which were set directly on the config object. + yarnConfig.addResource(overlayConf) + yarnConfig + } + + override protected def extraSparkConf(): Map[String, String] = + super.extraSparkConf() ++ Map(SHUFFLE_SERVICE_NAME.key -> shuffleServiceName) + + override def beforeAll(): Unit = { + val configFileContent = + s""" + | + | + | ${NETWORK_AUTH_ENABLED.key} + | false + | + | + |""".stripMargin + val jarFile = TestUtils.createJarWithFiles(Map( + YarnTestAccessor.getShuffleServiceConfOverlayResourceName -> configFileContent + )) + // Configure a custom classloader which includes the conf overlay as a resource + val oldClassLoader = Thread.currentThread().getContextClassLoader + Thread.currentThread().setContextClassLoader(new URLClassLoader(Array(jarFile))) + try { + super.beforeAll() + } finally { + Thread.currentThread().setContextClassLoader(oldClassLoader) + } + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 188a48509212d..d6d1715223e35 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -56,6 +56,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd override def beforeEach(): Unit = { super.beforeEach() + // Ensure that each test uses a fresh metrics system + DefaultMetricsSystem.shutdown() + DefaultMetricsSystem.setInstance(new MetricsSystemImpl()) yarnConfig = new YarnConfiguration() yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), @@ -413,6 +416,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd )) } + test("SPARK-34828: metrics should be registered with configured name") { + s1 = new YarnShuffleService + yarnConfig.set(YarnShuffleService.SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY, "fooMetrics") + s1.init(yarnConfig) + + assert(DefaultMetricsSystem.instance.getSource("sparkShuffleService") === null) + assert(DefaultMetricsSystem.instance.getSource("fooMetrics") + .isInstanceOf[YarnShuffleServiceMetrics]) + } + test("create default merged shuffle file manager instance") { val mockConf = mock(classOf[TransportConf]) when(mockConf.mergedShuffleFileManagerImpl).thenReturn( diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala index db322cd18e150..d87cc26384729 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala @@ -34,4 +34,7 @@ object YarnTestAccessor { service.registeredExecutorFile } + def getShuffleServiceConfOverlayResourceName: String = { + YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME + } }