forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
backoff_utils.go
144 lines (121 loc) · 4 KB
/
backoff_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"sync"
"sync/atomic"
"time"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
)
type clock interface {
Now() time.Time
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
// BackoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time.
// It is expected that all users will only use the public TryWait(...) method
// It is also not safe to copy this object.
type BackoffEntry struct {
backoff time.Duration
lastUpdate time.Time
reqInFlight int32
}
// tryLock attempts to acquire a lock via atomic compare and swap.
// returns true if the lock was acquired, false otherwise
func (b *BackoffEntry) tryLock() bool {
return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1)
}
// unlock returns the lock. panics if the lock isn't held
func (b *BackoffEntry) unlock() {
if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) {
panic(fmt.Sprintf("unexpected state on unlocking: %+v", b))
}
}
// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for.
func (b *BackoffEntry) TryWait(maxDuration time.Duration) bool {
if !b.tryLock() {
return false
}
defer b.unlock()
b.wait(maxDuration)
return true
}
func (b *BackoffEntry) getBackoff(maxDuration time.Duration) time.Duration {
duration := b.backoff
newDuration := time.Duration(duration) * 2
if newDuration > maxDuration {
newDuration = maxDuration
}
b.backoff = newDuration
klog.V(4).Infof("Backing off %s", duration.String())
return duration
}
func (b *BackoffEntry) wait(maxDuration time.Duration) {
time.Sleep(b.getBackoff(maxDuration))
}
// PodBackoff is used to restart a pod with back-off delay.
type PodBackoff struct {
perPodBackoff map[ktypes.NamespacedName]*BackoffEntry
lock sync.Mutex
clock clock
defaultDuration time.Duration
maxDuration time.Duration
}
// MaxDuration returns the max time duration of the back-off.
func (p *PodBackoff) MaxDuration() time.Duration {
return p.maxDuration
}
// CreateDefaultPodBackoff creates a default pod back-off object.
func CreateDefaultPodBackoff() *PodBackoff {
return CreatePodBackoff(1*time.Second, 60*time.Second)
}
// CreatePodBackoff creates a pod back-off object by default duration and max duration.
func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff {
return CreatePodBackoffWithClock(defaultDuration, maxDuration, realClock{})
}
// CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock.
func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff {
return &PodBackoff{
perPodBackoff: map[ktypes.NamespacedName]*BackoffEntry{},
clock: clock,
defaultDuration: defaultDuration,
maxDuration: maxDuration,
}
}
// GetEntry returns a back-off entry by Pod ID.
func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *BackoffEntry {
p.lock.Lock()
defer p.lock.Unlock()
entry, ok := p.perPodBackoff[podID]
if !ok {
entry = &BackoffEntry{backoff: p.defaultDuration}
p.perPodBackoff[podID] = entry
}
entry.lastUpdate = p.clock.Now()
return entry
}
// Gc execute garbage collection on the pod back-off.
func (p *PodBackoff) Gc() {
p.lock.Lock()
defer p.lock.Unlock()
now := p.clock.Now()
for podID, entry := range p.perPodBackoff {
if now.Sub(entry.lastUpdate) > p.maxDuration {
delete(p.perPodBackoff, podID)
}
}
}