Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: shallow copy BatchRequest on mutate in tryBumpBatchTimestamp #124634

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,30 +179,31 @@ func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error)
// diverged and where bumping is possible. When possible, this allows the
// transaction to commit without having to retry.
//
// Returns true if the timestamp was bumped.
// Returns true if the timestamp was bumped. Also returns the possibly updated
// batch request, which is shallow-copied on write.
//
// Note that this, like all the server-side bumping of the read timestamp, only
// works for batches that exclusively contain writes; reads cannot be bumped
// like this because they've already acquired timestamp-aware latches.
func maybeBumpReadTimestampToWriteTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard,
) bool {
) (*kvpb.BatchRequest, bool) {
if ba.Txn == nil {
return false
return ba, false
}
if !ba.CanForwardReadTimestamp {
return false
return ba, false
}
if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp {
return false
return ba, false
}
arg, ok := ba.GetArg(kvpb.EndTxn)
if !ok {
return false
return ba, false
}
et := arg.(*kvpb.EndTxnRequest)
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
return ba, false
}
return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp)
}
Expand All @@ -217,30 +218,33 @@ func maybeBumpReadTimestampToWriteTimestamp(
// more freely adjust its timestamp because it will re-acquire latches at
// whatever timestamp the batch is bumped to.
//
// Returns true if the timestamp was bumped. Returns false if the timestamp could
// not be bumped.
// Returns true if the timestamp was bumped. Returns false if the timestamp
// could not be bumped. Also returns the possibly updated batch request, which
// is shallow-copied on write.
func tryBumpBatchTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp,
) bool {
) (*kvpb.BatchRequest, bool) {
if g != nil && !g.IsolatedAtLaterTimestamps() {
return false
return ba, false
}
if ts.Less(ba.Timestamp) {
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
if ba.Txn == nil {
log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp)
ba = ba.ShallowCopy()
ba.Timestamp = ts
return true
return ba, true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+
"ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba = ba.ShallowCopy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't currently have the desired effect, as we're not doing anything with the shallow copy. We'll need to return it if we don't want to mutate the provided reference.

Copy link
Contributor Author

@lyang24 lyang24 May 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for guidance, I am trying to understand this pattern - the data race happens a pointer level, we return a new shallow copy pointer on mutate to prevent race on the same pointer down the line. But this approach cannot prevent the race of the value in memory right?

ba.Txn = ba.Txn.Clone()
ba.Txn.BumpReadTimestamp(ts)
ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp
return true
return ba, true
}
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,13 @@ func canDoServersideRetry(
ba *kvpb.BatchRequest,
g *concurrency.Guard,
deadline hlc.Timestamp,
) bool {
) (*kvpb.BatchRequest, bool) {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called without error")
}
if ba.Txn != nil {
if !ba.CanForwardReadTimestamp {
return false
return ba, false
}
if !deadline.IsEmpty() {
log.Fatal(ctx, "deadline passed for transactional request")
Expand All @@ -566,7 +566,7 @@ func canDoServersideRetry(
var ok bool
ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
return ba, false
}
} else {
switch tErr := pErr.GetDetail().(type) {
Expand All @@ -577,12 +577,12 @@ func canDoServersideRetry(
newTimestamp = tErr.RetryTimestamp()

default:
return false
return ba, false
}
}

if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
return ba, false
}
return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,9 +939,9 @@ func (r *Replica) evaluateProposal(
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
) (*result.Result, bool, *kvpb.Error) {
) (*kvpb.BatchRequest, *result.Result, bool, *kvpb.Error) {
if ba.Timestamp.IsEmpty() {
return nil, false, kvpb.NewErrorf("can't propose Raft command with zero timestamp")
return ba, nil, false, kvpb.NewErrorf("can't propose Raft command with zero timestamp")
}

// Evaluate the commands. If this returns without an error, the batch should
Expand All @@ -954,7 +954,7 @@ func (r *Replica) evaluateProposal(
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, g, st, ui)
ba, batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, g, st, ui)

// Note: reusing the proposer's batch when applying the command on the
// proposer was explored as an optimization but resulted in no performance
Expand All @@ -965,7 +965,7 @@ func (r *Replica) evaluateProposal(

if pErr != nil {
if _, ok := pErr.GetDetail().(*kvpb.ReplicaCorruptionError); ok {
return &res, false /* needConsensus */, pErr
return ba, &res, false /* needConsensus */, pErr
}

txn := pErr.GetTxn()
Expand All @@ -981,7 +981,7 @@ func (r *Replica) evaluateProposal(
Metrics: res.Local.Metrics,
}
res.Replicated.Reset()
return &res, false /* needConsensus */, pErr
return ba, &res, false /* needConsensus */, pErr
}

// Set the local reply, which is held only on the proposing replica and is
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func (r *Replica) evaluateProposal(
}
}

return &res, needConsensus, nil
return ba, &res, needConsensus, nil
}

// requestToProposal converts a BatchRequest into a ProposalData, by
Expand All @@ -1051,7 +1051,7 @@ func (r *Replica) requestToProposal(
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
) (*ProposalData, *kvpb.Error) {
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, g, st, ui)
ba, res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, g, st, ui)

