From 648083bb1640c0d2f3a10926fda914fc7dce3d55 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Feb 2019 15:57:45 -0800 Subject: [PATCH] [SPARK-26788][yarn] Remove SchedulerExtensionService. Since the yarn module is actually private to Spark, this interface was never actually "public". Since it has no use inside of Spark, let's avoid adding a yarn-specific extension that isn't public, and point any potential users are more general solutions (like using a SparkListener). --- .../scala/org/apache/spark/SparkConf.scala | 3 +- .../org/apache/spark/deploy/yarn/config.scala | 6 - .../cluster/SchedulerExtensionService.scala | 143 ------------------ .../cluster/YarnSchedulerBackend.scala | 7 - .../ExtensionServiceIntegrationSuite.scala | 72 --------- .../cluster/SimpleExtensionService.scala | 34 ----- .../cluster/StubApplicationAttemptId.scala | 48 ------ .../scheduler/cluster/StubApplicationId.scala | 42 ----- 8 files changed, 2 insertions(+), 353 deletions(-) delete mode 100644 resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala delete mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala delete mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala delete mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala delete mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index e686e079d1a19..529ca3faac130 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -638,7 +638,8 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", "Not used anymore. Please use spark.shuffle.service.index.cache.size"), DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."), - DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore.") + DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."), + DeprecatedConfig("spark.yarn.services", "3.0.0", "Feature no longer available.") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index bd5e136151650..5f8739b4097ab 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -181,12 +181,6 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("200ms") - private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services") - .doc("A comma-separated list of class names of services to add to the scheduler.") - .stringConf - .toSequence - .createWithDefault(Nil) - private[spark] val AM_FINAL_MSG_LIMIT = ConfigBuilder("spark.yarn.am.finalMessageLimit") .doc("The limit size of final diagnostic message for our ApplicationMaster to unregister from" + " the ResourceManager.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala deleted file mode 100644 index 7d15f0e2fbac8..0000000000000 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import java.util.concurrent.atomic.AtomicBoolean - -import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} - -import org.apache.spark.SparkContext -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * An extension service that can be loaded into a Spark YARN scheduler. - * A Service that can be started and stopped. - * - * 1. For implementations to be loadable by `SchedulerExtensionServices`, - * they must provide an empty constructor. - * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was - * never invoked. - */ -trait SchedulerExtensionService { - - /** - * Start the extension service. This should be a no-op if - * called more than once. - * @param binding binding to the spark application and YARN - */ - def start(binding: SchedulerExtensionServiceBinding): Unit - - /** - * Stop the service - * The `stop()` operation MUST be idempotent, and succeed even if `start()` was - * never invoked. - */ - def stop(): Unit -} - -/** - * Binding information for a [[SchedulerExtensionService]]. - * - * The attempt ID will be set if the service is started within a YARN application master; - * there is then a different attempt ID for every time that AM is restarted. - * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks - * this information. - * @param sparkContext current spark context - * @param applicationId YARN application ID - * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in - * cluster mode. - */ -case class SchedulerExtensionServiceBinding( - sparkContext: SparkContext, - applicationId: ApplicationId, - attemptId: Option[ApplicationAttemptId] = None) - -/** - * Container for [[SchedulerExtensionService]] instances. - * - * Loads Extension Services from the configuration property - * `"spark.yarn.services"`, instantiates and starts them. - * When stopped, it stops all child entries. - * - * The order in which child extension services are started and stopped - * is undefined. - */ -private[spark] class SchedulerExtensionServices extends SchedulerExtensionService - with Logging { - private var serviceOption: Option[String] = None - private var services: List[SchedulerExtensionService] = Nil - private val started = new AtomicBoolean(false) - private var binding: SchedulerExtensionServiceBinding = _ - - /** - * Binding operation will load the named services and call bind on them too; the - * entire set of services are then ready for `init()` and `start()` calls. - * - * @param binding binding to the spark application and YARN - */ - def start(binding: SchedulerExtensionServiceBinding): Unit = { - if (started.getAndSet(true)) { - logWarning("Ignoring re-entrant start operation") - return - } - require(binding.sparkContext != null, "Null context parameter") - require(binding.applicationId != null, "Null appId parameter") - this.binding = binding - val sparkContext = binding.sparkContext - val appId = binding.applicationId - val attemptId = binding.attemptId - logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId") - - services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass => - val instance = Utils.classForName(sClass) - .getConstructor().newInstance() - .asInstanceOf[SchedulerExtensionService] - // bind this service - instance.start(binding) - logInfo(s"Service $sClass started") - instance - }.toList - } - - /** - * Get the list of services. - * - * @return a list of services; Nil until the service is started - */ - def getServices: List[SchedulerExtensionService] = services - - /** - * Stop the services; idempotent. - * - */ - override def stop(): Unit = { - if (started.getAndSet(false)) { - logInfo(s"Stopping $this") - services.foreach { s => - Utils.tryLogNonFatalError(s.stop()) - } - } - } - - override def toString(): String = s"""SchedulerExtensionServices - |(serviceOption=$serviceOption, - | services=$services, - | started=$started)""".stripMargin -} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 78cd6a200ac2c..a8472b49ae278 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -73,10 +73,6 @@ private[spark] abstract class YarnSchedulerBackend( /** Attempt ID. This is unset for client-mode schedulers */ private var attemptId: Option[ApplicationAttemptId] = None - /** Scheduler extension services. */ - private val services: SchedulerExtensionServices = new SchedulerExtensionServices() - - /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -90,8 +86,6 @@ private[spark] abstract class YarnSchedulerBackend( protected def startBindings(): Unit = { require(appId.isDefined, "application ID unset") - val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId) - services.start(binding) } override def stop(): Unit = { @@ -102,7 +96,6 @@ private[spark] abstract class YarnSchedulerBackend( super.stop() } finally { stopped.set(true) - services.stop() } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala deleted file mode 100644 index 6ea7984c64514..0000000000000 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging - -/** - * Test the integration with [[SchedulerExtensionServices]] - */ -class ExtensionServiceIntegrationSuite extends SparkFunSuite - with LocalSparkContext with BeforeAndAfter - with Logging { - - val applicationId = new StubApplicationId(0, 1111L) - val attemptId = new StubApplicationAttemptId(applicationId, 1) - - /* - * Setup phase creates the spark context - */ - before { - val sparkConf = new SparkConf() - sparkConf.set(SCHEDULER_SERVICES, Seq(classOf[SimpleExtensionService].getName())) - sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite") - sc = new SparkContext(sparkConf) - } - - test("Instantiate") { - val services = new SchedulerExtensionServices() - assertResult(Nil, "non-nil service list") { - services.getServices - } - services.start(SchedulerExtensionServiceBinding(sc, applicationId)) - services.stop() - } - - test("Contains SimpleExtensionService Service") { - val services = new SchedulerExtensionServices() - try { - services.start(SchedulerExtensionServiceBinding(sc, applicationId)) - val serviceList = services.getServices - assert(serviceList.nonEmpty, "empty service list") - val (service :: Nil) = serviceList - val simpleService = service.asInstanceOf[SimpleExtensionService] - assert(simpleService.started.get, "service not started") - services.stop() - assert(!simpleService.started.get, "service not stopped") - } finally { - services.stop() - } - } -} - - diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala deleted file mode 100644 index 9b8c98cda8da8..0000000000000 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import java.util.concurrent.atomic.AtomicBoolean - -private[spark] class SimpleExtensionService extends SchedulerExtensionService { - - /** started flag; set in the `start()` call, stopped in `stop()`. */ - val started = new AtomicBoolean(false) - - override def start(binding: SchedulerExtensionServiceBinding): Unit = { - started.set(true) - } - - override def stop(): Unit = { - started.set(false) - } -} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala deleted file mode 100644 index 4b57b9509a655..0000000000000 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} - -/** - * A stub application ID; can be set in constructor and/or updated later. - * @param applicationId application ID - * @param attempt an attempt counter - */ -class StubApplicationAttemptId(var applicationId: ApplicationId, var attempt: Int) - extends ApplicationAttemptId { - - override def setApplicationId(appID: ApplicationId): Unit = { - applicationId = appID - } - - override def getAttemptId: Int = { - attempt - } - - override def setAttemptId(attemptId: Int): Unit = { - attempt = attemptId - } - - override def getApplicationId: ApplicationId = { - applicationId - } - - override def build(): Unit = { - } -} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala deleted file mode 100644 index bffa0e09befd2..0000000000000 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.hadoop.yarn.api.records.ApplicationId - -/** - * Simple Testing Application Id; ID and cluster timestamp are set in constructor - * and cannot be updated. - * @param id app id - * @param clusterTimestamp timestamp - */ -private[spark] class StubApplicationId(id: Int, clusterTimestamp: Long) extends ApplicationId { - override def getId: Int = { - id - } - - override def getClusterTimestamp: Long = { - clusterTimestamp - } - - override def setId(id: Int): Unit = {} - - override def setClusterTimestamp(clusterTimestamp: Long): Unit = {} - - override def build(): Unit = {} -}