Skip to content

Commit

Permalink
batcheval: add PushTxnResponse.AmbiguousAbort
Browse files Browse the repository at this point in the history
This indicates to the caller that the `ABORTED` status of the pushed
transaction is ambiguous, and the transaction may in fact have been
committed and GCed already. This information is also plumbed through
the `IntentResolver` txn push APIs.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 19, 2024
1 parent c24aedd commit 2378db7
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 43 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,15 @@ message PushTxnResponse {
// TODO(tschottdorf): Maybe this can be a TxnMeta instead; probably requires
// factoring out the new Priority.
Transaction pushee_txn = 2 [(gogoproto.nullable) = false];
// ambiguous_abort is true if pushee_txn has status ABORTED, but the
// transaction may in fact have been committed and GCed already. Concretely,
// this means that the transaction record does not exist, but it may have
// existed in the past (according to the timestamp cache), and we can't know
// whether it committed or aborted so we pessimistically assume it aborted.
//
// NB: this field was added in a patch release, and is not guaranteed to be
// populated prior to 24.1.
bool ambiguous_abort = 3;
}

// A RecoverTxnRequest is arguments to the RecoverTxn() method. It is sent
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func PushTxn(
// then we know we're in either the second or the third case.
reply.PusheeTxn = SynthesizeTxnFromMeta(ctx, cArgs.EvalCtx, args.PusheeTxn)
if reply.PusheeTxn.Status == roachpb.ABORTED {
// The transaction may actually have committed and already removed its
// intents and txn record, or it may have aborted and done the same. We
// can't know, so mark the abort as ambiguous.
reply.AmbiguousAbort = true
// If the transaction is uncommittable, we don't even need to
// persist an ABORTED transaction record, we can just consider it
// aborted. This is good because it allows us to obey the invariant
Expand Down
104 changes: 104 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

// TestPushTxnAmbiguousAbort tests PushTxn behavior when the transaction record
// is missing. In this case, the timestamp cache can tell us whether the
// transaction record may have existed in the past -- if we know it hasn't, then
// the transaction is still pending (e.g. before the record is written), but
// otherwise the transaction record is pessimistically assumed to have aborted.
// However, this state is ambiguous, as the transaction may in fact have
// committed already and GCed its transaction record. Make sure this is
// reflected in the AmbiguousAbort field.
//
// TODO(erikgrinaker): generalize this to test PushTxn more broadly.
func TestPushTxnAmbiguousAbort(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now()))
now := clock.Now()
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()

testutils.RunTrueAndFalse(t, "CanCreateTxnRecord", func(t *testing.T, canCreateTxnRecord bool) {
evalCtx := (&batcheval.MockEvalCtx{
Clock: clock,
CanCreateTxnRecordFn: func() (bool, kvpb.TransactionAbortedReason) {
return canCreateTxnRecord, 0 // PushTxn doesn't care about the reason
},
}).EvalContext()

key := roachpb.Key("foo")
pusheeTxnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(),
Key: key,
MinTimestamp: now,
}

resp := kvpb.PushTxnResponse{}
res, err := batcheval.PushTxn(ctx, engine, batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: kvpb.Header{
Timestamp: clock.Now(),
},
Args: &kvpb.PushTxnRequest{
RequestHeader: kvpb.RequestHeader{Key: key},
PusheeTxn: pusheeTxnMeta,
},
}, &resp)
require.NoError(t, err)

// There is no txn record (the engine is empty). If we can't create a txn
// record, it's because the timestamp cache can't confirm that it didn't
// exist in the past. This will return an ambiguous abort.
var expectUpdatedTxns []*roachpb.Transaction
expectTxn := roachpb.Transaction{
TxnMeta: pusheeTxnMeta,
LastHeartbeat: pusheeTxnMeta.MinTimestamp,
}
if !canCreateTxnRecord {
expectTxn.Status = roachpb.ABORTED
expectUpdatedTxns = append(expectUpdatedTxns, &expectTxn)
}

require.Equal(t, result.Result{
Local: result.LocalResult{
UpdatedTxns: expectUpdatedTxns,
},
}, res)
require.Equal(t, kvpb.PushTxnResponse{
PusheeTxn: expectTxn,
AmbiguousAbort: !canCreateTxnRecord,
}, resp)
})
}
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,22 +742,22 @@ func (c *cluster) makeConfig() concurrency.Config {
// PushTransaction implements the concurrency.IntentResolver interface.
func (c *cluster) PushTransaction(
ctx context.Context, pushee *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *kvpb.Error) {
) (*roachpb.Transaction, bool, *kvpb.Error) {
pusheeRecord, err := c.getTxnRecord(pushee.ID)
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
var pusherRecord *txnRecord
if h.Txn != nil {
pusherID := h.Txn.ID
pusherRecord, err = c.getTxnRecord(pusherID)
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}

push, err := c.registerPush(ctx, pusherID, pushee.ID)
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
defer c.unregisterPush(push)
}
Expand All @@ -782,10 +782,10 @@ func (c *cluster) PushTransaction(
switch {
case pusheeStatus.IsFinalized():
// Already finalized.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == kvpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
Expand All @@ -805,16 +805,16 @@ func (c *cluster) PushTransaction(
err = errors.Errorf("unexpected push type: %s", pushType)
}
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
pusheeTxn, _ = pusheeRecord.asTxn()
return pusheeTxn, nil
return pusheeTxn, false, nil
}
// If PUSH_TOUCH or WaitPolicy_Error, return error instead of waiting.
if pushType == kvpb.PUSH_TOUCH || h.WaitPolicy == lock.WaitPolicy_Error {
log.Eventf(ctx, "pushee not abandoned")
err := kvpb.NewTransactionPushError(*pusheeTxn)
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
// Or the pusher aborted?
var pusherRecordSig chan struct{}
Expand All @@ -824,15 +824,15 @@ func (c *cluster) PushTransaction(
if pusherTxn.Status == roachpb.ABORTED {
log.Eventf(ctx, "detected pusher aborted")
err := kvpb.NewTransactionAbortedError(kvpb.ABORT_REASON_PUSHER_ABORTED)
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
}
// Wait until either record is updated.
select {
case <-pusheeRecordSig:
case <-pusherRecordSig:
case <-ctx.Done():
return nil, kvpb.NewError(ctx.Err())
return nil, false, kvpb.NewError(ctx.Err())
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type IntentResolver interface {
// pushed successfully.
PushTransaction(
context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType,
) (*roachpb.Transaction, *Error)
) (*roachpb.Transaction, bool, *Error)

// ResolveIntent synchronously resolves the provided intent.
ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
Expand Down Expand Up @@ -523,7 +523,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
log.VEventf(ctx, 2, "pushing txn %s to abort", ws.txn.Short())
}

pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
pusheeTxn, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
// If pushing with an Error WaitPolicy and the push fails, then the lock
// holder is still active. Transform the error into a WriteIntentError.
Expand Down Expand Up @@ -705,7 +705,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn(
pushType := kvpb.PUSH_ABORT
log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.Short())

_, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
_, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ import (
)

type mockIntentResolver struct {
pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, *Error)
pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, bool, *Error)
resolveIntent func(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
resolveIntents func(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
}

// mockIntentResolver implements the IntentResolver interface.
func (m *mockIntentResolver) PushTransaction(
ctx context.Context, txn *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
return m.pushTxn(ctx, txn, h, pushType)
}

Expand Down Expand Up @@ -349,7 +349,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
pusheeArg *enginepb.TxnMeta,
h kvpb.Header,
pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
Expand Down Expand Up @@ -384,7 +384,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
g.state = waitingState{kind: doneWaiting}
g.notify()
}
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -552,15 +552,15 @@ func testErrorWaitPush(
pusheeArg *enginepb.TxnMeta,
h kvpb.Header,
pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
require.Equal(t, kvpb.PUSH_TIMESTAMP, pushType)

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, kvpb.NewError(&kvpb.TransactionPushError{
return nil, false, kvpb.NewError(&kvpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -583,7 +583,7 @@ func testErrorWaitPush(
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -724,7 +724,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
pusheeArg *enginepb.TxnMeta,
h kvpb.Header,
pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)

Expand All @@ -737,7 +737,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

// Wait for the context to hit its timeout.
<-ctx.Done()
return nil, kvpb.NewError(ctx.Err())
return nil, false, kvpb.NewError(ctx.Err())
}

require.Equal(t, kvpb.PUSH_ABORT, pushType)
Expand All @@ -748,7 +748,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, kvpb.NewError(&kvpb.TransactionPushError{
return nil, false, kvpb.NewError(&kvpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -771,7 +771,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -831,8 +831,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return nil, err1
) (*roachpb.Transaction, bool, *Error) {
return nil, false, err1
}
err := w.WaitOn(ctx, req, g)
require.Equal(t, err1, err)
Expand All @@ -842,8 +842,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return &pusheeTxn, nil
) (*roachpb.Transaction, bool, *Error) {
return &pusheeTxn, false, nil
}
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate, opts intentresolver.ResolveOptions) *Error {
return err2
Expand Down

0 comments on commit 2378db7

Please sign in to comment.