Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
315 lines (257 sloc) 15.8 KB
= Worker state transitions =
A scheduler talks to multiple workers, each identified by a shard ID -- a
string which logically identifies a worker so as to allow smooth failovers
(and per-worker customization). The scheduler tracks each worker's state
according to the diagram below. Each worker also models its own state, with
the same timeouts. A worker will not run tasks unless it's HEALTHY, and
will commit suicide upon MUST_DIE.
NEW -> UNHEALTHY -> MUST_DIE (lost / suicide requested)
When a worker becomes MUST_DIE on the scheduler, its "running" and "unsure
if running" tasks are forgotten, and a suicide request is sent. The
identifying information is retained, so that if the worker, due to network
partitions or bugs, comes back, it gets a suicide request ASAP.
A worker is truly forgotten when another connects with the same shard ID. A
worker can be replaced when it is not HEALTHY, but by default, it can be
replaced only when it is MUST_DIE (see --allow_bump_unhealthy_worker). As
above, the worker is sent a suicide request, its "running" and "unsure if
running" tasks are forgotten. But this time, the scheduler retains no
memory of its identity, as it is overwritten by the new worker.
One can construct a pathological situation when a displaced or lost worker
comes back to life, and successfully re-associates with the scheduler. For
example, there is a network partition, a new worker with the same shard ID
is spawned, then the partition resolves, the old worker reconnects, and we
discover its tasks were running all along. This cannot be prevented, so the
remediation is that one of the workers ends up getting a suicide request
(see RemoteWorker::processHeartbeat).
Aside from the above pathology, a running worker instance cannot return to
the NEW state, nor can it leave the MUST_DIE state.
= Scheduler startup and initial wait =
On startup, the scheduler does not know the set of available workers, and
does not know what tasks are already running. It simply waits for workers
to connect, and polls them for running tasks. Eventually, enough time
passes that it can conclude that any workers that were up, and separated by
a network partition, would have had to commit suicide by now. At that
point, the list of already-connected workers is known to be complete.
In setups where the "time to lose a worker" is long (e.g. to protect
fragile, long-running tasks in HPC-like settings), this initial wait
can be inconveniently long.
See remote/README.worker_set_consensus for a description of Bistro's current
solution to this problem. The 'worker set consensus' mechanism drastically
shortens this initial wait in the common case by having each workers record
a hash of the most recently seen set of workers.
= Core calls =
The core calls of the scheduler-worker protocol maintain the
scheduler-worker association, worker health information, and running tasks
for both the scheduler and the worker. They also update the scheduler-only
list of "unsure if running" tasks.
Of these, "running tasks" is the most fragile and important state in the
system, maintained with a lot of care to mitigate the usual distributed-RPC
issues. The block comments of the core calls have the details, but in a
nutshell, a task is running:
- From the worker's point of view -- from the receipt of runTask, until
updateStatus succeeds completely.
- From the scheduler's point of view -- from before runTask is sent, until
the worker is lost or until an updateStatus is received (either a normal
"task ended", or a "was not running" in response to a
notifyIfTasksNotRunning query).
A scheduler will only runTask with a given invocationID once.
Worker to scheduler (in scheduler.thrift):
- processHeartbeat: Associate with a scheduler, maintain health.
- updateStatus: Comes in two varieties -- (i) "task ended", when the
task just stopped running, and (ii) "was not running" in reply to
notifyIfTasksNotRunning. The worker retries sending the update until it
succeeds. For "task ended", the worker only stops considering a task
"running" once the updateStatus succeeds.
Scheduler to worker (below):
- getRunningTasks: After a new worker associates with the scheduler, and
before the scheduler runs any tasks, discover the tasks that the were
already running on the worker.
- runTask: The scheduler marks the task as running, then tells the worker
to start it too.
- notifyIfTasksNotRunning: If runTask fails in such a way that the
scheduler is unsure if the worker started it, this call asks the worker
for an updateStatus for the tasks that are not running. The scheduler
retries this call until it succeeds, or until the task are found to be
not running through updateStatus or worker loss. Once this call succeds,
the scheduler treats the tasks as "running" until further notice.
After the initial getRunningTasks, the scheduler & worker only synchronize
their lists of running tasks through the iterative updates of runTask and
updateStatus (helped by rare notifyIfTasksNotRunning calls when the
scheduler might have gotten out of sync).
Therefore, the protocol embodied by the above calls has to be 100% robust,
or the scheduler & worker's states will drift apart. The docblocks of the
individual calls are a great place to address five of of the six the
standard distributed-RPC issues:
- failure: Both on the sender and the receiver
- partial failure: Failed on the sender but succeeded on the receiver
- delay: Call received much later than expected, including self-ordering
- replay: Can the call be received twice? What happens if it does?
- identity: Calls from the wrong process (i.e. authentication)
However, the sixth issue of multi-call ordering is better covered in a
central place because pairwise interactions are hard to pin to a single
call's docblock.
For the 5 calls, pairwise interactions have to be considered both on from
the scheduler's and worker's perspective. That means we ought to discuss a
total of (5 * 4 / 2) * 2 = 20 interactions. Some will be implicit.
= processHeartbeat interactions (8/20 so far) =
== Health state ==
When a worker sends its first processHeartbeat to a new scheduler, its
health state is NEW on both sides. On the scheduler, it leaves the NEW
state immediately after getRunningTasks succeeds. The worker leaves the NEW
state upon receiving a response its first processHeartbeat that the
scheduler receives *after* getRunningTasks succeeds.
processHeartbeat barely interacts with the other core calls during normal
operation. Rather, it is is the main call that maintains health state.
Healthchecks' runTask and updateStatus also have an impact on healh, but
their ordering with respect to processHeartbeat does not matter. Even
transient delays, failures, and partial failures in health-related calls can
only cause minor health state discrepancies between the worker and the
scheduler. These last until the next successful heartbeat & healthcheck
exchange. Such discrepancies are not dangerous, because new tasks can only
start if both the scheduler & worker believe the worker is healthy, while a
worker will become lost if either side marks it MUST_DIE.
== Task-related state ==
Only when workers switch schedulers, or schedulers switch workers, does
processHeartbeat modify state that is relevant to the other calls.
When a worker associates with a new scheduler, the worker resets the
notifyIfTasksNotRunning sequence number. Since the worker validates the
scheduler ID in the two calls using the sequence number (runTask and
notifyIfTasksNotRunning), this state change is safe.
When a new worker replaces the old, the scheduler:
- discards old running tasks
- discards old "unsure if running" tasks
- resets the notifyIfTasksNotRunning sequence number
This is safe, since the scheduler will not send any requests to the old
worker aside from requestSuicide, and will not accept old updateStatus since
the worker ID won't match. In the unlikely event that it accepts another
processHeartbeat from the old worker, the worker would be treated as new.
Poetic aside: If the scheduler loses a worker (marking it MUST_DIE), but it
is not replaced, and the worker for some reason survives the requestSuicide,
and does not itself enter MUST_DIE, it is theoretically possible that a
much-delayed runTask would cause a task to start on this worker. Once the
task finishes, the scheduler would still accept its updateStatus, so not
much harm is done. This is also not a big issue because sane health
timeouts make such a scenario exceedingly unlikely.
= notifyIfTasksNotRunning interactions (14/20 so far) =
All interactions with processHeartbeat were discussed above.
This call does not interact with getRunningTasks in any way, because the
latter is only called for workers the NEW state, while
notifyIfTasksNotRunning may only be called after a runTask, which is never
used in the NEW state. Now, let's check the remaining 2 calls.
This call does not modify "running tasks" state, and only inspects it on the
worker. In terms of state changes, it increments its own per-worker
sequence number (on both scheduler & worker), and updates the scheduler's
"unsure if running" task list. It also causes the worker to call
updateStatus(was not running) for the tasks that are not running at the time
this call was received.
On the worker, notifyIfTasksNotRunning simultaneously locks state_, to
update the sequence number, and runningTasks_, to inspect it.
== Interactions on the worker ==
- With runTask: On the worker, a mutex protects runningTasks_, so
notifyIfTasksNotRunning and runTask are strictly ordered, and the
updateStatus(was not running) is always issued truthfully. But, there is
a catch -- once a worker claims that a task "was not running", it must
reject any attempts to later start the same task (as might happen if a
runTask hits networks delays). By starting such a task, the worker would
retroactively make itself a liar.
To spare the worker from having to store all its past "was not running"
tasks, the worker and the scheduler track a "notifyIfTasksNotRunning
sequence number". It prevents the worker from executing runTasks that
were sent *before* the most recently received notifyIfTasksNotRunning.
The exact evolution of this state is documented in the comment for
- With updateStatus: (i) There are no interactions with updateStatus(was
not running); (ii) updateStatus(task ended) locks the runningTasks_
mutex, so the "running->not running" transition is strictly ordered with
respect to to notifyIfTasksNotRunning. If the matching updateStatus runs
earlier than notifyIfTasksNotRunning, the worker will issue a _redundant_
updateStatus(was not running). This is not a problem because the
scheduler handles it correctly (see the updateStatus docblock). If the
matching updateStatus runs later, then notifyIfTasksNotRunning will see a
running task, and issue no additional updateStatus.
== Interactions on the scheduler ==
notifyIfTasksNotRunning on the scheduler does not access runningTasks_, so
any inter-call interactions can only concern the "unsure if running" tasks
list, or the notifyIfTasksNotRunning sequence number.
- With runTask: Obviously, notifyIfTasksNotRunning can only query task
invocations after their runTask call failed, and appended them to "unsure
if running" list. This list is protected by the workers_ mutex, so those
accesses are safe.
runTask, by design, sends the current notifyIfTasksNotRunning sequence
number, while notifyIfTasksNotRunning increments it and then sends it.
This number is protected by the workers_ mutex, so those accesses are
safely ordered. The invariant that must be honored is that the sequence
number sent with notifyIfTasksNotRunning be greater than that of any of
the failed runTasks' sequence numbers. This is trivially true, since
notifyIfTasksNotRunning increments the sequence number
*after* it has the list of tasks to query.
- With updateStatus: Whenever the scheduler receives a valid
updateStatus (either "was not running" or "task ended"), it locks the
workers_ mutex and removes the task from the "unsure if running" list.
This might happen after the task list for notifyIfTasksNotRunning is
composed, in which case the query will include some tasks that the
scheduler actually knows not to be running. This is fine, since any
consequent updateStatus(was not running) calls will be ignored -- see the
updateStatus docblock.
The scheduler's updateStatus does not interact with the
notifyIfTasksNotRunning sequence number.
== Aside: Tracking the "unsure if running" state ==
The scheduler sends notifyIfTasksNotRunning queries about tasks in the
"unsure if running" list. Since this list is scheduler-only (not
distributed), and is protected by a mutex, keeping it correct is pretty
- runTask may add to the list.
- A successful notifyIfTasksNotRunning removes from the list, taking care
only to remove the tasks & invocation IDs it queried.
- Worker loss clears the list.
- updateStatus removes the updated item from the list.
So long as this list is non-empty, the scheduler periodically calls
notifyIfTasksNotRunning to try to clear it out.
= runTask interactions (18/20 so far) =
All interactions with processHeartbeat and notifyIfTasksNotRunning were
discussed above.
The scheduler cannot call runTask until after getRunningTasks succeeds. The
worker will not accept runTask until it leaves the NEW state (i.e. after it
knows that the scheduler's getRunningTasks succeeded). So, these two calls
do not interact.
This leaves just the interactions with updateStatus:
- On the worker: (i) updateStatus(was not running); (ii)
updateStatus(task ended)
- On the worker, the updateStatus(task ended) for a particular task
invocation can obviously only be sent after its runTask was received. As
for updateStatus(was not running), it might be sent if a
notifyIfTasksNotRunning arrives before the runTask, in which case the
runTask will be rejected (see notifyIfTasksNotRunning's interactions on
the worker). notifyIfTasksNotRunning checks task invocation IDs, while
runTask prohibits even two tasks with the same task ID. Thus, no
cross-task interactions are possible.
- On the scheduler: For a given task invocation, these are intrinsically
ordered -- the scheduler can only receive updateStatus (of either kind)
after sending the corresponding runTask. Interactions between different
invocations can't happen, since updateStatus checks the ID.
= updateStatus interactions (20/20 so far) =
All interactions with processHeartbeat, notifyIfTasksNotRunning, and runTask
were discussed above. The interactions between updateStatus(was not
running) and updateStatus(task ended) are described in the updateStatus
docblock. That leaves only getRunningTasks:
- The worker makes updateStatus calls completely indepentently of
getRunningTasks, so it may send updates while still in NEW state.
- The scheduler rejects updates from a worker in NEW state, so no
updateStatus changes will be received until after getRunningTasks is
received and processed. Otherwise, the two calls could race to modify the
running tasks state.
= Non-core calls =
The following calls do not interact with any of the scheduler or worker
state above, so they can be excluded from the interaction analysis:
- getJobLogsByID: Return log lines for the UI.
- killTask: Kill a specific running task instance, but do not change the
worker's or scheduler's internal state -- instead, the task's death will
be registered just as if it had quit on its own.
- requestSuicide: The scheduler sends this if it decides that the worker
should not be running (e.g. another has replaced it, or it has been
unhealthy too long and became lost). The worker quits immediately, from
a system perspective, this is equivalent to the worker crashing.
You can’t perform that action at this time.