// Fill out the results even if pErr != nil; we'll return the error below.
proposal := &ProposalData{
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (r *Replica) evalAndPropose(
defer tok.DoneIfNotMoved(ctx)
idKey := raftlog.MakeCmdIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui)
ba = proposal.Request // may have been updated
log.Event(proposal.ctx, "evaluated request")

// If the request hit a server-side concurrency retry error, immediately
Expand Down Expand Up @@ -304,7 +305,7 @@ func (r *Replica) evalAndPropose(
Cmd: proposal.command,
QuotaAlloc: proposal.quotaAlloc,
CmdID: idKey,
Req: ba,
Req: proposal.Request,
// SeedID not set, since this is not a reproposal.
}
if pErr = filter(filterArgs); pErr != nil {
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Replica) executeReadOnlyBatch(
}

var result result.Result
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath)
ba, br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath)

// If the request hit a server-side concurrency retry error, immediately
// propagate the error. Don't assume ownership of the concurrency guard.
Expand Down Expand Up @@ -422,7 +422,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
evalPath batchEvalPath,
) (br *kvpb.BatchResponse, res result.Result, pErr *kvpb.Error) {
) (_ *kvpb.BatchRequest, br *kvpb.BatchResponse, res result.Result, pErr *kvpb.Error) {
log.Event(ctx, "executing read-only batch")

var rootMonitor *mon.BytesMonitor
Expand Down Expand Up @@ -489,7 +489,11 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
// retry at a higher timestamp because it is not isolated at higher
// timestamps.
latchesHeld := g != nil
if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) {
var ok bool
if latchesHeld {
ba, ok = canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{})
}
if !ok {
// TODO(aayush,arul): These metrics are incorrect at the moment since
// hitting this branch does not mean that we won't serverside retry, it
// just means that we will have to reacquire latches.
Expand All @@ -507,9 +511,9 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
EncounteredIntents: res.Local.DetachEncounteredIntents(),
Metrics: res.Local.Metrics,
}
return nil, res, pErr
return ba, nil, res, pErr
}
return br, res, nil
return ba, br, res, nil
}

func (r *Replica) handleReadOnlyLocalEvalResult(
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,9 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError(
// Attempt a server-side retry of the request. Note that we pass nil for
// latchSpans, because we have already released our latches and plan to
// re-acquire them if the retry is allowed.
if !canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) {
var ok bool
ba, ok = canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */)
if !ok {
r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1)
return nil, pErr
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func TestIsOnePhaseCommit(t *testing.T) {
// Emulate what a server actually does and bump the write timestamp when
// possible. This makes some batches with diverged read and write
// timestamps pass isOnePhaseCommit().
maybeBumpReadTimestampToWriteTimestamp(ctx, ba, allSpansGuard())
ba, _ = maybeBumpReadTimestampToWriteTimestamp(ctx, ba, allSpansGuard())

if is1PC := isOnePhaseCommit(ba); is1PC != c.exp1PC {
t.Errorf("expected 1pc=%t; got %t", c.exp1PC, is1PC)
Expand Down Expand Up @@ -8890,7 +8890,7 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) {
assignSeqNumsForReqs(txn, &txnPut, &txnPut2)
origTxn := txn.Clone()

batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, raftlog.MakeCmdIDKey(), ba, allSpansGuard(), nil, uncertainty.Interval{})
_, batch, _, _, _, pErr := tc.repl.evaluateWriteBatch(ctx, raftlog.MakeCmdIDKey(), ba, allSpansGuard(), nil, uncertainty.Interval{})
defer batch.Close()
if pErr != nil {
t.Fatal(pErr)
Expand Down
Loading
Loading