Skip to content

Commit

Permalink
kvclient: separate out proxy sending
Browse files Browse the repository at this point in the history
Previously in DistSender, a proxy request was handled using the same
logic within sendPartialBatch and sendToReplicas. There were short
circuits to handle the different cases of retries, but in cases with
changing range boundaries, it could return the wrong error. This change
intercepts proxy request at the start of DistSender.Send and sends to
the transport bypassing the rest of the DistSender retry logic. This
simplifies the code for proxy requests.

Epic: none
Fixes: cockroachdb#123965
Informs: cockroachdb#123146

Release note: None
  • Loading branch information
andrewbaptist committed May 14, 2024
1 parent 622c49c commit a97a68f
Showing 1 changed file with 51 additions and 123 deletions.
174 changes: 51 additions & 123 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,11 @@ func (ds *DistSender) Send(
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender send")
defer sp.Finish()

// Send proxy requests directly through the transport.
if ba.ProxyRangeInfo != nil {
return ds.sendProxyRequest(ctx, ba)
}

splitET := false
var require1PC bool
lastReq := ba.Requests[len(ba.Requests)-1].GetInner()
Expand Down Expand Up @@ -1299,6 +1304,50 @@ func (ds *DistSender) Send(
return reply, nil
}

// sendProxyRequest will send a request with ProxyRangeInfo is set directly to
// the Transport. without retries. Proxy requests are retried by the initial
// client, not the proxy node. If the response has an error with updated
// information, the client will update its cache and routing information and
// retry locally. The proxy node inserts any range information but doesn't use
// it for routing decisions.
func (ds *DistSender) sendProxyRequest(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo.Lease)
ds.metrics.ProxyForwardSentCount.Inc(1)
// If the proxy node has a more recent range cache than the client, update
// our cache.
ds.rangeCache.Insert(ctx, *ba.ProxyRangeInfo)

// Use the same connection class as we normally use for this request.
opts := SendOptions{
class: rpc.ConnectionClassForKey(ba.ProxyRangeInfo.Desc.RSpan().Key, ba.ConnectionClass),
metrics: &ds.metrics,
}

// We construct a replica slice containing only the leaseholder, since we're
// only going to contact this replica and try once.
transport := ds.transportFactory(opts, ReplicaSlice{{ReplicaDescriptor: ba.ProxyRangeInfo.Lease.Replica}})

requestToSend := ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
requestToSend.Replica = transport.NextReplica()
br, err := transport.SendNext(ctx, requestToSend)
log.VEventf(ctx, 3, "proxy result is %v %v", br, err)

if err != nil {
// Convert the error from an error on the transport to a
// ProxyFailedError This can then be sent over the wire and decoded by
// the client.
ds.metrics.ProxyForwardErrCount.Inc(1)
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply := &kvpb.BatchResponse{}
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
return reply, nil
}
return br, nil
}

// incrementBatchCounters increments the appropriate counters to track the
// batch and its composite request methods.
func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) {
Expand Down Expand Up @@ -2040,27 +2089,6 @@ func (ds *DistSender) sendPartialBatch(
pErr = nil
// If we've invalidated the descriptor on a send failure, re-lookup.

// On a proxy request, update our routing information with what the
// client sent us if the client had newer information. We have already
// validated the client request against our local replica state in
// node.go and reject requests with stale information. Here we ensure
// our RangeCache has the same information as both the client request
// and our local replica before attempting the request. If the sync
// makes our token invalid, we handle it similarly to a RangeNotFound or
// NotLeaseHolderError from a remote server.
// NB: The routingTok is usually valid when we get to this line on a
// proxy request since we never retry the outer for loop, however there
// is an edge case where we invalidate the token once here and then
// invalidate it a second time in the statement below and hit a
// retriable range loopup error.
// TODO(baptist): Consider splitting out the handling in this method for
// proxy requests vs non-proxy requests. Currently it is hard to follow
// the invariants when this is called. Alternativly move this call to be
// done immediately when the routingTok is created as it will always be
// valid at that point.
if ba.ProxyRangeInfo != nil && routingTok.Valid() {
routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc)
}
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
Expand Down Expand Up @@ -2130,43 +2158,6 @@ func (ds *DistSender) sendPartialBatch(
tBegin = time.Time{} // prevent reentering branch for this RPC
}

if ba.ProxyRangeInfo != nil {
// On a proxy request (when ProxyRangeInfo is set) we always return
// the response immediately without retries. Proxy requests are
// retried by the remote client, not the proxy node. If the error
// contains updated range information from the leaseholder and is
// normally retry, the client will update its cache and routing
// information and decide how to proceed. If it decides to retry
// against this proxy node, the proxy node will see the updated
// range information on the retried request, and apply it to its
// cache in the SyncTokenAndMaybeUpdateCache call a few lines above.
//
// TODO(baptist): Update the cache on a RangeKeyMismatchError before
// returning the error.
if err != nil {
log.VEventf(ctx, 2, "failing proxy request after error %s", err)
reply = &kvpb.BatchResponse{}
if IsSendError(err) {
// sendErrors shouldn't[1] escape from the DistSender. If
// routing the request resulted in a sendError, we intercept
// it here and replace it with a ProxyFailedWithSendError.
//
// [1] sendErrors are a mechanism for sendToReplicas to
// communicate back that the routing information used to
// route the request was stale. Therefore, the cache needs
// to be flushed and the request should be retried with
// fresher information. However, this is specific to the
// proxy node's range cache -- not the remote client's range
// cache. As such, returning the sendError back to the
// remote client is nonsensical.
return response{pErr: ProxyFailedWithSendError}
} else {
reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err))
}
}
return response{reply: reply}
}

