Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 169 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4410,6 +4410,7 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
targetTxnIDString.Store("")
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()

// This test relies on unreplicated locks to be replicated on lease transfers.
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
Expand Down Expand Up @@ -4625,7 +4626,7 @@ func TestMaxSpanRequestKeysWithMixedReadWriteBatches(t *testing.T) {
//
// Setup:
//
// Create a split at key=8 to force batch splitting.
// Create a split at key=6 to force batch splitting.
// Write to key=8 to be able to observe if the subsequent delete was effective.
//
// Txn 1:
Expand Down Expand Up @@ -4757,3 +4758,170 @@ func TestRollbackAfterRefreshAndFailedCommit(t *testing.T) {
recording := collectAndFinish()
require.NotContains(t, recording.String(), "failed indeterminate commit recovery: programming error")
}

// TestUnexpectedCommitOnTxnAbortAfterRefresh is a regression test for #151864.
// It is similar to TestRollbackAfterRefreshAndFailedCommit but tests a much
// worse outcome that can be experienced by weak-isolation transactions.
//
// This test issues requests across two transactions. In the presence of the
// bug, Txn 1 ends being committed even though an error is returned to the
// user.
//
// Setup:
//
// Create a split at key=6 to force batch splitting.
//
// Txn 1:
//
// Read key=1 to set timestamp and prevent server side refresh
//
// Txn 2:
//
// Read key=1 to bump timestamp cache on key 1
// Write key=8
//
// Txn1:
//
// Batch{
// Del key=1
// Del key=2
// Del key=8
// Del key=9
// EndTxn
// }
//
// This batch should encounter a write too old error on key=8.
//
// After a successful refresh, our testing filters have arranged for the second
// EndTxn to fail. In the absence of the bug, we expect for this EndTxn failure
// to result in the transaction never committing and an error being returned to
// the client. When the bug existed, transaction recovery (either initiated by
// another transaction or initiated by Txn 1 itself during a rollback issued
// after the injected error) could result in the transaction being erroneously
// committed despite the injected error being returned to the client.
func TestUnexpectedCommitOnTxnAbortAfterRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var (
targetTxnIDString atomic.Value
firstEndTxnSeen atomic.Bool
cmdID atomic.Value
)
cmdID.Store(kvserverbase.CmdIDKey(""))
targetTxnIDString.Store("")
ctx := context.Background()
st := cluster.MakeClusterSettings()
kvcoord.RandomizedTxnAnchorKeyEnabled.Override(ctx, &st.SV, false)

s, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
if fArgs.Req.Header.Txn == nil ||
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
return nil // not our txn
}

t.Logf("proposal from our txn: %s", fArgs.Req)
endTxnReq := fArgs.Req.Requests[len(fArgs.Req.Requests)-1].GetEndTxn()
if endTxnReq != nil {
if !firstEndTxnSeen.Load() {
firstEndTxnSeen.Store(true)
} else {
epoch := fArgs.Req.Header.Txn.Epoch
t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s",
fArgs.Req.Header.Txn.ID.String(), epoch, endTxnReq, fArgs.CmdID)
cmdID.Store(fArgs.CmdID)
}
}
return nil
},
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
t.Logf("failing application for raft cmdID: %s", fArgs.CmdID)
return 0, kvpb.NewErrorf("test injected error")
}
return 0, nil
},
},
},
})
defer s.Stopper().Stop(ctx)

scratchStart, err := s.ScratchRange()
require.NoError(t, err)

scratchKey := func(idx int) roachpb.Key {
key := scratchStart.Clone()
key = append(key, []byte(fmt.Sprintf("key-%03d", idx))...)
return key
}

_, _, err = s.SplitRange(scratchKey(6))
require.NoError(t, err)

tracer := s.TracerI().(*tracing.Tracer)
txn1Ctx, collectAndFinish := tracing.ContextWithRecordingSpan(context.Background(), tracer, "txn1")

