Skip to content

Commit

Permalink
kv: shallow copy BatchRequest on mutate in tryBumpBatchTimestamp
Browse files Browse the repository at this point in the history
This avoids a data race on tryBumpBatchTimestamp, which was fallout from the
new logging introduced in ba13697.

Fixes: #124553

Release note: None
  • Loading branch information
lyang24 authored and nvanbenschoten committed Jun 12, 2024
1 parent d146ecf commit 3b5b022
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 51 deletions.
30 changes: 17 additions & 13 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,30 +179,31 @@ func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error)
// diverged and where bumping is possible. When possible, this allows the
// transaction to commit without having to retry.
//
// Returns true if the timestamp was bumped.
// Returns true if the timestamp was bumped. Also returns the possibly updated
// batch request, which is shallow-copied on write.
//
// Note that this, like all the server-side bumping of the read timestamp, only
// works for batches that exclusively contain writes; reads cannot be bumped
// like this because they've already acquired timestamp-aware latches.
func maybeBumpReadTimestampToWriteTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard,
) bool {
) (*kvpb.BatchRequest, bool) {
if ba.Txn == nil {
return false
return ba, false
}
if !ba.CanForwardReadTimestamp {
return false
return ba, false
}
if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp {
return false
return ba, false
}
arg, ok := ba.GetArg(kvpb.EndTxn)
if !ok {
return false
return ba, false
}
et := arg.(*kvpb.EndTxnRequest)
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
return ba, false
}
return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp)
}
Expand All @@ -217,30 +218,33 @@ func maybeBumpReadTimestampToWriteTimestamp(
// more freely adjust its timestamp because it will re-acquire latches at
// whatever timestamp the batch is bumped to.
//
// Returns true if the timestamp was bumped. Returns false if the timestamp could
// not be bumped.
// Returns true if the timestamp was bumped. Returns false if the timestamp
// could not be bumped. Also returns the possibly updated batch request, which
// is shallow-copied on write.
func tryBumpBatchTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp,
) bool {
) (*kvpb.BatchRequest, bool) {
if g != nil && !g.IsolatedAtLaterTimestamps() {
return false
return ba, false
}
if ts.Less(ba.Timestamp) {
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
if ba.Txn == nil {
log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp)
ba = ba.ShallowCopy()
ba.Timestamp = ts
return true
return ba, true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+
"ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba = ba.ShallowCopy()
ba.Txn = ba.Txn.Clone()
ba.Txn.BumpReadTimestamp(ts)
ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp
return true
return ba, true
}
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,13 @@ func canDoServersideRetry(
ba *kvpb.BatchRequest,
g *concurrency.Guard,
deadline hlc.Timestamp,
) bool {
) (*kvpb.BatchRequest, bool) {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called without error")
}
if ba.Txn != nil {
if !ba.CanForwardReadTimestamp {
return false
return ba, false
}
if !deadline.IsEmpty() {
log.Fatal(ctx, "deadline passed for transactional request")
Expand All @@ -566,7 +566,7 @@ func canDoServersideRetry(
var ok bool
ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
return ba, false
}
} else {
switch tErr := pErr.GetDetail().(type) {
Expand All @@ -577,12 +577,12 @@ func canDoServersideRetry(
newTimestamp = tErr.RetryTimestamp()

default:
return false
return ba, false
}
}

if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
return ba, false
}
return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,9 +939,9 @@ func (r *Replica) evaluateProposal(
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
) (*result.Result, bool, *kvpb.Error) {
) (*kvpb.BatchRequest, *result.Result, bool, *kvpb.Error) {
if ba.Timestamp.IsEmpty() {
return nil, false, kvpb.NewErrorf("can't propose Raft command with zero timestamp")
return ba, nil, false, kvpb.NewErrorf("can't propose Raft command with zero timestamp")
}

// Evaluate the commands. If this returns without an error, the batch should
Expand All @@ -954,7 +954,7 @@ func (r *Replica) evaluateProposal(
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, g, st, ui)
ba, batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, g, st, ui)

// Note: reusing the proposer's batch when applying the command on the
// proposer was explored as an optimization but resulted in no performance
Expand All @@ -965,7 +965,7 @@ func (r *Replica) evaluateProposal(

if pErr != nil {
if _, ok := pErr.GetDetail().(*kvpb.ReplicaCorruptionError); ok {
return &res, false /* needConsensus */, pErr
return ba, &res, false /* needConsensus */, pErr
}

txn := pErr.GetTxn()
Expand All @@ -981,7 +981,7 @@ func (r *Replica) evaluateProposal(
Metrics: res.Local.Metrics,
}
res.Replicated.Reset()
return &res, false /* needConsensus */, pErr
return ba, &res, false /* needConsensus */, pErr
}

// Set the local reply, which is held only on the proposing replica and is
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func (r *Replica) evaluateProposal(
}
}

return &res, needConsensus, nil
return ba, &res, needConsensus, nil
}

// requestToProposal converts a BatchRequest into a ProposalData, by
Expand All @@ -1051,7 +1051,7 @@ func (r *Replica) requestToProposal(
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
) (*ProposalData, *kvpb.Error) {
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, g, st, ui)
ba, res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, g, st, ui)

