Skip to content

Commit

Permalink
kvcoord: add DistSender circuit breakers
Browse files Browse the repository at this point in the history
This patch adds an initial implementation of DistSender replica circuit
breakers. Their primary purpose is to prevent the DistSender getting
stuck on non-functional replicas. In particular, the DistSender relies
on receiving a NLHE from the replica to update its range cache and try
other replicas, otherwise it will keep sending requests to the same
broken replica which will continue to get stuck, giving the appearance
of an unavailable range. This can happen if:

- The replica stalls, e.g. with a disk stall or mutex deadlock.

- Clients time out before the replica lease acquisition attempt times out,
  e.g. if the replica is partitioned away from the leader.

If a replica has returned only errors in the past few seconds, or hasn't
returned any responses at all, the circuit breaker will probe the
replica by sending a `LeaseInfo` request. This must either return
success or a NLHE pointing to a leaseholder.  Otherwise, the circuit
breaker trips, and the DistSender will skip it for future requests,
optionally also cancelling in-flight requests.

Currently, only replica-level circuit breakers are implemented. If a
range is unavailable, the DistSender will continue to retry replicas as
today. Range-level circuit breakers can be added later if needed, but
are considered out of scope here.

The circuit breakers are disabled by default for now. Some follow-up
work is likely needed before they can be enabled by default:

* Consider always enabling cancellation of in-flight requests. This
  has a significant performance cost, and may need optimization.

* Improve probe scalability. Currently, a goroutine is spawned per
  replica probe, which is likely too expensive at large scales.
  We should consider batching probes to nodes/stores, and using
  a bounded worker pool.

* Consider follower read handling, e.g. by tracking the replica's
  closed timestamp and allowing requests that may still be served
  by it even if it's partitioned away from the leaseholder.

* Improve observability, with metrics, tracing, and logging.

* Comprehensive testing and benchmarking.

This will be addressed separately.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Feb 21, 2024
1 parent 31acb7a commit c749999
Show file tree
Hide file tree
Showing 4 changed files with 1,109 additions and 11 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_circuit_breaker.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"dist_sender_rangefeed_canceler.go",
Expand Down Expand Up @@ -66,6 +67,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/ctxgroup",
"//pkg/util/envutil",
"//pkg/util/errorutil/unimplemented",
Expand Down Expand Up @@ -122,6 +124,7 @@ go_test(
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_ambiguous_test.go",
"dist_sender_circuit_breaker_test.go",
"dist_sender_rangefeed_canceler_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
Expand Down
45 changes: 34 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ type DistSender struct {
transportFactory TransportFactory
rpcRetryOptions retry.Options
asyncSenderSem *quotapool.IntPool
circuitBreakers *DistSenderCircuitBreakers

// batchInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
Expand Down Expand Up @@ -726,6 +727,13 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
})
}

// Set up circuit breakers and spawn the manager goroutine, which runs until
// the stopper stops. This can only error if the server is shutting down, so
// ignore the returned error.
ds.circuitBreakers = NewDistSenderCircuitBreakers(
ds.stopper, ds.st, ds.transportFactory, ds.metrics)
_ = ds.circuitBreakers.Start()

if cfg.TestingKnobs.LatencyFunc != nil {
ds.latencyFunc = cfg.TestingKnobs.LatencyFunc
}
Expand Down Expand Up @@ -2454,23 +2462,38 @@ func (ds *DistSender) sendToReplicas(
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

tBegin := timeutil.Now() // for slow log message
br, err = transport.SendNext(ctx, ba)
if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
if cbErr != nil {
// Circuit breaker is tripped. err will be handled below.
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, ba)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())

if dur := tEnd.Sub(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
}
}
}

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

if err != nil {
if cbErr != nil {
log.VErrEventf(ctx, 2, "circuit breaker error: %s", cbErr)
// We know the request did not start, so the error is not ambiguous.

} else if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)

if grpcutil.IsAuthError(err) {
Expand Down

0 comments on commit c749999

Please sign in to comment.