Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: rangefeed: fix panic due to rangefeed stopper race #76827

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,16 @@ type CatchUpIteratorConstructor func() *CatchUpIterator
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) error {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectWithErr(all, roachpb.NewError(err))
close(p.stoppedC)
return err
}
return nil
}

// run is called from Start and runs the rangefeed.
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed
const testProcessorEventCCap = 16

func newTestProcessorWithTxnPusher(
rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
) (*Processor, *stop.Stopper) {
t.Helper()
stopper := stop.NewStopper()

var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
Expand All @@ -154,7 +155,7 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, makeIntentScannerConstructor(rtsIter))
require.NoError(t, p.Start(stopper, makeIntentScannerConstructor(rtsIter)))
return p, stopper
}

Expand All @@ -165,13 +166,16 @@ func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScan
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
}

func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
func newTestProcessor(
t *testing.T, rtsIter storage.SimpleMVCCIterator,
) (*Processor, *stop.Stopper) {
t.Helper()
return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */)
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Test processor without registrations.
Expand Down Expand Up @@ -434,13 +438,13 @@ func TestNilProcessor(t *testing.T) {
// to call on a nil Processor.
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
require.Panics(t, func() { p.Start(stopper, nil) })
require.Panics(t, func() { _ = p.Start(stopper, nil) })
require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) })
}

func TestProcessorSlowConsumer(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

// Add a registration.
Expand Down Expand Up @@ -566,7 +570,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
}, nil)
rtsIter.block = make(chan struct{})

p, stopper := newTestProcessor(rtsIter)
p, stopper := newTestProcessor(t, rtsIter)
defer stopper.Stop(context.Background())

// The resolved timestamp should not be initialized.
Expand Down Expand Up @@ -728,7 +732,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
return nil
})

p, stopper := newTestProcessorWithTxnPusher(nil /* rtsIter */, &tp)
p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp)
defer stopper.Stop(context.Background())

// Add a few intents and move the closed timestamp forward.
Expand Down Expand Up @@ -818,7 +822,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
defer leaktest.AfterTest(t)()
const trials = 10
for i := 0; i < trials; i++ {
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)

var wg sync.WaitGroup
wg.Add(6)
Expand Down Expand Up @@ -864,7 +868,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
// observes only operations that are consumed after it has registered.
func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
p, stopper := newTestProcessor(nil /* rtsIter */)
p, stopper := newTestProcessor(t, nil /* rtsIter */)
defer stopper.Stop(context.Background())

firstC := make(chan int64)
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,15 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
return rangefeed.NewLegacyIntentScanner(iter)
}

p.Start(r.store.Stopper(), rtsIter)
// NB: This only errors if the stopper is stopping, and we have to return here
// in that case. We do check ShouldQuiesce() below, but that's not sufficient
// because the stopper has two states: stopping and quiescing. If this errors
// due to stopping, but before it enters the quiescing state, then the select
// below will fall through to the panic.
if err := p.Start(r.store.Stopper(), rtsIter); err != nil {
errC <- roachpb.NewError(err)
return nil
}

// Register with the processor *before* we attach its reference to the
// Replica struct. This ensures that the registration is in place before
Expand Down