diff --git a/pkg/kv/kvserver/replica_batch_updates.go b/pkg/kv/kvserver/replica_batch_updates.go index 63705f695205..d82f84534415 100644 --- a/pkg/kv/kvserver/replica_batch_updates.go +++ b/pkg/kv/kvserver/replica_batch_updates.go @@ -186,23 +186,23 @@ func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error) // like this because they've already acquired timestamp-aware latches. func maybeBumpReadTimestampToWriteTimestamp( ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, -) bool { +) (*kvpb.BatchRequest, bool) { if ba.Txn == nil { - return false + return ba, false } if !ba.CanForwardReadTimestamp { - return false + return ba, false } if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp { - return false + return ba, false } arg, ok := ba.GetArg(kvpb.EndTxn) if !ok { - return false + return ba, false } et := arg.(*kvpb.EndTxnRequest) if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) { - return false + return ba, false } return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp) } @@ -221,9 +221,9 @@ func maybeBumpReadTimestampToWriteTimestamp( // not be bumped. func tryBumpBatchTimestamp( ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp, -) bool { +) (*kvpb.BatchRequest, bool) { if g != nil && !g.IsolatedAtLaterTimestamps() { - return false + return ba, false } if ts.Less(ba.Timestamp) { log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp) @@ -231,7 +231,7 @@ func tryBumpBatchTimestamp( if ba.Txn == nil { log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp) ba.Timestamp = ts - return true + return ba, true } if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) { log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+ @@ -239,8 +239,11 @@ func tryBumpBatchTimestamp( } log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) - ba.Txn = ba.Txn.Clone() - ba.Txn.BumpReadTimestamp(ts) - ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp - return true + txn := ba.Txn.Clone() + txn.BumpReadTimestamp(ts) + readTs := ba.Txn.ReadTimestamp + shallowCopy := ba.ShallowCopy() + shallowCopy.Txn = txn + shallowCopy.Timestamp = readTs // Refresh just updated ReadTimestamp + return shallowCopy, true } diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index cef33b7d08ed..4f3dcea0bcc3 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -544,13 +544,13 @@ func canDoServersideRetry( ba *kvpb.BatchRequest, g *concurrency.Guard, deadline hlc.Timestamp, -) bool { +) (*kvpb.BatchRequest, bool) { if pErr == nil { log.Fatalf(ctx, "canDoServersideRetry called without error") } if ba.Txn != nil { if !ba.CanForwardReadTimestamp { - return false + return ba, false } if !deadline.IsEmpty() { log.Fatal(ctx, "deadline passed for transactional request") @@ -566,7 +566,7 @@ func canDoServersideRetry( var ok bool ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr) if !ok { - return false + return ba, false } } else { switch tErr := pErr.GetDetail().(type) { @@ -577,12 +577,12 @@ func canDoServersideRetry( newTimestamp = tErr.RetryTimestamp() default: - return false + return ba, false } } if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) { - return false + return ba, false } return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp) } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 3755c2093a1b..1697b6a985af 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -489,7 +489,9 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( // retry at a higher timestamp because it is not isolated at higher // timestamps. latchesHeld := g != nil - if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) { + var canRetry bool + ba, canRetry = canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) + if !latchesHeld || canRetry { // TODO(aayush,arul): These metrics are incorrect at the moment since // hitting this branch does not mean that we won't serverside retry, it // just means that we will have to reacquire latches. diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 752ce7c259f0..96fd1403303d 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -809,7 +809,8 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError( // Attempt a server-side retry of the request. Note that we pass nil for // latchSpans, because we have already released our latches and plan to // re-acquire them if the retry is allowed. - if !canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) { + var ok bool + if ba, ok = canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */); !ok { r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1) return nil, pErr } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 1d01cb431b3d..d795fb07eaf3 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -369,9 +369,9 @@ func (r *Replica) executeWriteBatch( // executed as 1PC. func (r *Replica) canAttempt1PCEvaluation( ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, -) bool { +) (*kvpb.BatchRequest, bool) { if !isOnePhaseCommit(ba) { - return false + return ba, false } // isOnePhaseCommit ensured that the transaction has a non-skewed read/write @@ -393,7 +393,7 @@ func (r *Replica) canAttempt1PCEvaluation( // to check for an existing record. ok, _ := r.CanCreateTxnRecord(ctx, ba.Txn.ID, ba.Txn.Key, ba.Txn.MinTimestamp) if !ok { - return false + return ba, false } minCommitTS := r.MinTxnCommitTS(ctx, ba.Txn.ID, ba.Txn.Key) if ba.Timestamp.Less(minCommitTS) { @@ -402,7 +402,7 @@ func (r *Replica) canAttempt1PCEvaluation( // timestamp. return maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) } - return true + return ba, true } // evaluateWriteBatch evaluates the supplied batch. @@ -428,11 +428,11 @@ func (r *Replica) evaluateWriteBatch( // timestamp, let's evaluate the batch at the bumped timestamp. This will // allow serializable transactions to commit. It will also allow transactions // with any isolation level to attempt the 1PC code path. - maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) - + ba, _ = maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) + var onePCOk bool // Attempt 1PC execution, if applicable. If not transactional or there are // indications that the batch's txn will require retry, execute as normal. - if r.canAttempt1PCEvaluation(ctx, ba, g) { + if ba, onePCOk = r.canAttempt1PCEvaluation(ctx, ba, g); onePCOk { res := r.evaluate1PC(ctx, idKey, ba, g, st) switch res.success { case onePCSucceeded: @@ -707,8 +707,9 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( if pErr == nil || retries > 0 { break } + var serverSideRetryOk bool // If we can retry, set a higher batch timestamp and continue. - if !canDoServersideRetry(ctx, pErr, ba, g, deadline) { + if ba, serverSideRetryOk = canDoServersideRetry(ctx, pErr, ba, g, deadline); !serverSideRetryOk { r.store.Metrics().WriteEvaluationServerSideRetryFailure.Inc(1) break } else {