Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: separate out proxy sending #124066

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 104 additions & 128 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,12 @@ func (ds *DistSender) Send(
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender send")
defer sp.Finish()

// Send proxy requests directly through the transport. Any errors are
// encoded in the response.
if ba.ProxyRangeInfo != nil {
return ds.sendProxyRequest(ctx, ba), nil
}

splitET := false
var require1PC bool
lastReq := ba.Requests[len(ba.Requests)-1].GetInner()
Expand Down Expand Up @@ -1298,6 +1304,102 @@ func (ds *DistSender) Send(
return reply, nil
}

// sendProxyRequest will send a proxy request directly through its transport and
// return the response without any local retries. A proxy request is a request
// which has a non-nil ProxyRangeInfo header. Errors on proxy requests are
// retried by the initial client, not by the proxy node. Proxy requests are used
// to route around network partitions when the original client can not talk to
// its desired final recipient.
//
// The request carries range information in the ProxyRangeInfo header, which may
// be newer, older, or the same as that in the proxy node's range cache. If the
// request carries newer information than what is in the cache, the proxy node's
// cache is updated. However, even if the request carries stale information
// relative to the cache on the proxy node, the proxy node's cache is never used
// to inform routing decisions. Instead the stale routing information from the
// request is used to create the Transport. If the response has an error with
// updated routing information, it is passed back to the initial client which
// will updates its cache and retry.
//
// NB: The two decisions of not retrying on the proxy node and using the clients
// routing information for sending are linked. A normal, non-proxy, request may
// be split into multiple parts based on routing information, and each
// independent request is retried independently until it reaches a terminal
// state. Importantly, each sub-request can not terminate with a SendError.
// However, a proxy request will not retry and therefore may result in a
// SendError. DistSender can not return from Send with a SendError and
// DistSender can not combine a SendError with any other responses. Therefore
// if a proxy request is split due to stale routing information differences, the
// responses can not be merged together on the proxy node.
func (ds *DistSender) sendProxyRequest(
ctx context.Context, ba *kvpb.BatchRequest,
) *kvpb.BatchResponse {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo.Lease)
ds.metrics.ProxyForwardSentCount.Inc(1)
// If the proxy node has a more recent range cache than the client, update
// our cache.
ds.rangeCache.Insert(ctx, *ba.ProxyRangeInfo)

// Use the same connection class as we normally use for this request.
opts := SendOptions{
class: rpc.ConnectionClassForKey(ba.ProxyRangeInfo.Desc.RSpan().Key, ba.ConnectionClass),
metrics: &ds.metrics,
}

// We construct a replica slice containing only the leaseholder, since we're
// only going to contact this replica and try once.
transport := ds.transportFactory(opts, ReplicaSlice{{ReplicaDescriptor: ba.ProxyRangeInfo.Lease.Replica}})

requestToSend := ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
requestToSend.Replica = transport.NextReplica()

// Determine whether this part of the BatchRequest contains a committing
// EndTxn request.
var withCommit bool
if etArg, ok := ba.GetArg(kvpb.EndTxn); ok {
withCommit = etArg.(*kvpb.EndTxnRequest).Commit
}

// Verify the circuit breaker is not tripped and use a context which will
// cancel if the lease moves and the request has not responded.
// NB: The replicaCircuitBreakerToken is ignored since a proxy request is
// never evaluated locally.
sendCtx, cbToken, cbErr := ds.circuitBreakers.
ForReplica(&ba.ProxyRangeInfo.Desc, &ba.ProxyRangeInfo.Lease.Replica).
Track(ctx, ba, withCommit, timeutil.Now().UnixNano())
if cbErr != nil {
ds.metrics.ProxyForwardErrCount.Inc(1)
// Circuit breaker is tripped. Return immediately.
log.VEventf(ctx, 2, "failing proxy request for tripped circuit breaker %s", cbErr)
reply := &kvpb.BatchResponse{}
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(cbErr))
return reply
}

br, err := transport.SendNext(sendCtx, requestToSend)
log.VEventf(ctx, 3, "proxy result is %v %v", br, err)

// If the request failed because the circuit breaker tripped, change our
// error to the circuit breaker's error.
if cancelErr := cbToken.Done(br, err, timeutil.Now().UnixNano()); cancelErr != nil {
log.VEventf(ctx, 2, "failing proxy request after cb cancellation %s", err)
br, err = nil, cancelErr
}

if err != nil {
// Convert the error from an error on the transport to a
// ProxyFailedError. This can then be sent over the wire and decoded by
// the client.
ds.metrics.ProxyForwardErrCount.Inc(1)
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply := &kvpb.BatchResponse{}
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
return reply
}
return br
}

// incrementBatchCounters increments the appropriate counters to track the
// batch and its composite request methods.
func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) {
Expand Down Expand Up @@ -1989,11 +2091,6 @@ func slowReplicaRPCWarningStr(
dur.Seconds(), attempts, ba, ba.Replica, resp)
}

