diff --git a/pkg/kv/kvserver/replica_batch_updates.go b/pkg/kv/kvserver/replica_batch_updates.go index 63705f695205..d9bf61b16c68 100644 --- a/pkg/kv/kvserver/replica_batch_updates.go +++ b/pkg/kv/kvserver/replica_batch_updates.go @@ -179,30 +179,31 @@ func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error) // diverged and where bumping is possible. When possible, this allows the // transaction to commit without having to retry. // -// Returns true if the timestamp was bumped. +// Returns true if the timestamp was bumped. Also returns the possibly updated +// batch request, which is shallow-copied on write. // // Note that this, like all the server-side bumping of the read timestamp, only // works for batches that exclusively contain writes; reads cannot be bumped // like this because they've already acquired timestamp-aware latches. func maybeBumpReadTimestampToWriteTimestamp( ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, -) bool { +) (*kvpb.BatchRequest, bool) { if ba.Txn == nil { - return false + return ba, false } if !ba.CanForwardReadTimestamp { - return false + return ba, false } if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp { - return false + return ba, false } arg, ok := ba.GetArg(kvpb.EndTxn) if !ok { - return false + return ba, false } et := arg.(*kvpb.EndTxnRequest) if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) { - return false + return ba, false } return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp) } @@ -217,21 +218,23 @@ func maybeBumpReadTimestampToWriteTimestamp( // more freely adjust its timestamp because it will re-acquire latches at // whatever timestamp the batch is bumped to. // -// Returns true if the timestamp was bumped. Returns false if the timestamp could -// not be bumped. +// Returns true if the timestamp was bumped. Returns false if the timestamp +// could not be bumped. Also returns the possibly updated batch request, which +// is shallow-copied on write. func tryBumpBatchTimestamp( ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp, -) bool { +) (*kvpb.BatchRequest, bool) { if g != nil && !g.IsolatedAtLaterTimestamps() { - return false + return ba, false } if ts.Less(ba.Timestamp) { log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp) } if ba.Txn == nil { log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp) + ba = ba.ShallowCopy() ba.Timestamp = ts - return true + return ba, true } if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) { log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+ @@ -239,8 +242,9 @@ func tryBumpBatchTimestamp( } log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) + ba = ba.ShallowCopy() ba.Txn = ba.Txn.Clone() ba.Txn.BumpReadTimestamp(ts) ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp - return true + return ba, true } diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index cef33b7d08ed..4f3dcea0bcc3 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -544,13 +544,13 @@ func canDoServersideRetry( ba *kvpb.BatchRequest, g *concurrency.Guard, deadline hlc.Timestamp, -) bool { +) (*kvpb.BatchRequest, bool) { if pErr == nil { log.Fatalf(ctx, "canDoServersideRetry called without error") } if ba.Txn != nil { if !ba.CanForwardReadTimestamp { - return false + return ba, false } if !deadline.IsEmpty() { log.Fatal(ctx, "deadline passed for transactional request") @@ -566,7 +566,7 @@ func canDoServersideRetry( var ok bool ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr) if !ok { - return false + return ba, false } } else { switch tErr := pErr.GetDetail().(type) { @@ -577,12 +577,12 @@ func canDoServersideRetry( newTimestamp = tErr.RetryTimestamp() default: - return false + return ba, false } } if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) { - return false + return ba, false } return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 71286df03a29..f4a47589af05 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -939,9 +939,9 @@ func (r *Replica) evaluateProposal( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, -) (*result.Result, bool, *kvpb.Error) { +) (*kvpb.BatchRequest, *result.Result, bool, *kvpb.Error) { if ba.Timestamp.IsEmpty() { - return nil, false, kvpb.NewErrorf("can't propose Raft command with zero timestamp") + return ba, nil, false, kvpb.NewErrorf("can't propose Raft command with zero timestamp") } // Evaluate the commands. If this returns without an error, the batch should @@ -954,7 +954,7 @@ func (r *Replica) evaluateProposal( // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. - batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, g, st, ui) + ba, batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, g, st, ui) // Note: reusing the proposer's batch when applying the command on the // proposer was explored as an optimization but resulted in no performance @@ -965,7 +965,7 @@ func (r *Replica) evaluateProposal( if pErr != nil { if _, ok := pErr.GetDetail().(*kvpb.ReplicaCorruptionError); ok { - return &res, false /* needConsensus */, pErr + return ba, &res, false /* needConsensus */, pErr } txn := pErr.GetTxn() @@ -981,7 +981,7 @@ func (r *Replica) evaluateProposal( Metrics: res.Local.Metrics, } res.Replicated.Reset() - return &res, false /* needConsensus */, pErr + return ba, &res, false /* needConsensus */, pErr } // Set the local reply, which is held only on the proposing replica and is @@ -1036,7 +1036,7 @@ func (r *Replica) evaluateProposal( } } - return &res, needConsensus, nil + return ba, &res, needConsensus, nil } // requestToProposal converts a BatchRequest into a ProposalData, by @@ -1051,7 +1051,7 @@ func (r *Replica) requestToProposal( st *kvserverpb.LeaseStatus, ui uncertainty.Interval, ) (*ProposalData, *kvpb.Error) { - res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, g, st, ui) + ba, res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, g, st, ui) // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a956398e5546..657f70944922 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -124,6 +124,7 @@ func (r *Replica) evalAndPropose( defer tok.DoneIfNotMoved(ctx) idKey := raftlog.MakeCmdIDKey() proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui) + ba = proposal.Request // may have been updated log.Event(proposal.ctx, "evaluated request") // If the request hit a server-side concurrency retry error, immediately @@ -304,7 +305,7 @@ func (r *Replica) evalAndPropose( Cmd: proposal.command, QuotaAlloc: proposal.quotaAlloc, CmdID: idKey, - Req: ba, + Req: proposal.Request, // SeedID not set, since this is not a reproposal. } if pErr = filter(filterArgs); pErr != nil { diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 3755c2093a1b..4d223754c839 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -133,7 +133,7 @@ func (r *Replica) executeReadOnlyBatch( } var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath) + ba, br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. @@ -422,7 +422,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( st *kvserverpb.LeaseStatus, ui uncertainty.Interval, evalPath batchEvalPath, -) (br *kvpb.BatchResponse, res result.Result, pErr *kvpb.Error) { +) (_ *kvpb.BatchRequest, br *kvpb.BatchResponse, res result.Result, pErr *kvpb.Error) { log.Event(ctx, "executing read-only batch") var rootMonitor *mon.BytesMonitor @@ -489,7 +489,11 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( // retry at a higher timestamp because it is not isolated at higher // timestamps. latchesHeld := g != nil - if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) { + var ok bool + if latchesHeld { + ba, ok = canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) + } + if !ok { // TODO(aayush,arul): These metrics are incorrect at the moment since // hitting this branch does not mean that we won't serverside retry, it // just means that we will have to reacquire latches. @@ -507,9 +511,9 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( EncounteredIntents: res.Local.DetachEncounteredIntents(), Metrics: res.Local.Metrics, } - return nil, res, pErr + return ba, nil, res, pErr } - return br, res, nil + return ba, br, res, nil } func (r *Replica) handleReadOnlyLocalEvalResult( diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 752ce7c259f0..ae03da1d78a6 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -809,7 +809,9 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError( // Attempt a server-side retry of the request. Note that we pass nil for // latchSpans, because we have already released our latches and plan to // re-acquire them if the retry is allowed. - if !canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) { + var ok bool + ba, ok = canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) + if !ok { r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1) return nil, pErr } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 480e818ab26c..a92f094344bb 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -467,7 +467,7 @@ func TestIsOnePhaseCommit(t *testing.T) { // Emulate what a server actually does and bump the write timestamp when // possible. This makes some batches with diverged read and write // timestamps pass isOnePhaseCommit(). - maybeBumpReadTimestampToWriteTimestamp(ctx, ba, allSpansGuard()) + ba, _ = maybeBumpReadTimestampToWriteTimestamp(ctx, ba, allSpansGuard()) if is1PC := isOnePhaseCommit(ba); is1PC != c.exp1PC { t.Errorf("expected 1pc=%t; got %t", c.exp1PC, is1PC) @@ -8890,7 +8890,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { assignSeqNumsForReqs(txn, &txnPut, &txnPut2) origTxn := txn.Clone() - batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, raftlog.MakeCmdIDKey(), ba, allSpansGuard(), nil, uncertainty.Interval{}) + _, batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, raftlog.MakeCmdIDKey(), ba, allSpansGuard(), nil, uncertainty.Interval{}) defer batch.Close() if pErr != nil { t.Fatal(pErr) diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 1d01cb431b3d..7907b83671c6 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -367,11 +367,15 @@ func (r *Replica) executeWriteBatch( // canAttempt1PCEvaluation looks at the batch and decides whether it can be // executed as 1PC. +// +// The function may need to adjust the batch's timestamps in order to make it +// possible to evaluate the batch as 1PC. If it does so, it will return the +// updated batch request, which is shallow-copied on write. func (r *Replica) canAttempt1PCEvaluation( ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, -) bool { +) (*kvpb.BatchRequest, bool) { if !isOnePhaseCommit(ba) { - return false + return ba, false } // isOnePhaseCommit ensured that the transaction has a non-skewed read/write @@ -393,16 +397,17 @@ func (r *Replica) canAttempt1PCEvaluation( // to check for an existing record. ok, _ := r.CanCreateTxnRecord(ctx, ba.Txn.ID, ba.Txn.Key, ba.Txn.MinTimestamp) if !ok { - return false + return ba, false } minCommitTS := r.MinTxnCommitTS(ctx, ba.Txn.ID, ba.Txn.Key) if ba.Timestamp.Less(minCommitTS) { + ba = ba.ShallowCopy() ba.Txn.WriteTimestamp = minCommitTS // We can only evaluate at the new timestamp if we manage to bump the read // timestamp. return maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) } - return true + return ba, true } // evaluateWriteBatch evaluates the supplied batch. @@ -421,27 +426,34 @@ func (r *Replica) evaluateWriteBatch( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, -) (storage.Batch, enginepb.MVCCStats, *kvpb.BatchResponse, result.Result, *kvpb.Error) { +) ( + *kvpb.BatchRequest, + storage.Batch, + enginepb.MVCCStats, + *kvpb.BatchResponse, + result.Result, + *kvpb.Error, +) { log.Event(ctx, "executing read-write batch") // If the transaction has been pushed but it can be forwarded to the higher // timestamp, let's evaluate the batch at the bumped timestamp. This will // allow serializable transactions to commit. It will also allow transactions // with any isolation level to attempt the 1PC code path. - maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) - + ba, _ = maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) + var ok bool // Attempt 1PC execution, if applicable. If not transactional or there are // indications that the batch's txn will require retry, execute as normal. - if r.canAttempt1PCEvaluation(ctx, ba, g) { + if ba, ok = r.canAttempt1PCEvaluation(ctx, ba, g); ok { res := r.evaluate1PC(ctx, idKey, ba, g, st) switch res.success { case onePCSucceeded: - return res.batch, res.stats, res.br, res.res, nil + return ba, res.batch, res.stats, res.br, res.res, nil case onePCFailed: if res.pErr == nil { log.Fatalf(ctx, "1PC failed but no err. ba: %s", ba.String()) } - return nil, enginepb.MVCCStats{}, nil, result.Result{}, res.pErr + return ba, nil, enginepb.MVCCStats{}, nil, result.Result{}, res.pErr case onePCFallbackToTransactionalEvaluation: // Fallthrough to transactional evaluation. } @@ -452,7 +464,7 @@ func (r *Replica) evaluateWriteBatch( // terminate this request early. arg, ok := ba.GetArg(kvpb.EndTxn) if ok && arg.(*kvpb.EndTxnRequest).Require1PC { - return nil, enginepb.MVCCStats{}, nil, result.Result{}, kvpb.NewError(kv.OnePCNotAllowedError{}) + return ba, nil, enginepb.MVCCStats{}, nil, result.Result{}, kvpb.NewError(kv.OnePCNotAllowedError{}) } } @@ -469,9 +481,9 @@ func (r *Replica) evaluateWriteBatch( // For non-transactional writes, omitInRangefeeds should always be false. // For transactional writes, we propagate the flag from the txn. omitInRangefeeds := ba.Txn != nil && ba.Txn.OmitInRangefeeds - batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( + ba, batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( ctx, idKey, rec, ms, ba, g, st, ui, hlc.Timestamp{} /* deadline */, omitInRangefeeds) - return batch, *ms, br, res, pErr + return ba, batch, *ms, br, res, pErr } type onePCSuccess int @@ -554,7 +566,7 @@ func (r *Replica) evaluate1PC( ms := newMVCCStats() defer releaseMVCCStats(ms) if ba.CanForwardReadTimestamp { - batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes( + _, batch, br, res, pErr = r.evaluateWriteBatchWithServersideRefreshes( ctx, idKey, rec, ms, &strippedBa, g, st, ui, etArg.Deadline, ba.Txn.OmitInRangefeeds) } else { batch, br, res, pErr = r.evaluateWriteBatchWrapper( @@ -688,7 +700,13 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( ui uncertainty.Interval, deadline hlc.Timestamp, omitInRangefeeds bool, -) (batch storage.Batch, br *kvpb.BatchResponse, res result.Result, pErr *kvpb.Error) { +) ( + _ *kvpb.BatchRequest, + batch storage.Batch, + br *kvpb.BatchResponse, + res result.Result, + pErr *kvpb.Error, +) { goldenMS := *ms for retries := 0; ; retries++ { if retries > 0 { @@ -708,14 +726,16 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( break } // If we can retry, set a higher batch timestamp and continue. - if !canDoServersideRetry(ctx, pErr, ba, g, deadline) { + var ok bool + ba, ok = canDoServersideRetry(ctx, pErr, ba, g, deadline) + if !ok { r.store.Metrics().WriteEvaluationServerSideRetryFailure.Inc(1) break } else { r.store.Metrics().WriteEvaluationServerSideRetrySuccess.Inc(1) } } - return batch, br, res, pErr + return ba, batch, br, res, pErr } // evaluateWriteBatchWrapper is a wrapper on top of evaluateBatch() which deals