From d01e7698e39d0c08456cd3e850217f6e6551d516 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 22 Jul 2021 11:14:07 -0500 Subject: [PATCH] Fix timer failure (#27006) (#27017) Fixes #26205 Fixes deadlocks in heartbeat scheduler. No changelog needed because this bug only exists in go 1.16+ and v7.14.0 is the first affected beats version The underlying cause is a likely bug in golang, reported here by myself here: golang/go#47329 (cherry picked from commit 2f62e8fbd7cecd8c2cda3c76bad42136b54caafd) Co-authored-by: Andrew Cholakian --- heartbeat/scheduler/timerqueue/queue.go | 19 +++++++---- heartbeat/scheduler/timerqueue/queue_test.go | 33 +++++++++++++++++--- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/heartbeat/scheduler/timerqueue/queue.go b/heartbeat/scheduler/timerqueue/queue.go index f545c43be8e..61d4e0ac933 100644 --- a/heartbeat/scheduler/timerqueue/queue.go +++ b/heartbeat/scheduler/timerqueue/queue.go @@ -88,7 +88,7 @@ func (tq *TimerQueue) Start() { if tq.th.Len() > 0 { nr := tq.th[0].runAt tq.nextRunAt = &nr - tq.timer.Reset(nr.Sub(time.Now())) + tq.timer = time.NewTimer(time.Until(nr)) } else { tq.timer.Stop() tq.nextRunAt = nil @@ -104,14 +104,21 @@ func (tq *TimerQueue) pushInternal(tt *timerTask) { heap.Push(&tq.th, tt) if tq.nextRunAt == nil || tq.nextRunAt.After(tt.runAt) { - // Stop and drain the timer prior to reset per https://golang.org/pkg/time/#Timer.Reset - // Only drain if nextRunAt is set, otherwise the timer channel has already been stopped the - // channel is empty (and thus would block) if tq.nextRunAt != nil && !tq.timer.Stop() { <-tq.timer.C } - tq.timer.Reset(tt.runAt.Sub(time.Now())) - + // Originally the line below this comment was + // + // tq.timer.Reset(time.Until(tt.runAt)) + // + // however this broke in go1.16rc1, specifically on the commit b4b014465216790e01aa66f9120d03230e4aff46 + //, specifically on this line: + // https://github.com/golang/go/commit/b4b014465216790e01aa66f9120d03230e4aff46#diff-73699b6edfe5dbb3f6824e66bb3566bce9405e9a8c810cac55c8199459f0ac19R652 + // where some nice new optimizations don't actually work reliably + // This can be worked around by instantiating a new timer rather than resetting the timer. + // since that internally calls deltimer in runtime/timer.go rather than modtimer, + // I suspect that the problem is in modtimer's setting of &pp.timerModifiedEarliest + tq.timer = time.NewTimer(time.Until(tt.runAt)) tq.nextRunAt = &tt.runAt } } diff --git a/heartbeat/scheduler/timerqueue/queue_test.go b/heartbeat/scheduler/timerqueue/queue_test.go index 2e5a2e59eb8..26235390bb4 100644 --- a/heartbeat/scheduler/timerqueue/queue_test.go +++ b/heartbeat/scheduler/timerqueue/queue_test.go @@ -20,6 +20,8 @@ package timerqueue import ( "context" "math/rand" + "os" + "runtime/pprof" "sort" "testing" "time" @@ -27,11 +29,32 @@ import ( "github.com/stretchr/testify/require" ) -func TestQueueRunsInOrder(t *testing.T) { - t.Skip("flaky test on windows: https://github.com/elastic/beats/issues/26205") - // Bugs can show up only occasionally - for i := 0; i < 100; i++ { - testQueueRunsInOrderOnce(t) +func TestRunsInOrder(t *testing.T) { + testQueueRunsInOrderOnce(t) +} + +// TestStress tries to figure out if we have any deadlocks that show up under concurrency +func TestStress(t *testing.T) { + for i := 0; i < 120000; i++ { + failed := make(chan bool) + succeeded := make(chan bool) + + watchdog := time.AfterFunc(time.Second*5, func() { + failed <- true + }) + + go func() { + testQueueRunsInOrderOnce(t) + succeeded <- true + }() + + select { + case <-failed: + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + require.FailNow(t, "Scheduler test iteration timed out, deadlock issue?") + case <-succeeded: + watchdog.Stop() + } } }