Skip to content

Commit

Permalink
kvclient: separate out proxy sending
Browse files Browse the repository at this point in the history
Previously in DistSender, a proxy request was handled using the same
logic within sendPartialBatch and sendToReplicas. There were short
circuits to handle the different cases of retries, but in cases with
changing range boundaries, it could return the wrong error. This change
intercepts proxy request at the start of DistSender.Send and sends to
the transport bypassing the rest of the DistSender retry logic. This
simplifies the code for proxy requests.

Epic: none
Fixes: #123965
Informs: #123146

Release note: None
  • Loading branch information
andrewbaptist committed Jun 16, 2024
1 parent efed5b6 commit 39f5191
Showing 1 changed file with 80 additions and 128 deletions.
208 changes: 80 additions & 128 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,11 @@ func (ds *DistSender) Send(
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender send")
defer sp.Finish()

// Send proxy requests directly through the transport.
if ba.ProxyRangeInfo != nil {
return ds.sendProxyRequest(ctx, ba)
}

splitET := false
var require1PC bool
lastReq := ba.Requests[len(ba.Requests)-1].GetInner()
Expand Down Expand Up @@ -1298,6 +1303,79 @@ func (ds *DistSender) Send(
return reply, nil
}

// sendProxyRequest will send a request with ProxyRangeInfo is set directly to
// the Transport. without retries. Proxy requests are retried by the initial
// client, not the proxy node. If the response has an error with updated
// information, the client will update its cache and routing information and
// retry locally. The proxy node inserts any newer range information but doesn't
// use it for routing decisions, outated information is ignored. The transport
// is built used based on the information in the request not the local cache. As
// an example if a range is split and the proxy node knows about the split, it
// will still route using the information from the original sender. This is done
// to ensure the original sender gets the updated information and updates its
// cache. The invariant is since the cache is always moving forward eventually
// all the nodes in the system will see the updated information.
func (ds *DistSender) sendProxyRequest(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo.Lease)
ds.metrics.ProxyForwardSentCount.Inc(1)
// If the proxy node has a more recent range cache than the client, update
// our cache.
ds.rangeCache.Insert(ctx, *ba.ProxyRangeInfo)

// Use the same connection class as we normally use for this request.
opts := SendOptions{
class: rpc.ConnectionClassForKey(ba.ProxyRangeInfo.Desc.RSpan().Key, ba.ConnectionClass),
metrics: &ds.metrics,
}

// We construct a replica slice containing only the leaseholder, since we're
// only going to contact this replica and try once.
transport := ds.transportFactory(opts, ReplicaSlice{{ReplicaDescriptor: ba.ProxyRangeInfo.Lease.Replica}})

requestToSend := ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
requestToSend.Replica = transport.NextReplica()

// Determine whether this part of the BatchRequest contains a committing
// EndTxn request.
var withCommit bool
if etArg, ok := ba.GetArg(kvpb.EndTxn); ok {
withCommit = etArg.(*kvpb.EndTxnRequest).Commit
}

// Verify the circuit breaker is not tripped and use a context which will
// cancel if the lease moves and the request has not responded.
// NB: The replicaCircuitBreakerToken is ignored since a proxy request is
// never evaluated locally.
sendCtx, _, cbErr := ds.circuitBreakers.
ForReplica(&ba.ProxyRangeInfo.Desc, &ba.ProxyRangeInfo.Lease.Replica).
Track(ctx, ba, withCommit, timeutil.Now().UnixNano())
if cbErr != nil {
// Circuit breaker is tripped. Return immediately.
reply := &kvpb.BatchResponse{}
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(cbErr))
return reply, nil
}

br, err := transport.SendNext(sendCtx, requestToSend)

log.VEventf(ctx, 3, "proxy result is %v %v", br, err)

if err != nil {
// Convert the error from an error on the transport to a
// ProxyFailedError. This can then be sent over the wire and decoded by
// the client.
ds.metrics.ProxyForwardErrCount.Inc(1)
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply := &kvpb.BatchResponse{}
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
return reply, nil
}
return br, nil
}

// incrementBatchCounters increments the appropriate counters to track the
// batch and its composite request methods.
func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) {
Expand Down Expand Up @@ -1989,11 +2067,6 @@ func slowReplicaRPCWarningStr(
dur.Seconds(), attempts, ba, ba.Replica, resp)
}

// ProxyFailedWithSendError is a marker to indicate a proxy request failed with
// a sendError. Node.maybeProxyRequest specifically excludes this error from
// propagation over the wire and instead return the NLHE from local evaluation.
var ProxyFailedWithSendError = kvpb.NewErrorf("proxy request failed with send error")