if err != nil {
// Set pErr so that, if we don't perform any more retries, the
// deduceRetryEarlyExitError() call below the loop includes this error.
Expand Down Expand Up @@ -2514,57 +2505,6 @@ func (ds *DistSender) sendToReplicas(
return nil, err
}

// This client requested we proxy this request. Only proxy if we can
// determine the leaseholder and it agrees with the ProxyRangeInfo from
// the client. We don't support a proxy request to a non-leaseholder
// replica. If we decide to proxy this request, we will reduce our replica
// list to only the requested replica. If we fail on that request we fail back
// to the caller so they can try something else.
if ba.ProxyRangeInfo != nil {
log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo)
ds.metrics.ProxyForwardSentCount.Inc(1)
// We don't know who the leaseholder is, and it is likely that the
// client had stale information. Return our information to them through
// a NLHE and let them retry.
if routing.Lease().Empty() {
log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{},
0, /* proposerStoreID */
routing.Desc(),
"client requested a proxy but we can't figure out the leaseholder"),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}
if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence ||
ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation {
log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)
br := kvpb.BatchResponse{}
br.Error = kvpb.NewError(
kvpb.NewNotLeaseHolderError(
*routing.Lease(),
0, /* proposerStoreID */
routing.Desc(),
fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)),
)
ds.metrics.ProxyForwardErrCount.Inc(1)
return &br, nil
}

// On a proxy request, we only send the request to the leaseholder. If we
// are here then the client and server agree on the routing information, so
// use the leaseholder as our only replica to send to.
idx := replicas.Find(routing.Leaseholder().ReplicaID)
// This should never happen. We validated the routing above and the token
// is still valid.
if idx == -1 {
return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder())
}
replicas = replicas[idx : idx+1]
log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
var routeToLeaseholder bool
Expand Down Expand Up @@ -2726,14 +2666,6 @@ func (ds *DistSender) sendToReplicas(
requestToSend := ba
if !ProxyBatchRequest.Get(&ds.st.SV) {
// The setting is disabled, so we don't proxy this request.
} else if ba.ProxyRangeInfo != nil {
// Clear out the proxy information to prevent the recipient from
// sending this request onwards. This is necessary to prevent proxy
// chaining. We want the recipient to process the request or fail
// immediately. This is an extra safety measure to prevent any types
// of routing loops.
requestToSend = ba.ShallowCopy()
requestToSend.ProxyRangeInfo = nil
} else if !routeToLeaseholder {
// This request isn't intended for the leaseholder so we don't proxy it.
} else if routing.Leaseholder() == nil {
Expand Down Expand Up @@ -2804,12 +2736,8 @@ func (ds *DistSender) sendToReplicas(

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
if err != nil {
if ba.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
} else if requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyForwardErrCount.Inc(1)
}
if err != nil && requestToSend.ProxyRangeInfo != nil {
ds.metrics.ProxyErrCount.Inc(1)
}

if cbErr != nil {
Expand Down

0 comments on commit a97a68f

Please sign in to comment.