From 3d1281ac3f96db1e6beb0d29bda0b3c0bca1b74a Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 20 Feb 2022 12:07:09 +0000 Subject: [PATCH] rangefeed: fix panic due to rangefeed stopper race This patch fixes a race condition that could cause an `unexpected Stopped processor` panic if a rangefeed registration was attempted while a store was stopping. Registering a rangefeed panics if a newly created rangefeed processor is unexpectedly stopped and the store's stopper is not quiescing. However, the stopper has two distinct states that it transitions through: stopping and quiescing. It's possible for the processor to fail to start because the stopper is stopping, but before the stopper has transitioned to quiescing, which would trigger this panic. This patch propagates the processor startup error to the rangefeed registration and through to the caller, returning before attempting the registration at all and avoiding the panic. This was confirmed with 50000 stress runs of `TestPGTest/pgjdbc`, all of which succeeded. Release note (bug fix): Fixed a race condition that in rare circumstances could cause a node to panic with `unexpected Stopped processor` during shutdown. --- pkg/kv/kvserver/rangefeed/processor.go | 7 +++--- pkg/kv/kvserver/rangefeed/processor_test.go | 26 ++++++++++++--------- pkg/kv/kvserver/replica_rangefeed.go | 10 +++++++- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index ba6526bf2e44..12c553928786 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -198,15 +198,16 @@ type CatchUpIteratorConstructor func() *CatchUpIterator // calling its Close method when it is finished. If the iterator is nil then // no initialization scan will be performed and the resolved timestamp will // immediately be considered initialized. -func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) { +func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) error { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { p.run(ctx, p.RangeID, rtsIterFunc, stopper) }); err != nil { - pErr := roachpb.NewError(err) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectWithErr(all, roachpb.NewError(err)) close(p.stoppedC) + return err } + return nil } // run is called from Start and runs the rangefeed. diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 327a7b2d6219..ebc77919fdab 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -134,8 +134,9 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed const testProcessorEventCCap = 16 func newTestProcessorWithTxnPusher( - rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher, + t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher, ) (*Processor, *stop.Stopper) { + t.Helper() stopper := stop.NewStopper() var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable @@ -154,7 +155,7 @@ func newTestProcessorWithTxnPusher( EventChanCap: testProcessorEventCCap, CheckStreamsInterval: 10 * time.Millisecond, }) - p.Start(stopper, makeIntentScannerConstructor(rtsIter)) + require.NoError(t, p.Start(stopper, makeIntentScannerConstructor(rtsIter))) return p, stopper } @@ -165,13 +166,16 @@ func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScan return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) } } -func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) { - return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */) +func newTestProcessor( + t *testing.T, rtsIter storage.SimpleMVCCIterator, +) (*Processor, *stop.Stopper) { + t.Helper() + return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */) } func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) // Test processor without registrations. @@ -434,13 +438,13 @@ func TestNilProcessor(t *testing.T) { // to call on a nil Processor. stopper := stop.NewStopper() defer stopper.Stop(context.Background()) - require.Panics(t, func() { p.Start(stopper, nil) }) + require.Panics(t, func() { _ = p.Start(stopper, nil) }) require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) }) } func TestProcessorSlowConsumer(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) // Add a registration. @@ -566,7 +570,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { }, nil) rtsIter.block = make(chan struct{}) - p, stopper := newTestProcessor(rtsIter) + p, stopper := newTestProcessor(t, rtsIter) defer stopper.Stop(context.Background()) // The resolved timestamp should not be initialized. @@ -728,7 +732,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil }) - p, stopper := newTestProcessorWithTxnPusher(nil /* rtsIter */, &tp) + p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp) defer stopper.Stop(context.Background()) // Add a few intents and move the closed timestamp forward. @@ -818,7 +822,7 @@ func TestProcessorConcurrentStop(t *testing.T) { defer leaktest.AfterTest(t)() const trials = 10 for i := 0; i < trials; i++ { - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) var wg sync.WaitGroup wg.Add(6) @@ -864,7 +868,7 @@ func TestProcessorConcurrentStop(t *testing.T) { // observes only operations that are consumed after it has registered. func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) + p, stopper := newTestProcessor(t, nil /* rtsIter */) defer stopper.Stop(context.Background()) firstC := make(chan int64) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index ce4cac40bdc5..32574d9bac5f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -411,7 +411,15 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( return rangefeed.NewLegacyIntentScanner(iter) } - p.Start(r.store.Stopper(), rtsIter) + // NB: This only errors if the stopper is stopping, and we have to return here + // in that case. We do check ShouldQuiesce() below, but that's not sufficient + // because the stopper has two states: stopping and quiescing. If this errors + // due to stopping, but before it enters the quiescing state, then the select + // below will fall through to the panic. + if err := p.Start(r.store.Stopper(), rtsIter); err != nil { + errC <- roachpb.NewError(err) + return nil + } // Register with the processor *before* we attach its reference to the // Replica struct. This ensures that the registration is in place before