Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

requestbatcher,intentresolver: default to no inflight backpressure #109899

Merged
merged 1 commit into from Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 23 additions & 6 deletions pkg/internal/client/requestbatcher/batcher.go
Expand Up @@ -178,7 +178,7 @@ type Config struct {
// batches which are queued to send but not yet in flight will still send.
// Note that values less than or equal to zero will result in the use of
// DefaultInFlightBackpressureLimit.
InFlightBackpressureLimit int
InFlightBackpressureLimit func() int

// NowFunc is used to determine the current time. It defaults to timeutil.Now.
NowFunc func() time.Time
Expand Down Expand Up @@ -252,14 +252,24 @@ func validateConfig(cfg *Config) {
} else if cfg.Sender == nil {
panic("cannot construct a Batcher with a nil Sender")
}
if cfg.InFlightBackpressureLimit <= 0 {
cfg.InFlightBackpressureLimit = DefaultInFlightBackpressureLimit
if cfg.InFlightBackpressureLimit == nil {
cfg.InFlightBackpressureLimit = func() int {
return DefaultInFlightBackpressureLimit
}
}
if cfg.NowFunc == nil {
cfg.NowFunc = timeutil.Now
}
}

func normalizedInFlightBackPressureLimit(cfg *Config) int {
limit := cfg.InFlightBackpressureLimit()
if limit <= 0 {
limit = DefaultInFlightBackpressureLimit
}
return limit
}

// SendWithChan sends a request with a client provided response channel. The
// client is responsible for ensuring that the passed respChan has a buffer at
// least as large as the number of responses it expects to receive. Using an
Expand Down Expand Up @@ -457,9 +467,12 @@ func (b *RequestBatcher) run(ctx context.Context) {
// inBackPressure indicates whether the reqChan is enabled.
// It becomes true when inFlight exceeds b.cfg.InFlightBackpressureLimit.
inBackPressure = false
// curInFlightBackpressureLimit is the current limit on in flight
// requests.
curInFlightBackpressureLimit = normalizedInFlightBackPressureLimit(&b.cfg)
// recoveryThreshold is the number of in flight requests below which the
// the inBackPressure state should exit.
recoveryThreshold = backpressureRecoveryThreshold(b.cfg.InFlightBackpressureLimit)
// inBackPressure state should exit.
recoveryThreshold = backpressureRecoveryThreshold(curInFlightBackpressureLimit)
// reqChan consults inBackPressure to determine whether the goroutine is
// accepting new requests.
reqChan = func() <-chan *request {
Expand All @@ -470,7 +483,11 @@ func (b *RequestBatcher) run(ctx context.Context) {
}
sendBatch = func(ba *batch) {
inFlight++
if inFlight >= b.cfg.InFlightBackpressureLimit {
// Sample the backpressure limit.
curInFlightBackpressureLimit = normalizedInFlightBackPressureLimit(&b.cfg)
recoveryThreshold = backpressureRecoveryThreshold(curInFlightBackpressureLimit)

if inFlight >= curInFlightBackpressureLimit {
inBackPressure = true
}
b.sendBatch(sendCtx, ba)
Expand Down
44 changes: 40 additions & 4 deletions pkg/internal/client/requestbatcher/batcher_test.go
Expand Up @@ -160,13 +160,14 @@ func TestBackpressure(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
backpressureLimit := 3
b := New(Config{
MaxIdle: 50 * time.Millisecond,
MaxWait: 50 * time.Millisecond,
MaxMsgsPerBatch: 1,
Sender: sc,
Stopper: stopper,
InFlightBackpressureLimit: 3,
InFlightBackpressureLimit: func() int { return backpressureLimit },
})

// These 3 should all send without blocking but should put the batcher into
Expand All @@ -190,10 +191,12 @@ func TestBackpressure(t *testing.T) {
for i := 0; i < 3; i++ {
bs := <-sc
go reply(bs)
// We don't expect either of the calls to send to have finished yet.
// We don't expect any of the calls to send to have finished yet.
assert.Equal(t, int64(0), atomic.LoadInt64(&sent))
}
// Allow one reply to fly which should not unblock the requests.
// Allow one reply to fly which should not unblock the requests since the
// threshold to stop backpressuring is < 2, and there are still 2 in-flight
// requests.
canReply <- struct{}{}
runtime.Gosched() // tickle the runtime in case there might be a timing bug
assert.Equal(t, int64(0), atomic.LoadInt64(&sent))
Expand All @@ -209,9 +212,42 @@ func TestBackpressure(t *testing.T) {
}
return nil
})
go reply(<-sc)
go reply(<-sc)
// Now we have 3 outstanding reply() calls that we need to unblock.
canReply <- struct{}{}
canReply <- struct{}{}
canReply <- struct{}{}
// Now consume all the responses on sendChan.
for i := 0; i < 5; i++ {
<-sendChan
}

// Lower backpressureLimit to 1.
backpressureLimit = 1
atomic.StoreInt64(&sent, 0)
send()
// This should block.
go send()
// Try to reply to first, but reply will not happen yet.
go reply(<-sc)
runtime.Gosched() // tickle the runtime in case there might be a timing bug
assert.Equal(t, int64(1), atomic.LoadInt64(&sent))
// Allow one reply, which will unblock the request.
canReply <- struct{}{}
runtime.Gosched() // tickle the runtime in case there might be a timing bug
testutils.SucceedsSoon(t, func() error {
if numSent := atomic.LoadInt64(&sent); numSent != 2 {
return fmt.Errorf("expected %d to have been sent, so far %d", 2, numSent)
}
return nil
})
// Allow second reply too.
close(canReply)
reply(<-sc)
reply(<-sc)
// Now consume all the responses on sendChan.
<-sendChan
<-sendChan
}

func TestBatcherSend(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/intentresolver/BUILD.bazel
Expand Up @@ -18,6 +18,8 @@ go_library(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/txnwait",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/storage/enginepb",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
70 changes: 59 additions & 11 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Expand Up @@ -13,6 +13,7 @@ package intentresolver
import (
"bytes"
"context"
"math"
"sort"
"time"

Expand All @@ -25,6 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -121,6 +124,7 @@ type Config struct {
Clock *hlc.Clock
DB *kv.DB
Stopper *stop.Stopper
Settings *cluster.Settings
AmbientCtx log.AmbientContext
TestingKnobs kvserverbase.IntentResolverTestingKnobs
RangeDescriptorCache RangeCache
Expand All @@ -147,6 +151,7 @@ type IntentResolver struct {
db *kv.DB
stopper *stop.Stopper
testingKnobs kvserverbase.IntentResolverTestingKnobs
settings *cluster.Settings
ambientCtx log.AmbientContext
sem *quotapool.IntPool // semaphore to limit async goroutines

Expand Down Expand Up @@ -213,6 +218,7 @@ func New(c Config) *IntentResolver {
Metrics: makeMetrics(),
rdc: c.RangeDescriptorCache,
testingKnobs: c.TestingKnobs,
settings: c.Settings,
}
c.Stopper.AddCloser(ir.sem.Closer("stopper"))
ir.mu.inFlightPushes = map[uuid.UUID]int{}
Expand All @@ -221,22 +227,24 @@ func New(c Config) *IntentResolver {
if c.TestingKnobs.MaxIntentResolutionSendBatchTimeout != 0 {
intentResolutionSendBatchTimeout = c.TestingKnobs.MaxIntentResolutionSendBatchTimeout
}
inFlightBackpressureLimit := requestbatcher.DefaultInFlightBackpressureLimit
inFlightGCBackpressureLimit := requestbatcher.DefaultInFlightBackpressureLimit
if c.TestingKnobs.InFlightBackpressureLimit != 0 {
inFlightBackpressureLimit = c.TestingKnobs.InFlightBackpressureLimit
inFlightGCBackpressureLimit = c.TestingKnobs.InFlightBackpressureLimit
}
gcBatchSize := gcBatchSize
if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 {
gcBatchSize = c.TestingKnobs.MaxGCBatchSize
}
ir.gcBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_gc_batcher",
MaxMsgsPerBatch: gcBatchSize,
MaxWait: c.MaxGCBatchWait,
MaxIdle: c.MaxGCBatchIdle,
MaxTimeout: intentResolutionSendBatchTimeout,
InFlightBackpressureLimit: inFlightBackpressureLimit,
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_gc_batcher",
MaxMsgsPerBatch: gcBatchSize,
MaxWait: c.MaxGCBatchWait,
MaxIdle: c.MaxGCBatchIdle,
MaxTimeout: intentResolutionSendBatchTimeout,
// NB: async GC work is not limited by ir.sem, so we do need an in-flight
// backpressure limit.
InFlightBackpressureLimit: func() int { return inFlightGCBackpressureLimit },
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
Expand All @@ -246,6 +254,10 @@ func New(c Config) *IntentResolver {
intentResolutionBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
}
inFlightLimit := inFlightLimitProvider{
settings: c.Settings,
testingInFlightBackpressureLimit: c.TestingKnobs.InFlightBackpressureLimit,
}
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_batcher",
Expand All @@ -254,7 +266,7 @@ func New(c Config) *IntentResolver {
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
MaxTimeout: intentResolutionSendBatchTimeout,
InFlightBackpressureLimit: inFlightBackpressureLimit,
InFlightBackpressureLimit: inFlightLimit.limit,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
Expand All @@ -267,7 +279,7 @@ func New(c Config) *IntentResolver {
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
MaxTimeout: intentResolutionSendBatchTimeout,
InFlightBackpressureLimit: inFlightBackpressureLimit,
InFlightBackpressureLimit: inFlightLimit.limit,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
Expand Down Expand Up @@ -1092,3 +1104,39 @@ func (s intentsByTxn) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s intentsByTxn) Less(i, j int) bool {
return bytes.Compare(s[i].Txn.ID[:], s[j].Txn.ID[:]) < 0
}

// inFlightBackpressureLimitEnabled controls whether the intent resolving
// requestbatcher.RequestBatchers created by the IntentResolver use an
// in-flight backpressure limit of DefaultInFlightBackpressureLimit. The
// default is false, i.e., there is no limit on in-flight requests. A limit on
// in-flight requests is considered superfluous since we have two limits on
// the number of active goroutines waiting to get their intent resolution
// requests processed: the async goroutines, limited to 1000
// (defaultTaskLimit), and the workload goroutines. Each waiter produces work
// for a single range, and that work can typically be batched into a single
// RPC (since requestbatcher.Config.MaxMsgsPerBatch is quite generous). In
// the rare case where a single waiter produces numerous concurrent RPCs,
// because of a large number of kvpb.Requests (note that wide
// ResolveIntentRange cause pagination, and not concurrent RPCs), we have
// already paid the memory cost of buffering these numerous kvpb.Requests, so
// we may as well send them out.
var inFlightBackpressureLimitEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.intent_resolver.batcher.in_flight_backpressure_limit.enabled",
"set to true to enable the use of DefaultInFlightBackpressureLimit",
false)

type inFlightLimitProvider struct {
settings *cluster.Settings
testingInFlightBackpressureLimit int
}

func (p inFlightLimitProvider) limit() int {
if p.testingInFlightBackpressureLimit != 0 {
return p.testingInFlightBackpressureLimit
}
if p.settings == nil || inFlightBackpressureLimitEnabled.Get(&p.settings.SV) {
return requestbatcher.DefaultInFlightBackpressureLimit
}
return math.MaxInt32
}
Expand Up @@ -363,6 +363,14 @@ func forceScanOnAllReplicationQueues(tc *testcluster.TestCluster) (err error) {
// the intent for t1 and intent resolution is clogged up on the store
// containing t1, unless the intent resolution for the "unavailable" t2 times
// out.
//
// TODO(sumeer): this test clogs up batched intent resolution via an inflight
// backpressure limit, which by default in no longer limited. But an inflight
// backpressure limit does exist for GC of txn records. This test should
// continue to exist until we have production experience with no inflight
// backpressure for intent resolution. And after that we should create an
// equivalent test for inflight backpressure for GC of txn records and remove
// this test.
func TestIntentResolutionUnavailableRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/intentresolver/intent_resolver_test.go
Expand Up @@ -762,6 +762,14 @@ func TestCleanupIntents(t *testing.T) {
// TestIntentResolutionTimeout tests that running intent resolution with an
// unavailable range eventually times out and finishes, and does not block
// intent resolution on another available range.
//
// TODO(sumeer): this test clogs up batched intent resolution via an inflight
// backpressure limit, which by default in no longer limited. But an inflight
// backpressure limit does exist for GC of txn records. This test should
// continue to exist until we have production experience with no inflight
// backpressure for intent resolution. And after that we should create an
// equivalent test for inflight backpressure for GC of txn records and remove
// this test.
func TestIntentResolutionTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/replica_read.go
Expand Up @@ -186,11 +186,15 @@ func (r *Replica) executeReadOnlyBatch(
g = nil
}

// Semi-synchronously process any intents that need resolving here in
// order to apply back pressure on the client which generated them. The
// resolution is semi-synchronous in that there is a limited number of
// outstanding asynchronous resolution tasks allowed after which
// further calls will block.
// Semi-synchronously process any intents that need resolving here in order
// to apply back pressure on the client which generated them. The resolution
// is semi-synchronous in that there is a limited number of outstanding
// asynchronous resolution tasks allowed after which further calls will
// block. The limited number of asynchronous resolution tasks ensures that
// the number of goroutines doing intent resolution does not diverge from
// the number of workload goroutines (see
// https://github.com/cockroachdb/cockroach/issues/4925#issuecomment-193015586
// for an old problem predating such a limit).
if len(intents) > 0 {
log.Eventf(ctx, "submitting %d intents to asynchronous processing", len(intents))
// We only allow synchronous intent resolution for consistent requests.
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/replica_write.go
Expand Up @@ -207,8 +207,12 @@ func (r *Replica) executeWriteBatch(
// Semi-synchronously process any intents that need resolving here in
// order to apply back pressure on the client which generated them. The
// resolution is semi-synchronous in that there is a limited number of
// outstanding asynchronous resolution tasks allowed after which
// further calls will block.
// outstanding asynchronous resolution tasks allowed after which further
// calls will block. The limited number of asynchronous resolution tasks
// ensures that the number of goroutines doing intent resolution does
// not diverge from the number of workload goroutines (see
// https://github.com/cockroachdb/cockroach/issues/4925#issuecomment-193015586
// for an old problem predating such a limit).
if len(propResult.EndTxns) > 0 {
if err := r.store.intentResolver.CleanupTxnIntentsAsync(
ctx, r.RangeID, propResult.EndTxns, true, /* allowSync */
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store.go
Expand Up @@ -2000,6 +2000,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
Clock: s.cfg.Clock,
DB: s.db,
Stopper: stopper,
Settings: s.cfg.Settings,
TaskLimit: s.cfg.IntentResolverTaskLimit,
AmbientCtx: s.cfg.AmbientCtx,
TestingKnobs: s.cfg.TestingKnobs.IntentResolverKnobs,
Expand Down