Skip to content

Commit

Permalink
kv: shallow copy BatchRequest on mutate in tryBumpBatchTimestamp
Browse files Browse the repository at this point in the history
This avoids a data race on tryBumpBatchTimestamp, which was fallout from the
new logging introduced in ba13697.

Fixes: #124553

Release note: None
  • Loading branch information
lyang24 committed May 25, 2024
1 parent d146ecf commit 9dfcdaf
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 28 deletions.
29 changes: 16 additions & 13 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,23 @@ func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error)
// like this because they've already acquired timestamp-aware latches.
func maybeBumpReadTimestampToWriteTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard,
) bool {
) (*kvpb.BatchRequest, bool) {
if ba.Txn == nil {
return false
return ba, false
}
if !ba.CanForwardReadTimestamp {
return false
return ba, false
}
if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp {
return false
return ba, false
}
arg, ok := ba.GetArg(kvpb.EndTxn)
if !ok {
return false
return ba, false
}
et := arg.(*kvpb.EndTxnRequest)
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
return ba, false
}
return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp)
}
Expand All @@ -221,26 +221,29 @@ func maybeBumpReadTimestampToWriteTimestamp(
// not be bumped.
func tryBumpBatchTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp,
) bool {
) (*kvpb.BatchRequest, bool) {
if g != nil && !g.IsolatedAtLaterTimestamps() {
return false
return ba, false
}
if ts.Less(ba.Timestamp) {
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
if ba.Txn == nil {
log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp)
ba.Timestamp = ts
return true
return ba, true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+
"ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba.Txn = ba.Txn.Clone()
ba.Txn.BumpReadTimestamp(ts)
ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp
return true
txn := ba.Txn.Clone()
txn.BumpReadTimestamp(ts)
readTs := ba.Txn.ReadTimestamp
shallowCopy := ba.ShallowCopy()
shallowCopy.Txn = txn
shallowCopy.Timestamp = readTs // Refresh just updated ReadTimestamp
return shallowCopy, true
}
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,13 @@ func canDoServersideRetry(
ba *kvpb.BatchRequest,
g *concurrency.Guard,
deadline hlc.Timestamp,
) bool {
) (*kvpb.BatchRequest, bool) {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called without error")
}
if ba.Txn != nil {
if !ba.CanForwardReadTimestamp {
return false
return ba, false
}
if !deadline.IsEmpty() {
log.Fatal(ctx, "deadline passed for transactional request")
Expand All @@ -566,7 +566,7 @@ func canDoServersideRetry(
var ok bool
ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
return ba, false
}
} else {
switch tErr := pErr.GetDetail().(type) {
Expand All @@ -577,12 +577,12 @@ func canDoServersideRetry(
newTimestamp = tErr.RetryTimestamp()

default:
return false
return ba, false
}
}

if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
return ba, false
}
return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,9 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
// retry at a higher timestamp because it is not isolated at higher
// timestamps.
latchesHeld := g != nil
if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) {
var canRetry bool
ba, canRetry = canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{})
if !latchesHeld || canRetry {
// TODO(aayush,arul): These metrics are incorrect at the moment since
// hitting this branch does not mean that we won't serverside retry, it
// just means that we will have to reacquire latches.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,8 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError(
// Attempt a server-side retry of the request. Note that we pass nil for
// latchSpans, because we have already released our latches and plan to
// re-acquire them if the retry is allowed.
if !canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) {
var ok bool
if ba, ok = canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */); !ok {
r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1)
return nil, pErr
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,9 @@ func (r *Replica) executeWriteBatch(
// executed as 1PC.
func (r *Replica) canAttempt1PCEvaluation(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard,
) bool {
) (*kvpb.BatchRequest, bool) {
if !isOnePhaseCommit(ba) {
return false
return ba, false
}

// isOnePhaseCommit ensured that the transaction has a non-skewed read/write
Expand All @@ -393,7 +393,7 @@ func (r *Replica) canAttempt1PCEvaluation(
// to check for an existing record.
ok, _ := r.CanCreateTxnRecord(ctx, ba.Txn.ID, ba.Txn.Key, ba.Txn.MinTimestamp)
if !ok {
return false
return ba, false
}
minCommitTS := r.MinTxnCommitTS(ctx, ba.Txn.ID, ba.Txn.Key)
if ba.Timestamp.Less(minCommitTS) {
Expand All @@ -402,7 +402,7 @@ func (r *Replica) canAttempt1PCEvaluation(
// timestamp.
return maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g)
}
return true
return ba, true
}

// evaluateWriteBatch evaluates the supplied batch.
Expand All @@ -428,11 +428,11 @@ func (r *Replica) evaluateWriteBatch(
// timestamp, let's evaluate the batch at the bumped timestamp. This will
// allow serializable transactions to commit. It will also allow transactions
// with any isolation level to attempt the 1PC code path.
maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g)

ba, _ = maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g)
var onePCOk bool
// Attempt 1PC execution, if applicable. If not transactional or there are
// indications that the batch's txn will require retry, execute as normal.
if r.canAttempt1PCEvaluation(ctx, ba, g) {
if ba, onePCOk = r.canAttempt1PCEvaluation(ctx, ba, g); onePCOk {
res := r.evaluate1PC(ctx, idKey, ba, g, st)
switch res.success {
case onePCSucceeded:
Expand Down Expand Up @@ -707,8 +707,9 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes(
if pErr == nil || retries > 0 {
break
}
var serverSideRetryOk bool
// If we can retry, set a higher batch timestamp and continue.
if !canDoServersideRetry(ctx, pErr, ba, g, deadline) {
if ba, serverSideRetryOk = canDoServersideRetry(ctx, pErr, ba, g, deadline); !serverSideRetryOk {
r.store.Metrics().WriteEvaluationServerSideRetryFailure.Inc(1)
break
} else {
Expand Down

0 comments on commit 9dfcdaf

Please sign in to comment.