Skip to content

Commit

Permalink
[SPARK-36509][CORE] Fix the issue that executors are never re-schedul…
Browse files Browse the repository at this point in the history
…ed if the worker stops with standalone cluster

### What changes were proposed in this pull request?

This PR fixes an issue that executors are never re-scheduled if the worker which the executors run on stops.
As a result, the application stucks.
You can easily reproduce this issue by the following procedures.

```
# Run master
$ sbin/start-master.sh

# Run worker 1
$ SPARK_LOG_DIR=/tmp/worker1 SPARK_PID_DIR=/tmp/worker1/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker1 --webui-port 8081 spark://<hostname>:7077

# Run worker 2
$ SPARK_LOG_DIR=/tmp/worker2 SPARK_PID_DIR=/tmp/worker2/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker2 --webui-port 8082 spark://<hostname>:7077

# Run Spark Shell
$ bin/spark-shell --master spark://<hostname>:7077 --executor-cores 1 --total-executor-cores 1

# Check which worker the executor runs on and then kill the worker.
$ kill <worker pid>
```

With the procedure above, we will expect that the executor is re-scheduled on the other worker but it won't.

The reason seems that `Master.schedule` cannot be called after the worker is marked as `WorkerState.DEAD`.
So, the solution this PR proposes is to call `Master.schedule` whenever `Master.removeWorker` is called.

This PR also fixes an issue that `ExecutorRunner` can send `ExecutorStateChanged` message without changing its state.
This issue causes assertion error.
```
2021-08-13 14:05:37,991 [dispatcher-event-loop-9] ERROR: Ignoring errorjava.lang.AssertionError: assertion failed: executor 0 state transfer from RUNNING to RUNNING is illegal
```

### Why are the changes needed?

It's a critical bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested with the procedure shown above and confirmed the executor is re-scheduled.

Closes #33818 from sarutak/fix-scheduling-stuck.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
(cherry picked from commit ea8c31e)
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
  • Loading branch information
sarutak committed Aug 28, 2021
1 parent 9b28c2b commit 93f2b00
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ private[deploy] class Master(
app.driver.send(WorkerRemoved(worker.id, worker.host, msg))
}
persistenceEngine.removeWorker(worker)
schedule()
}

private def relaunchDriver(driver: DriverInfo): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[deploy] class ExecutorRunner(
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
if (state == ExecutorState.LAUNCHING) {
if (state == ExecutorState.LAUNCHING || state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
Expand Down

0 comments on commit 93f2b00

Please sign in to comment.