This repository has been archived by the owner on Jul 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 71
/
ring.go
123 lines (101 loc) · 3.65 KB
/
ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// SPDX-License-Identifier: AGPL-3.0-only
package servicediscovery
import (
"context"
"sort"
"time"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
)
var (
// Ring operation used to get healthy active instances in the ring.
activeRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
)
type ringServiceDiscovery struct {
services.Service
ringClient *ring.Ring
ringCheckPeriod time.Duration
maxUsedInstances int
subservicesWatcher *services.FailureWatcher
receiver Notifications
// Keep track of the instances that have been discovered and notified so far.
notifiedByAddress map[string]Instance
}
func NewRing(ringClient *ring.Ring, ringCheckPeriod time.Duration, maxUsedInstances int, receiver Notifications) services.Service {
r := &ringServiceDiscovery{
ringClient: ringClient,
ringCheckPeriod: ringCheckPeriod,
maxUsedInstances: maxUsedInstances,
subservicesWatcher: services.NewFailureWatcher(),
notifiedByAddress: make(map[string]Instance),
receiver: receiver,
}
r.Service = services.NewBasicService(r.starting, r.running, r.stopping)
return r
}
func (r *ringServiceDiscovery) starting(ctx context.Context) error {
r.subservicesWatcher.WatchService(r.ringClient)
return errors.Wrap(services.StartAndAwaitRunning(ctx, r.ringClient), "failed to start ring client")
}
func (r *ringServiceDiscovery) stopping(_ error) error {
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), r.ringClient), "failed to stop ring client")
}
func (r *ringServiceDiscovery) running(ctx context.Context) error {
ringTicker := time.NewTicker(r.ringCheckPeriod)
defer ringTicker.Stop()
// Notifies the initial state.
all, _ := r.ringClient.GetAllHealthy(activeRingOp) // nolint:errcheck
r.notifyChanges(all)
for {
select {
case <-ringTicker.C:
all, _ := r.ringClient.GetAllHealthy(activeRingOp) // nolint:errcheck
r.notifyChanges(all)
case <-ctx.Done():
return nil
case err := <-r.subservicesWatcher.Chan():
return errors.Wrap(err, "a subservice of ring-based service discovery has failed")
}
}
}
// notifyChanges is not concurrency safe. The input all and inUse ring.ReplicationSet may be the same object.
func (r *ringServiceDiscovery) notifyChanges(all ring.ReplicationSet) {
// Build a map with the discovered instances.
instancesByAddress := make(map[string]Instance, len(all.Instances))
for _, instance := range selectInUseInstances(all.Instances, r.maxUsedInstances) {
instancesByAddress[instance.Addr] = Instance{Address: instance.Addr, InUse: true}
}
for _, instance := range all.Instances {
if _, ok := instancesByAddress[instance.Addr]; !ok {
instancesByAddress[instance.Addr] = Instance{Address: instance.Addr, InUse: false}
}
}
// Notify new instances.
for addr, instance := range instancesByAddress {
if _, ok := r.notifiedByAddress[addr]; !ok {
r.receiver.InstanceAdded(instance)
}
}
// Notify changed instances.
for addr, instance := range instancesByAddress {
if n, ok := r.notifiedByAddress[addr]; ok && !n.Equal(instance) {
r.receiver.InstanceChanged(instance)
}
}
// Notify removed instances.
for addr, instance := range r.notifiedByAddress {
if _, ok := instancesByAddress[addr]; !ok {
r.receiver.InstanceRemoved(instance)
}
}
r.notifiedByAddress = instancesByAddress
}
func selectInUseInstances(instances []ring.InstanceDesc, maxInstances int) []ring.InstanceDesc {
if maxInstances <= 0 || len(instances) <= maxInstances {
return instances
}
// Select the first N instances (sorted by address) to be used.
sort.Sort(ring.ByAddr(instances))
return instances[:maxInstances]
}