-
Notifications
You must be signed in to change notification settings - Fork 795
/
replication_set_tracker.go
96 lines (78 loc) · 2.42 KB
/
replication_set_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package ring
type replicationSetResultTracker interface {
// Signals an instance has done the execution, either successful (no error)
// or failed (with error).
done(instance *IngesterDesc, err error)
// Returns true if the minimum number of successful results have been received.
succeeded() bool
// Returns true if the maximum number of failed executions have been reached.
failed() bool
}
type defaultResultTracker struct {
minSucceeded int
numSucceeded int
numErrors int
maxErrors int
}
func newDefaultResultTracker(instances []IngesterDesc, maxErrors int) *defaultResultTracker {
return &defaultResultTracker{
minSucceeded: len(instances) - maxErrors,
numSucceeded: 0,
numErrors: 0,
maxErrors: maxErrors,
}
}
func (t *defaultResultTracker) done(_ *IngesterDesc, err error) {
if err == nil {
t.numSucceeded++
} else {
t.numErrors++
}
}
func (t *defaultResultTracker) succeeded() bool {
return t.numSucceeded >= t.minSucceeded
}
func (t *defaultResultTracker) failed() bool {
return t.numErrors > t.maxErrors
}
// zoneAwareResultTracker tracks the results per zone.
// All instances in a zone must succeed in order for the zone to succeed.
type zoneAwareResultTracker struct {
waitingByZone map[string]int
failuresByZone map[string]int
minSuccessfulZones int
maxUnavailableZones int
}
func newZoneAwareResultTracker(instances []IngesterDesc, maxUnavailableZones int) *zoneAwareResultTracker {
t := &zoneAwareResultTracker{
waitingByZone: make(map[string]int),
failuresByZone: make(map[string]int),
maxUnavailableZones: maxUnavailableZones,
}
for _, instance := range instances {
t.waitingByZone[instance.Zone]++
}
t.minSuccessfulZones = len(t.waitingByZone) - maxUnavailableZones
return t
}
func (t *zoneAwareResultTracker) done(instance *IngesterDesc, err error) {
t.waitingByZone[instance.Zone]--
if err != nil {
t.failuresByZone[instance.Zone]++
}
}
func (t *zoneAwareResultTracker) succeeded() bool {
successfulZones := 0
// The execution succeeded once we successfully received a successful result
// from "all zones - max unavailable zones".
for zone, numWaiting := range t.waitingByZone {
if numWaiting == 0 && t.failuresByZone[zone] == 0 {
successfulZones++
}
}
return successfulZones >= t.minSuccessfulZones
}
func (t *zoneAwareResultTracker) failed() bool {
failedZones := len(t.failuresByZone)
return failedZones > t.maxUnavailableZones
}