Skip to content
Permalink
Browse files

[SPARK-27347][MESOS] Fix supervised driver retry logic for outdated t…

…asks

## What changes were proposed in this pull request?

This patch fixes a bug where `--supervised` Spark jobs would retry multiple times whenever an agent would crash, come back, and re-register even when those jobs had already relaunched on a different agent.

That is:
```
- supervised driver is running on agent1
- agent1 crashes
- driver is relaunched on another agent as `<task-id>-retry-1`
- agent1 comes back online and re-registers with scheduler
- spark relaunches the same job as `<task-id>-retry-2`
- now there are two jobs running simultaneously
```

This is because when an agent would come back and re-register it would send a status update `TASK_FAILED` for its old driver-task. Previous logic would indiscriminately remove the `submissionId` from Zookeeper's `launchedDrivers` node and add it to `retryList` node. Then, when a new offer came in, it would relaunch another `-retry-`  task even though one was previously running.

For example logs, scroll to bottom

## How was this patch tested?

- Added a unit test to simulate behavior described above
- Tested manually on a DC/OS cluster by
  ```
  - launching a --supervised spark job
  - dcos node ssh <to the agent with the running spark-driver>
  - systemctl stop dcos-mesos-slave
  - docker kill <driver-container-id>
  - [ wait until spark job is relaunched ]
  - systemctl start dcos-mesos-slave
  - [ observe spark driver is not relaunched as `-retry-2` ]
  ```

Log snippets included below. Notice the `-retry-1` task is running when status update for the old task comes in afterward:
```
19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos:
... [offers] ...
19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001"
...
19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_STARTING message=''
19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_RUNNING message=''
...
19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed out' reason=REASON_SLAVE_REMOVED
...
19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-1"
...
19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message=''
19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message=''
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable agent re-reregistered'
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED
19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with driver-20190115192138-0001 in status update
...
19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-2"
...
19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message=''
19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message=''
```

Closes #24276 from samvantran/SPARK-27347-duplicate-retries.

Authored-by: Sam Tran <stran@mesosphere.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit bcd3b61)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information...
samvantran authored and dongjoon-hyun committed May 10, 2019
1 parent 0d5533c commit fd177267e185c493a076f8e245bd6d576f7bebf2
@@ -747,6 +747,10 @@ private[spark] class MesosClusterScheduler(
val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
if (isTaskOutdated(taskId, state)) {
// Prevent outdated task from overwriting a more recent status
return
}
removeFromLaunchedDrivers(subId)
state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
@@ -767,6 +771,16 @@ private[spark] class MesosClusterScheduler(
}
}

/**
* Check if the task is outdated i.e. has already been launched or is pending
* If neither, the taskId is outdated and should be ignored
* This is to avoid scenarios where an outdated status update arrives
* after a supervised driver has already been relaunched
*/
private def isTaskOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean =
taskId != state.taskId.getValue &&
!pendingRetryDrivers.exists(_.submissionId == state.driverDescription.submissionId)

private def retireDriver(
submissionId: String,
state: MesosClusterSubmissionState) = {
@@ -423,6 +423,86 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(state.finishedDrivers.size == 1)
}

test("SPARK-27347: do not restart outdated supervised drivers") {
// Covers scenario where:
// - agent goes down
// - supervised job is relaunched on another agent
// - first agent re-registers and sends status update: TASK_FAILED
// - job should NOT be relaunched again
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("SparkMesosDriverRetries")
setScheduler(conf.getAll.toMap)

val mem = 1000
val cpu = 1
val offers = List(
Utils.createOffer("o1", "s1", mem, cpu, None),
Utils.createOffer("o2", "s2", mem, cpu, None),
Utils.createOffer("o3", "s1", mem, cpu, None))

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
assert(response.success)

// Offer a resource to launch the submitted driver
scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
var state = scheduler.getSchedulerState()
assert(state.launchedDrivers.size == 1)

// Signal agent lost with status with TASK_LOST
val agent1 = SlaveID.newBuilder().setValue("s1").build()
var taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent1)
.setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
.setState(MesosTaskState.TASK_LOST)
.build()

scheduler.statusUpdate(driver, taskStatus)
state = scheduler.getSchedulerState()
assert(state.pendingRetryDrivers.size == 1)
assert(state.pendingRetryDrivers.head.submissionId == taskStatus.getTaskId.getValue)
assert(state.launchedDrivers.isEmpty)

// Offer new resource to retry driver on a new agent
Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s
scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))

val agent2 = SlaveID.newBuilder().setValue("s2").build()
taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent2)
.setState(MesosTaskState.TASK_RUNNING)
.build()

scheduler.statusUpdate(driver, taskStatus)
state = scheduler.getSchedulerState()
assert(state.pendingRetryDrivers.isEmpty)
assert(state.launchedDrivers.size == 1)
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
assert(state.launchedDrivers.head.taskId.getValue != taskStatus.getTaskId.getValue)

// Agent1 comes back online and sends status update: TASK_FAILED
taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent1)
.setState(MesosTaskState.TASK_FAILED)
.setMessage("Abnormal executor termination")
.setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
.build()

scheduler.statusUpdate(driver, taskStatus)
scheduler.resourceOffers(driver, Collections.singletonList(offers.last))

// Assert driver does not restart 2nd time
state = scheduler.getSchedulerState()
assert(state.pendingRetryDrivers.isEmpty)
assert(state.launchedDrivers.size == 1)
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
}

test("Declines offer with refuse seconds = 120.") {
setScheduler()

0 comments on commit fd17726

Please sign in to comment.
You can’t perform that action at this time.