/
instance_count.go
53 lines (42 loc) · 2.15 KB
/
instance_count.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// SPDX-License-Identifier: AGPL-3.0-only
package distributor
import (
"time"
"github.com/grafana/dskit/ring"
"go.uber.org/atomic"
)
// healthyInstanceDelegate counts the number of healthy instances that are part of the ring
// and stores the count to the provided atomic integer. Used here to count the number of
// distributors in the ring to determine how to enforce rate limiting.
type healthyInstanceDelegate struct {
count *atomic.Uint32
heartbeatTimeout time.Duration
next ring.BasicLifecyclerDelegate
}
func newHealthyInstanceDelegate(count *atomic.Uint32, heartbeatTimeout time.Duration, next ring.BasicLifecyclerDelegate) *healthyInstanceDelegate {
return &healthyInstanceDelegate{count: count, heartbeatTimeout: heartbeatTimeout, next: next}
}
// OnRingInstanceRegister implements the ring.BasicLifecyclerDelegate interface
func (d *healthyInstanceDelegate) OnRingInstanceRegister(lifecycler *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
// OnRingInstanceTokens implements the ring.BasicLifecyclerDelegate interface
func (d *healthyInstanceDelegate) OnRingInstanceTokens(lifecycler *ring.BasicLifecycler, tokens ring.Tokens) {
d.next.OnRingInstanceTokens(lifecycler, tokens)
}
// OnRingInstanceStopping implements the ring.BasicLifecyclerDelegate interface
func (d *healthyInstanceDelegate) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {
d.next.OnRingInstanceStopping(lifecycler)
}
// OnRingInstanceHeartbeat implements the ring.BasicLifecyclerDelegate interface
func (d *healthyInstanceDelegate) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) {
activeMembers := uint32(0)
now := time.Now()
for _, instance := range ringDesc.Ingesters {
if ring.ACTIVE == instance.State && instance.IsHeartbeatHealthy(d.heartbeatTimeout, now) {
activeMembers++
}
}
d.count.Store(activeMembers)
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc, instanceDesc)
}