Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13604][Core]Sync worker's state after registering with master #11455

Closed
wants to merge 2 commits into from
Closed

[SPARK-13604][Core]Sync worker's state after registering with master #11455

wants to merge 2 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Mar 2, 2016

What changes were proposed in this pull request?

Here lists all cases that Master cannot talk with Worker for a while and then network is back.

  1. Master doesn't know the network issue (not yet timeout)

    a. Worker doesn't know the network issue (onDisconnected is not called)

    • Worker keeps sending Heartbeat. Both Worker and Master don't know the network issue. Nothing to do. (Finally, Master will notice the heartbeat timeout if network is not recovered)

    b. Worker knows the network issue (onDisconnected is called)

    • Worker stops sending Heartbeat and sends RegisterWorker to master. Master will reply RegisterWorkerFailed("Duplicate worker ID"). Worker calls "System.exit(1)" (Finally, Master will notice the heartbeat timeout if network is not recovered) (May leak driver processes. See SPARK-13602)
  2. Worker timeout (Master knows the network issue). In such case, master removes Worker and its executors and drivers.

    a. Worker doesn't know the network issue (onDisconnected is not called)

    • Worker keeps sending Heartbeat.
    • If the network is back, say Master receives Heartbeat, Master sends ReconnectWorker to Worker
    • Worker send RegisterWorker to master.
    • Master accepts RegisterWorker but doesn't know executors and drivers in Worker. (may leak executors)

    b. Worker knows the network issue (onDisconnected is called)

    • Worker stop sending Heartbeat. Worker will send "RegisterWorker" to master.
    • Master accepts RegisterWorker but doesn't know executors and drivers in Worker. (may leak executors)

This PR fixes executors and drivers leak in 2.a and 2.b when Worker reregisters with Master. The approach is making Worker send WorkerLatestState to sync the state after registering with master successfully. Then Master will ask Worker to kill unknown executors and drivers.

Note: Worker cannot just kill executors after registering with master because in the worker, LaunchExecutor and RegisteredWorker are processed in two threads. If LaunchExecutor happens before RegisteredWorker, Worker's executor list will contain new executors after Master accepts RegisterWorker. We should not kill these executors. So sending the list to Master and let Master tell Worker which executors should be killed.

How was this patch tested?

test("SPARK-13604: Master should ask Worker kill unknown executors and drivers")

@zsxwing zsxwing changed the title Sync worker's state after registering with master [SPARK-13604][Core]Sync worker's state after registering with master Mar 2, 2016
@zsxwing
Copy link
Member Author

zsxwing commented Mar 2, 2016

cc @andrewor14

@SparkQA
Copy link

SparkQA commented Mar 2, 2016

Test build #52279 has finished for PR 11455 at commit 7e0b2a2.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WorkerState(

@SparkQA
Copy link

SparkQA commented Mar 2, 2016

Test build #52272 has finished for PR 11455 at commit 6c13702.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 2, 2016

Test build #52275 has finished for PR 11455 at commit 97002e4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 2, 2016

Test build #52339 has finished for PR 11455 at commit 1b95f5b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WorkerLatestState(

@zsxwing
Copy link
Member Author

zsxwing commented Mar 9, 2016

ping @andrewor14

case Some(worker) =>
for (exec <- executors) {
if (!worker.executors.exists(
e => e._2.application.id == exec.appId && e._2.id == exec.execId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: can you use .exists { case (_, something) => something.application.id ... } and store it in a variable? e.g.

for (exec <- executors) {
  val executorMatches = worker.executors.exists { ... }
  if (!executorMatches) {
    worker.endpoint.send(...)
  }
}

@andrewor14
Copy link
Contributor

LGTM, just style nits.

@SparkQA
Copy link

SparkQA commented Mar 10, 2016

Test build #52838 has finished for PR 11455 at commit 51ac6dd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Mar 10, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Mar 10, 2016

Test build #52853 has finished for PR 11455 at commit 51ac6dd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Mar 10, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52861 has finished for PR 11455 at commit 51ac6dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Merged into master.

@asfgit asfgit closed this in 27fe6ba Mar 11, 2016
@zsxwing zsxwing deleted the orphan-executors branch March 11, 2016 01:02
val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
if (!driverMatches) {
// master doesn't recognize this driver. So just tell worker to kill it.
worker.endpoint.send(KillDriver(driverId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there may be scenario that Executor gets killed but driver gets kept, vice versa.

Is that desirable ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. Here just compare them with the executors and drivers of a worker stored in the master. If we find any mismatch, just kill it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Which part of the code leads you to believe that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me look at other parts of Master.scala and see if I can find anything.

roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

Here lists all cases that Master cannot talk with Worker for a while and then network is back.

1. Master doesn't know the network issue (not yet timeout)

  a. Worker doesn't know the network issue (onDisconnected is not called)
    - Worker keeps sending Heartbeat. Both Worker and Master don't know the network issue. Nothing to do. (Finally, Master will notice the heartbeat timeout if network is not recovered)

  b. Worker knows the network issue (onDisconnected is called)
    - Worker stops sending Heartbeat and sends `RegisterWorker` to master. Master will reply `RegisterWorkerFailed("Duplicate worker ID")`. Worker calls "System.exit(1)" (Finally, Master will notice the heartbeat timeout if network is not recovered) (May leak driver processes. See [SPARK-13602](https://issues.apache.org/jira/browse/SPARK-13602))

2. Worker timeout (Master knows the network issue). In such case,  master removes Worker and its executors and drivers.

  a. Worker doesn't know the network issue (onDisconnected is not called)
    - Worker keeps sending Heartbeat.
    - If the network is back, say Master receives Heartbeat, Master sends `ReconnectWorker` to Worker
    - Worker send `RegisterWorker` to master.
    - Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors)

  b. Worker knows the network issue (onDisconnected is called)
    - Worker stop sending `Heartbeat`. Worker will send "RegisterWorker" to master.
    - Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors)

This PR fixes executors and drivers leak in 2.a and 2.b when Worker reregisters with Master. The approach is making Worker send `WorkerLatestState` to sync the state after registering with master successfully. Then Master will ask Worker to kill unknown executors and drivers.

Note:  Worker cannot just kill executors after registering with master because in the worker, `LaunchExecutor` and `RegisteredWorker` are processed in two threads. If `LaunchExecutor` happens before `RegisteredWorker`, Worker's executor list will contain new executors after Master accepts `RegisterWorker`. We should not kill these executors. So sending the list to Master and let Master tell Worker which executors should be killed.

## How was this patch tested?

test("SPARK-13604: Master should ask Worker kill unknown executors and drivers")

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#11455 from zsxwing/orphan-executors.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants