forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwriterlease.go
340 lines (299 loc) · 8.44 KB
/
writerlease.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
package writerlease
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/golang/glog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
// Lease performs the equivalent of leader election by competing to perform work (such as
// updating a contended resource). Every successful work unit is considered a lease renewal,
// while work that is observed from others or that fails is treated as renewing another processes
// lease. When a lease expires (no work is detected within the lease term) the writer competes
// to perform work. When competing for the lease, exponential backoff is used.
type Lease interface {
// Wait waits for the first work function to complete and then returns whether the current
// process is the leader. This function will block forever if no work has been requested or if the
// work retries forever.
Wait() bool
// WaitUntil waits at most the provided duration for the frist work function to complete.
// If the duration expires without work completing it will return false for expired, otherwise
// it will return whether the lease is held by this process.
WaitUntil(t time.Duration) (leader bool, ok bool)
// Try runs the provided function when the lease is held is the leader. It retries work until
// the work func indicates retry is not necessary.
Try(key string, fn WorkFunc)
// Extend indicates that the caller has observed another writer performing work against
// the specified key. This will clear the work remaining for the lease and extend the lease
// interval.
Extend(key string)
// Remove clears any pending work for the provided key.
Remove(key string)
}
// WorkFunc is a retriable unit of work. It should return an error if the work couldn't be
// completed successfully, or true if we can assume our lease has been extended. If the
// lease could not be extended, we drop this unit of work.
type WorkFunc func() (result WorkResult, retry bool)
type WorkResult int
const (
None WorkResult = iota
Extend
Release
)
// LimitRetries allows a work function to be retried up to retries times.
func LimitRetries(retries int, fn WorkFunc) WorkFunc {
i := 0
return func() (WorkResult, bool) {
extend, retry := fn()
if retry {
retry = i < retries
i++
}
return extend, retry
}
}
// State is the state of the lease.
type State int
const (
// Election is before a work unit has been completed.
Election State = iota
Leader
Follower
)
type work struct {
id int
fn WorkFunc
}
type WriterLease struct {
name string
backoff wait.Backoff
maxBackoff time.Duration
retryInterval time.Duration
once chan struct{}
nowFn func() time.Time
lock sync.Mutex
id int
queued map[string]*work
queue workqueue.DelayingInterface
state State
expires time.Time
tick int
}
// New creates a new Lease. Specify the duration to hold leases for and the retry
// interval on requests that fail.
func New(leaseDuration, retryInterval time.Duration) *WriterLease {
backoff := wait.Backoff{
Duration: 20 * time.Millisecond,
Factor: 4,
Steps: 5,
Jitter: 0.5,
}
return &WriterLease{
name: fmt.Sprintf("%08d", rand.Int31()),
backoff: backoff,
maxBackoff: leaseDuration,
retryInterval: retryInterval,
nowFn: time.Now,
queued: make(map[string]*work),
queue: workqueue.NewDelayingQueue(),
once: make(chan struct{}),
}
}
// NewWithBackoff creates a new Lease. Specify the duration to hold leases for and the retry
// interval on requests that fail.
func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, backoff wait.Backoff) *WriterLease {
return &WriterLease{
name: name,
backoff: backoff,
maxBackoff: leaseDuration,
retryInterval: retryInterval,
nowFn: time.Now,
queued: make(map[string]*work),
queue: workqueue.NewNamedDelayingQueue(name),
once: make(chan struct{}),
}
}
func (l *WriterLease) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer l.queue.ShutDown()
go func() {
defer utilruntime.HandleCrash()
for l.work() {
}
glog.V(4).Infof("[%s] Worker stopped", l.name)
}()
<-stopCh
}
func (l *WriterLease) Expire() {
l.lock.Lock()
defer l.lock.Unlock()
l.expires = time.Time{}
}
func (l *WriterLease) Wait() bool {
<-l.once
state, _, _ := l.leaseState()
return state == Leader
}
func (l *WriterLease) WaitUntil(t time.Duration) (bool, bool) {
select {
case <-l.once:
case <-time.After(t):
return false, false
}
state, _, _ := l.leaseState()
return state == Leader, true
}
func (l *WriterLease) Try(key string, fn WorkFunc) {
l.lock.Lock()
defer l.lock.Unlock()
l.id++
l.queued[key] = &work{fn: fn, id: l.id}
if l.state == Follower {
delay := l.expires.Sub(l.nowFn())
// no matter what, always wait at least some amount of time as a follower to give the nominal
// leader a chance to win
if delay < l.backoff.Duration*2 {
delay = l.backoff.Duration * 2
}
l.queue.AddAfter(key, delay)
} else {
l.queue.Add(key)
}
}
func (l *WriterLease) Extend(key string) {
l.lock.Lock()
defer l.lock.Unlock()
if _, ok := l.queued[key]; ok {
delete(l.queued, key)
switch l.state {
case Follower:
l.tick++
backoff := l.nextBackoff()
glog.V(4).Infof("[%s] Clearing work for %s and extending lease by %s", l.name, key, backoff)
l.expires = l.nowFn().Add(backoff)
}
}
}
func (l *WriterLease) Len() int {
l.lock.Lock()
defer l.lock.Unlock()
return len(l.queued)
}
func (l *WriterLease) Remove(key string) {
l.lock.Lock()
defer l.lock.Unlock()
delete(l.queued, key)
}
func (l *WriterLease) get(key string) *work {
l.lock.Lock()
defer l.lock.Unlock()
return l.queued[key]
}
func (l *WriterLease) leaseState() (State, time.Time, int) {
l.lock.Lock()
defer l.lock.Unlock()
return l.state, l.expires, l.tick
}
func (l *WriterLease) work() bool {
item, shutdown := l.queue.Get()
if shutdown {
return false
}
key := item.(string)
work := l.get(key)
if work == nil {
glog.V(4).Infof("[%s] Work item %s was cleared, done", l.name, key)
l.queue.Done(key)
return true
}
leaseState, leaseExpires, _ := l.leaseState()
if leaseState == Follower {
// if we are following, continue to defer work until the lease expires
if remaining := leaseExpires.Sub(l.nowFn()); remaining > 0 {
glog.V(4).Infof("[%s] Follower, %s remaining in lease", l.name, remaining)
time.Sleep(remaining)
l.queue.Add(key)
l.queue.Done(key)
return true
}
glog.V(4).Infof("[%s] Lease expired, running %s", l.name, key)
} else {
glog.V(4).Infof("[%s] Lease owner or electing, running %s", l.name, key)
}
result, retry := work.fn()
if retry {
l.retryKey(key, result)
return true
}
l.finishKey(key, result, work.id)
return true
}
// retryKey schedules the key for a retry in the future.
func (l *WriterLease) retryKey(key string, result WorkResult) {
l.lock.Lock()
defer l.lock.Unlock()
l.nextState(result)
l.queue.AddAfter(key, l.retryInterval)
l.queue.Done(key)
glog.V(4).Infof("[%s] Retrying work for %s in state=%d tick=%d expires=%s", l.name, key, l.state, l.tick, l.expires)
}
func (l *WriterLease) finishKey(key string, result WorkResult, id int) {
l.lock.Lock()
defer l.lock.Unlock()
l.nextState(result)
if work, ok := l.queued[key]; ok && work.id == id {
delete(l.queued, key)
}
l.queue.Done(key)
glog.V(4).Infof("[%s] Completed work for %s in state=%d tick=%d expires=%s", l.name, key, l.state, l.tick, l.expires)
}
// nextState must be called while holding the lock.
func (l *WriterLease) nextState(result WorkResult) {
resolvedElection := l.state == Election
switch result {
case Extend:
switch l.state {
case Election, Follower:
l.tick = 0
l.state = Leader
}
l.expires = l.nowFn().Add(l.maxBackoff)
case Release:
switch l.state {
case Election, Leader:
l.tick = 0
l.state = Follower
case Follower:
l.tick++
}
l.expires = l.nowFn().Add(l.nextBackoff())
default:
resolvedElection = false
}
// close the channel before we remove the key from the queue to prevent races in Wait
if resolvedElection {
close(l.once)
}
}
func (l *WriterLease) nextBackoff() time.Duration {
step := l.tick
b := l.backoff
if step > b.Steps {
return l.maxBackoff
}
duration := b.Duration
for i := 0; i < step; i++ {
adjusted := duration
if b.Jitter > 0.0 {
adjusted = wait.Jitter(duration, b.Jitter)
}
duration = time.Duration(float64(adjusted) * b.Factor)
if duration > l.maxBackoff {
return l.maxBackoff
}
}
return duration
}