Skip to content

Commit

Permalink
ccl/changefeedccl: deflake TestChangefeedSchemaChangeAllowBackfill
Browse files Browse the repository at this point in the history
Any goroutine started via `Stopper.RunAsyncTask` needs to watch
`Stopper.ShouldQuiesce()` to determine if it should stop
processing. This wasn't being done properly for the "changefeed-poller"
task causing the test to fail to exit in rare circumstances.

Fixes #33166

Release note: None
  • Loading branch information
petermattis committed Dec 19, 2018
1 parent eff9b52 commit 0b1a649
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,14 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
// but only the first one is ever used.
ca.errCh = make(chan error, 2)
ca.pollerDoneCh = make(chan struct{})
if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) {
stopper := ca.flowCtx.Stopper()
if err := stopper.RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) {
defer close(ca.pollerDoneCh)
var err error
if storage.RangefeedEnabled.Get(&ca.flowCtx.Settings.SV) {
err = ca.poller.RunUsingRangefeeds(ctx)
err = ca.poller.RunUsingRangefeeds(ctx, stopper)
} else {
err = ca.poller.Run(ctx)
err = ca.poller.Run(ctx, stopper)
}

// Trying to call MoveToDraining here is racy (`MoveToDraining called in
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func createBenchmarkChangefeed(
s.ClusterSettings(), details, encoder, sink, rowsFn, TestingKnobs{}, metrics)

ctx, cancel := context.WithCancel(ctx)
go func() { _ = poller.Run(ctx) }()
go func() { _ = poller.Run(ctx, nil) }()
go func() { _ = thUpdater.PollTableDescs(ctx) }()

errCh := make(chan error, 1)
Expand Down
24 changes: 22 additions & 2 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -119,7 +120,12 @@ func makePoller(
// ExportRequests. It backpressures sending the requests such that some maximum
// number are inflight or being inserted into the buffer. Finally, after each
// poll completes, a resolved timestamp notification is added to the buffer.
func (p *poller) Run(ctx context.Context) error {
func (p *poller) Run(ctx context.Context, stopper *stop.Stopper) error {
var shouldQuiesce <-chan struct{}
if stopper != nil {
shouldQuiesce = stopper.ShouldQuiesce()
}

for {
// Wait for polling interval
p.mu.Lock()
Expand All @@ -134,6 +140,10 @@ func (p *poller) Run(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-time.After(pollDuration):
case <-shouldQuiesce:
// NB: We have to return an error in order tell the ctxgroup to
// initiate cancellation. Any error will do.
return context.Canceled
}
}

Expand Down Expand Up @@ -190,12 +200,22 @@ func (p *poller) Run(ctx context.Context) error {
// RunUsingRangeFeeds performs the same task as the normal Run method, but uses
// the experimental Rangefeed system to capture changes rather than the
// poll-and-export method. Note
func (p *poller) RunUsingRangefeeds(ctx context.Context) error {
func (p *poller) RunUsingRangefeeds(ctx context.Context, stopper *stop.Stopper) error {
// Start polling tablehistory, which must be done concurrently with
// the individual rangefeed routines.
g := ctxgroup.WithContext(ctx)
g.GoCtx(p.pollTableHistory)
g.GoCtx(p.rangefeedImpl)
g.GoCtx(func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-stopper.ShouldQuiesce():
// NB: We have to return an error in order tell the ctxgroup to initiate
// cancellation. Any error will do.
return context.Canceled
}
})
return g.Wait()
}

Expand Down

0 comments on commit 0b1a649

Please sign in to comment.