Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor #63

Closed
wants to merge 5 commits into from

6 participants

@CodingCat

https://spark-project.atlassian.net/browse/SPARK-1171

When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...

@AmplabJenkins

Can one of the admins verify this patch?

@rxin

Jenkins, add to whitelist.

@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12964/

@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12971/

...in/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -21,4 +21,6 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)
+class WorkerOffer(val executorId: String, val host: String, var cores: Int) {
+ @transient val totalcores = cores

Why does this need to be transient? also use camelcase for naming (totalCores)

Actually on second thought, can CoarseGrainedSchedulerBackend just store the total cores for each worker in a hash map? I'd prefer that solution since other classes use WorkerOffer and don't use it to keep track of the total cores on each worker.

+1 I'd also like to see WorkerOffer remain more like an immutable message type, with derived, mutable structures created only locally within the implementations that need it.

On that note, it seems to me that WorkerOffer should just be a case class, since all the constructor parameters are public vals anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12982/

@CodingCat

@kayousterhout @markhamstra @andrewor14 Thank you for your comments,

I updated the code, how about this?

...scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -125,14 +126,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Make fake resource offers on all executors
def makeOffers() {
- launchTasks(scheduler.resourceOffers(
- executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ // reconstruct workerOffers
+ workerOffers.foreach(o => workerOffers(o._1) =
+ new WorkerOffer(o._1, o._2.host, freeCores(o._1)))

Now that WorkerOffer is a case class, you can do this and the one in makeOffers with the copy idiom:

workerOffers.keys.foreach { executorId => 
  workerOffers(executorId) = workerOffers(executorId).copy(cores = freeCores(executorId))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12994/

...in/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -21,4 +21,4 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)
+case class WorkerOffer(executorId: String, host: String, cores: Int);

superfluous ';'

oops, sorry, fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@kayousterhout

This new version of the change doesn't look any simpler to me than the current version of the code and I think is a slightly confusing way of using worker offers to store info about the executors. Can you just remove executorAddress, the unused variable, and fix the bug, but keep the original way of generating worker offers?

@AmplabJenkins

Merged build finished.

@AmplabJenkins

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12995/

@AmplabJenkins

Merged build triggered.

@AmplabJenkins

Merged build started.

@AmplabJenkins

Merged build finished.

@AmplabJenkins

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12996/

@CodingCat

How about this?

@kayousterhout

This looks good -- I've merged this into master.

@CodingCat

@kayousterhout Thank you very much!

@asfgit asfgit closed this pull request from a commit
@CodingCat CodingCat SPARK-1171: when executor is removed, we should minus totalCores inst…
…ead of just freeCores on that executor


https://spark-project.atlassian.net/browse/SPARK-1171

When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>

Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits:

f6bf93f [Nan Zhu] code clean
19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers
43c13e9 [CodingCat] keep WorkerOffer immutable
af470d3 [CodingCat] style fix
0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
a3da508
@asfgit asfgit closed this in a3da508
@CodingCat CodingCat deleted the branch
@CrazyJvm CrazyJvm referenced this pull request from a commit in CrazyJvm/spark
@CodingCat CodingCat SPARK-1171: when executor is removed, we should minus totalCores inst…
…ead of just freeCores on that executor


https://spark-project.atlassian.net/browse/SPARK-1171

When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>

Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits:

f6bf93f [Nan Zhu] code clean
19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers
43c13e9 [CodingCat] keep WorkerOffer immutable
af470d3 [CodingCat] style fix
0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
d0b8fad
@gzm55 gzm55 referenced this pull request from a commit in MediaV/spark
@CodingCat CodingCat SPARK-1171: when executor is removed, we should minus totalCores inst…
…ead of just freeCores on that executor


https://spark-project.atlassian.net/browse/SPARK-1171

When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>

Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits:

f6bf93f [Nan Zhu] code clean
19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers
43c13e9 [CodingCat] keep WorkerOffer immutable
af470d3 [CodingCat] style fix
0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
a9e41c7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 3, 2014
  1. @CodingCat
  2. @CodingCat

    style fix

    CodingCat authored
Commits on Mar 4, 2014
  1. @CodingCat

    keep WorkerOffer immutable

    CodingCat authored
  2. @CodingCat
  3. @CodingCat

    code clean

    CodingCat authored CodingCat committed
This page is out of date. Refresh to see the latest.
View
2  core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -21,4 +21,4 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)
+case class WorkerOffer(executorId: String, host: String, cores: Int)
View
8 core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -54,6 +54,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
+ private val totalCores = new HashMap[String, Int]
private val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
@@ -76,6 +77,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
+ totalCores(executorId) = cores
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
@@ -147,10 +149,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def removeExecutor(executorId: String, reason: String) {
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
- val numCores = freeCores(executorId)
- addressToExecutorId -= executorAddress(executorId)
+ val numCores = totalCores(executorId)
executorActor -= executorId
executorHost -= executorId
+ addressToExecutorId -= executorAddress(executorId)
+ executorAddress -= executorId
+ totalCores -= executorId
freeCores -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
Something went wrong with that request. Please try again.