Skip to content

Commit

Permalink
rangefeed: add transaction pusher to scheduler rangefeed
Browse files Browse the repository at this point in the history
This commit adds slow transaction pusher to scheduler based rangefeed.
Store will trigger bulk enqueueing of relevant push events to
eligible processors.

Epic: none

Release note: None
  • Loading branch information
aliher1911 committed Sep 7, 2023
1 parent ac3a0df commit 161f2df
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 184 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Expand Up @@ -62,7 +62,11 @@ type feedProcessorType struct {
}

func (t feedProcessorType) String() string {
return fmt.Sprintf("mux=%t_scheduler=%t", t.useMuxRangefeed, t.useScheduler)
if t.useMuxRangefeed {
return fmt.Sprintf("mux/scheduler=%t", t.useScheduler)
} else {
return fmt.Sprintf("single/scheduler=%t", t.useScheduler)
}
}

var procTypes = []feedProcessorType{
Expand Down
27 changes: 21 additions & 6 deletions pkg/kv/kvserver/rangefeed/processor.go
Expand Up @@ -27,10 +27,10 @@ import (
)

const (
// defaultPushTxnsInterval is the default interval at which a Processor will
// DefaultPushTxnsInterval is the default interval at which a Processor will
// push all transactions in the unresolvedIntentQueue that are above the age
// specified by PushTxnsAge.
defaultPushTxnsInterval = 250 * time.Millisecond
DefaultPushTxnsInterval = 250 * time.Millisecond
// defaultPushTxnsAge is the default age at which a Processor will begin to
// consider a transaction old enough to push.
defaultPushTxnsAge = 10 * time.Second
Expand All @@ -57,6 +57,9 @@ type Config struct {
// PushTxnsInterval specifies the interval at which a Processor will push
// all transactions in the unresolvedIntentQueue that are above the age
// specified by PushTxnsAge.
//
// This option only applies to LegacyProcessor since ScheduledProcessor is
// relying on store to push events to scheduler to initiate transaction push.
PushTxnsInterval time.Duration
// PushTxnsAge specifies the age at which a Processor will begin to consider
// a transaction old enough to push.
Expand Down Expand Up @@ -93,7 +96,7 @@ func (sc *Config) SetDefaults() {
}
} else {
if sc.PushTxnsInterval == 0 {
sc.PushTxnsInterval = defaultPushTxnsInterval
sc.PushTxnsInterval = DefaultPushTxnsInterval
}
if sc.PushTxnsAge == 0 {
sc.PushTxnsAge = defaultPushTxnsAge
Expand Down Expand Up @@ -196,6 +199,13 @@ type Processor interface {
// EventChanTimeout configuration. If the method returns false, the processor
// will have been stopped, so calling Stop is not necessary.
ForwardClosedTS(ctx context.Context, closedTS hlc.Timestamp) bool

// External notification integration.

// ID returns scheduler ID of the processor that can be used to notify it
// to do some type of work. If ID is 0 then processor doesn't support
// external event scheduling.
ID() int64
}

// NewProcessor creates a new rangefeed Processor. The corresponding processing
Expand All @@ -204,8 +214,6 @@ func NewProcessor(cfg Config) Processor {
cfg.SetDefaults()
cfg.AmbientContext.AddLogTag("rangefeed", nil)
if cfg.Scheduler != nil {
// TODO(oleg): remove logging
log.Infof(context.Background(), "creating new style processor for range feed on r%d", cfg.RangeID)
return NewScheduledProcessor(cfg)
}
return NewLegacyProcessor(cfg)
Expand Down Expand Up @@ -479,7 +487,9 @@ func (p *LegacyProcessor) run(
// Launch an async transaction push attempt that pushes the
// timestamp of all transactions beneath the push offset.
// Ignore error if quiescing.
pushTxns := newTxnPushAttempt(p.Span, p.TxnPusher, p, toPush, now, txnPushAttemptC)
pushTxns := newTxnPushAttempt(p.Span, p.TxnPusher, p, toPush, now, func() {
close(txnPushAttemptC)
})
err := stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run)
if err != nil {
pushTxns.Cancel()
Expand Down Expand Up @@ -922,6 +932,11 @@ func (p *LegacyProcessor) newCheckpointEvent() *kvpb.RangeFeedEvent {
return &event
}

// ID implements Processor interface.
func (p *LegacyProcessor) ID() int64 {
return 0
}

// calculateDateEventSize returns estimated size of the event that contain actual
// data. We only account for logical ops and sst's. Those events come from raft
// and are budgeted. Other events come from processor jobs and update timestamps
Expand Down

0 comments on commit 161f2df

Please sign in to comment.