This repository has been archived by the owner on May 16, 2023. It is now read-only.
/
README.worker_set_consensus
380 lines (307 loc) · 19.2 KB
/
README.worker_set_consensus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
For a brief foreword, see "Scheduler startup and initial wait" in
if/README.worker_protocol.
= Why is the initial wait necessary? =
A reliable task scheduler:
- *Never* starts a second copy of a task that is already running.
- Eventually retries tasks that were running on a worker that died.
This requires distinguishing between tasks that are "probably running" and
those "certainly not running", and some kind of timeout mechanism to
identify tasks that stopped running without notice (e.g. due to a kernel
crash).
Typically, remote task schedulers use of persistent store, like a database
or ZooKeeper, as the source of truth for what tasks are running, and to
store the "last heard from" value for timeouts. This has a few
disadvantages:
- The overall QPS of the system becomes bottlenecked by your storage
latency, which can be challenging to keep low.
- The system becomes more complex to operate and deploy, since storage
services are notoriously labor-intensive to keep up and responsive.
- The possibility of 3-way conflicts between scheduler, DB, and the workers
themselves, creates more failure modes, and substantial code complexity
to mitigate them.
Bistro takes an approach with fewer moving parts and bottlenecks: its
workers are the sole source of truth about the currently running tasks. See
if/README.worker_protocol for the protocol details, but in a nutshell:
- A scheduler associates with new workers by accepting their heartbeats.
- It then polls them for running tasks to populate its in-RAM model
of what tasks are running.
- The scheduler's model errs on the side of assuming that a task is running
until proven otherwise.
- The worker-scheduler protocol keeps the scheduler's models up to date.
- The worker & scheduler both use the same state machine for worker
health, ensuring that network-partitioned workers kills their tasks
just as the scheduler decides they would be lost.
At startup, the scheduler does not know the set of workers, nor their
running tasks. It is imperative to find the running tasks *before* starting
new tasks, since we would otherwise certainly start duplicates of
already-running tasks. So, the scheduler must wait for workers to connect.
In the simplest implementation, the wait is just as long as it would take
for a healthy, but network-partitioned worker to commit suicide. That is
sure to prevent double-starting tasks, but makes it inconvenient to use a
large timeout for "worker becomes lost" -- each scheduler restart leads to a
long downtime.
Why not just give the scheduler a full list of workers at startup? Manually
curating one is too laborious with big deployments. Automatically
maintaining one is tricky, since one has to watch a worker's health status
using Bistro's state machine, and remove workers that exceed their timeout.
In effect, the automatic curator would still have to wait the same amount of
time whenever a worker goes AWOL, and it would be another moving part in the
system. This additional complexity is not clearly better than starting with
an unknown set of workers.
= How to shorten the initial wait? =
One can sacrifice reliability to manually shorten the initial wait via
--CAUTION_startup_wait_for_workers. This will mean that each scheduler
restart exposes you to a risk of double-starting tasks. Network partitions
are not all that rare in production systems (Facebook has had plenty). So,
basically, this is a bad idea if you care about reliability. And if you do
not, why not just make your "--lose_unhealthy_worker_after" and other
timeouts shorter?
Instead of the Faustian bargain of using an unsafe intitial wait, Bistro
has a mechanism to shorten the initial wait in the common case. Imagine
that your scheduler restarts when:
- All workers are healthy / responsive.
- Workers are neither being removed nor added.
In this steady-state scenario, there is a simple fix:
- Each worker stores a hash of the entire worker set, regularly
updated by the scheduler.
- When the workers reconnect, the scheduler can check whether all
the workers agree on the same worker set, and exit initial wait
the moment that this worker set is reached.
This is why each worker heartbeat carries the WorkerSetID it currently
knows, to which the scheduler responds with its latest WorkerSetID.
= What makes a worker set consensus robust? =
While the fix above sounds simple, our ultimate goal is to have workers'
WorkerSetIDs be set up in such a way that if the scheduler restarts, and
workers connect to the new scheduler at some arbitrary speed, and in an
arbitrary order, it will never be the case that the scheduler detects a
worker consensus before *all* workers that might be running tasks have
connected.
Such guaranteed robustness takes a bit of extra care.
== It can be unsafe to start running tasks before enough consensus emerges ==
Here is a simple example that shows why tasks must not be run before enough
consensus exists:
- Start with one unhealthy worker w1 whose WorkerSetID is itself.
- A second worker w2 connects. A choice is forced -- either:
(a) Wait for the unhealthy w1 to add w2 to its WorkerSetID.
(b) Immediately start running tasks on w2.
Choosing (b) is unsafe:
* A task starts on w2.
* The scheduler restarts.
* w1, still unaware of w2, is first to reconnect to the scheduler.
* The scheduler instantly achieves consensus, and starts duplicate tasks on
the formerly unhealthy w1.
The problem is that w1's WorkerSetID is enough for a spurious consensus,
while w2 has running tasks. We cannot prevent the spurious consensus -- a
consensus set of workers can become transiently unhealthy, and new workers
can connect in the meanwhile. However, we **can** avoid starting new tasks
until the spurious consensus disappears.
Bistro achieves this via `consensusPermitsWorkerToBecomeHealthy()`, a pure
function that examines a newly added worker, and decides whether it's safe
to start running tasks on it. Once a worker `hasBeenHealthy_`, this
function's output is no longer used. In effect, this adds an extra state to
Bistro's state diagram, located between NEW and HEALTHY, but it was easier
to implement the hysteresis as an extra boolean.
This initial state of "UNHEALTHY due to lack of consensus" has an extra
wrinkle: workers will NOT become MUST_DIE solely due to lack of consensus.
This is important when we have a high turnover of workers -- any incoming or
departing worker delays consensus, so without blocking MUST_DIE here, it is
possible to reach a pathological steady state where workers are lost because
they have no consensus, thus preventing other workers from reaching
consensus. See also "Future: dealing with high worker turnover" below.
The simplest implementation of `consensusPermitsWorkerToBecomeHealthy()`
would be to always wait for all non-MUST_DIE workers (healthy or not) to add
a new worker to their WorkerSetID **before** running tasks on the new
worker. Unfortunately, this means that any time you have unhealthy workers,
you will be unable to use additional workers until the unhealthy ones are
lost. This is very inconvenient -- such a "consensus" cure for long initial
waits would be just as bad as the disease.
Instead, Bistro only waits for as long as there are workers, which do not --
either directly, or through other workers in their WorkerSetID -- require
the newly added workers to achieve consensus. In other words, instead of
asking each worker's WorkerSetID to include the new worker, we take the
transitive closure through all available WorkerSetIDs. This precludes the
existence of worker set that can achieve consensus on scheduler restart, but
that does **not** include the new worker. In a setup with many workers,
this mechanism requires far fewer WorkerSetIDs to actually contain the new
worker. Instead of waiting for O(# workers) WorkerSetIDs to include the new
worker, we generically only need to wait for 1 WorkerSetID update before it
is safe to run tasks on a new worker.
For the careful reader: yes, Bistro's mechanism is equivalent to "wait for
w1" in the contrived scenario above. Its strength lies in that it works
very well when there are many workers, while "wait for all workers" works
terribly.
A naive implementation of the "transitive closure of WorkerSetIDs" idea
would store for every worker the set of workers it requires. With N
workers, this would lead to O(N^2) memory usage and O(N^2) update duration.
These are not appealing at Bistro's scale. However, there is a smarter
implementation.
= Efficiently maintaining a robust worker set consensus =
The rest of this README is dedicated to the nitty-gritty details of Bistro's
implementation of the above idea, but does not add much conceptual depth.
Read on if you are:
- seeking to understand the corner cases, and what can go wrong,
- modifying the relevant RemoteWorkers code.
== Why are UNHEALTHY workers required for initial-wait-ending consensus? ==
The downside of including UNHEALTHY workers is clear: if a `w` becomes
UNHEALTHY, and the scheduler restarts, the scheduler will incur the maximum
initial wait unless the worker comes back sooner.
However, it is unsafe to exclude such workers, for the same reason -- the
network partition may be transient, the UNHEALTHY workers may actually be
running tasks, and if the scheduler exits initial wait before those workers
reconnect, it can easily start second copies of those tasks.
There is no simple way to wiggle out of this scenario. Not waiting for
UNHEALTHY workers to get lost **and** not double-starting tasks would
require a secondary way of learning about what workers were running which
tasks -- either a DB, or consensus storage among the workers themselves.
Bistro avoids the complexity, and accepts the occasional startup delay.
== Why must UNHEALTHY workers indirectly require new workers? =
This section reiterates the point of the above "It can be unsafe to start
running tasks before enough consensus emerges" in slightly different words.
We insist that all non-MUST_DIE workers indirectly require `w` before
allowing tasks to be run on `w`. Why is that?
The downside is that we can end waiting for UNHEALTHY workers to get lost
before we can start tasks on new workers. Unfortunately, failing to wait is
unsafe.
The following scenario demonstrates what goes wrong if we do not require
UNHEALTHY workers to participate in the consensus -- we will end up
double-starting tasks:
- The scheduler is empty.
- w1 connects
- w1's WorkerSetID becomes "just itself"
- w1 stops sending heartbeats
- scheduler leaves initial wait
- w2 connects, becomes healthy
- scheduler starts task T on w2 -- **we did not wait for w1 to learn of w2**
- scheduler restarts
- ex-unhealthy w1 sends a heartbeat, registering a WorkerSetID of {w1}.
- scheduler detects consensus and exits initial wait
- scheduler starts task T on w1 (UH-OH, double-start!)
- w2 connects, *kaboom*
We don't *have* to require NEW workers to be part of the consensus. For
the scheduler to start tasks, while having a NEW worker, the worker must
have first connected after the scheduler exited initial wait -- it's
pretty safe to assume that such a NEW worker has an empty WorkerSetID and
thus could not trigger the initial WorkerSetID consensus. However, there
is no harm in including NEW workers, so we do.
== How can Bistro's consensus mechanism fail? ==
Excluding UNHEALTHY workers from the consensus is not the only thing that
can cause problems, but it is the only one we can do something about. Here
are the other cases to consider:
MUST_DIE worker: it would be pretty pathological, though not impossible, for
such a worker to both be alive *and* have a dangerous WorkerSetID. For
example, in the scenario above, w1 might have become MUST_DIE due to a w1a
connecting with the same shard ID **and** the suicide request might have
gotten lost. However, we have to take this chance, since otherwise losing a
worker will prevent worker set consensus from working on the next scheduler
restart. The good news is that this particular scenario can be prevented by
**not** using --allow_bump_unhealthy_worker, which would guarantee that w1
would suicide due to its own timeout.
Unknown worker: in some macabre circumstances (e.g. multiple rapid scheduler
restarts), we can end up with a worker like w1 above, which is unknown to
the scheduler *and* has a WorkerSetID e.g. equal to {w1}. If this worker
is first to connect to a just-started scheduler, it **will** exit initial
wait, and **will** double-start tasks.
== Tracking which workers require each other to achieve consensus ==
Remember our goal: never to achieve a consensus on startup, which excludes
any workers that might already be running tasks. We achieve it by only
starting to run tasks on a worker `w` when (a) `w` requires every
non-MUST_DIE worker, and (b) `w` is **indirectly** required by every other
non-MUST_DIE worker.
Definition: `w` indirectly requires `wN` if there is a chain such that `w`
requires `w1`, which requires ..., which requires `wN`.
We could further relax the above condition (and thus start running tasks
sooner), since any worker that indirectly requires a `MUST_DIE` worker
cannot participate in any kind of consensus. Clearly, `w` does not have to
wait to be required by any such workers. However, computing or maintaining
whether a given worker indirectly requires a MUST_DIE worker is
unnecessarily complex. To implement it efficiently, one would likely
forward-and-back-propagate additional types of updates through the graph of
"w1 requires w2". Bistro neglects this optimization, and compromises on
starting to some tasks a bit later than necessary by waiting for `w` to be
indirectly required even by workers that also indirectly require MUST_DIE
workers.
== Efficiently maintaining `indirectWorkerSetID_` ==
Conceptually, the scheduler has a graph of workers, whose directed edges
indicate that one worker's workerSetID includes the other worker. The size
of the edge set is quadratic in the number of workers. Fortunately, we do
not have to explicitly store the edges or even iterate over them.
Every relevant WorkerSetID originates from this scheduler, and since
RemoteWorkers is synchronized, we have a linear view of history: w1 was
added, w2 was added, w1 was lost, etc. Every associated worker's set can be
uniquely identified by a single version number. So, that's what we do.
For each worker, we iteratively maintain a lower bound on the set of workers
that it indirectly requires in `indirectWorkerSetID_`. This is an
ever-increasing version pointing into the worker set history. Its update
strategy is a simple form of label propagation -- note that it may require
multiple passes to converge:
- Initially, it is not set. This means "worker requires unknown other
workers" -- unsafe for consensus!
- Resets to `workerSetID_` any time that version is newer than ours.
- On every `RemoteWorkers::updateState`, updates to the highest-versioned
`indirectWorkerSetID_` of all of the current set's workers.
Note that some of the workers may be MUST_DIE -- either MUST_DIE because
they timed out and were lost, or because they were bumped (possibly with
the unsafe flag of --allow_bump_unhealthy_worker), or both. For the ones
that were not bumped, we will use (and update) their
`indirectWorkerSetID_` as if they were not MUST_DIE. It must be safe to
either use or not use these (since we can neither "unpropagate" through
workers that became MUST_DIE after we propagated through them, nor can we
propagate through bumped ones) -- see ** below. The decision to propagate
trades off a little more computation for the occasional chance to start
running on new workers sooner.
To do this propagation, we must use the history to materialize each
version as an actual worker set, and consider all the workers in the set
-- we can do so efficiently by sorting the RemoteWorkers by their version,
and maintaining a sorted set of versions as we iterate through history.
- Run this algorithm every `updateState`, so that `indirectWorkerSetID_`
eventually captures all the workers that the current worker indirectly
requires.
== Why is it okay to always pick the highest version when propagating? ==
Let's say we're trying to find the transitive closure of indirectly required
workers for `w`. Each propagation will pick the highest `workerSetID_`
version among the workers in its `indirectWorkerSetID_`. This new will
contain some newly-added workers, but they all have the property that `w`
cannot be part of an initial-wait-ending consensus without those workers.
So the added workers cause no problems. On the other hand, using a higher
version will also forget that `w` indirectly requires some workers that
became MUST_DIE after its current version of `indirectWorkerSetID_` (and,
recursively, all of their requirements). As a result, `w`'s
`indirectWorkerSetID_` value ends up conservative -- a new worker `w` may be
blocked from running tasks while waiting for worker `w_older` to indirectly
require it, even though `w_older` indirectly requires some MUST_DIE workers
(and thus is irrelevant for the purposes of consensus safety).
**This subsection also explains why either choice is safe: to propagate, or
not to propagate worker sets through MUST_DIE workers.
= Future: dealing with high worker turnover =
If new workers connect, or existing workers are lost, with a high enough
frequency, the existing workers will NEVER reach consensus. This is a bit
of an adversarial case, but there is a fairly simple trigger -- a small pool
of workers which connect, become unhealthy, and then lost.
The required number of misbehaving workers to trigger a pathology can be
estimated as N = (time to lose a worker) / (time to establish consensus).
If we have many more than N crash-looping workers, and their restarts are
spread uniformly through time, then consensus will never emerge.
With the current scheduler & worker defaults, this N is approximately:
(60 + 60 + 500 + 2*5) / (2 * 15) = 21
Here is an idea for counteracting this. Suppose we haven't reached consensus
after some number K of "new worker" (but not "updated", and probably not
'lost') WorkerSetID changes. Then, we temporarily queue up any incoming
"new worker" updates to the consensus state, and establish a slightly stale
consensus. In the worst case, where we gradually lose all workers just
before achieving consensus, the time to reach consensus is bounded by
(number of workers) * (time to reach consensus). In practice, this should
converge pretty fast.
Then, we replay the batched queue of "new worker" updates and reset the
counter of "updates without consensus" to K.
Aside: I **think** that it's not a good idea to also queue "lost" updates,
since doing that could block consensus from emerging. It's also harder,
since a lost RemoteWorker can get replaced, complicating our bookkeeping.
This idea is a definite improvement in that it gives guaranteed convergence.
The minor complication is that the current "new worker" callback is
synchronous, and enabling it to be queued up would require some care. The
resulting RemoteWorker would, for all intents and purposes, appear as any
other "never been healthy, blocked by consensus" worker. The only
difference is that it would not be included in any consensus calculations.
If such a worker is lost, or its WorkerSetID state updated, we have to take
care not to touch the consensus state either, and instead just update the
queue.