-
Notifications
You must be signed in to change notification settings - Fork 1
/
clock.go
153 lines (126 loc) · 3.15 KB
/
clock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package pulse
import (
"sync"
"sync/atomic"
"time"
)
// Clock is a simple abstraction to allow for time based assertions in tests.
type Clock interface {
Now() time.Time
NewTicker(d time.Duration) (<-chan time.Time, func())
NewTimer(d time.Duration) (<-chan time.Time, func() bool)
GetTime() int64
}
type RealClock struct{}
func NewClock() *RealClock {
return &RealClock{}
}
func (c *RealClock) Now() time.Time {
return time.Now()
}
func (c *RealClock) NewTicker(d time.Duration) (<-chan time.Time, func()) {
t := time.NewTicker(d)
return t.C, t.Stop
}
func (c *RealClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) {
t := time.NewTimer(d)
return t.C, t.Stop
}
func (c *RealClock) GetTime() int64 {
return time.Now().UTC().UnixMilli()
}
type testTimer struct {
deadline time.Time
ch chan time.Time
stopped *atomic.Bool
}
type testTicker struct {
nextTick time.Time
interval time.Duration
ch chan time.Time
stopped *atomic.Bool
}
type TestClock struct {
mu sync.Mutex
time time.Time
timers []*testTimer
tickers []*testTicker
}
func NewTestClock(time time.Time) *TestClock {
var c TestClock
c.time = time
c.timers = make([]*testTimer, 0)
c.tickers = make([]*testTicker, 0)
return &c
}
func (c *TestClock) Set(t time.Time) {
c.mu.Lock()
defer c.mu.Unlock()
if t.Before(c.time) {
panic("can't go back in time")
}
c.time = t
for _, ticker := range c.tickers {
if !ticker.stopped.Load() && !ticker.nextTick.Add(ticker.interval).After(c.time) {
//nolint: durationcheck // This is a test clock where we can ignore overflows.
nextTick := (c.time.Sub(ticker.nextTick) / ticker.interval) * ticker.interval
ticker.nextTick = ticker.nextTick.Add(nextTick)
select {
case ticker.ch <- c.time:
default:
}
}
}
unfiredTimers := make([]*testTimer, 0)
for i, timer := range c.timers {
if timer.deadline.After(c.time) && !timer.stopped.Load() {
unfiredTimers = append(unfiredTimers, c.timers[i])
continue
}
timer.stopped.Store(true)
timer.ch <- c.time
}
c.timers = unfiredTimers
}
func (c *TestClock) Add(d time.Duration) {
c.Set(c.time.Add(d))
}
func (c *TestClock) Now() time.Time {
c.mu.Lock()
defer c.mu.Unlock()
return c.time
}
func (c *TestClock) GetTime() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.time.UnixMilli()
}
func (c *TestClock) NewTicker(d time.Duration) (<-chan time.Time, func()) {
c.mu.Lock()
defer c.mu.Unlock()
ch := make(chan time.Time, 1)
stopped := &atomic.Bool{}
ticker := &testTicker{nextTick: c.time, interval: d, ch: ch, stopped: stopped}
c.tickers = append(c.tickers, ticker)
stop := func() {
stopped.Store(true)
}
return ch, stop
}
func (c *TestClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) {
c.mu.Lock()
defer c.mu.Unlock()
ch := make(chan time.Time, 1)
stopped := &atomic.Bool{}
// Fire the timer straight away if the duration is less than zero.
if d <= 0 {
ch <- c.time
return ch, func() bool { return false }
}
timer := &testTimer{deadline: c.time.Add(d), ch: ch, stopped: stopped}
c.timers = append(c.timers, timer)
stop := func() bool {
return stopped.CompareAndSwap(false, true)
}
return ch, stop
}