-
Notifications
You must be signed in to change notification settings - Fork 2k
/
scheduler.go
120 lines (99 loc) · 3.23 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
Copyright 2020 The cert-manager Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"sync"
"time"
"k8s.io/utils/clock"
)
// We are writing our own time.AfterFunc to be able to mock the clock. The
// cancel function can be called concurrently.
func afterFunc(c clock.Clock, d time.Duration, f func()) (cancel func()) {
t := c.NewTimer(d)
cancelCh := make(chan struct{})
cancelOnce := sync.Once{}
cancel = func() {
t.Stop()
cancelOnce.Do(func() {
close(cancelCh)
})
}
go func() {
defer cancel()
select {
case <-t.C():
// We don't need to check whether the channel has returned a zero
// value since t.C is never closed as per the timer.Stop
// documentation.
f()
case <-cancelCh:
return
}
}()
return cancel
}
// ProcessFunc is a function to process an item in the work queue.
type ProcessFunc func(interface{})
// ScheduledWorkQueue is an interface to describe a queue that will execute the
// given ProcessFunc with the object given to Add once the time.Duration is up,
// since the time of calling Add.
type ScheduledWorkQueue interface {
// Add will add an item to this queue, executing the ProcessFunc after the
// Duration has come (since the time Add was called). If an existing Timer
// for obj already exists, the previous timer will be cancelled.
Add(interface{}, time.Duration)
// Forget will cancel the timer for the given object, if the timer exists.
Forget(interface{})
}
type scheduledWorkQueue struct {
processFunc ProcessFunc
clock clock.Clock
work map[interface{}]func()
workLock sync.Mutex
// Testing purposes.
afterFunc func(clock.Clock, time.Duration, func()) func()
}
// NewScheduledWorkQueue will create a new workqueue with the given processFunc
func NewScheduledWorkQueue(clock clock.Clock, processFunc ProcessFunc) ScheduledWorkQueue {
return &scheduledWorkQueue{
processFunc: processFunc,
clock: clock,
work: make(map[interface{}]func()),
workLock: sync.Mutex{},
afterFunc: afterFunc,
}
}
// Add will add an item to this queue, executing the ProcessFunc after the
// Duration has come (since the time Add was called). If an existing Timer for
// obj already exists, the previous timer will be cancelled.
func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) {
s.workLock.Lock()
defer s.workLock.Unlock()
if cancel, ok := s.work[obj]; ok {
cancel()
delete(s.work, obj)
}
s.work[obj] = afterFunc(s.clock, duration, func() {
defer s.Forget(obj)
s.processFunc(obj)
})
}
// Forget will cancel the timer for the given object, if the timer exists.
func (s *scheduledWorkQueue) Forget(obj interface{}) {
s.workLock.Lock()
defer s.workLock.Unlock()
if cancel, ok := s.work[obj]; ok {
cancel()
delete(s.work, obj)
}
}