Skip to content

Commit

Permalink
rangefeed: add transaction pusher to scheduler rangefeed
Browse files Browse the repository at this point in the history
This commit adds slow transaction pusher to scheduler based rangefeed.
Store will trigger bulk enqueueing of relevant push events to
eligible processors.

Epic: none

Release note: None
  • Loading branch information
aliher1911 committed Aug 4, 2023
1 parent d3d0566 commit ef9a442
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 192 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Expand Up @@ -58,7 +58,11 @@ type feedProcessorType struct {
}

func (t feedProcessorType) String() string {
return fmt.Sprintf("mux=%t_scheduler=%t", t.feed == rangefeed.MuxRangefeedEnabled, t.useScheduler)
if t.feed == rangefeed.MuxRangefeedEnabled {
return fmt.Sprintf("mux/scheduler=%t", t.useScheduler)
} else {
return fmt.Sprintf("single/scheduler=%t", t.useScheduler)
}
}

var procTypes = []feedProcessorType{
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/rangefeed/legacy_processor.go
Expand Up @@ -220,7 +220,9 @@ func (p *LegacyProcessor) run(
// Launch an async transaction push attempt that pushes the
// timestamp of all transactions beneath the push offset.
// Ignore error if quiescing.
pushTxns := newTxnPushAttempt(p, toPush, now, txnPushAttemptC)
pushTxns := newTxnPushAttempt(p, toPush, now, func() {
close(txnPushAttemptC)
})
err := stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run)
if err != nil {
pushTxns.Cancel()
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor.go
Expand Up @@ -26,10 +26,10 @@ import (
)

const (
// defaultPushTxnsInterval is the default interval at which a Processor will
// DefaultPushTxnsInterval is the default interval at which a Processor will
// push all transactions in the unresolvedIntentQueue that are above the age
// specified by PushTxnsAge.
defaultPushTxnsInterval = 250 * time.Millisecond
DefaultPushTxnsInterval = 250 * time.Millisecond
// defaultPushTxnsAge is the default age at which a Processor will begin to
// consider a transaction old enough to push.
defaultPushTxnsAge = 10 * time.Second
Expand Down Expand Up @@ -92,7 +92,7 @@ func (sc *Config) SetDefaults() {
}
} else {
if sc.PushTxnsInterval == 0 {
sc.PushTxnsInterval = defaultPushTxnsInterval
sc.PushTxnsInterval = DefaultPushTxnsInterval
}
if sc.PushTxnsAge == 0 {
sc.PushTxnsAge = defaultPushTxnsAge
Expand Down

0 comments on commit ef9a442

Please sign in to comment.