txn1Err := db.Txn(txn1Ctx, func(txn1Ctx context.Context, txn1 *kv.Txn) error {
txn1.SetDebugName("txn1")
targetTxnIDString.Store(txn1.ID().String())
// Txn1 must be at either SNAPSHOT or READ COMMITTED with Stepping enabled
// for this bug because it both needs to be in an isolation level that
// allows committing when wrt != rts and the read needs to produce a read
// span that requires a refresh such that we can't do a server-side refresh.
_ = txn1.ConfigureStepping(txn1Ctx, kv.SteppingEnabled)
if err := txn1.SetIsoLevel(isolation.ReadCommitted); err != nil {
return err
}
if _, err = txn1.Get(txn1Ctx, scratchKey(1)); err != nil {
return err
}

// Txn2 now executes, arranging for the WriteTooOld error and the timestamp
// cache bump on the txn's anchor key.
if txn1.Epoch() == 0 {
txn2Ctx := context.Background()
txn2 := db.NewTxn(txn2Ctx, "txn2")
require.NoError(t, err)
_, err = txn2.Get(txn2Ctx, scratchKey(1))
require.NoError(t, err)

err = txn2.Put(txn2Ctx, scratchKey(8), "hello")
require.NoError(t, err)
err = txn2.Commit(txn2Ctx)
require.NoError(t, err)
}

b := txn1.NewBatch()
b.Del(scratchKey(1))
b.Del(scratchKey(2))
b.Del(scratchKey(8))
b.Del(scratchKey(9))
return txn1.CommitInBatch(txn1Ctx, b)
})
val8, err := db.Get(ctx, scratchKey(8))
require.NoError(t, err)

defer func() {
recording := collectAndFinish()
if t.Failed() {
t.Logf("TXN 1 TRACE: %s", recording)
}
}()

if val8.Exists() {
// If val8 _exists_ then it means our transaction did not commit. So really
// any error is correct.
require.Error(t, txn1Err)
} else {
// If val8 doesn't exist, then txn1 was committed so we shouldn't have
// gotten an error or se should have gotten an ambiguous result error.
if txn1Err != nil {
ambigErr := &kvpb.AmbiguousResultError{}
require.ErrorAs(t, txn1Err, &ambigErr, "transaction committed but non-ambiguous error returned")
}
}
}
24 changes: 19 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,31 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
if !ok {
return nil, pErr
}

// If we are in a weak isolation transaction and we had an EndTxn in our
// batch, then this retry faces another hazard: The STAGING record written in
// the first attempt may possibly be satisfied by our future writes.
//
// TODO(ssd): If we were guaranteed to have a BatchResponse if a request was
// evaluated, then we could narrow this further and only bump the refresh
// timestamp if the StagingTimestamp == RefreshTS.
endTxnArg, hasET := ba.GetArg(kvpb.EndTxn)
bumpedRefreshTimestampRequired := hasET && txn.Status == roachpb.STAGING && txn.IsoLevel.ToleratesWriteSkew()
if bumpedRefreshTimestampRequired {
refreshTS = refreshTS.Next()
log.Eventf(ctx, "bumping refresh timestamp to avoid unexpected parallel commit: %s", refreshTS)
}

refreshFrom := txn.ReadTimestamp
refreshToTxn := txn.Clone()
refreshToTxn.BumpReadTimestamp(refreshTS)
switch refreshToTxn.Status {
case roachpb.PENDING:
case roachpb.STAGING:
// If the batch resulted in an error but the EndTxn request succeeded,
// staging the transaction record in the process, downgrade the status
// back to PENDING. Even though the transaction record may have a status
// of STAGING, we know that the transaction failed to implicitly commit.
// staging the transaction record in the process, downgrade the status back
// to PENDING. Even though the transaction record may have a status of
// STAGING, we know that the transaction failed to implicitly commit.
refreshToTxn.Status = roachpb.PENDING
default:
return nil, kvpb.NewError(errors.AssertionFailedf(
Expand All @@ -294,8 +309,7 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(

// To prevent starvation of batches and to ensure the correctness of parallel
// commits, split off the EndTxn request into its own batch on auto-retries.
args, hasET := ba.GetArg(kvpb.EndTxn)
if len(ba.Requests) > 1 && hasET && !args.(*kvpb.EndTxnRequest).Require1PC {
if len(ba.Requests) > 1 && hasET && !endTxnArg.(*kvpb.EndTxnRequest).Require1PC {
log.Eventf(ctx, "sending EndTxn separately from rest of batch on retry")
return sr.splitEndTxnAndRetrySend(ctx, ba)
}
Expand Down