diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index db5f8d1a7ef3..ccf64deaef37 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -62,7 +62,11 @@ type feedProcessorType struct { } func (t feedProcessorType) String() string { - return fmt.Sprintf("mux=%t_scheduler=%t", t.feed == rangefeed.MuxRangefeedEnabled, t.useScheduler) + if t.feed == rangefeed.MuxRangefeedEnabled { + return fmt.Sprintf("mux/scheduler=%t", t.useScheduler) + } else { + return fmt.Sprintf("single/scheduler=%t", t.useScheduler) + } } var procTypes = []feedProcessorType{ diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index c0bfef6b8fb1..87ee481656b1 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -27,10 +27,10 @@ import ( ) const ( - // defaultPushTxnsInterval is the default interval at which a Processor will + // DefaultPushTxnsInterval is the default interval at which a Processor will // push all transactions in the unresolvedIntentQueue that are above the age // specified by PushTxnsAge. - defaultPushTxnsInterval = 250 * time.Millisecond + DefaultPushTxnsInterval = 250 * time.Millisecond // defaultPushTxnsAge is the default age at which a Processor will begin to // consider a transaction old enough to push. defaultPushTxnsAge = 10 * time.Second @@ -57,6 +57,9 @@ type Config struct { // PushTxnsInterval specifies the interval at which a Processor will push // all transactions in the unresolvedIntentQueue that are above the age // specified by PushTxnsAge. + // + // This option only applies to LegacyProcessor since ScheduledProcessor is + // relying on store to push events to scheduler to initiate transaction push. PushTxnsInterval time.Duration // PushTxnsAge specifies the age at which a Processor will begin to consider // a transaction old enough to push. @@ -93,7 +96,7 @@ func (sc *Config) SetDefaults() { } } else { if sc.PushTxnsInterval == 0 { - sc.PushTxnsInterval = defaultPushTxnsInterval + sc.PushTxnsInterval = DefaultPushTxnsInterval } if sc.PushTxnsAge == 0 { sc.PushTxnsAge = defaultPushTxnsAge @@ -196,6 +199,13 @@ type Processor interface { // EventChanTimeout configuration. If the method returns false, the processor // will have been stopped, so calling Stop is not necessary. ForwardClosedTS(ctx context.Context, closedTS hlc.Timestamp) bool + + // External notification integration. + + // ID returns scheduler ID of the processor that can be used to notify it + // to do some type of work. If ID is 0 then processor doesn't support + // external event scheduling. + ID() int64 } // NewProcessor creates a new rangefeed Processor. The corresponding processing @@ -204,8 +214,6 @@ func NewProcessor(cfg Config) Processor { cfg.SetDefaults() cfg.AmbientContext.AddLogTag("rangefeed", nil) if cfg.Scheduler != nil { - // TODO(oleg): remove logging - log.Infof(context.Background(), "creating new style processor for range feed on r%d", cfg.RangeID) return NewScheduledProcessor(cfg) } return NewLegacyProcessor(cfg) @@ -479,7 +487,9 @@ func (p *LegacyProcessor) run( // Launch an async transaction push attempt that pushes the // timestamp of all transactions beneath the push offset. // Ignore error if quiescing. - pushTxns := newTxnPushAttempt(p.Span, p.TxnPusher, p, toPush, now, txnPushAttemptC) + pushTxns := newTxnPushAttempt(p.Span, p.TxnPusher, p, toPush, now, func() { + close(txnPushAttemptC) + }) err := stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run) if err != nil { pushTxns.Cancel() @@ -922,6 +932,11 @@ func (p *LegacyProcessor) newCheckpointEvent() *kvpb.RangeFeedEvent { return &event } +// ID implements Processor interface. +func (p *LegacyProcessor) ID() int64 { + return 0 +} + // calculateDateEventSize returns estimated size of the event that contain actual // data. We only account for logical ops and sst's. Those events come from raft // and are budgeted. Other events come from processor jobs and update timestamps diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index c28f527f5b5f..5e1d339744d8 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -149,6 +149,7 @@ type processorTestHelper struct { rts *resolvedTimestamp syncEventC func() sendSpanSync func(*roachpb.Span) + scheduler *ClientScheduler } // syncEventAndRegistrations waits for all previously sent events to be @@ -165,6 +166,28 @@ func (h *processorTestHelper) syncEventAndRegistrationsSpan(span roachpb.Span) { h.sendSpanSync(&span) } +func (h *processorTestHelper) triggerTxnPushUntilPushed( + t *testing.T, ackC chan struct{}, timeout time.Duration, +) { + if h.scheduler != nil { + deadline := time.After(timeout) + for { + h.scheduler.Schedule(PushTxn) + select { + case <-deadline: + t.Fatal("failed to get txn push notification") + case ackC <- struct{}{}: + return + case <-time.After(100 * time.Millisecond): + // We keep sending events to avoid the situation where event arrives + // but flag indicating that push is still running is not reset. + } + } + } else { + ackC <- struct{}{} + } +} + type procType bool const ( @@ -301,6 +324,7 @@ func newTestProcessor(t tHelper, opts ...option) (Processor, *processorTestHelpe h.sendSpanSync = func(span *roachpb.Span) { p.syncSendAndWait(&syncEvent{c: make(chan struct{}), testRegCatchupSpan: span}) } + h.scheduler = &p.scheduler default: panic("unknown processor type") } @@ -890,186 +914,187 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { }) } -// TODO(oleg): write test for scheduler where push would be scheduled externally. func TestProcessorTxnPushAttempt(t *testing.T) { defer leaktest.AfterTest(t)() - ts10 := hlc.Timestamp{WallTime: 10} - ts20 := hlc.Timestamp{WallTime: 20} - ts25 := hlc.Timestamp{WallTime: 25} - ts30 := hlc.Timestamp{WallTime: 30} - ts50 := hlc.Timestamp{WallTime: 50} - ts60 := hlc.Timestamp{WallTime: 60} - ts70 := hlc.Timestamp{WallTime: 70} - ts90 := hlc.Timestamp{WallTime: 90} - - // Create a set of transactions. - txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() - txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts10, MinTimestamp: ts10} - txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts20, MinTimestamp: ts20} - txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts30, MinTimestamp: ts30} - txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING} - txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.PENDING} - txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.PENDING} - - // Modifications for test 2. - txn1MetaT2Pre := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts25, MinTimestamp: ts10} - txn1MetaT2Post := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts50, MinTimestamp: ts10} - txn2MetaT2Post := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts60, MinTimestamp: ts20} - txn3MetaT2Post := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts70, MinTimestamp: ts30} - txn1ProtoT2 := &roachpb.Transaction{TxnMeta: txn1MetaT2Post, Status: roachpb.COMMITTED} - txn2ProtoT2 := &roachpb.Transaction{TxnMeta: txn2MetaT2Post, Status: roachpb.PENDING} - txn3ProtoT2 := &roachpb.Transaction{TxnMeta: txn3MetaT2Post, Status: roachpb.PENDING} - - // Modifications for test 3. - txn2MetaT3Post := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts60, MinTimestamp: ts20} - txn3MetaT3Post := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts90, MinTimestamp: ts30} - txn2ProtoT3 := &roachpb.Transaction{TxnMeta: txn2MetaT3Post, Status: roachpb.ABORTED} - txn3ProtoT3 := &roachpb.Transaction{TxnMeta: txn3MetaT3Post, Status: roachpb.PENDING} - - testNum := 0 - pausePushAttemptsC := make(chan struct{}) - resumePushAttemptsC := make(chan struct{}) - defer close(pausePushAttemptsC) - defer close(resumePushAttemptsC) - - // Create a TxnPusher that performs assertions during the first 3 uses. - var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { - // The txns are not in a sorted order. Enforce one. - sort.Slice(txns, func(i, j int) bool { - return bytes.Compare(txns[i].Key, txns[j].Key) < 0 - }) + testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) { + ts10 := hlc.Timestamp{WallTime: 10} + ts20 := hlc.Timestamp{WallTime: 20} + ts25 := hlc.Timestamp{WallTime: 25} + ts30 := hlc.Timestamp{WallTime: 30} + ts50 := hlc.Timestamp{WallTime: 50} + ts60 := hlc.Timestamp{WallTime: 60} + ts70 := hlc.Timestamp{WallTime: 70} + ts90 := hlc.Timestamp{WallTime: 90} + + // Create a set of transactions. + txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4() + txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts10, MinTimestamp: ts10} + txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts20, MinTimestamp: ts20} + txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts30, MinTimestamp: ts30} + txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING} + txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.PENDING} + txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.PENDING} + + // Modifications for test 2. + txn1MetaT2Pre := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts25, MinTimestamp: ts10} + txn1MetaT2Post := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts50, MinTimestamp: ts10} + txn2MetaT2Post := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts60, MinTimestamp: ts20} + txn3MetaT2Post := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts70, MinTimestamp: ts30} + txn1ProtoT2 := &roachpb.Transaction{TxnMeta: txn1MetaT2Post, Status: roachpb.COMMITTED} + txn2ProtoT2 := &roachpb.Transaction{TxnMeta: txn2MetaT2Post, Status: roachpb.PENDING} + txn3ProtoT2 := &roachpb.Transaction{TxnMeta: txn3MetaT2Post, Status: roachpb.PENDING} + + // Modifications for test 3. + txn2MetaT3Post := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts60, MinTimestamp: ts20} + txn3MetaT3Post := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts90, MinTimestamp: ts30} + txn2ProtoT3 := &roachpb.Transaction{TxnMeta: txn2MetaT3Post, Status: roachpb.ABORTED} + txn3ProtoT3 := &roachpb.Transaction{TxnMeta: txn3MetaT3Post, Status: roachpb.PENDING} + + testNum := 0 + pausePushAttemptsC := make(chan struct{}) + resumePushAttemptsC := make(chan struct{}) + defer close(pausePushAttemptsC) + defer close(resumePushAttemptsC) + + // Create a TxnPusher that performs assertions during the first 3 uses. + var tp testTxnPusher + tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + // The txns are not in a sorted order. Enforce one. + sort.Slice(txns, func(i, j int) bool { + return bytes.Compare(txns[i].Key, txns[j].Key) < 0 + }) + + testNum++ + switch testNum { + case 1: + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1Meta, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } - testNum++ - switch testNum { - case 1: - assert.Equal(t, 3, len(txns)) - assert.Equal(t, txn1Meta, txns[0]) - assert.Equal(t, txn2Meta, txns[1]) - assert.Equal(t, txn3Meta, txns[2]) - if t.Failed() { - return nil, errors.New("test failed") - } + // Push does not succeed. Protos not at larger ts. + return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil + case 2: + assert.Equal(t, 3, len(txns)) + assert.Equal(t, txn1MetaT2Pre, txns[0]) + assert.Equal(t, txn2Meta, txns[1]) + assert.Equal(t, txn3Meta, txns[2]) + if t.Failed() { + return nil, errors.New("test failed") + } - // Push does not succeed. Protos not at larger ts. - return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil - case 2: - assert.Equal(t, 3, len(txns)) - assert.Equal(t, txn1MetaT2Pre, txns[0]) - assert.Equal(t, txn2Meta, txns[1]) - assert.Equal(t, txn3Meta, txns[2]) - if t.Failed() { - return nil, errors.New("test failed") - } + // Push succeeds. Return new protos. + return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil + case 3: + assert.Equal(t, 2, len(txns)) + assert.Equal(t, txn2MetaT2Post, txns[0]) + assert.Equal(t, txn3MetaT2Post, txns[1]) + if t.Failed() { + return nil, errors.New("test failed") + } - // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil - case 3: - assert.Equal(t, 2, len(txns)) - assert.Equal(t, txn2MetaT2Post, txns[0]) - assert.Equal(t, txn3MetaT2Post, txns[1]) - if t.Failed() { - return nil, errors.New("test failed") + // Push succeeds. Return new protos. + return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil + default: + return nil, nil } + }) + tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + // There's nothing to assert here. We expect the intents to correspond to + // transactions that had their LockSpans populated when we pushed them. This + // test doesn't simulate that. - // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil - default: - return nil, nil - } - }) - tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { - // There's nothing to assert here. We expect the intents to correspond to - // transactions that had their LockSpans populated when we pushed them. This - // test doesn't simulate that. + if testNum > 3 { + return nil + } - if testNum > 3 { + <-pausePushAttemptsC + <-resumePushAttemptsC return nil + }) + + p, h, stopper := newTestProcessor(t, withPusher(&tp), withProcType(pt)) + ctx := context.Background() + defer stopper.Stop(ctx) + + // Add a few intents and move the closed timestamp forward. + writeIntentOpFromMeta := func(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp { + return writeIntentOpWithDetails(txn.ID, txn.Key, txn.IsoLevel, txn.MinTimestamp, + txn.WriteTimestamp) } + p.ConsumeLogicalOps(ctx, + writeIntentOpFromMeta(txn1Meta), + writeIntentOpFromMeta(txn2Meta), + writeIntentOpFromMeta(txn2Meta), + writeIntentOpFromMeta(txn3Meta), + ) + p.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 40}) + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get()) - <-pausePushAttemptsC - <-resumePushAttemptsC - return nil - }) + // Wait for the first txn push attempt to complete. + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) - p, h, stopper := newTestProcessor(t, withPusher(&tp), withProcType(legacyProcessor)) - ctx := context.Background() - defer stopper.Stop(ctx) + // The resolved timestamp hasn't moved. + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get()) - // Add a few intents and move the closed timestamp forward. - writeIntentOpFromMeta := func(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp { - return writeIntentOpWithDetails(txn.ID, txn.Key, txn.IsoLevel, txn.MinTimestamp, - txn.WriteTimestamp) - } - p.ConsumeLogicalOps(ctx, - writeIntentOpFromMeta(txn1Meta), - writeIntentOpFromMeta(txn2Meta), - writeIntentOpFromMeta(txn2Meta), - writeIntentOpFromMeta(txn3Meta), - ) - p.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 40}) - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get()) - - // Wait for the first txn push attempt to complete. - pausePushAttemptsC <- struct{}{} - - // The resolved timestamp hasn't moved. - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get()) - - // Write another intent for one of the txns. This moves the resolved - // timestamp forward. - p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txn1MetaT2Pre)) - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 19}, h.rts.Get()) - - // Unblock the second txn push attempt and wait for it to complete. - resumePushAttemptsC <- struct{}{} - pausePushAttemptsC <- struct{}{} - - // The resolved timestamp should have moved forwards to the closed - // timestamp. - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 40}, h.rts.Get()) - - // Forward the closed timestamp. - p.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 80}) - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 49}, h.rts.Get()) - - // Txn1's first intent is committed. Resolved timestamp doesn't change. - p.ConsumeLogicalOps(ctx, commitIntentOp(txn1MetaT2Post.ID, txn1MetaT2Post.WriteTimestamp)) - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 49}, h.rts.Get()) - - // Txn1's second intent is committed. Resolved timestamp moves forward. - p.ConsumeLogicalOps(ctx, commitIntentOp(txn1MetaT2Post.ID, txn1MetaT2Post.WriteTimestamp)) - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 59}, h.rts.Get()) - - // Unblock the third txn push attempt and wait for it to complete. - resumePushAttemptsC <- struct{}{} - pausePushAttemptsC <- struct{}{} - - // The resolved timestamp should have moved forwards to the closed - // timestamp. - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 80}, h.rts.Get()) - - // Forward the closed timestamp. - p.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 100}) - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 89}, h.rts.Get()) - - // Commit txn3's only intent. Resolved timestamp moves forward. - p.ConsumeLogicalOps(ctx, commitIntentOp(txn3MetaT3Post.ID, txn3MetaT3Post.WriteTimestamp)) - h.syncEventC() - require.Equal(t, hlc.Timestamp{WallTime: 100}, h.rts.Get()) - - // Release push attempt to avoid deadlock. - resumePushAttemptsC <- struct{}{} + // Write another intent for one of the txns. This moves the resolved + // timestamp forward. + p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txn1MetaT2Pre)) + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 19}, h.rts.Get()) + + // Unblock the second txn push attempt and wait for it to complete. + resumePushAttemptsC <- struct{}{} + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + + // The resolved timestamp should have moved forwards to the closed + // timestamp. + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 40}, h.rts.Get()) + + // Forward the closed timestamp. + p.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 80}) + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 49}, h.rts.Get()) + + // Txn1's first intent is committed. Resolved timestamp doesn't change. + p.ConsumeLogicalOps(ctx, commitIntentOp(txn1MetaT2Post.ID, txn1MetaT2Post.WriteTimestamp)) + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 49}, h.rts.Get()) + + // Txn1's second intent is committed. Resolved timestamp moves forward. + p.ConsumeLogicalOps(ctx, commitIntentOp(txn1MetaT2Post.ID, txn1MetaT2Post.WriteTimestamp)) + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 59}, h.rts.Get()) + + // Unblock the third txn push attempt and wait for it to complete. + resumePushAttemptsC <- struct{}{} + h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + + // The resolved timestamp should have moved forwards to the closed + // timestamp. + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 80}, h.rts.Get()) + + // Forward the closed timestamp. + p.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: 100}) + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 89}, h.rts.Get()) + + // Commit txn3's only intent. Resolved timestamp moves forward. + p.ConsumeLogicalOps(ctx, commitIntentOp(txn3MetaT3Post.ID, txn3MetaT3Post.WriteTimestamp)) + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 100}, h.rts.Get()) + + // Release push attempt to avoid deadlock. + resumePushAttemptsC <- struct{}{} + }) } // TestProcessorConcurrentStop tests that all methods in Processor's API diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 41d0bfae170e..8de7bf0926a0 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -48,6 +48,9 @@ type ScheduledProcessor struct { // Processor startup runs background tasks to scan intents. If processor is // stopped early, this task needs to be terminated to avoid resource waste. startupCancel func() + // stopper passed by start that is used for firing up async work from scheduler. + stopper *stop.Stopper + txnPushActive bool } // NewScheduledProcessor creates a new scheduler based rangefeed Processor. @@ -82,6 +85,7 @@ func (p *ScheduledProcessor) Start( ) error { ctx := p.Config.AmbientContext.AnnotateCtx(context.Background()) ctx, p.startupCancel = context.WithCancel(ctx) + p.stopper = stopper // Note that callback registration must be performed before starting resolved // timestamp init because resolution posts resolvedTS event when it is done. @@ -150,6 +154,36 @@ func (p *ScheduledProcessor) process(e processorEventType) processorEventType { if e&Stopped != 0 { p.cleanup() } + if e&PushTxn != 0 { + if !p.txnPushActive && p.rts.IsInit() { + now := p.Clock.Now() + before := now.Add(-p.PushTxnsAge.Nanoseconds(), 0) + oldTxns := p.rts.intentQ.Before(before) + + if len(oldTxns) > 0 { + toPush := make([]enginepb.TxnMeta, len(oldTxns)) + for i, txn := range oldTxns { + toPush[i] = txn.asTxnMeta() + } + + // Launch an async transaction push attempt that pushes the + // timestamp of all transactions beneath the push offset. + // Ignore error if quiescing. + pushTxns := newTxnPushAttempt(p.Span, p.TxnPusher, p, toPush, now, func() { + p.enqueueRequest(func(ctx context.Context) { + p.txnPushActive = false + }) + }) + p.txnPushActive = true + // TODO(oleg): we need to cap number of tasks that we can fire up across + // all feeds as they could potentially generate O(n) tasks for push. + err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run) + if err != nil { + pushTxns.Cancel() + } + } + } + } return 0 } @@ -263,7 +297,7 @@ func (p *ScheduledProcessor) Register( // Add the new registration to the registry. p.reg.Register(&r) - // Publish an updated filter that includes the new registration. + // Prep response with filter that includes the new registration. f := p.reg.NewFilter() // Immediately publish a checkpoint event to the registry. This will be the first event @@ -720,3 +754,8 @@ func (p *ScheduledProcessor) newCheckpointEvent() *kvpb.RangeFeedEvent { func (p *ScheduledProcessor) scheduleEvent(e processorEventType) { p.scheduler.Schedule(e) } + +// ID implements Processor interface. +func (p *ScheduledProcessor) ID() int64 { + return p.scheduler.ID() +} diff --git a/pkg/kv/kvserver/rangefeed/scheduler.go b/pkg/kv/kvserver/rangefeed/scheduler.go index 916ea7572d6e..6abb238b585b 100644 --- a/pkg/kv/kvserver/rangefeed/scheduler.go +++ b/pkg/kv/kvserver/rangefeed/scheduler.go @@ -57,6 +57,9 @@ const ( // ReqEvent is scheduled when request function id put into rangefeed request // queue. ReqEvent processorEventType = 1 << 3 + // PushTxn is scheduled externally on ranges to push transaction with intents + // that block resolved timestamp advancing. + PushTxn processorEventType = 1 << 4 // numProcessorEventTypes is total number of event types. numProcessorEventTypes int = iota ) @@ -66,6 +69,7 @@ var eventNames = map[processorEventType]string{ Stopped: "Stopped", QueueData: "Data", ReqEvent: "Request", + PushTxn: "PushTxn", } func (e processorEventType) String() string { diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 45992b984570..038c8bd545b0 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -263,7 +263,7 @@ type txnPushAttempt struct { p processorTaskHelper txns []enginepb.TxnMeta ts hlc.Timestamp - doneC chan struct{} + done func() } func newTxnPushAttempt( @@ -272,7 +272,7 @@ func newTxnPushAttempt( p processorTaskHelper, txns []enginepb.TxnMeta, ts hlc.Timestamp, - doneC chan struct{}, + done func(), ) runnable { return &txnPushAttempt{ span: span, @@ -280,7 +280,7 @@ func newTxnPushAttempt( p: p, txns: txns, ts: ts, - doneC: doneC, + done: done, } } @@ -377,7 +377,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { } func (a *txnPushAttempt) Cancel() { - close(a.doneC) + a.done() } // intentsInBound returns LockUpdates for the provided transaction's LockSpans diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 07c7230e3b75..c74f790d6ade 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -502,7 +502,9 @@ func TestTxnPushAttempt(t *testing.T) { txns := []enginepb.TxnMeta{txn1Meta, txn2Meta, txn3Meta, txn4Meta} doneC := make(chan struct{}) pushAttempt := newTxnPushAttempt(p.Span, p.TxnPusher, &p, txns, hlc.Timestamp{WallTime: 15}, - doneC) + func() { + close(doneC) + }) pushAttempt.Run(context.Background()) <-doneC // check if closed diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index afc3f1c156c4..0f671b05946a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -301,7 +301,7 @@ func (r *Replica) setRangefeedProcessor(p rangefeed.Processor) { r.rangefeedMu.Lock() defer r.rangefeedMu.Unlock() r.rangefeedMu.proc = p - r.store.addReplicaWithRangefeed(r.RangeID) + r.store.addReplicaWithRangefeed(r.RangeID, p.ID()) } func (r *Replica) unsetRangefeedProcessorLocked(p rangefeed.Processor) { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 8a73600a6bbf..40620f0d7865 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1016,7 +1016,10 @@ type Store struct { // The subset of replicas with active rangefeeds. rangefeedReplicas struct { syncutil.Mutex - m map[roachpb.RangeID]struct{} + // m contains mapping from rangeID that could be used to retrieve replicas + // with associated rangefeed processor scheduler IDs that allow enqueueing + // periodic events directly. + m map[roachpb.RangeID]int64 } rangefeedScheduler *rangefeed.Scheduler @@ -1449,7 +1452,7 @@ func NewStore( s.unquiescedReplicas.Unlock() s.rangefeedReplicas.Lock() - s.rangefeedReplicas.m = map[roachpb.RangeID]struct{}{} + s.rangefeedReplicas.m = map[roachpb.RangeID]int64{} s.rangefeedReplicas.Unlock() s.tsCache = tscache.New(cfg.Clock) @@ -2190,6 +2193,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Connect rangefeeds to closed timestamp updates. s.startRangefeedUpdater(ctx) + s.startRangefeedTxnPushNotifier(ctx) + if s.replicateQueue != nil { s.storeRebalancer = NewStoreRebalancer( s.cfg.AmbientCtx, s.cfg.Settings, s.replicateQueue, s.replRankings, s.rebalanceObjManager) @@ -2367,9 +2372,49 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) { }) } -func (s *Store) addReplicaWithRangefeed(rangeID roachpb.RangeID) { +// startRangefeedTxnPushNotifier starts a worker that would periodically +// enqueue txn push event for rangefeed processors to let them push lagging +// transactions. +// Note that this is only affecting scheduler based rangefeeds. +func (s *Store) startRangefeedTxnPushNotifier(ctx context.Context) { + _ /* err */ = s.stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{ + TaskName: "transaction-rangefeed-push-notifier", + SpanOpt: stop.SterileRootSpan, + }, func(ctx context.Context) { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() + + var schedulerIDs []int64 + updateSchedulerIDs := func() []int64 { + schedulerIDs = schedulerIDs[:0] + s.rangefeedReplicas.Lock() + for _, id := range s.rangefeedReplicas.m { + if id != 0 { + // Only process ranges that use scheduler. + schedulerIDs = append(schedulerIDs, id) + } + } + s.rangefeedReplicas.Unlock() + return schedulerIDs + } + + ticker := time.NewTicker(rangefeed.DefaultPushTxnsInterval) + for { + select { + case <-ticker.C: + activeIDs := updateSchedulerIDs() + s.rangefeedScheduler.EnqueueAll(activeIDs, rangefeed.PushTxn) + case <-ctx.Done(): + ticker.Stop() + return + } + } + }) +} + +func (s *Store) addReplicaWithRangefeed(rangeID roachpb.RangeID, schedulerID int64) { s.rangefeedReplicas.Lock() - s.rangefeedReplicas.m[rangeID] = struct{}{} + s.rangefeedReplicas.m[rangeID] = schedulerID s.rangefeedReplicas.Unlock() }