-
Notifications
You must be signed in to change notification settings - Fork 786
/
replication_set.go
125 lines (107 loc) · 2.93 KB
/
replication_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package ring
import (
"context"
"sort"
"time"
)
// ReplicationSet describes the ingesters to talk to for a given key, and how
// many errors to tolerate.
type ReplicationSet struct {
Ingesters []IngesterDesc
MaxErrors int
}
// Do function f in parallel for all replicas in the set, erroring is we exceed
// MaxErrors and returning early otherwise.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *IngesterDesc) (interface{}, error)) ([]interface{}, error) {
var (
errs = make(chan error, len(r.Ingesters))
resultsChan = make(chan interface{}, len(r.Ingesters))
minSuccess = len(r.Ingesters) - r.MaxErrors
forceStart = make(chan struct{}, r.MaxErrors)
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i := range r.Ingesters {
go func(i int, ing *IngesterDesc) {
// wait to send extra requests
if i >= minSuccess && delay > 0 {
after := time.NewTimer(delay)
defer after.Stop()
select {
case <-ctx.Done():
return
case <-forceStart:
case <-after.C:
}
}
result, err := f(ctx, ing)
if err != nil {
errs <- err
} else {
resultsChan <- result
}
}(i, &r.Ingesters[i])
}
var (
numErrs int
numSuccess int
results = make([]interface{}, 0, len(r.Ingesters))
)
for numSuccess < minSuccess {
select {
case err := <-errs:
numErrs++
if numErrs > r.MaxErrors {
return nil, err
}
// force one of the delayed requests to start
forceStart <- struct{}{}
case result := <-resultsChan:
numSuccess++
results = append(results, result)
case <-ctx.Done():
return nil, ctx.Err()
}
}
return results, nil
}
// Includes returns whether the replication set includes the replica with the provided addr.
func (r ReplicationSet) Includes(addr string) bool {
for _, instance := range r.Ingesters {
if instance.GetAddr() == addr {
return true
}
}
return false
}
// GetAddresses returns the addresses of all instances within the replication set. Returned slice
// order is not guaranteed.
func (r ReplicationSet) GetAddresses() []string {
addrs := make([]string, 0, len(r.Ingesters))
for _, desc := range r.Ingesters {
addrs = append(addrs, desc.Addr)
}
return addrs
}
// HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps),
// false if they differ in any way (number of instances, instance states, tokens, zones, ...).
func HasReplicationSetChanged(before, after ReplicationSet) bool {
beforeInstances := before.Ingesters
afterInstances := after.Ingesters
if len(beforeInstances) != len(afterInstances) {
return true
}
sort.Sort(ByAddr(beforeInstances))
sort.Sort(ByAddr(afterInstances))
for i := 0; i < len(beforeInstances); i++ {
b := beforeInstances[i]
a := afterInstances[i]
// Exclude the heartbeat timestamp from the comparison.
b.Timestamp = 0
a.Timestamp = 0
if !b.Equal(a) {
return true
}
}
return false
}