Skip to content


Merge pull request #46848 from andreimatei/backport20.1-46596
Browse files Browse the repository at this point in the history
release-20.1: kvclient/kvcoord: inhibit parallel commit when retrying EndTxn request
  • Loading branch information
andreimatei committed Apr 1, 2020
2 parents e8ab990 + 3beb292 commit ea91416
Show file tree
Hide file tree
Showing 11 changed files with 526 additions and 97 deletions.
276 changes: 275 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Expand Up @@ -38,7 +38,8 @@ import (

Expand Down Expand Up @@ -2630,6 +2631,279 @@ func TestTxnCoordSenderRetries(t *testing.T) {

type pushExpectation int

const (
// expectPusheeTxnRecovery means we're expecting transaction recovery to be
// performed (after finding a STAGING txn record).
expectPusheeTxnRecovery pushExpectation = iota
// expectPusheeTxnRecordNotFound means we're expecting the push to not find the
// pushee txn record.
// dontExpectAnything means we're not going to check the state in which the
// pusher found the pushee's txn record.

type expectedTxnResolution int

const (
expectAborted expectedTxnResolution = iota

// checkPushResult pushes the specified txn and checks that the pushee's
// resolution is the expected one.
func checkPushResult(
ctx context.Context,
db *kv.DB,
txn roachpb.Transaction,
expResolution expectedTxnResolution,
pushExpectation pushExpectation,
) error {
pushReq := roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key,
PusheeTxn: txn.TxnMeta,
PushTo: hlc.Timestamp{},
PushType: roachpb.PUSH_ABORT,
// We're going to Force the push in order to not wait for the pushee to
// expire.
Force: true,
ba := roachpb.BatchRequest{}

recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace")
defer cancel()

resp, pErr := db.NonTransactionalSender().Send(recCtx, ba)
if pErr != nil {
return pErr.GoError()

var statusErr error
pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status
switch pusheeStatus {
case roachpb.ABORTED:
if expResolution != expectAborted {
statusErr = errors.Errorf("transaction unexpectedly aborted")
case roachpb.COMMITTED:
if expResolution != expectCommitted {
statusErr = errors.Errorf("transaction unexpectedly committed")
return errors.Errorf("unexpected txn status: %s", pusheeStatus)

// Verify that we're not fooling ourselves and that checking for the implicit
// commit actually caused the txn recovery procedure to run.
recording := collectRec()
var resolutionErr error
switch pushExpectation {
case expectPusheeTxnRecovery:
expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short())
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"recovery didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
case expectPusheeTxnRecordNotFound:
expMsg := "pushee txn record not found"
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"push didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
case dontExpectAnything:

return errors.CombineErrors(statusErr, resolutionErr)

// Test that, even though at the kvserver level requests are not idempotent
// across an EndTxn, a TxnCoordSender retry of the final batch after a refresh
// still works fine. We check that a transaction is not considered implicitly
// committed through a combination of writes from a previous attempt of the
// EndTxn batch and a STAGING txn record written by a newer attempt of that
// batch.
// Namely, the scenario is as follows:
// 1. client sends CPut(a) + CPut(b) + EndTxn. The CPut(a) is split by the
// DistSender from the rest. Note that the parallel commit mechanism is in
// effect here.
// 2. One of the two sides gets a WriteTooOldError, the other succeeds.
// The client needs to refresh.
// 3. The refresh succeeds.
// 4. The client resends the whole batch (note that we don't keep track of the
// previous partial success).
// 5. The batch is split again, and one of the two sides fails.
// This tests checks that, for the different combinations of failures across the
// two attempts of the request, the transaction is not erroneously considered to
// be committed. We don't want an intent laid down by the first attempt to
// satisfy a STAGING record from the 2nd attempt, or the other way around (an
// intent written in the 2nd attempt satisfying a STAGING record written on the
// first attempt). See subtests for more details.
func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
defer leaktest.AfterTest(t)()

var filterFn atomic.Value
var storeKnobs kvserver.StoreTestingKnobs
storeKnobs.EvalKnobs.TestingEvalFilter =
func(fArgs storagebase.FilterArgs) *roachpb.Error {
fnVal := filterFn.Load()
if fn, ok := fnVal.(func(storagebase.FilterArgs) *roachpb.Error); ok && fn != nil {
return fn(fArgs)
return nil

// The left side is CPut(a), the right side is CPut(b)+EndTxn(STAGING).
type side int
const (
left side = iota

testCases := []struct {
// sidePushedOnFirstAttempt controls which sub-batch will return a
// WriteTooOldError on the first attempt.
sidePushedOnFirstAttempt side
sideRejectedOnSecondAttempt side
txnRecExpectation pushExpectation
// On the first attempt, the left side succeeds in laying down an intent,
// while the right side fails. On the 2nd attempt, the right side succeeds
// while the left side fails.
// The point of this test is to check that the txn is not considered to be
// implicitly committed at this point. Handling this scenario requires
// special care. If we didn't do anything, then we'd end up with a STAGING
// txn record (from the second attempt of the request) and an intent on
// "a" from the first attempt. That intent would have a lower timestamp
// than the txn record and so the txn would be considered explicitly
// committed. If the txn were to be considered implicitly committed, and
// the intent on "a" was resolved, then write on a (when it eventually
// evaluates) might return wrong results, or be pushed, or generally get
// very confused about how its own transaction got committed already.
// We handle this scenario by disabling the parallel commit on the
// request's 2nd attempt. Thus, the EndTxn will be split from all the
// other requests, and the txn record is never written if anything fails.
sidePushedOnFirstAttempt: right,
sideRejectedOnSecondAttempt: left,
// The first attempt of right side contains a parallel commit (i.e. an
// EndTxn), but fails. The 2nd attempt of the right side will no longer
// contain an EndTxn, as explained above. So we expect the txn record to
// not exist.
txnRecExpectation: expectPusheeTxnRecordNotFound,
// On the first attempt, the right side succeed in writing a STAGING txn
// record, but the left side fails. On the second attempt, the right side
// is rejected.
// The point of this test is to check that the txn is not considered
// implicitly committed at this point. All the intents are in place for
// the txn to be considered committed, but we rely on the fact that the
// intent on "a" has a timestamp that's too high (it gets the timestamp
// from the 2nd attempt, after a refresh, but the STAGING txn record has
// an older timestamp). If the txn were to be considered implicitly
// committed, it'd be bad as we are returning an error to the client
// telling it that the EndTxn failed.
sidePushedOnFirstAttempt: left,
sideRejectedOnSecondAttempt: right,
// The first attempt of the right side writes a STAGING txn record, so we
// expect to perform txn recovery.
txnRecExpectation: expectPusheeTxnRecovery,

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
s, _, db := serverutils.StartServer(t,
base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

keyA, keyA1, keyB, keyB1 := roachpb.Key("a"), roachpb.Key("a1"), roachpb.Key("b"), roachpb.Key("b1")
require.NoError(t, setupMultipleRanges(ctx, db, string(keyB)))

origValA := roachpb.MakeValueFromString("initA")
require.NoError(t, db.Put(ctx, keyA, &origValA))
origValB := roachpb.MakeValueFromString("initA")
require.NoError(t, db.Put(ctx, keyB, &origValB))

txn := db.NewTxn(ctx, "test txn")

// Do a write to anchor the txn on b's range.
require.NoError(t, txn.Put(ctx, keyB1, "b1"))

// Take a snapshot of the txn early. We'll use it when verifying if the txn is
// implicitly committed. If we didn't use this early snapshot and, instead,
// used the transaction with a bumped timestamp, then the push code would
// infer that the txn is not implicitly committed without actually running the
// recovery procedure. Using this snapshot mimics a pusher that ran into an
// old intent.
origTxn := txn.TestingCloneTxn()

// Do a read to prevent the txn for performing server-side refreshes.
_, err := txn.Get(ctx, keyA1)
require.NoError(t, err)

// After the txn started, do a conflicting read. This will cause one of
// the txn's upcoming CPuts to return a WriteTooOldError on the first
// attempt, causing in turn to refresh and a retry. Note that, being
// CPuts, the pushed writes don't defer the error by returning the
// WriteTooOld flag instead of a WriteTooOldError.
var readKey roachpb.Key
if tc.sidePushedOnFirstAttempt == left {
readKey = keyA
} else {
readKey = keyB
_, err = db.Get(ctx, readKey)
require.NoError(t, err)

b := txn.NewBatch()
b.CPut(keyA, "a", &origValA)
b.CPut(keyB, "b", &origValB)

var secondAttemptRejectKey roachpb.Key
if tc.sideRejectedOnSecondAttempt == left {
secondAttemptRejectKey = keyA
} else {
secondAttemptRejectKey = keyB

// Install a filter which will reject requests touching
// secondAttemptRejectKey on the retry.
var count int32
filterFn.Store(func(args storagebase.FilterArgs) *roachpb.Error {
put, ok := args.Req.(*roachpb.ConditionalPutRequest)
if !ok {
return nil
if !put.Key.Equal(secondAttemptRejectKey) {
return nil
// Reject the right request on the 2nd attempt.
if count == 2 {
return roachpb.NewErrorf("injected error; test rejecting request")
return nil

require.Error(t, txn.CommitInBatch(ctx, b), "injected")
require.NoError(t, checkPushResult(ctx, db, *origTxn, expectAborted, tc.txnRecExpectation))

// Test that we're being smart about the timestamp ranges that need to be
// refreshed: when span are refreshed, they only need to be checked for writes
// above the previous time when they've been refreshed, not from the
Expand Down
48 changes: 46 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Expand Up @@ -138,16 +138,42 @@ func (tc *txnCommitter) SendLocked(
// set. This is the only place where EndTxnRequest.Key is assigned, but we
// could be dealing with a re-issued batch after a refresh. Remember, the
// committer is below the span refresh on the interceptor stack.
var etAttempt endTxnAttempt
if et.Key == nil {
et.Key = ba.Txn.Key
etAttempt = endTxnFirstAttempt
} else {
// If this is a retry, we'll disable parallel commit. Since the previous
// attempt might have partially succeeded (i.e. the batch might have been
// split into sub-batches and some of them might have evaluated
// successfully), there might be intents laying around. If we'd perform a
// parallel commit, and the batch gets split again, and the STAGING txn
// record were written before we evaluate some of the other sub-batche. We
// could technically enter the "implicitly committed" state before all the
// sub-batches are evaluated and this is problematic: there's a race between
// evaluating those requests and other pushers coming along and
// transitioning the txn to explicitly committed (and cleaning up all the
// intents), and the evaluations of the outstanding sub-batches. If the
// randos win, then the re-evaluations will fail because we don't have
// idempotency of evaluations across a txn commit (for example, the
// re-evaluations might notice that their transaction is already committed
// and get confused).
etAttempt = endTxnRetry
if len(et.InFlightWrites) > 0 {
// Make a copy of the EndTxn, since we're going to change it below to
// disable the parallel commit.
etCpy := *et
et = &etCpy

// Determine whether the commit request can be run in parallel with the rest
// of the requests in the batch. If not, move the in-flight writes currently
// attached to the EndTxn request to the LockSpans and clear the in-flight
// write set; no writes will be in-flight concurrently with the EndTxn
// request.
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et) {
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et, etAttempt) {
// NB: when parallel commits is disabled, this is the best place to
// detect whether the batch has only distinct spans. We can set this
// flag based on whether any of previously declared in-flight writes
Expand Down Expand Up @@ -208,6 +234,8 @@ func (tc *txnCommitter) SendLocked(
// could be a combination of protos from responses, all merged by
// DistSender.
if pErr := needTxnRetryAfterStaging(br); pErr != nil {
log.VEventf(ctx, 2, "parallel commit failed since some writes were pushed. "+
"Synthesized err: %s", pErr)
return nil, pErr

Expand Down Expand Up @@ -274,17 +302,33 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn(
return br, nil

// endTxnAttempt specifies whether it's the first time that we're attempting to
// evaluate an EndTxn request or whether it's a retry (i.e. after a successful
// refresh). There are some precautions we need to take when sending out
// retries.
type endTxnAttempt int

const (
endTxnFirstAttempt endTxnAttempt = iota

// canCommitInParallel determines whether the batch can issue its committing
// EndTxn in parallel with the rest of its requests and with any in-flight
// writes, which all should have corresponding QueryIntent requests in the
// batch.
func (tc *txnCommitter) canCommitInParallel(
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest,
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, etAttempt endTxnAttempt,
) bool {
if !parallelCommitsEnabled.Get(& {
return false

if etAttempt == endTxnRetry {
log.VEventf(ctx, 2, "retrying batch not eligible for parallel commit")
return false

// We're trying to parallel commit, not parallel abort.
if !et.Commit {
return false
Expand Down

0 comments on commit ea91416

Please sign in to comment.