Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -4616,3 +4617,143 @@ func TestMaxSpanRequestKeysWithMixedReadWriteBatches(t *testing.T) {
})
require.NoError(t, err)
}

// TestRollbackAfterRefreshAndFailedCommit is a regression test for #156698.
//
// This test forces the self-recovery of a transaction after a failed parallel
// commit. It doe this via the following.
//
// Setup:
//
// Create a split at key=8 to force batch splitting.
// Write to key=8 to be able to observe if the subsequent delete was effective.
//
// Txn 1:
//
// Read key=1 to set timestamp and prevent server side refresh.
//
// Txn 2:
//
// Read key=8 to bump timestamp cache on key=8.
//
// Txn1:
//
// Batch{
// Del key=1
// Del key=8
// EndTxn
// }
//
// This batch fails its parallel commit because Del(key=8) has its timestamp
// pushed so its write doesn't satisfy the STAGING record.
//
// After a successful refresh, our testing filters have arranged for the
// second EndTxn to fail. This then results in a Rollback().
//
// In the presence of the bug, the Rollback() would also fail. Without the bug,
// it succeeds. In either case, Txn1 fails so we observe the rollback failure
// via the trace.
func TestRollbackAfterRefreshAndFailedCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var (
targetTxnIDString atomic.Value
firstEndTxnSeen atomic.Bool
cmdID atomic.Value
)
cmdID.Store(kvserverbase.CmdIDKey(""))
targetTxnIDString.Store("")
ctx := context.Background()
st := cluster.MakeClusterSettings()
kvcoord.RandomizedTxnAnchorKeyEnabled.Override(ctx, &st.SV, false)

s, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
if fArgs.Req.Header.Txn == nil ||
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
return nil // not our txn
}
if cmdID.Load().(kvserverbase.CmdIDKey) != "" {
// We already have a command to fail.
return nil
}

t.Logf("proposal from our txn: %s", fArgs.Req)
endTxnReq := fArgs.Req.Requests[len(fArgs.Req.Requests)-1].GetEndTxn()
if endTxnReq != nil {
if !firstEndTxnSeen.Load() {
firstEndTxnSeen.Store(true)
} else {
epoch := fArgs.Req.Header.Txn.Epoch
t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s",
fArgs.Req.Header.Txn.ID.String(), epoch, endTxnReq, fArgs.CmdID)
cmdID.Store(fArgs.CmdID)
}
}
return nil
},
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
t.Logf("failing application for raft cmdID: %s", fArgs.CmdID)
return 0, kvpb.NewErrorf("test injected error")
}
return 0, nil
},
},
},
})
defer s.Stopper().Stop(ctx)

scratchStart, err := s.ScratchRange()
require.NoError(t, err)

scratchKey := func(idx int) roachpb.Key {
key := scratchStart.Clone()
key = append(key, []byte(fmt.Sprintf("key-%03d", idx))...)
return key
}

_, _, err = s.SplitRange(scratchKey(6))
require.NoError(t, err)
require.NoError(t, db.Put(ctx, scratchKey(8), "hello"))

tracer := s.TracerI().(*tracing.Tracer)
txn1Ctx, collectAndFinish := tracing.ContextWithRecordingSpan(context.Background(), tracer, "txn1")

txn1Err := db.Txn(txn1Ctx, func(txn1Ctx context.Context, txn1 *kv.Txn) error {
txn1.SetDebugName("txn1")
targetTxnIDString.Store(txn1.ID().String())
_ = txn1.ConfigureStepping(txn1Ctx, kv.SteppingEnabled)
if _, err = txn1.Get(txn1Ctx, scratchKey(1)); err != nil {
return err
}

// Txn2 now executes, bumping the timestamp on key8.
if txn1.Epoch() == 0 {
txn2Ctx := context.Background()
txn2 := db.NewTxn(txn2Ctx, "txn2")
require.NoError(t, err)
_, err = txn2.Get(txn2Ctx, scratchKey(8))
require.NoError(t, err)
}

b := txn1.NewBatch()
b.Del(scratchKey(1))
b.Del(scratchKey(8)) // Won't satisfy implicit commit.
return txn1.CommitInBatch(txn1Ctx, b)
})

