diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index ee300854debf..12f5554e3835 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1317,6 +1317,15 @@ message PushTxnResponse { // TODO(tschottdorf): Maybe this can be a TxnMeta instead; probably requires // factoring out the new Priority. Transaction pushee_txn = 2 [(gogoproto.nullable) = false]; + // ambiguous_abort is true if pushee_txn has status ABORTED, but the + // transaction may in fact have been committed and GCed already. Concretely, + // this means that the transaction record does not exist, but it may have + // existed in the past (according to the timestamp cache), and we can't know + // whether it committed or aborted so we pessimistically assume it aborted. + // + // NB: this field was added in a patch release, and is not guaranteed to be + // populated prior to 24.1. + bool ambiguous_abort = 3; } // A RecoverTxnRequest is arguments to the RecoverTxn() method. It is sent diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index 759dcb2d96e8..79d39c901a6e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -176,6 +176,10 @@ func PushTxn( // then we know we're in either the second or the third case. reply.PusheeTxn = SynthesizeTxnFromMeta(ctx, cArgs.EvalCtx, args.PusheeTxn) if reply.PusheeTxn.Status == roachpb.ABORTED { + // The transaction may actually have committed and already removed its + // intents and txn record, or it may have aborted and done the same. We + // can't know, so mark the abort as ambiguous. + reply.AmbiguousAbort = true // If the transaction is uncommittable, we don't even need to // persist an ABORTED transaction record, we can just consider it // aborted. This is good because it allows us to obey the invariant diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go b/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go new file mode 100644 index 000000000000..2ddaff4741f8 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn_test.go @@ -0,0 +1,104 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestPushTxnAmbiguousAbort tests PushTxn behavior when the transaction record +// is missing. In this case, the timestamp cache can tell us whether the +// transaction record may have existed in the past -- if we know it hasn't, then +// the transaction is still pending (e.g. before the record is written), but +// otherwise the transaction record is pessimistically assumed to have aborted. +// However, this state is ambiguous, as the transaction may in fact have +// committed already and GCed its transaction record. Make sure this is +// reflected in the AmbiguousAbort field. +// +// TODO(erikgrinaker): generalize this to test PushTxn more broadly. +func TestPushTxnAmbiguousAbort(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now())) + now := clock.Now() + engine := storage.NewDefaultInMemForTesting() + defer engine.Close() + + testutils.RunTrueAndFalse(t, "CanCreateTxnRecord", func(t *testing.T, canCreateTxnRecord bool) { + evalCtx := (&batcheval.MockEvalCtx{ + Clock: clock, + CanCreateTxnRecordFn: func() (bool, kvpb.TransactionAbortedReason) { + return canCreateTxnRecord, 0 // PushTxn doesn't care about the reason + }, + }).EvalContext() + + key := roachpb.Key("foo") + pusheeTxnMeta := enginepb.TxnMeta{ + ID: uuid.MakeV4(), + Key: key, + MinTimestamp: now, + } + + resp := kvpb.PushTxnResponse{} + res, err := batcheval.PushTxn(ctx, engine, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Header: kvpb.Header{ + Timestamp: clock.Now(), + }, + Args: &kvpb.PushTxnRequest{ + RequestHeader: kvpb.RequestHeader{Key: key}, + PusheeTxn: pusheeTxnMeta, + }, + }, &resp) + require.NoError(t, err) + + // There is no txn record (the engine is empty). If we can't create a txn + // record, it's because the timestamp cache can't confirm that it didn't + // exist in the past. This will return an ambiguous abort. + var expectUpdatedTxns []*roachpb.Transaction + expectTxn := roachpb.Transaction{ + TxnMeta: pusheeTxnMeta, + LastHeartbeat: pusheeTxnMeta.MinTimestamp, + } + if !canCreateTxnRecord { + expectTxn.Status = roachpb.ABORTED + expectUpdatedTxns = append(expectUpdatedTxns, &expectTxn) + } + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + UpdatedTxns: expectUpdatedTxns, + }, + }, res) + require.Equal(t, kvpb.PushTxnResponse{ + PusheeTxn: expectTxn, + AmbiguousAbort: !canCreateTxnRecord, + }, resp) + }) +} diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 01eb99ddc62d..b267eee8d8af 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -742,22 +742,22 @@ func (c *cluster) makeConfig() concurrency.Config { // PushTransaction implements the concurrency.IntentResolver interface. func (c *cluster) PushTransaction( ctx context.Context, pushee *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, -) (*roachpb.Transaction, *kvpb.Error) { +) (*roachpb.Transaction, bool, *kvpb.Error) { pusheeRecord, err := c.getTxnRecord(pushee.ID) if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } var pusherRecord *txnRecord if h.Txn != nil { pusherID := h.Txn.ID pusherRecord, err = c.getTxnRecord(pusherID) if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } push, err := c.registerPush(ctx, pusherID, pushee.ID) if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } defer c.unregisterPush(push) } @@ -782,10 +782,10 @@ func (c *cluster) PushTransaction( switch { case pusheeStatus.IsFinalized(): // Already finalized. - return pusheeTxn, nil + return pusheeTxn, false, nil case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp): // Already pushed. - return pusheeTxn, nil + return pusheeTxn, false, nil case pushType == kvpb.PUSH_TOUCH: pusherWins = false case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus): @@ -805,16 +805,16 @@ func (c *cluster) PushTransaction( err = errors.Errorf("unexpected push type: %s", pushType) } if err != nil { - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } pusheeTxn, _ = pusheeRecord.asTxn() - return pusheeTxn, nil + return pusheeTxn, false, nil } // If PUSH_TOUCH or WaitPolicy_Error, return error instead of waiting. if pushType == kvpb.PUSH_TOUCH || h.WaitPolicy == lock.WaitPolicy_Error { log.Eventf(ctx, "pushee not abandoned") err := kvpb.NewTransactionPushError(*pusheeTxn) - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } // Or the pusher aborted? var pusherRecordSig chan struct{} @@ -824,7 +824,7 @@ func (c *cluster) PushTransaction( if pusherTxn.Status == roachpb.ABORTED { log.Eventf(ctx, "detected pusher aborted") err := kvpb.NewTransactionAbortedError(kvpb.ABORT_REASON_PUSHER_ABORTED) - return nil, kvpb.NewError(err) + return nil, false, kvpb.NewError(err) } } // Wait until either record is updated. @@ -832,7 +832,7 @@ func (c *cluster) PushTransaction( case <-pusheeRecordSig: case <-pusherRecordSig: case <-ctx.Done(): - return nil, kvpb.NewError(ctx.Err()) + return nil, false, kvpb.NewError(ctx.Err()) } } } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index af6d31e15867..9aefcf3e7c91 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -125,7 +125,7 @@ type IntentResolver interface { // pushed successfully. PushTransaction( context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) + ) (*roachpb.Transaction, bool, *Error) // ResolveIntent synchronously resolves the provided intent. ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error @@ -523,7 +523,7 @@ func (w *lockTableWaiterImpl) pushLockTxn( log.VEventf(ctx, 2, "pushing txn %s to abort", ws.txn.Short()) } - pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + pusheeTxn, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) if err != nil { // If pushing with an Error WaitPolicy and the push fails, then the lock // holder is still active. Transform the error into a WriteIntentError. @@ -705,7 +705,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn( pushType := kvpb.PUSH_ABORT log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.Short()) - _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + _, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) if err != nil { return err } diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index dbbc298868de..4009f2e4a41d 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -38,7 +38,7 @@ import ( ) type mockIntentResolver struct { - pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, *Error) + pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, bool, *Error) resolveIntent func(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error resolveIntents func(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error } @@ -46,7 +46,7 @@ type mockIntentResolver struct { // mockIntentResolver implements the IntentResolver interface. func (m *mockIntentResolver) PushTransaction( ctx context.Context, txn *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, -) (*roachpb.Transaction, *Error) { +) (*roachpb.Transaction, bool, *Error) { return m.pushTxn(ctx, txn, h, pushType) } @@ -349,7 +349,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl pusheeArg *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) require.Equal(t, expPushTS, h.Timestamp) @@ -384,7 +384,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl g.state = waitingState{kind: doneWaiting} g.notify() } - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -552,7 +552,7 @@ func testErrorWaitPush( pusheeArg *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) require.Equal(t, expPushTS, h.Timestamp) @@ -560,7 +560,7 @@ func testErrorWaitPush( resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING} if pusheeActive { - return nil, kvpb.NewError(&kvpb.TransactionPushError{ + return nil, false, kvpb.NewError(&kvpb.TransactionPushError{ PusheeTxn: *resp, }) } @@ -583,7 +583,7 @@ func testErrorWaitPush( return nil } resp.Status = roachpb.ABORTED - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -724,7 +724,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { pusheeArg *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { + ) (*roachpb.Transaction, bool, *Error) { require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) require.Equal(t, req.Txn, h.Txn) @@ -737,7 +737,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { // Wait for the context to hit its timeout. <-ctx.Done() - return nil, kvpb.NewError(ctx.Err()) + return nil, false, kvpb.NewError(ctx.Err()) } require.Equal(t, kvpb.PUSH_ABORT, pushType) @@ -748,7 +748,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING} if pusheeActive { - return nil, kvpb.NewError(&kvpb.TransactionPushError{ + return nil, false, kvpb.NewError(&kvpb.TransactionPushError{ PusheeTxn: *resp, }) } @@ -771,7 +771,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) { return nil } resp.Status = roachpb.ABORTED - return resp, nil + return resp, false, nil } err := w.WaitOn(ctx, req, g) @@ -831,8 +831,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { g.notify() ir.pushTxn = func( _ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { - return nil, err1 + ) (*roachpb.Transaction, bool, *Error) { + return nil, false, err1 } err := w.WaitOn(ctx, req, g) require.Equal(t, err1, err) @@ -842,8 +842,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { g.notify() ir.pushTxn = func( _ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType, - ) (*roachpb.Transaction, *Error) { - return &pusheeTxn, nil + ) (*roachpb.Transaction, bool, *Error) { + return &pusheeTxn, false, nil } ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate, opts intentresolver.ResolveOptions) *Error { return err2 diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index bf590213f9f3..7afcdf10b91f 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -334,21 +334,26 @@ func updateIntentTxnStatus( // PushTransaction takes a transaction and pushes its record using the specified // push type and request header. It returns the transaction proto corresponding -// to the pushed transaction. +// to the pushed transaction, and in the case of an ABORTED transaction, a bool +// indicating whether the abort was ambiguous (see +// PushTxnResponse.AmbiguousAbort). +// +// NB: ambiguousAbort may be false with nodes <24.1. func (ir *IntentResolver) PushTransaction( ctx context.Context, pushTxn *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType, -) (*roachpb.Transaction, *kvpb.Error) { +) (_ *roachpb.Transaction, ambiguousAbort bool, _ *kvpb.Error) { pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta, 1) pushTxns[pushTxn.ID] = pushTxn - pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, false /* skipIfInFlight */) + pushedTxns, ambiguousAbort, pErr := ir.MaybePushTransactions( + ctx, pushTxns, h, pushType, false /* skipIfInFlight */) if pErr != nil { - return nil, pErr + return nil, false, pErr } pushedTxn, ok := pushedTxns[pushTxn.ID] if !ok { log.Fatalf(ctx, "missing PushTxn responses for %s", pushTxn) } - return pushedTxn, nil + return pushedTxn, ambiguousAbort, nil } // MaybePushTransactions tries to push the conflicting transaction(s): @@ -356,8 +361,12 @@ func (ir *IntentResolver) PushTransaction( // it on a write/write conflict, or doing nothing if the transaction is no // longer pending. // -// Returns a set of transaction protos who correspond to the pushed -// transactions and whose intents can now be resolved, and an error. +// Returns a set of transaction protos who correspond to the pushed transactions +// and whose intents can now be resolved, along with a bool indicating whether +// any of the responses were an ambiguous abort (see +// PushTxnResponse.AmbiguousAbort), and an error. +// +// NB: anyAmbiguousAbort may be false with nodes <24.1. // // If skipIfInFlight is true, then no PushTxns will be sent and no intents // will be returned for any transaction for which there is another push in @@ -382,7 +391,7 @@ func (ir *IntentResolver) MaybePushTransactions( h kvpb.Header, pushType kvpb.PushTxnType, skipIfInFlight bool, -) (map[uuid.UUID]*roachpb.Transaction, *kvpb.Error) { +) (_ map[uuid.UUID]*roachpb.Transaction, anyAmbiguousAbort bool, _ *kvpb.Error) { // Decide which transactions to push and which to ignore because // of other in-flight requests. For those transactions that we // will be pushing, increment their ref count in the in-flight @@ -413,7 +422,7 @@ func (ir *IntentResolver) MaybePushTransactions( } ir.mu.Unlock() if len(pushTxns) == 0 { - return nil, nil + return nil, false, nil } pusherTxn := getPusherTxn(h) @@ -439,20 +448,22 @@ func (ir *IntentResolver) MaybePushTransactions( err := ir.db.Run(ctx, b) cleanupInFlightPushes() if err != nil { - return nil, b.MustPErr() + return nil, false, b.MustPErr() } br := b.RawResponse() pushedTxns := make(map[uuid.UUID]*roachpb.Transaction, len(br.Responses)) for _, resp := range br.Responses { - txn := &resp.GetInner().(*kvpb.PushTxnResponse).PusheeTxn + resp := resp.GetInner().(*kvpb.PushTxnResponse) + txn := &resp.PusheeTxn + anyAmbiguousAbort = anyAmbiguousAbort || resp.AmbiguousAbort if _, ok := pushedTxns[txn.ID]; ok { log.Fatalf(ctx, "have two PushTxn responses for %s", txn.ID) } pushedTxns[txn.ID] = txn log.Eventf(ctx, "%s is now %s", txn.ID, txn.Status) } - return pushedTxns, nil + return pushedTxns, anyAmbiguousAbort, nil } // runAsyncTask semi-synchronously runs a generic task function. If @@ -563,7 +574,7 @@ func (ir *IntentResolver) CleanupIntents( } } - pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) + pushedTxns, _, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) if pErr != nil { return 0, errors.Wrapf(pErr.GoError(), "failed to push during intent resolution") } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index c746f26603f8..c21bdd815085 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -149,7 +149,7 @@ func (tp *rangefeedTxnPusher) PushTxns( }, } - pushedTxnMap, pErr := tp.ir.MaybePushTransactions( + pushedTxnMap, _, pErr := tp.ir.MaybePushTransactions( ctx, pushTxnMap, h, kvpb.PUSH_TIMESTAMP, false, /* skipIfInFlight */ ) if pErr != nil {