From 6cfb9ece6bdc608d198b013510e53e2b8dc6e965 Mon Sep 17 00:00:00 2001 From: li-zhihui Date: Fri, 20 Jun 2014 13:04:27 +0800 Subject: [PATCH] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha --- .../scala/org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 9 +++++---- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 1 + .../scheduler/cluster/YarnClientSchedulerBackend.scala | 5 ----- .../scheduler/cluster/YarnClusterSchedulerBackend.scala | 5 ----- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 7 files changed, 10 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 3f42f4b989673..324dd0f0acdfd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,5 +54,5 @@ private[spark] trait TaskScheduler { // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int - def waitBackendReady(): Unit = {return} + def waitBackendReady(): Unit = { return } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3566fc7f3107f..8189b622d38ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -431,12 +431,13 @@ private[spark] class TaskSchedulerImpl( // By default, rack is unknown def getRackForHost(value: String): Option[String] = None - override def waitBackendReady():Unit={ - if(backend.isReady){ + + override def waitBackendReady(): Unit = { + if (backend.isReady) { return } - while(!backend.isReady){ - synchronized{ + while (!backend.isReady) { + synchronized { this.wait(100) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ff57372dcb72d..a2a4d9a65729c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -254,14 +254,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } override def isReady(): Boolean = { - if (ready){ + if (ready) { return true } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { ready = true return true } - return false + false } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1cc9c33cd2d02..c0f0595c94a0c 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances", args.numExecutors.toString) val mainMethod = Class.forName( args.userClass, false /* initialize */ , diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 1e49a34a0b02f..0110bc857e9bd 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -78,11 +78,6 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) totalExecutors.set(args.numExecutors) - // reset default minRegisteredRatio for yarn mode - if (minRegisteredRatio == 0) { - minRegisteredRatio = 0.9 - ready = false - } client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index f8cc4e8517f02..3165b456c1663 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -47,10 +47,5 @@ private[spark] class YarnClusterSchedulerBackend( .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } val args = new ApplicationMasterArguments(argsArrayBuf.toArray) totalExecutors.set(args.numExecutors) - // reset default minRegisteredRatio for yarn mode - if (minRegisteredRatio == 0) { - minRegisteredRatio = 0.9 - ready = false - } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bd9eed57301a5..797c7895847df 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -164,7 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") - System.setProperty("spark.executor.instances",args.numExecutors.toString) + System.setProperty("spark.executor.instances", args.numExecutors.toString) val mainMethod = Class.forName( args.userClass, false,