// sendPartialBatch sends the supplied batch to the range specified by the
// routing token.
//
Expand Down Expand Up @@ -2039,27 +2112,6 @@ func (ds *DistSender) sendPartialBatch(
pErr = nil
// If we've invalidated the descriptor on a send failure, re-lookup.

// On a proxy request, update our routing information with what the
// client sent us if the client had newer information. We have already
// validated the client request against our local replica state in
// node.go and reject requests with stale information. Here we ensure
// our RangeCache has the same information as both the client request
// and our local replica before attempting the request. If the sync
// makes our token invalid, we handle it similarly to a RangeNotFound or
// NotLeaseHolderError from a remote server.
// NB: The routingTok is usually valid when we get to this line on a
// proxy request since we never retry the outer for loop, however there
// is an edge case where we invalidate the token once here and then
// invalidate it a second time in the statement below and hit a
// retriable range loopup error.
// TODO(baptist): Consider splitting out the handling in this method for
// proxy requests vs non-proxy requests. Currently it is hard to follow
// the invariants when this is called. Alternativly move this call to be
// done immediately when the routingTok is created as it will always be
// valid at that point.
if ba.ProxyRangeInfo != nil && routingTok.Valid() {
routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc)
}
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
Expand Down Expand Up @@ -2129,43 +2181,6 @@ func (ds *DistSender) sendPartialBatch(
tBegin = time.Time{} // prevent reentering branch for this RPC
}

if ba.ProxyRangeInfo != nil {
// On a proxy request (when ProxyRangeInfo is set) we always return
// the response immediately without retries. Proxy requests are
// retried by the remote client, not the proxy node. If the error
// contains updated range information from the leaseholder and is
// normally retry, the client will update its cache and routing
// information and decide how to proceed. If it decides to retry
// against this proxy node, the proxy node will see the updated
// range information on the retried request, and apply it to its
// cache in the SyncTokenAndMaybeUpdateCache call a few lines above.
//
// TODO(baptist): Update the cache on a RangeKeyMismatchError before
// returning the error.
if err != nil {
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply = &kvpb.BatchResponse{}
if IsSendError(err) {
// sendErrors shouldn't[1] escape from the DistSender. If
// routing the request resulted in a sendError, we intercept
// it here and replace it with a ProxyFailedWithSendError.
//
// [1] sendErrors are a mechanism for sendToReplicas to
// communicate back that the routing information used to
// route the request was stale. Therefore, the cache needs
// to be flushed and the request should be retried with
// fresher information. However, this is specific to the
// proxy node's range cache -- not the remote client's range
// cache. As such, returning the sendError back to the
// remote client is nonsensical.
return response{pErr: ProxyFailedWithSendError}
} else {
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
}
}
return response{reply: reply}
}

if err != nil {
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
Expand Down Expand Up @@ -2513,57 +2528,6 @@ func (ds *DistSender) sendToReplicas(
return nil, err
}

// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested replica. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo)
ds.metrics.ProxyForwardSentCount.Inc(1)
// We don't know who the leaseholder is, and it is likely that the
// client had stale information. Return our information to them through
// a NLHE and let them retry.
if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{},
0, /* proposerStoreID */
routing.Desc(),
"client requested a proxy but we can't figure out the leaseholder"),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}
if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
*routing.Lease(),
0, /* proposerStoreID */
routing.Desc(),
fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
idx := replicas.Find(routing.Leaseholder().ReplicaID)
// This should never happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder())
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var routeToLeaseholder bool
Expand Down Expand Up @@ -2725,14 +2689,6 @@ func (ds *DistSender) sendToReplicas(
requestToSend := ba
if !ProxyBatchRequest.Get(&ds.st.SV) {
// The setting is disabled, so we don't proxy this request.
} else if ba.ProxyRangeInfo != nil {
// Clear out the proxy information to prevent the recipient from
// sending this request onwards. This is necessary to prevent proxy
// chaining. We want the recipient to process the request or fail
// immediately. This is an extra safety measure to prevent any types
// of routing loops.
requestToSend = ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
} else if !routeToLeaseholder {
// This request isn't intended for the leaseholder so we don't proxy it.
} else if routing.Leaseholder() == nil {
Expand Down Expand Up @@ -2803,12 +2759,8 @@ func (ds *DistSender) sendToReplicas(

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
if err != nil {
if ba.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
} else if requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyForwardErrCount.Inc(1)
}
if err != nil && requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
}

if cbErr != nil {
Expand Down

0 comments on commit 39f5191

Please sign in to comment.