// ProxyFailedWithSendError is a marker to indicate a proxy request failed with
// a sendError. Node.maybeProxyRequest specifically excludes this error from
// propagation over the wire and instead return the NLHE from local evaluation.
var ProxyFailedWithSendError = kvpb.NewErrorf("proxy request failed with send error")

// sendPartialBatch sends the supplied batch to the range specified by the
// routing token.
//
Expand Down Expand Up @@ -2039,27 +2136,6 @@ func (ds *DistSender) sendPartialBatch(
pErr = nil
// If we've invalidated the descriptor on a send failure, re-lookup.

// On a proxy request, update our routing information with what the
// client sent us if the client had newer information. We have already
// validated the client request against our local replica state in
// node.go and reject requests with stale information. Here we ensure
// our RangeCache has the same information as both the client request
// and our local replica before attempting the request. If the sync
// makes our token invalid, we handle it similarly to a RangeNotFound or
// NotLeaseHolderError from a remote server.
// NB: The routingTok is usually valid when we get to this line on a
// proxy request since we never retry the outer for loop, however there
// is an edge case where we invalidate the token once here and then
// invalidate it a second time in the statement below and hit a
// retriable range loopup error.
// TODO(baptist): Consider splitting out the handling in this method for
// proxy requests vs non-proxy requests. Currently it is hard to follow
// the invariants when this is called. Alternativly move this call to be
// done immediately when the routingTok is created as it will always be
// valid at that point.
if ba.ProxyRangeInfo != nil && routingTok.Valid() {
routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc)
}
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
Expand Down Expand Up @@ -2129,43 +2205,6 @@ func (ds *DistSender) sendPartialBatch(
tBegin = time.Time{} // prevent reentering branch for this RPC
}

if ba.ProxyRangeInfo != nil {
// On a proxy request (when ProxyRangeInfo is set) we always return
// the response immediately without retries. Proxy requests are
// retried by the remote client, not the proxy node. If the error
// contains updated range information from the leaseholder and is
// normally retry, the client will update its cache and routing
// information and decide how to proceed. If it decides to retry
// against this proxy node, the proxy node will see the updated
// range information on the retried request, and apply it to its
// cache in the SyncTokenAndMaybeUpdateCache call a few lines above.
//
// TODO(baptist): Update the cache on a RangeKeyMismatchError before
// returning the error.
if err != nil {
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply = &kvpb.BatchResponse{}
if IsSendError(err) {
// sendErrors shouldn't[1] escape from the DistSender. If
// routing the request resulted in a sendError, we intercept
// it here and replace it with a ProxyFailedWithSendError.
//
// [1] sendErrors are a mechanism for sendToReplicas to
// communicate back that the routing information used to
// route the request was stale. Therefore, the cache needs
// to be flushed and the request should be retried with
// fresher information. However, this is specific to the
// proxy node's range cache -- not the remote client's range
// cache. As such, returning the sendError back to the
// remote client is nonsensical.
return response{pErr: ProxyFailedWithSendError}
} else {
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
}
}
return response{reply: reply}
}

if err != nil {
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
Expand Down Expand Up @@ -2513,57 +2552,6 @@ func (ds *DistSender) sendToReplicas(
return nil, err
}

// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested replica. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo)
ds.metrics.ProxyForwardSentCount.Inc(1)
// We don't know who the leaseholder is, and it is likely that the
// client had stale information. Return our information to them through
// a NLHE and let them retry.
if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{},
0, /* proposerStoreID */
routing.Desc(),
"client requested a proxy but we can't figure out the leaseholder"),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}
if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
*routing.Lease(),
0, /* proposerStoreID */
routing.Desc(),
fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
idx := replicas.Find(routing.Leaseholder().ReplicaID)
// This should never happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder())
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var routeToLeaseholder bool
Expand Down Expand Up @@ -2725,14 +2713,6 @@ func (ds *DistSender) sendToReplicas(
requestToSend := ba
if !ProxyBatchRequest.Get(&ds.st.SV) {
// The setting is disabled, so we don't proxy this request.
} else if ba.ProxyRangeInfo != nil {
// Clear out the proxy information to prevent the recipient from
// sending this request onwards. This is necessary to prevent proxy
// chaining. We want the recipient to process the request or fail
// immediately. This is an extra safety measure to prevent any types
// of routing loops.
requestToSend = ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
} else if !routeToLeaseholder {
// This request isn't intended for the leaseholder so we don't proxy it.
} else if routing.Leaseholder() == nil {
Expand Down Expand Up @@ -2803,12 +2783,8 @@ func (ds *DistSender) sendToReplicas(

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
if err != nil {
if ba.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
} else if requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyForwardErrCount.Inc(1)
}
if err != nil && requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
}

if cbErr != nil {
Expand Down
Loading
Loading