Skip to content

Commit

Permalink
kvclient: separate out proxy sending
Browse files Browse the repository at this point in the history
Previously a proxy request was handled using the same logic within
sendPartialBatch and sendToReplicas. There were short circuits to handle
the different cases of retries, but this was still incorrect in some
places. Instead this change intercepts proxy request at the start of
Send and creates and sends to the transport directly. This greatly
simplifies the code for proxy requests and additionally fixes complex
cases where the routing information changes.

Epic: none
Fixes: #123965
Informs: #123146

Release note: None
  • Loading branch information
andrewbaptist committed May 13, 2024
1 parent c179d3f commit 83b9103
Showing 1 changed file with 59 additions and 123 deletions.
182 changes: 59 additions & 123 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,12 @@ func (ds *DistSender) Send(
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender send")
defer sp.Finish()

// If the request is a proxy request, send it through the local DistSender
// stack.
if ba.ProxyRangeInfo != nil {
return ds.sendProxyRequest(ctx, ba)
}

splitET := false
var require1PC bool
lastReq := ba.Requests[len(ba.Requests)-1].GetInner()
Expand Down Expand Up @@ -1299,6 +1305,57 @@ func (ds *DistSender) Send(
return reply, nil
}

// On a proxy request (when ProxyRangeInfo is set) we always return
// the response immediately without retries. Proxy requests are
// retried by the remote client, not the proxy node. If the error
// contains updated range information from the leaseholder and is
// normally retry, the client will update its cache and routing
// information and decide how to proceed. If it decides to retry
// against this proxy node, the proxy node will see the updated
// range information on the retried request, and apply it to its
// cache in the SyncTokenAndMaybeUpdateCache call a few lines above.
func (ds *DistSender) sendProxyRequest(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
// We don't use the range cache for proxy requests, but if it contains newer
// information, we can use it in our cache.
ds.rangeCache.Insert(ctx, *ba.ProxyRangeInfo)
// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested replica. If we fail on that request we fail back
// to the caller so they can try something else.
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo.Lease)
ds.metrics.ProxyForwardSentCount.Inc(1)

// NB: upgrade the connection class to SYSTEM, for critical ranges. Set it to
// DEFAULT if the class is unknown, to handle mixed-version states gracefully.
// Other kinds of overrides are possible, see rpc.ConnectionClassForKey().
opts := SendOptions{
class: rpc.ConnectionClassForKey(ba.ProxyRangeInfo.Desc.RSpan().Key, ba.ConnectionClass),
metrics: &ds.metrics,
}
// We only want to contact the leaseholder based on the clients view, so we
// construct a replica slice containing only the leaseholder.
transport := ds.transportFactory(opts, ReplicaSlice{{ReplicaDescriptor: ba.ProxyRangeInfo.Lease.Replica}})
// We don't want this to be proxied a second time, so clear the proxy info.
requestToSend := ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
requestToSend.Replica = transport.NextReplica()
br, err := transport.SendNext(ctx, requestToSend)
log.VEventf(ctx, 3, "proxy result is %v %v", br, err)

if err != nil {
ds.metrics.ProxyForwardErrCount.Inc(1)
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply := &kvpb.BatchResponse{}
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
return reply, nil
}
return br, nil
}

