diff --git a/.github/ci.yaml b/.github/ci.yaml index 5911295..ba12379 100644 --- a/.github/ci.yaml +++ b/.github/ci.yaml @@ -7,6 +7,6 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v4 with: - go-version: "^1.21" + go-version: "^1.23" - name: test run: go test . \ No newline at end of file diff --git a/go.mod b/go.mod index 6a82bfe..2ac6d39 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/coder/quartz -go 1.21.8 +go 1.23.9 diff --git a/mock.go b/mock.go index bd65ddb..d168849 100644 --- a/mock.go +++ b/mock.go @@ -55,6 +55,9 @@ func (m *Mock) TickerFunc(ctx context.Context, d time.Duration, f func() error, return t } +// NewTicker creates a mocked ticker attached to this Mock. Note that it will cease sending ticks on its channel at the +// end of the test, to avoid leaking any goroutines. Ticks are suppressed even if the mock clock is advanced after the +// test completes. Best practice is to only manipulate the mock time in the main goroutine of the test. func (m *Mock) NewTicker(d time.Duration, tags ...string) *Ticker { if d <= 0 { panic("NewTicker called with negative or zero duration") @@ -64,17 +67,7 @@ func (m *Mock) NewTicker(d time.Duration, tags ...string) *Ticker { c := newCall(clockFunctionNewTicker, tags, withDuration(d)) m.matchCallLocked(c) defer close(c.complete) - // 1 element buffer follows standard library implementation - ticks := make(chan time.Time, 1) - t := &Ticker{ - C: ticks, - c: ticks, - d: d, - nxt: m.cur.Add(d), - mock: m, - } - m.addEventLocked(t) - return t + return newMockTickerLocked(m, d) } func (m *Mock) NewTimer(d time.Duration, tags ...string) *Timer { @@ -83,7 +76,7 @@ func (m *Mock) NewTimer(d time.Duration, tags ...string) *Timer { c := newCall(clockFunctionNewTimer, tags, withDuration(d)) defer close(c.complete) m.matchCallLocked(c) - ch := make(chan time.Time, 1) + ch := make(chan time.Time) t := &Timer{ C: ch, c: ch, @@ -277,8 +270,8 @@ func (m *Mock) Advance(d time.Duration) AdvanceWaiter { return w } if fin.After(m.nextTime) { - m.tb.Errorf(fmt.Sprintf("cannot advance %s which is beyond next timer/ticker event in %s", - d.String(), m.nextTime.Sub(m.cur))) + m.tb.Errorf("cannot advance %s which is beyond next timer/ticker event in %s", + d.String(), m.nextTime.Sub(m.cur)) m.mu.Unlock() close(w.ch) return w diff --git a/mock_test.go b/mock_test.go index ae3d14c..79e58e0 100644 --- a/mock_test.go +++ b/mock_test.go @@ -380,9 +380,8 @@ func Test_MultipleTrapsDeadlock(t *testing.T) { trap1 := mClock.Trap().Now("1") defer trap1.Close() - timeCh := make(chan time.Time) go func() { - timeCh <- mClock.Now("0", "1") + mClock.Now("0", "1") }() c0 := trap0.MustWait(testCtx) @@ -501,3 +500,76 @@ func (l *testLogger) Log(args ...any) { func (l *testLogger) Logf(format string, args ...any) { l.calls = append(l.calls, fmt.Sprintf(format, args...)) } + +func TestTimerStop_Go123(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + mClock := quartz.NewMock(t) + + tmr := mClock.NewTimer(1 * time.Second) + mClock.Advance(1 * time.Second).MustWait(ctx) + if tmr.Stop() { + t.Fatal("timer hadn't already been active") + } + + select { + case tme := <-tmr.C: + t.Fatalf("got channel read after stop: %s", tme) + default: + // OK! + } +} + +func TestTimerReset_Go123(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + mClock := quartz.NewMock(t) + + tmr := mClock.NewTimer(1 * time.Second) + mClock.Advance(1 * time.Second).MustWait(ctx) + if tmr.Reset(1 * time.Second) { + t.Fatal("timer hadn't already been active") + } + + select { + case tme := <-tmr.C: + t.Fatalf("got channel read after stop: %s", tme) + default: + // OK! + } +} + +func TestNoLeak_Go123(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + mClock := quartz.NewMock(t) + + _ = mClock.NewTimer(1 * time.Second) + _ = mClock.NewTicker(1 * time.Second) + mClock.Advance(1 * time.Second).MustWait(ctx) +} + +func TestTickerStop_Go123(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + mClock := quartz.NewMock(t) + + tkr := mClock.NewTicker(1 * time.Second) + mClock.Advance(1 * time.Second).MustWait(ctx) + tkr.Stop() + + select { + case tme := <-tkr.C: + t.Fatalf("got channel read after stop: %s", tme) + default: + // OK! + } +} diff --git a/ticker.go b/ticker.go index 5506750..f4a4b06 100644 --- a/ticker.go +++ b/ticker.go @@ -6,12 +6,25 @@ import "time" type Ticker struct { C <-chan time.Time //nolint: revive - c chan time.Time - ticker *time.Ticker // realtime impl, if set - d time.Duration // period, if set - nxt time.Time // next tick time - mock *Mock // mock clock, if set - stopped bool // true if the ticker is not running + c chan time.Time + ticker *time.Ticker // realtime impl, if set + d time.Duration // period, if set + nxt time.Time // next tick time + mock *Mock // mock clock, if set + stopped bool // true if the ticker is not running + internalTicks chan time.Time // used to deliver ticks to the runLoop goroutine + + // As of Go 1.23, ticker channels are unbuffered and guaranteed to block forever after a call to stop. + // + // When a mocked ticker fires, we don't want to block on a channel write, because it's fine for the code under test + // not to be reading. That means we need to start a new goroutine to do the channel write (runLoop) if we are a + // channel-based ticker. + // + // They also are not supposed to leak even if they are never read or stopped (Go runtime can garbage collect them). + // We can't garbage-collect because we can't check if any other code besides the mock references, but we can ensure + // that we don't leak goroutines so that the garbage collector can do its job when the mock is no longer + // referenced. The channels below allow us to interrupt the runLoop goroutine. + interrupt chan struct{} } func (t *Ticker) fire(tt time.Time) { @@ -24,9 +37,8 @@ func (t *Ticker) fire(tt time.Time) { t.nxt = t.nxt.Add(t.d) } t.mock.recomputeNextLocked() - select { - case t.c <- tt: - default: + if t.interrupt != nil { // implies runLoop is still going. + t.internalTicks <- tt } } @@ -49,6 +61,11 @@ func (t *Ticker) Stop(tags ...string) { defer close(c.complete) t.mock.removeEventLocked(t) t.stopped = true + // check if we've already fired, and if so, interrupt it. + if t.interrupt != nil { + <-t.interrupt + t.interrupt = nil + } } // Reset stops a ticker and resets its period to the specified duration. The @@ -72,4 +89,63 @@ func (t *Ticker) Reset(d time.Duration, tags ...string) { } else { t.mock.recomputeNextLocked() } + if t.interrupt == nil { + t.startRunLoopLocked() + } +} + +func (t *Ticker) runLoop(interrupt chan struct{}) { + defer close(interrupt) +outer: + for { + select { + case tt := <-t.internalTicks: + for { + select { + case t.c <- tt: + continue outer + case <-t.internalTicks: + // Discard future ticks until we can send this one. + case interrupt <- struct{}{}: + return + } + } + case interrupt <- struct{}{}: + return + } + } +} + +func (t *Ticker) startRunLoopLocked() { + // assert some assumptions. If these fire, it is a bug in Quartz itself. + if t.interrupt != nil { + t.mock.tb.Error("called startRunLoopLocked when interrupt suggests we are already running") + } + interrupt := make(chan struct{}) + t.interrupt = interrupt + go t.runLoop(interrupt) +} + +func newMockTickerLocked(m *Mock, d time.Duration) *Ticker { + // no buffer follows Go 1.23+ behavior + ticks := make(chan time.Time) + t := &Ticker{ + C: ticks, + c: ticks, + d: d, + nxt: m.cur.Add(d), + mock: m, + internalTicks: make(chan time.Time), + } + m.addEventLocked(t) + m.tb.Cleanup(func() { + m.mu.Lock() + defer m.mu.Unlock() + if t.interrupt != nil { + <-t.interrupt + t.interrupt = nil + } + }) + t.startRunLoopLocked() + return t } diff --git a/timer.go b/timer.go index 8d928ac..8f571eb 100644 --- a/timer.go +++ b/timer.go @@ -1,6 +1,8 @@ package quartz -import "time" +import ( + "time" +) // The Timer type represents a single event. When the Timer expires, the current time will be sent // on C, unless the Timer was created by AfterFunc. A Timer must be created with NewTimer or @@ -14,14 +16,43 @@ type Timer struct { mock *Mock // mock clock, if set fn func() // AfterFunc function, if set stopped bool // True if stopped, false if running + + // As of Go 1.23, timer channels are unbuffered and guaranteed to block forever after a call to stop. + // + // When a mocked timer fires, we don't want to block on a channel write, because it's fine for the code under test + // not to be reading. That means we need to start a new goroutine to do the channel write if we are a channel-based + // timer. + // + // They also are not supposed to leak even if they are never read or stopped (Go runtime can garbage collect them). + // We can't garbage-collect because we can't check if any other code besides the mock references, but we can ensure + // that we don't leak goroutines so that the garbage collector can do its job when the mock is no longer + // referenced. The channels below allow us to interrupt the channel write goroutine. + interrupt chan struct{} } func (t *Timer) fire(tt time.Time) { - t.mock.removeTimer(t) + t.mock.mu.Lock() + t.mock.removeTimerLocked(t) if t.fn != nil { + t.mock.mu.Unlock() t.fn() + return } else { - t.c <- tt + interrupt := make(chan struct{}) + // Prevents the goroutine from leaking beyond the test. Side effect is that timer channels cannot be read + // after the test exits. + t.mock.tb.Cleanup(func() { + <-interrupt + }) + t.interrupt = interrupt + t.mock.mu.Unlock() + go func() { + defer close(interrupt) + select { + case t.c <- tt: + case interrupt <- struct{}{}: + } + }() } } @@ -45,6 +76,11 @@ func (t *Timer) Stop(tags ...string) bool { defer close(c.complete) result := !t.stopped t.mock.removeTimerLocked(t) + // check if we've already fired, and if so, interrupt it. + if t.interrupt != nil { + <-t.interrupt + t.interrupt = nil + } return result } @@ -62,9 +98,10 @@ func (t *Timer) Reset(d time.Duration, tags ...string) bool { t.mock.matchCallLocked(c) defer close(c.complete) result := !t.stopped - select { - case <-t.c: - default: + // check if we've already fired, and if so, interrupt it. + if t.interrupt != nil { + <-t.interrupt + t.interrupt = nil } if d <= 0 { // zero or negative duration timer means we should immediately re-fire