diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index a0393bf06a6e..df3983262d22 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -4616,3 +4617,143 @@ func TestMaxSpanRequestKeysWithMixedReadWriteBatches(t *testing.T) { }) require.NoError(t, err) } + +// TestRollbackAfterRefreshAndFailedCommit is a regression test for #156698. +// +// This test forces the self-recovery of a transaction after a failed parallel +// commit. It doe this via the following. +// +// Setup: +// +// Create a split at key=8 to force batch splitting. +// Write to key=8 to be able to observe if the subsequent delete was effective. +// +// Txn 1: +// +// Read key=1 to set timestamp and prevent server side refresh. +// +// Txn 2: +// +// Read key=8 to bump timestamp cache on key=8. +// +// Txn1: +// +// Batch{ +// Del key=1 +// Del key=8 +// EndTxn +// } +// +// This batch fails its parallel commit because Del(key=8) has its timestamp +// pushed so its write doesn't satisfy the STAGING record. +// +// After a successful refresh, our testing filters have arranged for the +// second EndTxn to fail. This then results in a Rollback(). +// +// In the presence of the bug, the Rollback() would also fail. Without the bug, +// it succeeds. In either case, Txn1 fails so we observe the rollback failure +// via the trace. +func TestRollbackAfterRefreshAndFailedCommit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var ( + targetTxnIDString atomic.Value + firstEndTxnSeen atomic.Bool + cmdID atomic.Value + ) + cmdID.Store(kvserverbase.CmdIDKey("")) + targetTxnIDString.Store("") + ctx := context.Background() + st := cluster.MakeClusterSettings() + kvcoord.RandomizedTxnAnchorKeyEnabled.Override(ctx, &st.SV, false) + + s, _, db := serverutils.StartServer(t, base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error { + if fArgs.Req.Header.Txn == nil || + fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) { + return nil // not our txn + } + if cmdID.Load().(kvserverbase.CmdIDKey) != "" { + // We already have a command to fail. + return nil + } + + t.Logf("proposal from our txn: %s", fArgs.Req) + endTxnReq := fArgs.Req.Requests[len(fArgs.Req.Requests)-1].GetEndTxn() + if endTxnReq != nil { + if !firstEndTxnSeen.Load() { + firstEndTxnSeen.Store(true) + } else { + epoch := fArgs.Req.Header.Txn.Epoch + t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s", + fArgs.Req.Header.Txn.ID.String(), epoch, endTxnReq, fArgs.CmdID) + cmdID.Store(fArgs.CmdID) + } + } + return nil + }, + TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) { + if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) { + t.Logf("failing application for raft cmdID: %s", fArgs.CmdID) + return 0, kvpb.NewErrorf("test injected error") + } + return 0, nil + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + scratchStart, err := s.ScratchRange() + require.NoError(t, err) + + scratchKey := func(idx int) roachpb.Key { + key := scratchStart.Clone() + key = append(key, []byte(fmt.Sprintf("key-%03d", idx))...) + return key + } + + _, _, err = s.SplitRange(scratchKey(6)) + require.NoError(t, err) + require.NoError(t, db.Put(ctx, scratchKey(8), "hello")) + + tracer := s.TracerI().(*tracing.Tracer) + txn1Ctx, collectAndFinish := tracing.ContextWithRecordingSpan(context.Background(), tracer, "txn1") + + txn1Err := db.Txn(txn1Ctx, func(txn1Ctx context.Context, txn1 *kv.Txn) error { + txn1.SetDebugName("txn1") + targetTxnIDString.Store(txn1.ID().String()) + _ = txn1.ConfigureStepping(txn1Ctx, kv.SteppingEnabled) + if _, err = txn1.Get(txn1Ctx, scratchKey(1)); err != nil { + return err + } + + // Txn2 now executes, bumping the timestamp on key8. + if txn1.Epoch() == 0 { + txn2Ctx := context.Background() + txn2 := db.NewTxn(txn2Ctx, "txn2") + require.NoError(t, err) + _, err = txn2.Get(txn2Ctx, scratchKey(8)) + require.NoError(t, err) + } + + b := txn1.NewBatch() + b.Del(scratchKey(1)) + b.Del(scratchKey(8)) // Won't satisfy implicit commit. + return txn1.CommitInBatch(txn1Ctx, b) + }) + + require.Error(t, txn1Err) + val8, err := db.Get(ctx, scratchKey(8)) + require.NoError(t, err) + require.True(t, val8.Exists()) + // The actual error returned to the user is the injected error. However, we + // want to verify that the transaction's self-recovery did not encounter a + // programming error. So, we check the trace for it. + recording := collectAndFinish() + require.NotContains(t, recording.String(), "failed indeterminate commit recovery: programming error") +} diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index c89e189ee6ca..22a14b6a7f76 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -305,7 +305,7 @@ func EndTxn( // and various timestamps). We must be careful to update it with the // supplied ba.Txn if we return it with an error which might be // retried, as for example to avoid client-side serializable restart. - reply.Txn = &existingTxn + reply.Txn = existingTxn.Clone() // Verify that we can either commit it or abort it (according // to args.Commit), and also that the Timestamp and Epoch have @@ -498,7 +498,19 @@ func EndTxn( // committed. Doing so is only possible if we can guarantee that under no // circumstances can an implicitly committed transaction be rolled back. if reply.Txn.Status == roachpb.STAGING { - err := kvpb.NewIndeterminateCommitError(*reply.Txn) + // Note that reply.Txn has been updated with the Txn from the request + // header. But, the transaction might have been pushed since it was + // written. In fact, the transaction from the request header might + // actually be in a state that _would have_ been implicitly committed IF + // it had been able to write a transaction record with this new state. We + // use the transaction record from disk to avoid erroneously attempting to + // commit this transaction during recovery. Attempting to commit the + // transaction based on the pushed timestamp would result in an assertion + // failure. + if !recordAlreadyExisted { + return result.Result{}, errors.AssertionFailedf("programming error: transaction in STAGING without transaction record") + } + err := kvpb.NewIndeterminateCommitError(existingTxn) log.VEventf(ctx, 1, "%v", err) return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go index 29749208d84e..51aee2b6b7b8 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -185,8 +186,9 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) { inFlightWrites []roachpb.SequencedWrite deadline hlc.Timestamp // Expected result. - expError string - expTxn *roachpb.TransactionRecord + expError string + expTxn *roachpb.TransactionRecord + validateError func(t *testing.T, err error) }{ { // Standard case where a transaction is rolled back when @@ -1049,6 +1051,28 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) { return &record }(), }, + { + // Non-standard case where the transaction is being rolled back after a + // successful refresh. In this case we want to be sure that the staging + // record is returned so that we don't attempt transaction recovery using + // the refreshed transaction. + name: "record staging, rollback after refresh.", + // Replica state. + existingTxn: stagingRecord, + // Request state. + headerTxn: refreshedHeaderTxn, + commit: false, + // Expected result. + expError: "found txn in indeterminate STAGING state", + expTxn: stagingRecord, + validateError: func(t *testing.T, err error) { + var icErr *kvpb.IndeterminateCommitError + errors.As(err, &icErr) + require.NotNil(t, icErr) + require.Equal(t, stagingRecord.WriteTimestamp, icErr.StagingTxn.WriteTimestamp) + }, + }, + { // Non-standard case where a transaction record is re-staged during // a parallel commit. The record already exists because of a failed @@ -1597,10 +1621,11 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) { if !testutils.IsError(err, regexp.QuoteMeta(c.expError)) { t.Fatalf("expected error %q; found %v", c.expError, err) } - } else { - if err != nil { - t.Fatal(err) + if c.validateError != nil { + c.validateError(t, err) } + } else { + require.NoError(t, err) // Assert that the txn record is written as expected. var resTxnRecord roachpb.TransactionRecord