Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: add DistSender circuit breakers #118943

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ kv.closed_timestamp.follower_reads.enabled boolean true allow (all) replicas to
kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps system-visible
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport system-visible
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration system-visible
kv.dist_sender.circuit_breaker.cancellation.enabled boolean true when enabled, in-flight requests will be cancelled when the circuit breaker trips application
kv.dist_sender.circuit_breaker.enabled boolean true enable circuit breakers for failing or stalled replicas application
kv.dist_sender.circuit_breaker.probe.interval duration 3s interval between replica probes application
kv.dist_sender.circuit_breaker.probe.threshold duration 3s duration of errors or stalls after which a replica will be probed application
kv.dist_sender.circuit_breaker.probe.timeout duration 3s timeout for replica probes application
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records system-visible
kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited application
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval system-visible
Expand Down
5 changes: 5 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-override" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_override</code></div></td><td>duration</td><td><code>0s</code></td><td>if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-cancellation-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.cancellation.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when enabled, in-flight requests will be cancelled when the circuit breaker trips</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable circuit breakers for failing or stalled replicas</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-interval" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.interval</code></div></td><td>duration</td><td><code>3s</code></td><td>interval between replica probes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-threshold" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.threshold</code></div></td><td>duration</td><td><code>3s</code></td><td>duration of errors or stalls after which a replica will be probed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-timeout" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.timeout</code></div></td><td>duration</td><td><code>3s</code></td><td>timeout for replica probes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-global-budget" class="anchored"><code>kv.lease_transfer_read_summary.global_budget</code></div></td><td>byte size</td><td><code>0 B</code></td><td>controls the maximum number of bytes that will be used to summarize the global segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-local-budget" class="anchored"><code>kv.lease_transfer_read_summary.local_budget</code></div></td><td>byte size</td><td><code>4.0 MiB</code></td><td>controls the maximum number of bytes that will be used to summarize the local segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td><td>Dedicated/Self-Hosted</td></tr>
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_circuit_breaker.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"dist_sender_rangefeed_canceler.go",
Expand Down Expand Up @@ -66,6 +67,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/ctxgroup",
"//pkg/util/envutil",
"//pkg/util/errorutil/unimplemented",
Expand Down Expand Up @@ -122,6 +124,7 @@ go_test(
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_ambiguous_test.go",
"dist_sender_circuit_breaker_test.go",
"dist_sender_rangefeed_canceler_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
Expand Down
45 changes: 34 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ type DistSender struct {
transportFactory TransportFactory
rpcRetryOptions retry.Options
asyncSenderSem *quotapool.IntPool
circuitBreakers *DistSenderCircuitBreakers

// batchInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
Expand Down Expand Up @@ -726,6 +727,13 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
})
}

// Set up circuit breakers and spawn the manager goroutine, which runs until
// the stopper stops. This can only error if the server is shutting down, so
// ignore the returned error.
ds.circuitBreakers = NewDistSenderCircuitBreakers(
ds.stopper, ds.st, ds.transportFactory, ds.metrics)
_ = ds.circuitBreakers.Start()

if cfg.TestingKnobs.LatencyFunc != nil {
ds.latencyFunc = cfg.TestingKnobs.LatencyFunc
}
Expand Down Expand Up @@ -2454,23 +2462,38 @@ func (ds *DistSender) sendToReplicas(
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

tBegin := timeutil.Now() // for slow log message
br, err = transport.SendNext(ctx, ba)
if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
if cbErr != nil {
// Circuit breaker is tripped. err will be handled below.
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, ba)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())

if dur := tEnd.Sub(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
}
}
}

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

if err != nil {
if cbErr != nil {
log.VErrEventf(ctx, 2, "circuit breaker error: %s", cbErr)
// We know the request did not start, so the error is not ambiguous.

} else if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)

if grpcutil.IsAuthError(err) {
Expand Down