require.Error(t, txn1Err)
val8, err := db.Get(ctx, scratchKey(8))
require.NoError(t, err)
require.True(t, val8.Exists())
// The actual error returned to the user is the injected error. However, we
// want to verify that the transaction's self-recovery did not encounter a
// programming error. So, we check the trace for it.
recording := collectAndFinish()
require.NotContains(t, recording.String(), "failed indeterminate commit recovery: programming error")
}
16 changes: 14 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func EndTxn(
// and various timestamps). We must be careful to update it with the
// supplied ba.Txn if we return it with an error which might be
// retried, as for example to avoid client-side serializable restart.
reply.Txn = &existingTxn
reply.Txn = existingTxn.Clone()

// Verify that we can either commit it or abort it (according
// to args.Commit), and also that the Timestamp and Epoch have
Expand Down Expand Up @@ -498,7 +498,19 @@ func EndTxn(
// committed. Doing so is only possible if we can guarantee that under no
// circumstances can an implicitly committed transaction be rolled back.
if reply.Txn.Status == roachpb.STAGING {
err := kvpb.NewIndeterminateCommitError(*reply.Txn)
// Note that reply.Txn has been updated with the Txn from the request
// header. But, the transaction might have been pushed since it was
// written. In fact, the transaction from the request header might
// actually be in a state that _would have_ been implicitly committed IF
// it had been able to write a transaction record with this new state. We
// use the transaction record from disk to avoid erroneously attempting to
// commit this transaction during recovery. Attempting to commit the
// transaction based on the pushed timestamp would result in an assertion
// failure.
if !recordAlreadyExisted {
return result.Result{}, errors.AssertionFailedf("programming error: transaction in STAGING without transaction record")
}
err := kvpb.NewIndeterminateCommitError(existingTxn)
log.VEventf(ctx, 1, "%v", err)
return result.Result{}, err
}
Expand Down
35 changes: 30 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -185,8 +186,9 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
inFlightWrites []roachpb.SequencedWrite
deadline hlc.Timestamp
// Expected result.
expError string
expTxn *roachpb.TransactionRecord
expError string
expTxn *roachpb.TransactionRecord
validateError func(t *testing.T, err error)
}{
{
// Standard case where a transaction is rolled back when
Expand Down Expand Up @@ -1049,6 +1051,28 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
return &record
}(),
},
{
// Non-standard case where the transaction is being rolled back after a
// successful refresh. In this case we want to be sure that the staging
// record is returned so that we don't attempt transaction recovery using
// the refreshed transaction.
name: "record staging, rollback after refresh.",
// Replica state.
existingTxn: stagingRecord,
// Request state.
headerTxn: refreshedHeaderTxn,
commit: false,
// Expected result.
expError: "found txn in indeterminate STAGING state",
expTxn: stagingRecord,
validateError: func(t *testing.T, err error) {
var icErr *kvpb.IndeterminateCommitError
errors.As(err, &icErr)
require.NotNil(t, icErr)
require.Equal(t, stagingRecord.WriteTimestamp, icErr.StagingTxn.WriteTimestamp)
},
},

{
// Non-standard case where a transaction record is re-staged during
// a parallel commit. The record already exists because of a failed
Expand Down Expand Up @@ -1597,10 +1621,11 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
if !testutils.IsError(err, regexp.QuoteMeta(c.expError)) {
t.Fatalf("expected error %q; found %v", c.expError, err)
}
} else {
if err != nil {
t.Fatal(err)
if c.validateError != nil {
c.validateError(t, err)
}
} else {
require.NoError(t, err)

// Assert that the txn record is written as expected.
var resTxnRecord roachpb.TransactionRecord
Expand Down