// Fill out the results even if pErr != nil; we'll return the error below.
proposal := &ProposalData{
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (r *Replica) evalAndPropose(
defer tok.DoneIfNotMoved(ctx)
idKey := raftlog.MakeCmdIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui)
ba = proposal.Request // may have been updated
log.Event(proposal.ctx, "evaluated request")

// If the request hit a server-side concurrency retry error, immediately
Expand Down Expand Up @@ -304,7 +305,7 @@ func (r *Replica) evalAndPropose(
Cmd: proposal.command,
QuotaAlloc: proposal.quotaAlloc,
CmdID: idKey,
Req: ba,
Req: proposal.Request,
// SeedID not set, since this is not a reproposal.
}
if pErr = filter(filterArgs); pErr != nil {
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Replica) executeReadOnlyBatch(
}

var result result.Result
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath)
ba, br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath)

// If the request hit a server-side concurrency retry error, immediately
// propagate the error. Don't assume ownership of the concurrency guard.
Expand Down Expand Up @@ -422,7 +422,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
evalPath batchEvalPath,
) (br *kvpb.BatchResponse, res result.Result, pErr *kvpb.Error) {
) (_ *kvpb.BatchRequest, br *kvpb.BatchResponse, res result.Result, pErr *kvpb.Error) {
log.Event(ctx, "executing read-only batch")

var rootMonitor *mon.BytesMonitor
Expand Down Expand Up @@ -489,7 +489,11 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
// retry at a higher timestamp because it is not isolated at higher
// timestamps.
latchesHeld := g != nil
if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) {
var ok bool
if latchesHeld {
ba, ok = canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{})
}
if !ok {
// TODO(aayush,arul): These metrics are incorrect at the moment since
// hitting this branch does not mean that we won't serverside retry, it
// just means that we will have to reacquire latches.
Expand All @@ -507,9 +511,9 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
EncounteredIntents: res.Local.DetachEncounteredIntents(),
Metrics: res.Local.Metrics,
}
return nil, res, pErr
return ba, nil, res, pErr
}
return br, res, nil
return ba, br, res, nil
}

func (r *Replica) handleReadOnlyLocalEvalResult(
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,9 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError(
// Attempt a server-side retry of the request. Note that we pass nil for
// latchSpans, because we have already released our latches and plan to
// re-acquire them if the retry is allowed.
if !canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) {
var ok bool
ba, ok = canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */)
if !ok {
r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1)
return nil, pErr
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func TestIsOnePhaseCommit(t *testing.T) {
// Emulate what a server actually does and bump the write timestamp when
// possible. This makes some batches with diverged read and write
// timestamps pass isOnePhaseCommit().
maybeBumpReadTimestampToWriteTimestamp(ctx, ba, allSpansGuard())
ba, _ = maybeBumpReadTimestampToWriteTimestamp(ctx, ba, allSpansGuard())

if is1PC := isOnePhaseCommit(ba); is1PC != c.exp1PC {
t.Errorf("expected 1pc=%t; got %t", c.exp1PC, is1PC)
Expand Down Expand Up @@ -8890,7 +8890,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) {
assignSeqNumsForReqs(txn, &txnPut, &txnPut2)
origTxn := txn.Clone()

batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, raftlog.MakeCmdIDKey(), ba, allSpansGuard(), nil, uncertainty.Interval{})
_, batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, raftlog.MakeCmdIDKey(), ba, allSpansGuard(), nil, uncertainty.Interval{})
defer batch.Close()
if pErr != nil {
t.Fatal(pErr)
Expand Down
Loading

0 comments on commit 3b5b022

Please sign in to comment.