-
Notifications
You must be signed in to change notification settings - Fork 168
/
limits.go
228 lines (199 loc) · 6.99 KB
/
limits.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package workflowrun
import (
"fmt"
"sync"
"time"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/workflow/common"
)
// LimitedQueues manages WorkflowRun queue for each Workflow. Queue for each Workflow is limited to
// a given maximum size, if new WorkflowRun created, the oldest one would be removed.
type LimitedQueues struct {
// Maximum queue size, it indicates maximum number of WorkflowRuns to retain for each Workflow.
MaxQueueSize int
// Workflow queue map. It use Workflow name and namespace as the key, and manage Queue for each
// Workflow.
Queues map[string]*LimitedSortedQueue
// k8s client used to clean old WorkflowRun
Client clientset.Interface
}
// NewLimitedQueues creates a limited queues for WorkflowRuns, and start auto scan.
func NewLimitedQueues(client clientset.Interface, maxSize int) *LimitedQueues {
log.WithField("max", maxSize).Info("Create limited queues")
queues := &LimitedQueues{
MaxQueueSize: maxSize,
Queues: make(map[string]*LimitedSortedQueue),
Client: client,
}
go queues.AutoScan()
return queues
}
func key(wfr *v1alpha1.WorkflowRun) string {
return fmt.Sprintf("%s/%s", wfr.Spec.WorkflowRef.Namespace, wfr.Spec.WorkflowRef.Name)
}
// Refresh refreshes the WorkflowRun in the queue, the refresh time would be updated.
func (w *LimitedQueues) Refresh(wfr *v1alpha1.WorkflowRun) {
q, ok := w.Queues[key(wfr)]
if !ok {
log.WithField("key", key(wfr)).Warn("Queue not exist")
return
}
q.Refresh(wfr)
}
// AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the maximum size, the
// oldest one would be deleted. And if the WorkflowRun already exists in the queue, its 'refresh' time field
// would be refreshed.
func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun) {
q, ok := w.Queues[key(wfr)]
if !ok {
q = NewQueue(key(wfr), w.MaxQueueSize)
w.Queues[key(wfr)] = q
}
// PushOrRefresh push the WorkflowRun to the queue. If it's already existed in the queue, its refresh
// time would be updated to now.
q.PushOrRefresh(wfr)
for q.size > w.MaxQueueSize {
log.WithField("max", w.MaxQueueSize).Debug("Max WorkflowRun exceeded, delete the oldest one")
old := q.Pop()
err := w.Client.CycloneV1alpha1().WorkflowRuns(old.namespace).Delete(old.wfr, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
log.WithField("wfr", old.wfr).Error("Delete old WorkflowRun error: ", err)
} else {
log.WithField("wfr", old.wfr).Info("Old WorkflowRun deleted")
}
}
}
// AutoScan scans all WorkflowRuns in the queues regularly, remove abnormal ones with old enough
// refresh time.
func (w *LimitedQueues) AutoScan() {
ticker := time.NewTicker(time.Hour)
for {
select {
case <-ticker.C:
for _, q := range w.Queues {
scanQueue(q)
}
}
}
}
// scanQueue scans all WorkflowRun in the queue, check their refresh time with current time, if refresh
// time is old enough, it means WorkflowRun is actually deleted in k8s, but somehow Workflow Controller
// didn't know (this seldom happen), in this case, remove the WorkflowRun from the queue.
func scanQueue(q *LimitedSortedQueue) {
q.lock.Lock()
defer q.lock.Unlock()
h := q.head
for h.next != nil {
// If the node's refresh time is old enough compared to the resync time
// (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun
// is actually removed from etcd somehow, so we will remove it also here.
if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) {
log.WithField("wfr", h.next.wfr).Info("remove wfr with outdated refresh time from queue")
h.next = h.next.next
q.size--
continue
}
h = h.next
}
}
// LimitedSortedQueue is a sorted fixed length queue implemented with single linked list.
// Note that each queue would have a sentinel node to assist the implementation, it's a
// dummy node, and won't be counted in the queue size. So an empty queue would have head
// pointed to dummy node, with queue size 0.
type LimitedSortedQueue struct {
// Key of the Workflow, it's generated by namespace and workflow name
key string
// Lock to for concurrency control
lock sync.Mutex
// Maximum queue size
max int
// Current size of the queue
size int
// Head of the queue
head *Node
}
// NewQueue creates a limited sorted queue.
func NewQueue(key string, max int) *LimitedSortedQueue {
dummy := &Node{}
return &LimitedSortedQueue{
key: key,
max: max,
size: 0,
head: dummy,
}
}
// Node represents a WorkflowRun in the queue. The 'next' link to next node in the queue, and the
// 'refresh' stands the last time this node is refreshed.
//
// 'refresh' here is used to deal with a rarely occurred case: when one WorkflowRun got deleted in
// etcd, but workflow controller didn't get notified. Workflow controller would perform resync with
// etcd regularly, (5 minutes by default), during resync every WorkflowRun in the queue would be
// refreshed, it one WorkflowRun is deleted in etcd abnormally, it wouldn't get refreshed in the queue,
// so we can judge by the refresh time for this case.
//
// When we found a node that hasn't be refreshed for a long time (for example, twice the resync period),
// then we remove this node from the queue.
type Node struct {
next *Node
// Name of the WorkflowRun
wfr string
// Namespace of the WorkflowRun
namespace string
// Time when the WorkflowRun is created
created int64
// Time when the node is refreshed
refresh time.Time
}
// PushOrRefresh pushes a WorkflowRun object to the queue, it will be inserted in the right place to keep
// the queue sorted by creation time.
// If the object already existed in the queue, its refresh time would be updated.
func (q *LimitedSortedQueue) PushOrRefresh(wfr *v1alpha1.WorkflowRun) {
q.lock.Lock()
defer q.lock.Unlock()
node := &Node{
wfr: wfr.Name,
namespace: wfr.Namespace,
created: wfr.ObjectMeta.CreationTimestamp.Time.Unix(),
refresh: time.Now(),
}
if q.Refresh(wfr) {
return
}
p := q.head
for p.next != nil && p.next.created < node.created {
p = p.next
}
node.next = p.next
p.next = node
q.size++
}
// Refresh updates refresh time of WorkflowRun in the queue, if the WorkflowRun found in the queue
// and update successfully, return true, otherwise return false.
func (q *LimitedSortedQueue) Refresh(wfr *v1alpha1.WorkflowRun) bool {
p := q.head
for p.next != nil && (p.next.namespace != wfr.Namespace || p.next.wfr != wfr.Name) {
p = p.next
}
if p.next != nil {
log.WithField("queue", q.key).WithField("wfr", wfr.Name).Debug("Update refresh time")
p.next.refresh = time.Now()
return true
}
return false
}
// Pop pops up a WorkflowRun object from the queue, it's the oldest one that will be popped.
func (q *LimitedSortedQueue) Pop() *Node {
if q.size <= 0 {
return nil
}
q.lock.Lock()
defer q.lock.Unlock()
n := q.head.next
q.head.next = q.head.next.next
q.size--
return n
}