This repository has been archived by the owner on Mar 16, 2021. It is now read-only.
/
mock.go
309 lines (251 loc) · 7.01 KB
/
mock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package glock
import (
"runtime"
"sort"
"sync"
"time"
)
type (
// MockClock is an implementation of Clock that can be moved forward in time
// in increments for testing code that relies on timeouts or other time-sensitive
// constructs.
MockClock struct {
fakeTime time.Time
triggers mockTriggers
tickers mockTickers
afterArgs []time.Duration
tickerArgs []time.Duration
nowLock sync.RWMutex
afterLock sync.RWMutex
tickerLock sync.Mutex
}
mockTrigger struct {
trigger time.Time
ch chan time.Time
}
mockTriggers []*mockTrigger
mockTickers []*mockTicker
)
// Make sure MockClock conforms to the interfaces
var _ Clock = &MockClock{}
// NewMockClock creates a new MockClock with the internal time set
// to time.Now()
func NewMockClock() *MockClock {
return NewMockClockAt(time.Now())
}
// NewMockClockAt creates a new MockClick with the internal time set
// to the provided time.
func NewMockClockAt(now time.Time) *MockClock {
return &MockClock{
fakeTime: now,
tickers: make([]*mockTicker, 0),
afterArgs: make([]time.Duration, 0),
tickerArgs: make([]time.Duration, 0),
}
}
// SetCurrent sets the internal MockClock time to the supplied time.
func (mc *MockClock) SetCurrent(current time.Time) {
mc.nowLock.Lock()
defer mc.nowLock.Unlock()
mc.fakeTime = current
}
// Advance will advance the internal MockClock time by the supplied time.
func (mc *MockClock) Advance(duration time.Duration) {
mc.nowLock.Lock()
now := mc.fakeTime.Add(duration)
mc.fakeTime = now
mc.nowLock.Unlock()
mc.processTriggers(now)
mc.processTickers(now)
}
func (mc *MockClock) processTriggers(now time.Time) {
mc.afterLock.Lock()
defer mc.afterLock.Unlock()
triggered := 0
for _, trigger := range mc.triggers {
if trigger.trigger.Before(now) || trigger.trigger.Equal(now) {
trigger.ch <- trigger.trigger
triggered++
}
}
mc.triggers = mc.triggers[triggered:]
}
func (mc *MockClock) processTickers(now time.Time) {
mc.tickerLock.Lock()
defer mc.tickerLock.Unlock()
for _, ticker := range mc.tickers {
ticker.publish(now)
}
}
// BlockingAdvance will call Advance but only after there is another routine
// which is blocking on the channel result of a call to After.
func (mc *MockClock) BlockingAdvance(duration time.Duration) {
for mc.BlockedOnAfter() == 0 {
runtime.Gosched()
}
mc.Advance(duration)
}
// Now returns the current time internal to the MockClock
func (mc *MockClock) Now() time.Time {
mc.nowLock.RLock()
defer mc.nowLock.RUnlock()
return mc.fakeTime
}
// After returns a channel that will be sent the current internal MockClock
// time once the MockClock's internal time is at or past the provided duration
func (mc *MockClock) After(duration time.Duration) <-chan time.Time {
mc.nowLock.RLock()
triggerTime := mc.fakeTime.Add(duration)
mc.nowLock.RUnlock()
mc.afterLock.Lock()
defer mc.afterLock.Unlock()
trigger := &mockTrigger{
trigger: triggerTime,
ch: make(chan time.Time, 1),
}
mc.triggers = append(mc.triggers, trigger)
mc.afterArgs = append(mc.afterArgs, duration)
sort.Slice(mc.triggers, func(i, j int) bool {
return mc.triggers[i].trigger.Before(mc.triggers[j].trigger)
})
return trigger.ch
}
// BlockedOnAfter returns the number of calls to After that are blocked
// waiting for a call to Advance to trigger them.
func (mc *MockClock) BlockedOnAfter() int {
mc.afterLock.RLock()
defer mc.afterLock.RUnlock()
return len(mc.triggers)
}
// Sleep will block until the internal MockClock time is at or past the
// provided duration
func (mc *MockClock) Sleep(duration time.Duration) {
<-mc.After(duration)
}
// Since returns the time elapsed since t.
func (mc *MockClock) Since(t time.Time) time.Duration {
return mc.Now().Sub(t)
}
// Until returns the duration until t.
func (mc *MockClock) Until(t time.Time) time.Duration {
return t.Sub(mc.Now())
}
// GetAfterArgs returns the duration of each call to After in the
// same order as they were called. The list is cleared each time
// GetAfterArgs is called.
func (mc *MockClock) GetAfterArgs() []time.Duration {
mc.afterLock.Lock()
defer mc.afterLock.Unlock()
args := mc.afterArgs
mc.afterArgs = mc.afterArgs[:0]
return args
}
// GetTickerArgs returns the duration of each call to create a new
// ticker in the same order as they were called. The list is cleared
// each time GetTickerArgs is called.
func (mc *MockClock) GetTickerArgs() []time.Duration {
mc.tickerLock.Lock()
defer mc.tickerLock.Unlock()
args := mc.tickerArgs
mc.tickerArgs = mc.tickerArgs[:0]
return args
}
type mockTicker struct {
clock *MockClock
duration time.Duration
started time.Time
nextTick time.Time
processQueue []time.Time
ch chan time.Time
wakeup chan struct{}
stopped bool
processLock sync.Mutex
stoppedLock sync.RWMutex
}
// NewTicker creates a new Ticker tied to the internal MockClock time that ticks
// at intervals similar to time.NewTicker(). It will also skip or drop ticks
// for slow readers similar to time.NewTicker() as well.
func (mc *MockClock) NewTicker(duration time.Duration) Ticker {
if duration == 0 {
panic("duration cannot be 0")
}
now := mc.Now()
ticker := &mockTicker{
clock: mc,
duration: duration,
started: now,
nextTick: now.Add(duration),
processQueue: make([]time.Time, 0),
ch: make(chan time.Time),
wakeup: make(chan struct{}, 1),
}
mc.tickerLock.Lock()
defer mc.tickerLock.Unlock()
mc.tickers = append(mc.tickers, ticker)
mc.tickerArgs = append(mc.tickerArgs, duration)
go ticker.process()
return ticker
}
// Chan returns a channel which will receive the MockClock's internal time
// at the interval given when creating the ticker.
func (mt *mockTicker) Chan() <-chan time.Time {
return mt.ch
}
// Stop will stop the ticker from ticking
func (mt *mockTicker) Stop() {
mt.stoppedLock.Lock()
defer mt.stoppedLock.Unlock()
mt.stopped = true
mt.wakeup <- struct{}{}
}
func (mt *mockTicker) publish(now time.Time) {
if mt.isStopped() {
return
}
mt.processLock.Lock()
mt.processQueue = append(mt.processQueue, now)
mt.processLock.Unlock()
select {
case mt.wakeup <- struct{}{}:
default:
}
}
func (mt *mockTicker) process() {
defer close(mt.wakeup)
for !mt.isStopped() {
for {
first, ok := mt.pop()
if !ok {
break
}
if mt.nextTick.After(first) {
continue
}
mt.ch <- mt.nextTick
durationMod := first.Sub(mt.started) % mt.duration
if durationMod == 0 {
mt.nextTick = first.Add(mt.duration)
} else if first.Sub(mt.nextTick) > mt.duration {
mt.nextTick = first.Add(mt.duration - durationMod)
} else {
mt.nextTick = mt.nextTick.Add(mt.duration)
}
}
<-mt.wakeup
}
}
func (mt *mockTicker) pop() (time.Time, bool) {
mt.processLock.Lock()
defer mt.processLock.Unlock()
if len(mt.processQueue) == 0 {
return time.Unix(0, 0), false
}
first := mt.processQueue[0]
mt.processQueue = mt.processQueue[1:]
return first, true
}
func (mt *mockTicker) isStopped() bool {
mt.stoppedLock.RLock()
defer mt.stoppedLock.RUnlock()
return mt.stopped
}