forked from asonawalla/gazette
/
allocator.go
617 lines (540 loc) · 21.4 KB
/
allocator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
package consensus
import (
"context"
"errors"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
etcd "github.com/coreos/etcd/client"
log "github.com/sirupsen/logrus"
)
//go:generate mockery -inpkg -name=Allocator
const (
MemberPrefix = "members" // Directory root for member announcements.
ItemsPrefix = "items" // Directory root for allocated items.
lockDuration = time.Minute * 5 // Duration of held locks.
allocErrSleepInterval = time.Second * 5 // Sleep cool-off on errors.
// Maximum sleep interval. This is a tighter bound than that required
// for lock resets, to ensure that ItemStates are polled and updated
// into Etcd with sufficient frequency.
allocMaxSleepInterval = time.Second * 5
)
var ErrAllocatorInstanceExists = errors.New("Allocator member key exists")
// Allocator is an interface which performs distributed allocation of items.
type Allocator interface {
KeysAPI() etcd.KeysAPI
// Etcd path which roots shared state for this Context.
PathRoot() string
// A key uniquely identifying this Allocator within shared state.
InstanceKey() string
// The required number of replicas. Except in cases of failure, allocation
// changes will not be made which would violate having at least this many
// ready replicas of an item at all times.
Replicas() int
// Items which will be created if they do not exist. May be empty, and
// additional items may be added at any time out-of-band (via creation of
// a corresponding Etcd directory).
FixedItems() []string
// For |item| which is currently a local replica or master, returns a
// representation of the local item processing state. State is shared with
// other Allocators via this Allocator's |item| announcement in Etcd.
ItemState(item string) string
// For |state| of an item, which may be processed by another Allocator,
// returns whether the item can safely be promoted at this time.
ItemIsReadyForPromotion(item, state string) bool
// Notifies Allocator of |route| for |item|. If |index| == -1, then Allocator
// has no entry for |item|. Otherwise, |route.Entries[index]| is the entry
// of this Allocator (and will have basename InstanceKey()). |tree| is given
// as context: ItemRoute() will often wish to wish to inspect other state
// within |tree| in response to a route change. Note that |route| or |tree|
// must be copied if retained beyond this call
ItemRoute(item string, route Route, index int, tree *etcd.Node)
}
// Inspector is an optional interface of an Allocator which allows for
// inspections of the Allocator state tree.
type Inspector interface {
// InspectChan returns a channel which may request invocations to inspect
// state in between allocator actions. The callback is invoked from the
// Allocator's goroutine and has exclusive read access to the |tree| for the
// call duration. Because of this, callbacks must be non-blocking. The
// callback must not modify |tree|.
InspectChan() chan func(tree *etcd.Node)
}
// Create attempts to create an Allocator member lock reflecting instance
// |alloc|. If the member lock already exists, returns
// ErrAllocatorInstanceExists. An Allocator member lock should be obtained
// prior to an Allocate call.
func Create(alloc Allocator) error {
_, err := alloc.KeysAPI().Set(context.Background(), memberKey(alloc), "",
&etcd.SetOptions{PrevExist: etcd.PrevNoExist, TTL: lockDuration})
if err, ok := err.(etcd.Error); ok && err.Code == etcd.ErrorCodeNodeExist {
return ErrAllocatorInstanceExists
}
return err
}
// Allocate acts on behalf of |alloc| to achieve distributed allocation of
// items. This is a long-lived call, which will exit only after |alloc|'s
// member announcement is removed (eg, by Cancel(alloc)) and all allocated
// items have been safely handed off to ready replicas.
//
// Allocate acts on behalf of an existing member lock. If such a lock does not
// exist, Allocate will take no action. If it exists but is owned by another
// process, Allocate will duplicate the item allocations of that process. It is
// the caller's responsibility to obtain and verify uniqueness of the member
// lock (eg, via a preceeding Create).
func Allocate(alloc Allocator) error {
// Channels for receiving & cancelling watched tree updates.
var watchCh = make(chan *etcd.Response)
var cancelWatch = make(chan struct{})
var refreshTicker = time.NewTicker(time.Minute * 10)
defer func() {
close(cancelWatch)
refreshTicker.Stop()
}()
var tree *etcd.Node // Watched tree rooted at alloc.PathRoot().
watcher := RetryWatcher(alloc.KeysAPI(), alloc.PathRoot(),
&etcd.GetOptions{Recursive: true, Sort: true},
&etcd.WatcherOptions{Recursive: true},
refreshTicker.C)
// Load initial tree. Fail-fast on any error.
if r, err := watcher.Next(context.Background()); err != nil {
return err
} else {
tree = r.Node
}
// Begin monitoring alloc.PathRoot() for changes.
go func() {
for {
if r, err := watcher.Next(context.Background()); err != nil {
log.WithField("err", err).Warn("allocator watch")
select {
case <-cancelWatch:
return
case <-time.Tick(allocErrSleepInterval):
}
} else {
select {
case <-cancelWatch:
return
case watchCh <- r:
}
}
}
}()
// Test support hooks.
testNotifier, _ := alloc.(interface {
IdleAt(uint64) // Allocate() is idle at the given |modifiedIndex|.
ActedAt(uint64) // Allocate() acted at |modifiedIndex|.
})
var now time.Time // Current timepoint.
var modifiedIndex uint64 // Current Etcd ModifiedIndex.
var inspectCh chan func(tree *etcd.Node)
if inspector, ok := alloc.(Inspector); ok {
inspectCh = inspector.InspectChan()
}
// When idle, manages deadline at which we must wake for next lock refresh.
var deadlineTimer = time.NewTimer(0)
var deadlineCh = deadlineTimer.C
for {
// Repeatedly wait for either |deadlineCh| or |watchCh| to select.
select {
case now = <-deadlineCh:
case response := <-watchCh:
for done := false; !done; {
var err error
if tree, err = PatchTree(tree, response); err != nil {
log.WithFields(log.Fields{"err": err, "resp": response}).Error("patch failed")
}
// Process further queued watch updates without blocking.
select {
case response = <-watchCh:
default:
done = true
}
}
if modifiedIndex > response.Node.ModifiedIndex {
// We are waiting for a previous allocation action to be reflected
// in our watched tree. Don't attempt another action until it is.
continue
}
modifiedIndex = response.Node.ModifiedIndex
case callback := <-inspectCh:
callback(tree)
continue
}
// Disable timer notifications until explicitly re-enabled.
deadlineTimer.Stop()
deadlineCh = nil
var params = allocParams{Allocator: alloc}
params.Input.Time = now
params.Input.Tree = tree
params.Input.Index = modifiedIndex
allocExtract(¶ms)
desiredMaster, desiredTotal := targetCounts(¶ms)
log.WithFields(log.Fields{
"allocParams": params,
"desiredMaster": desiredMaster,
"desiredTotal": desiredTotal,
}).Debug("allocator params")
if response, err := allocAction(¶ms, desiredMaster, desiredTotal); err != nil {
log.WithField("err", err).Warn("failed to apply allocation action")
// Action is implicitly retried the next iteration, which will occur
// on the next watch update or after cool-off.
deadlineTimer.Reset(allocErrSleepInterval)
deadlineCh = deadlineTimer.C
continue
} else if response != nil {
// Action was applied. We expect to see |response| again via our Etcd
// watch, and defer further processing or actions until we do.
modifiedIndex = response.Node.ModifiedIndex
if testNotifier != nil {
testNotifier.ActedAt(modifiedIndex)
}
continue
}
// No action is possible at this time.
if nextDeadline := nextDeadline(¶ms); nextDeadline.IsZero() {
// Termination condition: no Etcd entries remain.
return nil
} else {
// Arrange to sleep until the earlier of |nextDeadline| or
// |allocMaxSleepInterval|.
if d := nextDeadline.Sub(time.Now()); d < allocMaxSleepInterval {
deadlineTimer.Reset(d)
} else {
deadlineTimer.Reset(allocMaxSleepInterval)
}
deadlineCh = deadlineTimer.C
if testNotifier != nil {
testNotifier.IdleAt(modifiedIndex)
}
}
}
}
// Cancel cancels |alloc| by deleting its member announcement. The matching
// Allocate() invocation will begin an orderly release of held items. When all
// items are released, Allocate() will exit. Note that mastered items will be
// released only once they have a sufficient number of ready replicas for
// hand-off.
func Cancel(alloc Allocator) error {
_, err := alloc.KeysAPI().Delete(context.Background(), memberKey(alloc), nil)
return err
}
// CancelItem cancels |item| by deleting its announcement. This should be
// undertaken only under exceptional circumstances, where the local Allocator
// is unable to service the allocated |item| (eg, because of an unrecoverable
// local error).
func CancelItem(alloc Allocator, item string) error {
_, err := alloc.KeysAPI().Delete(context.Background(), itemKey(alloc, item), nil)
return err
}
// Composes Create and Allocate to run an Allocator which will additionally
// use an installed signal handler to gracefully Cancel itself on a SIGTERM
// or SIGINT. Performs a polled retry of Create on ErrAllocatorInstanceExists,
// until aquired or signaled. Top-level programs implementing an Allocator will
// generally want to use this.
func CreateAndAllocateWithSignalHandling(alloc Allocator) error {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGTERM, syscall.SIGINT)
shutdownCh := make(chan struct{})
go func() {
sig, ok := <-signalCh
if ok {
log.WithField("signal", sig).Info("caught signal")
close(shutdownCh)
}
}()
// Obtain Allocator lock. If it exists, retry until signalled.
for {
err := Create(alloc)
if err == nil {
break
} else if err != ErrAllocatorInstanceExists {
return err
}
log.WithField("key", alloc.InstanceKey()).
Warn("waiting for prior Allocator to expire")
select {
case <-time.After(time.Second * 10):
continue
case <-shutdownCh:
return err
}
}
// Arrange to Cancel Allocate on signal, allowing it to gracefully tear down.
go func() {
<-shutdownCh
if err := Cancel(alloc); err != nil {
log.WithField("err", err).Error("allocator cancel failed")
}
}()
return Allocate(alloc)
}
// memberKey returns the member announcement key for |alloc|.
// Ex: /path/root/members/my-alloc-key
func memberKey(alloc Allocator) string {
return alloc.PathRoot() + "/" + MemberPrefix + "/" + alloc.InstanceKey()
}
// itemKey returns the item entry key for |item| held by |alloc|.
// Ex: /path/root/items/an-item/my-alloc-key
func itemKey(alloc Allocator, item string) string {
return alloc.PathRoot() + "/" + ItemsPrefix + "/" + item + "/" + alloc.InstanceKey()
}
// itemOfItemKey returns the item name represented by item entry |key| held by
// |alloc|. Ex: /path/root/items/an-item/my-alloc-key => an-item
func itemOfItemKey(alloc Allocator, key string) string {
lstrip := len(alloc.PathRoot()) + 1 + len(ItemsPrefix) + 1
rstrip := len(key) - len(alloc.InstanceKey()) - 1
return key[lstrip:rstrip]
}
// POD type built by individual iterations of the Allocate() protocol,
// to succinctly describe global allocator state.
type allocParams struct {
Allocator `json:"-"`
Input struct {
Time time.Time
Tree *etcd.Node
Index uint64 // Current Etcd ModifiedIndex.
}
Item struct {
Master []*etcd.Node // Items for which we're master.
Replica []*etcd.Node // Items for which we're a replica.
Extra []*etcd.Node // Items for which we hold an extra lock.
Releaseable []*etcd.Node // Mastered items we may release.
OpenMasters []string // Names of items in need of a master.
OpenReplicas []string // Names of items in need of a replica.
Count int // Total number of items.
}
Member struct {
Entry *etcd.Node // Our member entry.
Count int // Total number of allocator members.
}
}
// WalkItems performs a zipped, outer-join iteration of items under ItemsPrefix
// of |tree|, and |fixedItems| (which must be ordered). The argument callback
// |cb| is invoked for each item, and must not retain |route| after each call.
func WalkItems(tree *etcd.Node, fixedItems []string, cb func(name string, route Route)) {
var dir etcd.Node
if d := Child(tree, ItemsPrefix); d != nil {
dir = *d
} else {
// Fabricate items directory if it doesn't exist.
dir = etcd.Node{Key: tree.Key + "/" + ItemsPrefix, Dir: true}
}
// Perform a zipped, outer-join iteration of |dir| items and |fixedItems|.
var scratch [8]*etcd.Node // Re-usable buffer for building Route.Entries.
forEachChild(&dir, fixedItems, func(name string, node *etcd.Node) {
// Bypass NewRoute to avoid extra deep copies and because we know (per the
// callback contract) that |route| will not be retained.
var route = Route{
Item: node,
Entries: append(scratch[:0], node.Nodes...),
}
route.init()
cb(name, route)
})
}
// allocExtract builds |p.Item| and |p.Member| descriptions of allocParams from
// |p.Input|.
func allocExtract(p *allocParams) {
WalkItems(p.Input.Tree, p.FixedItems(), func(name string, route Route) {
p.Item.Count += 1
var index = route.Index(p.InstanceKey())
p.ItemRoute(name, route, index, p.Input.Tree)
if index == -1 {
// We do not hold a lock on this item.
if len(route.Entries) == 0 {
p.Item.OpenMasters = append(p.Item.OpenMasters, name)
} else if len(route.Entries) < p.Replicas()+1 {
p.Item.OpenReplicas = append(p.Item.OpenReplicas, name)
}
} else if index == 0 {
// We act as item master.
p.Item.Master = append(p.Item.Master, route.Entries[0])
// We always require that mastered items be ready for hand-off
// before we may release them, even if our member lock is gone.
if route.IsReadyForHandoff(p) {
p.Item.Releaseable = append(p.Item.Releaseable, route.Entries[0])
}
} else if index < p.Replicas()+1 {
// We act as an item replica.
p.Item.Replica = append(p.Item.Replica, route.Entries[index])
} else {
// We hold an extra lock (we lost a race to become a replica).
p.Item.Extra = append(p.Item.Extra, route.Entries[index])
}
})
if membersDir := Child(p.Input.Tree, MemberPrefix); membersDir != nil {
p.Member.Entry = Child(membersDir, p.InstanceKey())
p.Member.Count = len(membersDir.Nodes)
}
}
// allocAction selects and attempts an action (state transition) given the
// current parameters, as an Etcd operation. Etcd response and error code are
// passed through. If both are nil, no action was available to be attempted.
func allocAction(p *allocParams, desiredMaster, desiredTotal int) (*etcd.Response, error) {
// Locks are refreshed when less than 1/2 of their TTL remains.
var horizon = p.Input.Time.Add(lockDuration / 2)
// Helper which CASs |node| to |value| with TTL.
var compareAndSet = func(node *etcd.Node, value string) (*etcd.Response, error) {
return p.KeysAPI().Set(context.Background(), node.Key, value,
&etcd.SetOptions{PrevIndex: node.ModifiedIndex, TTL: lockDuration})
}
// Helper which CADs |node|.
var compareAndDelete = func(node *etcd.Node) (*etcd.Response, error) {
return p.KeysAPI().Delete(context.Background(), node.Key,
&etcd.DeleteOptions{PrevIndex: node.ModifiedIndex})
}
// Helper which creates |key| with TTL.
var create = func(key string) (*etcd.Response, error) {
return p.KeysAPI().Set(context.Background(), key, "",
&etcd.SetOptions{PrevExist: etcd.PrevNoExist, TTL: lockDuration})
}
// 1) Refresh the member lock.
if p.Member.Entry != nil {
if p.Member.Entry.Expiration.Before(horizon) {
log.WithField("key", p.Member.Entry.Key).Debug("refreshing member lock")
return compareAndSet(p.Member.Entry, "")
}
}
// An item lock is updated if it's beyond |horizon|, *or* if Allocator state
// for the item no longer matches persisted item state. For example, the item
// may now be ready for promotion, which must be published via Etcd.
// 2) Refresh or update a master lock
for _, entry := range p.Item.Master {
value := p.ItemState(itemOfItemKey(p, entry.Key))
if entry.Expiration.Before(horizon) || value != entry.Value {
log.WithFields(log.Fields{"key": entry.Key, "value": value}).
Debug("refreshing allocated master lock")
return compareAndSet(entry, value)
}
}
// 3) Refresh or update a replica lock.
for _, entry := range p.Item.Replica {
value := p.ItemState(itemOfItemKey(p, entry.Key))
if entry.Expiration.Before(horizon) || value != entry.Value {
log.WithFields(log.Fields{"key": entry.Key, "value": value}).
Debug("refreshing allocated replica lock")
return compareAndSet(entry, value)
}
}
// 4) Release a spurious lock from a lost acquisition race.
for _, entry := range p.Item.Extra {
log.WithField("key", entry.Key).Debug("deleting lost-race item lock")
return compareAndDelete(entry)
}
// 5) Select a random master item to release. This may occur iff:
// * We are currently the item master.
// * The item has the required number of ready replicas.
// * We'd like to release a mastered item.
if len(p.Item.Master) > desiredMaster && len(p.Item.Releaseable) != 0 {
entry := p.Item.Releaseable[rand.Int()%len(p.Item.Releaseable)]
log.WithField("key", entry.Key).Debug("releasing mastered item lock")
return compareAndDelete(entry)
}
// 6) Select a random replica to release. In normal operation we never
// release a replica we hold. However, iff we have no member lock (we're in
// the process of shutdown), then we may release held replicas (not masters).
if p.Member.Entry == nil && len(p.Item.Replica) != 0 {
entry := p.Item.Replica[rand.Int()%len(p.Item.Replica)]
log.WithField("key", entry.Key).Debug("releasing replica item lock")
return compareAndDelete(entry)
}
// 7) Select a random item to master. This may occur iff:
// * We don't hold an entry for the item.
// * The item has an open master slot.
// * We'd like to have another master.
if len(p.Item.Master) < desiredMaster && len(p.Item.OpenMasters) != 0 {
name := p.Item.OpenMasters[rand.Int()%len(p.Item.OpenMasters)]
key := itemKey(p, name)
log.WithField("key", key).Debug("aquiring item master lock")
return create(key)
}
// 8) Select a random item to replicate. This may occur iff:
// * We don't hold an entry for the item.
// * The item has an open replica slot.
// * We'd like to have another replica.
if len(p.Item.Master)+len(p.Item.Replica) < desiredTotal && len(p.Item.OpenReplicas) != 0 {
name := p.Item.OpenReplicas[rand.Int()%len(p.Item.OpenReplicas)]
key := itemKey(p, name)
log.WithField("key", key).Debug("aquiring item replica lock")
return create(key)
}
// 9) Deadlock avoidance: Select a random master to release, iff:
// * We are currently the item master.
// * The item has the required number of ready replicas.
// * We hold exactly as many master slots as we'd like.
// * We have too many items overall.
if len(p.Item.Master) == desiredMaster &&
len(p.Item.Master)+len(p.Item.Replica) > desiredTotal &&
len(p.Item.Releaseable) != 0 {
var entry = p.Item.Releaseable[rand.Int()%len(p.Item.Releaseable)]
log.WithField("key", entry.Key).Debug("releasing EXTRA mastered item lock")
return compareAndDelete(entry)
}
// 10) Deadlock avoidance: Select a random item to replicate with delay, iff:
// * We don't hold an entry for the item.
// * The item has an open replica slot.
// * We have the exact right number of items overall (we'll be going over).
// * We are not actively seeking to exit.
//
// Note that this case means an allocator can potentially fail to converge.
// We resolve this in practice by sleeping for a period of time: if there's
// another allocator that actively seeks more replicas, we'd prefer that they
// win. Sleeping is safe because we've already asserted that all held keys
// have at least 1/2 of their TTL remaining.
if len(p.Item.Master)+len(p.Item.Replica) == desiredTotal &&
len(p.Item.Master) == desiredMaster &&
len(p.Item.OpenReplicas) != 0 &&
p.Member.Entry != nil {
var name = p.Item.OpenReplicas[rand.Int()%len(p.Item.OpenReplicas)]
var key = itemKey(p, name)
time.Sleep(100 * time.Millisecond)
log.WithField("key", key).Debug("aquiring EXTRA item replica lock")
return create(key)
}
return nil, nil
}
// targetCounts returns the desired number of mastered and total (mastered +
// replica) items.
func targetCounts(p *allocParams) (desiredMaster, desiredTotal int) {
// If we do not hold a member lock, our target is zero. Otherwise, it's
// p.Item.Count / p.Member.Count rounded up.
if p.Member.Entry != nil {
desiredMaster = p.Item.Count / p.Member.Count
if p.Item.Count%p.Member.Count != 0 {
desiredMaster += 1
}
desiredTotal = desiredMaster * (p.Replicas() + 1)
}
return
}
// nextDeadline computes the next deadline by finding the minimum Expiration of
// all held Etcd entries, and subtracting 1/2 of lockDuration. Eg, we wish to
// refresh a held entry once its remaining TTL is less than 1/2 of
// lockDuration.
func nextDeadline(p *allocParams) time.Time {
var firstExpire time.Time
if p.Member.Entry != nil {
firstExpire = *p.Member.Entry.Expiration
}
for _, entry := range p.Item.Master {
if firstExpire.IsZero() || entry.Expiration.Before(firstExpire) {
firstExpire = *entry.Expiration
}
}
for _, entry := range p.Item.Replica {
if firstExpire.IsZero() || entry.Expiration.Before(firstExpire) {
firstExpire = *entry.Expiration
}
}
if firstExpire.IsZero() {
return time.Time{}
}
return firstExpire.Add(-lockDuration / 2)
}
func init() {
rand.Seed(time.Now().UnixNano())
}