diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index dc06fa4bf4f7..dc0178759e9c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1196,6 +1196,11 @@ func (ds *DistSender) Send( ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender send") defer sp.Finish() + // Send proxy requests directly through the transport. + if ba.ProxyRangeInfo != nil { + return ds.sendProxyRequest(ctx, ba) + } + splitET := false var require1PC bool lastReq := ba.Requests[len(ba.Requests)-1].GetInner() @@ -1299,6 +1304,50 @@ func (ds *DistSender) Send( return reply, nil } +// sendProxyRequest will send a request with ProxyRangeInfo is set directly to +// the Transport. without retries. Proxy requests are retried by the initial +// client, not the proxy node. If the response has an error with updated +// information, the client will update its cache and routing information and +// retry locally. The proxy node inserts any range information but doesn't use +// it for routing decisions. +func (ds *DistSender) sendProxyRequest( + ctx context.Context, ba *kvpb.BatchRequest, +) (*kvpb.BatchResponse, *kvpb.Error) { + log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo.Lease) + ds.metrics.ProxyForwardSentCount.Inc(1) + // If the proxy node has a more recent range cache than the client, update + // our cache. + ds.rangeCache.Insert(ctx, *ba.ProxyRangeInfo) + + // Use the same connection class as we normally use for this request. + opts := SendOptions{ + class: rpc.ConnectionClassForKey(ba.ProxyRangeInfo.Desc.RSpan().Key, ba.ConnectionClass), + metrics: &ds.metrics, + } + + // We construct a replica slice containing only the leaseholder, since we're + // only going to contact this replica and try once. + transport := ds.transportFactory(opts, ReplicaSlice{{ReplicaDescriptor: ba.ProxyRangeInfo.Lease.Replica}}) + + requestToSend := ba.ShallowCopy() + requestToSend.ProxyRangeInfo = nil + requestToSend.Replica = transport.NextReplica() + br, err := transport.SendNext(ctx, requestToSend) + log.VEventf(ctx, 3, "proxy result is %v %v", br, err) + + if err != nil { + // Convert the error from an error on the transport to a + // ProxyFailedError This can then be sent over the wire and decoded by + // the client. + ds.metrics.ProxyForwardErrCount.Inc(1) + log.VEventf(ctx, 2, "failing proxy request after error %s", err) + reply := &kvpb.BatchResponse{} + reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err)) + return reply, nil + } + return br, nil +} + // incrementBatchCounters increments the appropriate counters to track the // batch and its composite request methods. func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) { @@ -2040,27 +2089,6 @@ func (ds *DistSender) sendPartialBatch( pErr = nil // If we've invalidated the descriptor on a send failure, re-lookup. - // On a proxy request, update our routing information with what the - // client sent us if the client had newer information. We have already - // validated the client request against our local replica state in - // node.go and reject requests with stale information. Here we ensure - // our RangeCache has the same information as both the client request - // and our local replica before attempting the request. If the sync - // makes our token invalid, we handle it similarly to a RangeNotFound or - // NotLeaseHolderError from a remote server. - // NB: The routingTok is usually valid when we get to this line on a - // proxy request since we never retry the outer for loop, however there - // is an edge case where we invalidate the token once here and then - // invalidate it a second time in the statement below and hit a - // retriable range loopup error. - // TODO(baptist): Consider splitting out the handling in this method for - // proxy requests vs non-proxy requests. Currently it is hard to follow - // the invariants when this is called. Alternativly move this call to be - // done immediately when the routingTok is created as it will always be - // valid at that point. - if ba.ProxyRangeInfo != nil && routingTok.Valid() { - routingTok.SyncTokenAndMaybeUpdateCache(ctx, &ba.ProxyRangeInfo.Lease, &ba.ProxyRangeInfo.Desc) - } if !routingTok.Valid() { var descKey roachpb.RKey if isReverse { @@ -2130,43 +2158,6 @@ func (ds *DistSender) sendPartialBatch( tBegin = time.Time{} // prevent reentering branch for this RPC } - if ba.ProxyRangeInfo != nil { - // On a proxy request (when ProxyRangeInfo is set) we always return - // the response immediately without retries. Proxy requests are - // retried by the remote client, not the proxy node. If the error - // contains updated range information from the leaseholder and is - // normally retry, the client will update its cache and routing - // information and decide how to proceed. If it decides to retry - // against this proxy node, the proxy node will see the updated - // range information on the retried request, and apply it to its - // cache in the SyncTokenAndMaybeUpdateCache call a few lines above. - // - // TODO(baptist): Update the cache on a RangeKeyMismatchError before - // returning the error. - if err != nil { - log.VEventf(ctx, 2, "failing proxy request after error %s", err) - reply = &kvpb.BatchResponse{} - if IsSendError(err) { - // sendErrors shouldn't[1] escape from the DistSender. If - // routing the request resulted in a sendError, we intercept - // it here and replace it with a ProxyFailedWithSendError. - // - // [1] sendErrors are a mechanism for sendToReplicas to - // communicate back that the routing information used to - // route the request was stale. Therefore, the cache needs - // to be flushed and the request should be retried with - // fresher information. However, this is specific to the - // proxy node's range cache -- not the remote client's range - // cache. As such, returning the sendError back to the - // remote client is nonsensical. - return response{pErr: ProxyFailedWithSendError} - } else { - reply.Error = kvpb.NewError(kvpb.NewProxyFailedError(err)) - } - } - return response{reply: reply} - } - if err != nil { // Set pErr so that, if we don't perform any more retries, the // deduceRetryEarlyExitError() call below the loop includes this error. @@ -2514,57 +2505,6 @@ func (ds *DistSender) sendToReplicas( return nil, err } - // This client requested we proxy this request. Only proxy if we can - // determine the leaseholder and it agrees with the ProxyRangeInfo from - // the client. We don't support a proxy request to a non-leaseholder - // replica. If we decide to proxy this request, we will reduce our replica - // list to only the requested replica. If we fail on that request we fail back - // to the caller so they can try something else. - if ba.ProxyRangeInfo != nil { - log.VEventf(ctx, 3, "processing a proxy request to %v", ba.ProxyRangeInfo) - ds.metrics.ProxyForwardSentCount.Inc(1) - // We don't know who the leaseholder is, and it is likely that the - // client had stale information. Return our information to them through - // a NLHE and let them retry. - if routing.Lease().Empty() { - log.VEventf(ctx, 2, "proxy failed, unknown leaseholder %v", routing) - br := kvpb.BatchResponse{} - br.Error = kvpb.NewError( - kvpb.NewNotLeaseHolderError(roachpb.Lease{}, - 0, /* proposerStoreID */ - routing.Desc(), - "client requested a proxy but we can't figure out the leaseholder"), - ) - ds.metrics.ProxyForwardErrCount.Inc(1) - return &br, nil - } - if ba.ProxyRangeInfo.Lease.Sequence != routing.Lease().Sequence || - ba.ProxyRangeInfo.Desc.Generation != routing.Desc().Generation { - log.VEventf(ctx, 2, "proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing) - br := kvpb.BatchResponse{} - br.Error = kvpb.NewError( - kvpb.NewNotLeaseHolderError( - *routing.Lease(), - 0, /* proposerStoreID */ - routing.Desc(), - fmt.Sprintf("proxy failed, update client information %v != %v", ba.ProxyRangeInfo, routing)), - ) - ds.metrics.ProxyForwardErrCount.Inc(1) - return &br, nil - } - - // On a proxy request, we only send the request to the leaseholder. If we - // are here then the client and server agree on the routing information, so - // use the leaseholder as our only replica to send to. - idx := replicas.Find(routing.Leaseholder().ReplicaID) - // This should never happen. We validated the routing above and the token - // is still valid. - if idx == -1 { - return nil, errors.AssertionFailedf("inconsistent routing %v %v", desc, *routing.Leaseholder()) - } - replicas = replicas[idx : idx+1] - log.VEventf(ctx, 2, "sender requested proxy to leaseholder %v", replicas) - } // Rearrange the replicas so that they're ordered according to the routing // policy. var routeToLeaseholder bool @@ -2726,14 +2666,6 @@ func (ds *DistSender) sendToReplicas( requestToSend := ba if !ProxyBatchRequest.Get(&ds.st.SV) { // The setting is disabled, so we don't proxy this request. - } else if ba.ProxyRangeInfo != nil { - // Clear out the proxy information to prevent the recipient from - // sending this request onwards. This is necessary to prevent proxy - // chaining. We want the recipient to process the request or fail - // immediately. This is an extra safety measure to prevent any types - // of routing loops. - requestToSend = ba.ShallowCopy() - requestToSend.ProxyRangeInfo = nil } else if !routeToLeaseholder { // This request isn't intended for the leaseholder so we don't proxy it. } else if routing.Leaseholder() == nil { @@ -2804,12 +2736,8 @@ func (ds *DistSender) sendToReplicas( ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size())) ds.maybeIncrementErrCounters(br, err) - if err != nil { - if ba.ProxyRangeInfo != nil { - ds.metrics.ProxyErrCount.Inc(1) - } else if requestToSend.ProxyRangeInfo != nil { - ds.metrics.ProxyForwardErrCount.Inc(1) - } + if err != nil && requestToSend.ProxyRangeInfo != nil { + ds.metrics.ProxyErrCount.Inc(1) } if cbErr != nil {