// incrementBatchCounters increments the appropriate counters to track the
// batch and its composite request methods.
func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) {
Expand Down Expand Up @@ -2040,27 +2097,6 @@ func (ds *DistSender) sendPartialBatch(
pErr = nil
// If we've invalidated the descriptor on a send failure, re-lookup.

// On a proxy request, update our routing information with what the
// client sent us if the client had newer information. We have already
// validated the client request against our local replica state in
// node.go and reject requests with stale information. Here we ensure
// our RangeCache has the same information as both the client request
// and our local replica before attempting the request. If the sync
// makes our token invalid, we handle it similarly to a RangeNotFound or
// NotLeaseHolderError from a remote server.
// NB: The routingTok is usually valid when we get to this line on a
// proxy request since we never retry the outer for loop, however there
// is an edge case where we invalidate the token once here and then
// invalidate it a second time in the statement below and hit a
// retriable range loopup error.
// TODO(baptist): Consider splitting out the handling in this method for
// proxy requests vs non-proxy requests. Currently it is hard to follow
// the invariants when this is called. Alternativly move this call to be
// done immediately when the routingTok is created as it will always be
// valid at that point.
if ba.ProxyRangeInfo != nil && routingTok.Valid() {
routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc)
}
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
Expand Down Expand Up @@ -2130,43 +2166,6 @@ func (ds *DistSender) sendPartialBatch(
tBegin = time.Time{} // prevent reentering branch for this RPC
}

if ba.ProxyRangeInfo != nil {
// On a proxy request (when ProxyRangeInfo is set) we always return
// the response immediately without retries. Proxy requests are
// retried by the remote client, not the proxy node. If the error
// contains updated range information from the leaseholder and is
// normally retry, the client will update its cache and routing
// information and decide how to proceed. If it decides to retry
// against this proxy node, the proxy node will see the updated
// range information on the retried request, and apply it to its
// cache in the SyncTokenAndMaybeUpdateCache call a few lines above.
//
// TODO(baptist): Update the cache on a RangeKeyMismatchError before
// returning the error.
if err != nil {
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply = &kvpb.BatchResponse{}
if IsSendError(err) {
// sendErrors shouldn't[1] escape from the DistSender. If
// routing the request resulted in a sendError, we intercept
// it here and replace it with a ProxyFailedWithSendError.
//
// [1] sendErrors are a mechanism for sendToReplicas to
// communicate back that the routing information used to
// route the request was stale. Therefore, the cache needs
// to be flushed and the request should be retried with
// fresher information. However, this is specific to the
// proxy node's range cache -- not the remote client's range
// cache. As such, returning the sendError back to the
// remote client is nonsensical.
return response{pErr: ProxyFailedWithSendError}
} else {
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
}
}
return response{reply: reply}
}

if err != nil {
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
Expand Down Expand Up @@ -2514,57 +2513,6 @@ func (ds *DistSender) sendToReplicas(
return nil, err
}

// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested replica. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo)
ds.metrics.ProxyForwardSentCount.Inc(1)
// We don't know who the leaseholder is, and it is likely that the
// client had stale information. Return our information to them through
// a NLHE and let them retry.
if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{},
0, /* proposerStoreID */
routing.Desc(),
"client requested a proxy but we can't figure out the leaseholder"),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}
if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
*routing.Lease(),
0, /* proposerStoreID */
routing.Desc(),
fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
idx := replicas.Find(routing.Leaseholder().ReplicaID)
// This should never happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder())
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var routeToLeaseholder bool
Expand Down Expand Up @@ -2726,14 +2674,6 @@ func (ds *DistSender) sendToReplicas(
requestToSend := ba
if !ProxyBatchRequest.Get(&ds.st.SV) {
// The setting is disabled, so we don't proxy this request.
} else if ba.ProxyRangeInfo != nil {
// Clear out the proxy information to prevent the recipient from
// sending this request onwards. This is necessary to prevent proxy
// chaining. We want the recipient to process the request or fail
// immediately. This is an extra safety measure to prevent any types
// of routing loops.
requestToSend = ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
} else if !routeToLeaseholder {
// This request isn't intended for the leaseholder so we don't proxy it.
} else if routing.Leaseholder() == nil {
Expand Down Expand Up @@ -2804,12 +2744,8 @@ func (ds *DistSender) sendToReplicas(

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
if err != nil {
if ba.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
} else if requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyForwardErrCount.Inc(1)
}
if err != nil && requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
}

if cbErr != nil {
Expand Down

0 comments on commit 83b9103

Please sign in to comment.