-
Notifications
You must be signed in to change notification settings - Fork 796
/
basic_lifecycler.go
486 lines (387 loc) · 14.1 KB
/
basic_lifecycler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
package ring
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)
type BasicLifecyclerDelegate interface {
// OnRingInstanceRegister is called while the lifecycler is registering the
// instance within the ring and should return the state and set of tokens to
// use for the instance itself.
OnRingInstanceRegister(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc IngesterDesc) (IngesterState, Tokens)
// OnRingInstanceTokens is called once the instance tokens are set and are
// stable within the ring (honoring the observe period, if set).
OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens)
// OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler
// will continue to hearbeat the ring the this function is executing and will proceed
// to unregister the instance from the ring only after this function has returned.
OnRingInstanceStopping(lifecycler *BasicLifecycler)
// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
// in the ring.
OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *IngesterDesc)
}
type BasicLifecyclerConfig struct {
// ID is the instance unique ID.
ID string
// Addr is the instance address, in the form "address:port".
Addr string
// Zone is the instance availability zone. Can be an empty string
// if zone awareness is unused.
Zone string
HeartbeatPeriod time.Duration
TokensObservePeriod time.Duration
NumTokens int
}
// BasicLifecycler is a basic ring lifecycler which allows to hook custom
// logic at different stages of the lifecycle. This lifecycler should be
// used to build higher level lifecyclers.
//
// This lifecycler never change the instance state. It's the delegate
// responsibility to ChangeState().
type BasicLifecycler struct {
*services.BasicService
cfg BasicLifecyclerConfig
logger log.Logger
store kv.Client
delegate BasicLifecyclerDelegate
metrics *BasicLifecyclerMetrics
// Channel used to execute logic within the lifecycler loop.
actorChan chan func()
// These values are initialised at startup, and never change
ringName string
ringKey string
// The current instance state.
currState sync.RWMutex
currInstanceDesc *IngesterDesc
}
// NewBasicLifecycler makes a new BasicLifecycler.
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error) {
l := &BasicLifecycler{
cfg: cfg,
ringName: ringName,
ringKey: ringKey,
logger: logger,
store: store,
delegate: delegate,
metrics: NewBasicLifecyclerMetrics(ringName, reg),
actorChan: make(chan func()),
}
l.metrics.tokensToOwn.Set(float64(cfg.NumTokens))
l.BasicService = services.NewBasicService(l.starting, l.running, l.stopping)
return l, nil
}
func (l *BasicLifecycler) GetInstanceID() string {
return l.cfg.ID
}
func (l *BasicLifecycler) GetInstanceAddr() string {
return l.cfg.Addr
}
func (l *BasicLifecycler) GetInstanceZone() string {
return l.cfg.Zone
}
func (l *BasicLifecycler) GetState() IngesterState {
l.currState.RLock()
defer l.currState.RUnlock()
if l.currInstanceDesc == nil {
return PENDING
}
return l.currInstanceDesc.GetState()
}
func (l *BasicLifecycler) GetTokens() Tokens {
l.currState.RLock()
defer l.currState.RUnlock()
if l.currInstanceDesc == nil {
return Tokens{}
}
return l.currInstanceDesc.GetTokens()
}
// GetRegisteredAt returns the timestamp when the instance has been registered to the ring
// or a zero value if the lifecycler hasn't been started yet or was already registered and its
// timestamp is unknown.
func (l *BasicLifecycler) GetRegisteredAt() time.Time {
l.currState.RLock()
defer l.currState.RUnlock()
return l.currInstanceDesc.GetRegisteredAt()
}
// IsRegistered returns whether the instance is currently registered within the ring.
func (l *BasicLifecycler) IsRegistered() bool {
l.currState.RLock()
defer l.currState.RUnlock()
return l.currInstanceDesc != nil
}
func (l *BasicLifecycler) ChangeState(ctx context.Context, state IngesterState) error {
return l.run(func() error {
return l.changeState(ctx, state)
})
}
func (l *BasicLifecycler) starting(ctx context.Context) error {
if err := l.registerInstance(ctx); err != nil {
return errors.Wrap(err, "register instance in the ring")
}
// If we have registered an instance with some tokens and
// an observe period has been configured, we should now wait
// until tokens are "stable" within the ring.
if len(l.GetTokens()) > 0 && l.cfg.TokensObservePeriod > 0 {
if err := l.waitStableTokens(ctx, l.cfg.TokensObservePeriod); err != nil {
return errors.Wrap(err, "wait stable tokens in the ring")
}
}
// At this point, if some tokens have been set they're stable and we
// can notify the delegate.
if tokens := l.GetTokens(); len(tokens) > 0 {
l.metrics.tokensOwned.Set(float64(len(tokens)))
l.delegate.OnRingInstanceTokens(l, tokens)
}
return nil
}
func (l *BasicLifecycler) running(ctx context.Context) error {
heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
for {
select {
case <-heartbeatTicker.C:
l.heartbeat(ctx)
case f := <-l.actorChan:
f()
case <-ctx.Done():
level.Info(util.Logger).Log("msg", "ring lifecycler is shutting down", "ring", l.ringName)
return nil
}
}
}
func (l *BasicLifecycler) stopping(runningError error) error {
if runningError != nil {
return nil
}
// Let the delegate change the instance state (ie. to LEAVING) and handling any
// state transferring / flushing while we continue to heartbeat.
done := make(chan struct{})
go func() {
defer close(done)
l.delegate.OnRingInstanceStopping(l)
}()
// Heartbeat while the stopping delegate function is running.
heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
heartbeatLoop:
for {
select {
case <-heartbeatTicker.C:
l.heartbeat(context.Background())
case <-done:
break heartbeatLoop
}
}
// Remove the instance from the ring.
if err := l.unregisterInstance(context.Background()); err != nil {
return errors.Wrapf(err, "failed to unregister instance from the ring (ring: %s)", l.ringName)
}
level.Info(l.logger).Log("msg", "instance removed from the ring", "ring", l.ringName)
return nil
}
// registerInstance registers the instance in the ring. The initial state and set of tokens
// depends on the OnRingInstanceRegister() delegate function.
func (l *BasicLifecycler) registerInstance(ctx context.Context) error {
var instanceDesc IngesterDesc
err := l.store.CAS(ctx, l.ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := GetOrCreateRingDesc(in)
var exists bool
instanceDesc, exists = ringDesc.Ingesters[l.cfg.ID]
if exists {
level.Info(l.logger).Log("msg", "instance found in the ring", "instance", l.cfg.ID, "ring", l.ringName, "state", instanceDesc.GetState(), "tokens", len(instanceDesc.GetTokens()), "registered_at", instanceDesc.GetRegisteredAt().String())
} else {
level.Info(l.logger).Log("msg", "instance not found in the ring", "instance", l.cfg.ID, "ring", l.ringName)
}
// We call the delegate to get the desired state right after the initialization.
state, tokens := l.delegate.OnRingInstanceRegister(l, *ringDesc, exists, l.cfg.ID, instanceDesc)
// Ensure tokens are sorted.
sort.Sort(tokens)
// If the instance didn't already exist, then we can safely set the registered timestamp to "now",
// otherwise we have to honor the previous value (even if it was zero, because means it was unknown
// but it's definitely not "now").
var registeredAt time.Time
if exists {
registeredAt = instanceDesc.GetRegisteredAt()
} else {
registeredAt = time.Now()
}
if !exists {
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt)
return ringDesc, true, nil
}
// Always overwrite the instance in the ring (even if already exists) because some properties
// may have changed (stated, tokens, zone, address) and even if they didn't the heartbeat at
// least did.
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt)
return ringDesc, true, nil
})
if err != nil {
return err
}
l.currState.Lock()
l.currInstanceDesc = &instanceDesc
l.currState.Unlock()
return nil
}
func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Duration) error {
heartbeatTicker := time.NewTicker(l.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()
// The first observation will occur after the specified period.
level.Info(l.logger).Log("msg", "waiting stable tokens", "ring", l.ringName)
observeChan := time.After(period)
for {
select {
case <-observeChan:
if !l.verifyTokens(ctx) {
// The verification has failed
level.Info(l.logger).Log("msg", "tokens verification failed, keep observing", "ring", l.ringName)
observeChan = time.After(period)
break
}
level.Info(l.logger).Log("msg", "tokens verification succeeded", "ring", l.ringName)
return nil
case <-heartbeatTicker.C:
l.heartbeat(ctx)
case <-ctx.Done():
return ctx.Err()
}
}
}
// Verifies that tokens that this instance has registered to the ring still belong to it.
// Gossiping ring may change the ownership of tokens in case of conflicts.
// If instance doesn't own its tokens anymore, this method generates new tokens and stores them to the ring.
func (l *BasicLifecycler) verifyTokens(ctx context.Context) bool {
result := false
err := l.updateInstance(ctx, func(r *Desc, i *IngesterDesc) bool {
// At this point, we should have the same tokens as we have registered before.
actualTokens, takenTokens := r.TokensFor(l.cfg.ID)
if actualTokens.Equals(l.GetTokens()) {
// Tokens have been verified. No need to change them.
result = true
return false
}
// uh, oh... our tokens are not our anymore. Let's try new ones.
needTokens := l.cfg.NumTokens - len(actualTokens)
level.Info(l.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", l.ringName)
newTokens := GenerateTokens(needTokens, takenTokens)
actualTokens = append(actualTokens, newTokens...)
sort.Sort(actualTokens)
i.Tokens = actualTokens
return true
})
if err != nil {
level.Error(l.logger).Log("msg", "failed to verify tokens", "ring", l.ringName, "err", err)
return false
}
return result
}
// unregister removes our entry from the store.
func (l *BasicLifecycler) unregisterInstance(ctx context.Context) error {
level.Info(l.logger).Log("msg", "unregistering instance from ring", "ring", l.ringName)
err := l.store.CAS(ctx, l.ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}
ringDesc := in.(*Desc)
ringDesc.RemoveIngester(l.cfg.ID)
return ringDesc, true, nil
})
if err != nil {
return err
}
l.currState.Lock()
l.currInstanceDesc = nil
l.currState.Unlock()
l.metrics.tokensToOwn.Set(0)
l.metrics.tokensOwned.Set(0)
return nil
}
func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc, *IngesterDesc) bool) error {
var instanceDesc IngesterDesc
err := l.store.CAS(ctx, l.ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := GetOrCreateRingDesc(in)
var ok bool
instanceDesc, ok = ringDesc.Ingesters[l.cfg.ID]
// This could happen if the backend store restarted (and content deleted)
// or the instance has been forgotten. In this case, we do re-insert it.
if !ok {
level.Warn(l.logger).Log("msg", "instance missing in the ring, adding it back", "ring", l.ringName)
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), l.GetRegisteredAt())
}
prevTimestamp := instanceDesc.Timestamp
changed := update(ringDesc, &instanceDesc)
if ok && !changed {
return nil, false, nil
}
// Memberlist requires that the timestamp always change, so we do update it unless
// was updated in the callback function.
if instanceDesc.Timestamp == prevTimestamp {
instanceDesc.Timestamp = time.Now().Unix()
}
ringDesc.Ingesters[l.cfg.ID] = instanceDesc
return ringDesc, true, nil
})
if err != nil {
return err
}
l.currState.Lock()
l.currInstanceDesc = &instanceDesc
l.currState.Unlock()
return nil
}
// heartbeat updates the instance timestamp within the ring. This function is guaranteed
// to be called within the lifecycler main goroutine.
func (l *BasicLifecycler) heartbeat(ctx context.Context) {
err := l.updateInstance(ctx, func(r *Desc, i *IngesterDesc) bool {
l.delegate.OnRingInstanceHeartbeat(l, r, i)
i.Timestamp = time.Now().Unix()
return true
})
if err != nil {
level.Warn(l.logger).Log("msg", "failed to heartbeat instance in the ring", "ring", l.ringName, "err", err)
return
}
l.metrics.heartbeats.Inc()
}
// changeState of the instance within the ring. This function is guaranteed
// to be called within the lifecycler main goroutine.
func (l *BasicLifecycler) changeState(ctx context.Context, state IngesterState) error {
err := l.updateInstance(ctx, func(_ *Desc, i *IngesterDesc) bool {
// No-op if the state hasn't changed.
if i.State == state {
return false
}
i.State = state
return true
})
if err != nil {
level.Warn(l.logger).Log("msg", "failed to change instance state in the ring", "from", l.GetState(), "to", state, "err", err)
}
return err
}
// run a function within the lifecycler service goroutine.
func (l *BasicLifecycler) run(fn func() error) error {
sc := l.ServiceContext()
if sc == nil {
return errors.New("lifecycler not running")
}
errCh := make(chan error)
wrappedFn := func() {
errCh <- fn()
}
select {
case <-sc.Done():
return errors.New("lifecycler not running")
case l.actorChan <- wrappedFn:
return <-errCh
}
}