From 0b1a649a9d38a466e6a288f1ce5aafd54822eca5 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Wed, 19 Dec 2018 09:59:59 -0500 Subject: [PATCH] ccl/changefeedccl: deflake TestChangefeedSchemaChangeAllowBackfill Any goroutine started via `Stopper.RunAsyncTask` needs to watch `Stopper.ShouldQuiesce()` to determine if it should stop processing. This wasn't being done properly for the "changefeed-poller" task causing the test to fail to exit in rare circumstances. Fixes #33166 Release note: None --- .../changefeedccl/changefeed_processors.go | 7 +++--- pkg/ccl/changefeedccl/helpers_test.go | 2 +- pkg/ccl/changefeedccl/poller.go | 24 +++++++++++++++++-- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 047ac21215ae..079e09d0151d 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -166,13 +166,14 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { // but only the first one is ever used. ca.errCh = make(chan error, 2) ca.pollerDoneCh = make(chan struct{}) - if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) { + stopper := ca.flowCtx.Stopper() + if err := stopper.RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) { defer close(ca.pollerDoneCh) var err error if storage.RangefeedEnabled.Get(&ca.flowCtx.Settings.SV) { - err = ca.poller.RunUsingRangefeeds(ctx) + err = ca.poller.RunUsingRangefeeds(ctx, stopper) } else { - err = ca.poller.Run(ctx) + err = ca.poller.Run(ctx, stopper) } // Trying to call MoveToDraining here is racy (`MoveToDraining called in diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index ca30c27e5d4b..d56014a20f0b 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -142,7 +142,7 @@ func createBenchmarkChangefeed( s.ClusterSettings(), details, encoder, sink, rowsFn, TestingKnobs{}, metrics) ctx, cancel := context.WithCancel(ctx) - go func() { _ = poller.Run(ctx) }() + go func() { _ = poller.Run(ctx, nil) }() go func() { _ = thUpdater.PollTableDescs(ctx) }() errCh := make(chan error, 1) diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index cf89e8d7b438..605680aabecd 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" @@ -119,7 +120,12 @@ func makePoller( // ExportRequests. It backpressures sending the requests such that some maximum // number are inflight or being inserted into the buffer. Finally, after each // poll completes, a resolved timestamp notification is added to the buffer. -func (p *poller) Run(ctx context.Context) error { +func (p *poller) Run(ctx context.Context, stopper *stop.Stopper) error { + var shouldQuiesce <-chan struct{} + if stopper != nil { + shouldQuiesce = stopper.ShouldQuiesce() + } + for { // Wait for polling interval p.mu.Lock() @@ -134,6 +140,10 @@ func (p *poller) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-time.After(pollDuration): + case <-shouldQuiesce: + // NB: We have to return an error in order tell the ctxgroup to + // initiate cancellation. Any error will do. + return context.Canceled } } @@ -190,12 +200,22 @@ func (p *poller) Run(ctx context.Context) error { // RunUsingRangeFeeds performs the same task as the normal Run method, but uses // the experimental Rangefeed system to capture changes rather than the // poll-and-export method. Note -func (p *poller) RunUsingRangefeeds(ctx context.Context) error { +func (p *poller) RunUsingRangefeeds(ctx context.Context, stopper *stop.Stopper) error { // Start polling tablehistory, which must be done concurrently with // the individual rangefeed routines. g := ctxgroup.WithContext(ctx) g.GoCtx(p.pollTableHistory) g.GoCtx(p.rangefeedImpl) + g.GoCtx(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-stopper.ShouldQuiesce(): + // NB: We have to return an error in order tell the ctxgroup to initiate + // cancellation. Any error will do. + return context.Canceled + } + }) return g.Wait() }