Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcheval: add PushTxnResponse.AmbiguousAbort #117969

Merged
merged 1 commit into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,15 @@ message PushTxnResponse {
// TODO(tschottdorf): Maybe this can be a TxnMeta instead; probably requires
// factoring out the new Priority.
Transaction pushee_txn = 2 [(gogoproto.nullable) = false];
// ambiguous_abort is true if pushee_txn has status ABORTED, but the
// transaction may in fact have been committed and GCed already. Concretely,
// this means that the transaction record does not exist, but it may have
// existed in the past (according to the timestamp cache), and we can't know
// whether it committed or aborted so we pessimistically assume it aborted.
//
// NB: this field was added in a patch release, and is not guaranteed to be
// populated prior to 24.1.
bool ambiguous_abort = 3;
}

// A RecoverTxnRequest is arguments to the RecoverTxn() method. It is sent
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ go_test(
"cmd_get_test.go",
"cmd_is_span_empty_test.go",
"cmd_lease_test.go",
"cmd_push_txn_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
"cmd_recover_txn_test.go",
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func PushTxn(
// then we know we're in either the second or the third case.
reply.PusheeTxn = SynthesizeTxnFromMeta(ctx, cArgs.EvalCtx, args.PusheeTxn)
if reply.PusheeTxn.Status == roachpb.ABORTED {
// The transaction may actually have committed and already removed its
// intents and txn record, or it may have aborted and done the same. We
// can't know, so mark the abort as ambiguous.
reply.AmbiguousAbort = true
// If the transaction is uncommittable, we don't even need to
// persist an ABORTED transaction record, we can just consider it
// aborted. This is good because it allows us to obey the invariant
Expand Down
104 changes: 104 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_push_txn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

// TestPushTxnAmbiguousAbort tests PushTxn behavior when the transaction record
// is missing. In this case, the timestamp cache can tell us whether the
// transaction record may have existed in the past -- if we know it hasn't, then
// the transaction is still pending (e.g. before the record is written), but
// otherwise the transaction record is pessimistically assumed to have aborted.
// However, this state is ambiguous, as the transaction may in fact have
// committed already and GCed its transaction record. Make sure this is
// reflected in the AmbiguousAbort field.
//
// TODO(erikgrinaker): generalize this to test PushTxn more broadly.
func TestPushTxnAmbiguousAbort(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now()))
now := clock.Now()
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()

testutils.RunTrueAndFalse(t, "CanCreateTxnRecord", func(t *testing.T, canCreateTxnRecord bool) {
evalCtx := (&batcheval.MockEvalCtx{
Clock: clock,
CanCreateTxnRecordFn: func() (bool, kvpb.TransactionAbortedReason) {
return canCreateTxnRecord, 0 // PushTxn doesn't care about the reason
},
}).EvalContext()

key := roachpb.Key("foo")
pusheeTxnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(),
Key: key,
MinTimestamp: now,
}

resp := kvpb.PushTxnResponse{}
res, err := batcheval.PushTxn(ctx, engine, batcheval.CommandArgs{
EvalCtx: evalCtx,
Header: kvpb.Header{
Timestamp: clock.Now(),
},
Args: &kvpb.PushTxnRequest{
RequestHeader: kvpb.RequestHeader{Key: key},
PusheeTxn: pusheeTxnMeta,
},
}, &resp)
require.NoError(t, err)

// There is no txn record (the engine is empty). If we can't create a txn
// record, it's because the timestamp cache can't confirm that it didn't
// exist in the past. This will return an ambiguous abort.
var expectUpdatedTxns []*roachpb.Transaction
expectTxn := roachpb.Transaction{
TxnMeta: pusheeTxnMeta,
LastHeartbeat: pusheeTxnMeta.MinTimestamp,
}
if !canCreateTxnRecord {
expectTxn.Status = roachpb.ABORTED
expectUpdatedTxns = append(expectUpdatedTxns, &expectTxn)
}

require.Equal(t, result.Result{
Local: result.LocalResult{
UpdatedTxns: expectUpdatedTxns,
},
}, res)
require.Equal(t, kvpb.PushTxnResponse{
PusheeTxn: expectTxn,
AmbiguousAbort: !canCreateTxnRecord,
}, resp)
})
}
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,22 +742,22 @@ func (c *cluster) makeConfig() concurrency.Config {
// PushTransaction implements the concurrency.IntentResolver interface.
func (c *cluster) PushTransaction(
ctx context.Context, pushee *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *kvpb.Error) {
) (*roachpb.Transaction, bool, *kvpb.Error) {
pusheeRecord, err := c.getTxnRecord(pushee.ID)
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
var pusherRecord *txnRecord
if h.Txn != nil {
pusherID := h.Txn.ID
pusherRecord, err = c.getTxnRecord(pusherID)
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}

push, err := c.registerPush(ctx, pusherID, pushee.ID)
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
defer c.unregisterPush(push)
}
Expand All @@ -782,10 +782,10 @@ func (c *cluster) PushTransaction(
switch {
case pusheeStatus.IsFinalized():
// Already finalized.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
return pusheeTxn, false, nil
case pushType == kvpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
Expand All @@ -805,16 +805,16 @@ func (c *cluster) PushTransaction(
err = errors.Errorf("unexpected push type: %s", pushType)
}
if err != nil {
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
pusheeTxn, _ = pusheeRecord.asTxn()
return pusheeTxn, nil
return pusheeTxn, false, nil
}
// If PUSH_TOUCH or WaitPolicy_Error, return error instead of waiting.
if pushType == kvpb.PUSH_TOUCH || h.WaitPolicy == lock.WaitPolicy_Error {
log.Eventf(ctx, "pushee not abandoned")
err := kvpb.NewTransactionPushError(*pusheeTxn)
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
// Or the pusher aborted?
var pusherRecordSig chan struct{}
Expand All @@ -824,15 +824,15 @@ func (c *cluster) PushTransaction(
if pusherTxn.Status == roachpb.ABORTED {
log.Eventf(ctx, "detected pusher aborted")
err := kvpb.NewTransactionAbortedError(kvpb.ABORT_REASON_PUSHER_ABORTED)
return nil, kvpb.NewError(err)
return nil, false, kvpb.NewError(err)
}
}
// Wait until either record is updated.
select {
case <-pusheeRecordSig:
case <-pusherRecordSig:
case <-ctx.Done():
return nil, kvpb.NewError(ctx.Err())
return nil, false, kvpb.NewError(ctx.Err())
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type IntentResolver interface {
// pushed successfully.
PushTransaction(
context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType,
) (*roachpb.Transaction, *Error)
) (*roachpb.Transaction, bool, *Error)

// ResolveIntent synchronously resolves the provided intent.
ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
Expand Down Expand Up @@ -523,7 +523,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
log.VEventf(ctx, 2, "pushing txn %s to abort", ws.txn.Short())
}

pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
pusheeTxn, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
// If pushing with an Error WaitPolicy and the push fails, then the lock
// holder is still active. Transform the error into a WriteIntentError.
Expand Down Expand Up @@ -705,7 +705,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn(
pushType := kvpb.PUSH_ABORT
log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.Short())

_, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
_, _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ import (
)

type mockIntentResolver struct {
pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, *Error)
pushTxn func(context.Context, *enginepb.TxnMeta, kvpb.Header, kvpb.PushTxnType) (*roachpb.Transaction, bool, *Error)
resolveIntent func(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
resolveIntents func(context.Context, []roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
}

// mockIntentResolver implements the IntentResolver interface.
func (m *mockIntentResolver) PushTransaction(
ctx context.Context, txn *enginepb.TxnMeta, h kvpb.Header, pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
return m.pushTxn(ctx, txn, h, pushType)
}

Expand Down Expand Up @@ -349,7 +349,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
pusheeArg *enginepb.TxnMeta,
h kvpb.Header,
pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
Expand Down Expand Up @@ -384,7 +384,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
g.state = waitingState{kind: doneWaiting}
g.notify()
}
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -552,15 +552,15 @@ func testErrorWaitPush(
pusheeArg *enginepb.TxnMeta,
h kvpb.Header,
pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
require.Equal(t, kvpb.PUSH_TIMESTAMP, pushType)

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, kvpb.NewError(&kvpb.TransactionPushError{
return nil, false, kvpb.NewError(&kvpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -583,7 +583,7 @@ func testErrorWaitPush(
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -724,7 +724,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
pusheeArg *enginepb.TxnMeta,
h kvpb.Header,
pushType kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
) (*roachpb.Transaction, bool, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)

Expand All @@ -737,7 +737,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

// Wait for the context to hit its timeout.
<-ctx.Done()
return nil, kvpb.NewError(ctx.Err())
return nil, false, kvpb.NewError(ctx.Err())
}

require.Equal(t, kvpb.PUSH_ABORT, pushType)
Expand All @@ -748,7 +748,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {

resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.PENDING}
if pusheeActive {
return nil, kvpb.NewError(&kvpb.TransactionPushError{
return nil, false, kvpb.NewError(&kvpb.TransactionPushError{
PusheeTxn: *resp,
})
}
Expand All @@ -771,7 +771,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
return nil
}
resp.Status = roachpb.ABORTED
return resp, nil
return resp, false, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down Expand Up @@ -831,8 +831,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return nil, err1
) (*roachpb.Transaction, bool, *Error) {
return nil, false, err1
}
err := w.WaitOn(ctx, req, g)
require.Equal(t, err1, err)
Expand All @@ -842,8 +842,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ kvpb.Header, _ kvpb.PushTxnType,
) (*roachpb.Transaction, *Error) {
return &pusheeTxn, nil
) (*roachpb.Transaction, bool, *Error) {
return &pusheeTxn, false, nil
}
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate, opts intentresolver.ResolveOptions) *Error {
return err2
Expand Down