-
Notifications
You must be signed in to change notification settings - Fork 786
/
basic_lifecycler_delegates.go
152 lines (124 loc) · 5.7 KB
/
basic_lifecycler_delegates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package ring
import (
"context"
"os"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)
type LeaveOnStoppingDelegate struct {
next BasicLifecyclerDelegate
logger log.Logger
}
func NewLeaveOnStoppingDelegate(next BasicLifecyclerDelegate, logger log.Logger) *LeaveOnStoppingDelegate {
return &LeaveOnStoppingDelegate{
next: next,
logger: logger,
}
}
func (d *LeaveOnStoppingDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) {
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
func (d *LeaveOnStoppingDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) {
d.next.OnRingInstanceTokens(lifecycler, tokens)
}
func (d *LeaveOnStoppingDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler) {
if err := lifecycler.changeState(context.Background(), LEAVING); err != nil {
level.Error(d.logger).Log("msg", "failed to change instance state to LEAVING in the ring", "err", err)
}
d.next.OnRingInstanceStopping(lifecycler)
}
func (d *LeaveOnStoppingDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) {
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc, instanceDesc)
}
type TokensPersistencyDelegate struct {
next BasicLifecyclerDelegate
logger log.Logger
tokensPath string
loadState InstanceState
}
func NewTokensPersistencyDelegate(path string, state InstanceState, next BasicLifecyclerDelegate, logger log.Logger) *TokensPersistencyDelegate {
return &TokensPersistencyDelegate{
next: next,
logger: logger,
tokensPath: path,
loadState: state,
}
}
func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) {
// Skip if no path has been configured.
if d.tokensPath == "" {
level.Info(d.logger).Log("msg", "not loading tokens from file, tokens file path is empty")
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
// Do not load tokens from disk if the instance is already in the ring and has some tokens.
if instanceExists && len(instanceDesc.GetTokens()) > 0 {
level.Info(d.logger).Log("msg", "not loading tokens from file, instance already in the ring")
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
tokensFromFile, err := LoadTokensFromFile(d.tokensPath)
if err != nil {
if !os.IsNotExist(err) {
level.Error(d.logger).Log("msg", "error loading tokens from file", "err", err)
}
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
// Signal the next delegate that the tokens have been loaded, miming the
// case the instance exist in the ring (which is OK because the lifecycler
// will correctly reconcile this case too).
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, true, lifecycler.GetInstanceID(), InstanceDesc{
Addr: lifecycler.GetInstanceAddr(),
Timestamp: time.Now().Unix(),
RegisteredTimestamp: lifecycler.GetRegisteredAt().Unix(),
State: d.loadState,
Tokens: tokensFromFile,
Zone: lifecycler.GetInstanceZone(),
})
}
func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) {
if d.tokensPath != "" {
if err := tokens.StoreToFile(d.tokensPath); err != nil {
level.Error(d.logger).Log("msg", "error storing tokens to disk", "path", d.tokensPath, "err", err)
}
}
d.next.OnRingInstanceTokens(lifecycler, tokens)
}
func (d *TokensPersistencyDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler) {
d.next.OnRingInstanceStopping(lifecycler)
}
func (d *TokensPersistencyDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) {
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc, instanceDesc)
}
// AutoForgetDelegate automatically remove an instance from the ring if the last
// heartbeat is older than a configured period.
type AutoForgetDelegate struct {
next BasicLifecyclerDelegate
logger log.Logger
forgetPeriod time.Duration
}
func NewAutoForgetDelegate(forgetPeriod time.Duration, next BasicLifecyclerDelegate, logger log.Logger) *AutoForgetDelegate {
return &AutoForgetDelegate{
next: next,
logger: logger,
forgetPeriod: forgetPeriod,
}
}
func (d *AutoForgetDelegate) OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) {
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
func (d *AutoForgetDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) {
d.next.OnRingInstanceTokens(lifecycler, tokens)
}
func (d *AutoForgetDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler) {
d.next.OnRingInstanceStopping(lifecycler)
}
func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) {
for id, instance := range ringDesc.Ingesters {
lastHeartbeat := time.Unix(instance.GetTimestamp(), 0)
if time.Since(lastHeartbeat) > d.forgetPeriod {
level.Warn(d.logger).Log("msg", "auto-forgetting instance from the ring because it is unhealthy for a long time", "instance", id, "last_heartbeat", lastHeartbeat.String(), "forget_period", d.forgetPeriod)
ringDesc.RemoveIngester(id)
}
}
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc, instanceDesc)
}