/
leader.go
342 lines (302 loc) · 10.7 KB
/
leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package dkron
import (
"fmt"
"net"
"sync"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
const (
// barrierWriteTimeout is used to give Raft a chance to process a
// possible loss of leadership event if we are unable to get a barrier
// while leader.
barrierWriteTimeout = 2 * time.Minute
)
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func (a *Agent) monitorLeadership() {
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup
for {
a.logger.Info("dkron: monitoring leadership")
select {
case isLeader := <-a.leaderCh:
switch {
case isLeader:
if weAreLeaderCh != nil {
a.logger.Error("dkron: attempted to start the leader loop while running")
continue
}
weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
a.leaderLoop(ch)
}(weAreLeaderCh)
a.logger.Info("dkron: cluster leadership acquired")
default:
if weAreLeaderCh == nil {
a.logger.Error("dkron: attempted to stop the leader loop while not running")
continue
}
a.logger.Debug("dkron: shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
a.logger.Info("dkron: cluster leadership lost")
}
case <-a.shutdownCh:
return
}
}
}
func (a *Agent) leadershipTransfer() error {
retryCount := 3
for i := 0; i < retryCount; i++ {
err := a.raft.LeadershipTransfer().Error()
if err == nil {
// Stop the scheduler, running jobs will continue to finish but we
// can not actively wait for them blocking the execution here.
a.sched.Stop()
a.logger.Info("dkron: successfully transferred leadership")
return nil
}
// Don't retry if the Raft version doesn't support leadership transfer
// since this will never succeed.
if err == raft.ErrUnsupportedProtocol {
return fmt.Errorf("leadership transfer not supported with Raft version lower than 3")
}
a.logger.Error("failed to transfer leadership attempt, will retry",
"attempt", i,
"retry_limit", retryCount,
"error", err,
)
}
return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount)
}
// leaderLoop runs as long as we are the leader to run various
// maintenance activities
func (a *Agent) leaderLoop(stopCh chan struct{}) {
var reconcileCh chan serf.Member
establishedLeader := false
RECONCILE:
// Setup a reconciliation timer
reconcileCh = nil
interval := time.After(a.config.ReconcileInterval)
// Apply a raft barrier to ensure our FSM is caught up
start := time.Now()
barrier := a.raft.Barrier(barrierWriteTimeout)
if err := barrier.Error(); err != nil {
a.logger.WithError(err).Error("dkron: failed to wait for barrier")
goto WAIT
}
metrics.MeasureSince([]string{"dkron", "leader", "barrier"}, start)
// Check if we need to handle initial leadership actions
if !establishedLeader {
if err := a.establishLeadership(stopCh); err != nil {
a.logger.WithError(err).Error("dkron: failed to establish leadership")
// Immediately revoke leadership since we didn't successfully
// establish leadership.
if err := a.revokeLeadership(); err != nil {
a.logger.WithError(err).Error("dkron: failed to revoke leadership")
}
// Attempt to transfer leadership. If successful, leave the
// leaderLoop since this node is no longer the leader. Otherwise
// try to establish leadership again after 5 seconds.
if err := a.leadershipTransfer(); err != nil {
a.logger.Error("failed to transfer leadership", "error", err)
interval = time.After(5 * time.Second)
goto WAIT
}
return
}
establishedLeader = true
defer func() {
if err := a.revokeLeadership(); err != nil {
a.logger.WithError(err).Error("dkron: failed to revoke leadership")
}
}()
}
// Reconcile any missing data
if err := a.reconcile(); err != nil {
a.logger.WithError(err).Error("dkron: failed to reconcile")
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = a.reconcileCh
// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
select {
case <-stopCh:
return
default:
}
WAIT:
// Wait until leadership is lost or periodically reconcile as long as we
// are the leader, or when Serf events arrive.
for {
select {
case <-stopCh:
// Lost leadership.
return
case <-a.shutdownCh:
return
case <-interval:
goto RECONCILE
case member := <-reconcileCh:
if err := a.reconcileMember(member); err != nil {
a.logger.WithError(err).Error("dkron: failed to reconcile member")
}
}
}
}
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
func (a *Agent) reconcile() error {
defer metrics.MeasureSince([]string{"dkron", "leader", "reconcile"}, time.Now())
members := a.serf.Members()
for _, member := range members {
if err := a.reconcileMember(member); err != nil {
return err
}
}
return nil
}
// reconcileMember is used to do an async reconcile of a single serf member
func (a *Agent) reconcileMember(member serf.Member) error {
// Check if this is a member we should handle
valid, parts := isServer(member)
if !valid || parts.Region != a.config.Region {
return nil
}
defer metrics.MeasureSince([]string{"dkron", "leader", "reconcileMember"}, time.Now())
var err error
switch member.Status {
case serf.StatusAlive:
err = a.addRaftPeer(member, parts)
case serf.StatusLeft:
err = a.removeRaftPeer(member, parts)
}
if err != nil {
a.logger.WithError(err).WithField("member", member).Error("failed to reconcile member")
return err
}
return nil
}
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
// previously inflight transactions have been committed and that our
// state is up-to-date.
func (a *Agent) establishLeadership(stopCh chan struct{}) error {
defer metrics.MeasureSince([]string{"dkron", "leader", "establish_leadership"}, time.Now())
a.logger.Info("agent: Starting scheduler")
jobs, err := a.Store.GetJobs(nil)
if err != nil {
return err
}
return a.sched.Start(jobs, a)
}
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (a *Agent) revokeLeadership() error {
defer metrics.MeasureSince([]string{"dkron", "leader", "revoke_leadership"}, time.Now())
// Stop the scheduler, running jobs will continue to finish but we
// can not actively wait for them blocking the execution here.
a.sched.Stop()
return nil
}
// addRaftPeer is used to add a new Raft peer when a dkron server joins
func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error {
// Check for possibility of multiple bootstrap nodes
members := a.serf.Members()
if parts.Bootstrap {
for _, member := range members {
valid, p := isServer(member)
if valid && member.Name != m.Name && p.Bootstrap {
a.logger.Errorf("dkron: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
return nil
}
}
}
// Processing ourselves could result in trying to remove ourselves to
// fix up our address, which would make us step down. This is only
// safe to attempt if there are multiple servers available.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
configFuture := a.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
a.logger.WithError(err).Error("dkron: failed to get raft configuration")
return err
}
if m.Name == a.config.NodeName {
if l := len(configFuture.Configuration().Servers); l < 3 {
a.logger.WithField("peer", m.Name).Debug("dkron: Skipping self join check since the cluster is too small")
return nil
}
}
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
for _, server := range configFuture.Configuration().Servers {
// If the address or ID matches an existing server, see if we need to remove the old one first
if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) {
// Exit with no-op if this is being called on an existing server and both the ID and address match
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
return nil
}
future := a.raft.RemoveServer(server.ID, 0, 0)
if server.Address == raft.ServerAddress(addr) {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
}
a.logger.WithField("server", server.Address).Info("dkron: removed server with duplicate address")
} else {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
}
a.logger.WithField("server", server.ID).Info("dkron: removed server with duplicate ID")
}
}
}
// Attempt to add as a peer
switch {
case minRaftProtocol >= 3:
addFuture := a.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
a.logger.WithError(err).Error("dkron: failed to add raft peer")
return err
}
}
return nil
}
// removeRaftPeer is used to remove a Raft peer when a dkron server leaves
// or is reaped
func (a *Agent) removeRaftPeer(m serf.Member, parts *ServerParts) error {
// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := a.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
a.logger.WithError(err).Error("dkron: failed to get raft configuration")
return err
}
// Pick which remove API to use based on how the server was added.
for _, server := range configFuture.Configuration().Servers {
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) {
a.logger.WithField("server", server.ID).Info("dkron: removing server by ID")
future := a.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0)
if err := future.Error(); err != nil {
a.logger.WithError(err).WithField("server", server.ID).Error("dkron: failed to remove raft peer")
return err
}
break
}
}
return nil
}