Skip to content

Commit

Permalink
storage: rationalize server-side refreshes and fix bugs
Browse files Browse the repository at this point in the history
Before this patch, we had several issues due to the server erroneously
considering that it's OK to commit a transaction at a bumped timestamp.

One of the issues was a lost update: a CPut could erroneously succeed
even though there's been a more recent write. This was caused by faulty
code in evaluateBatch() that was thinking that, just because an EndTxn
claimed to have been able to commit a transaction, that means that any
WriteTooOldError encountered previously by the batch was safe to
discard. An EndTxn might consider that it can commit even if there had
been previous write too old conditions if the NoRefreshSpans flag is
set. The problems is that a CPut that had returned a WriteTooOldError
also evaluated at the wrong read timestamp, and so its evaluation can't
be relied on.

Another issue is that, when the EndTxn code mentioned above considers
that it's safe to commit at a bumped timestamp, it doesn't take into
considerations that the EndTxn's batch might have performed reads (other
than CPuts)  that have been evaluated at a lower timestamp. This can
happen, for example in the following scenario: - a txn sends a Put which
gets bumped by the ts cache - the txn then sends a Scan + EndTxn. The
scan gets evaluated at the original timestamp, but then we commit at a
bumped one because the NoRefreshSpans flag is set.

The patch fixes the bugs by reworking how evaluation takes advantage of
the fact that some requests have flexible timestamps. EndTxn no longer
is in the business of committing at bumped timestamps, and its code is
thus simplified. Instead, the replica's "local retries" loop takes over.
The replica already had code handling non-transactional batches that
evaluated them repeatedly in case of WriteTooOldErrors. This patch
rationalizes and expands this code to deal with transactional batches
too, and with pushes besides WriteTooOldErrors. This reevaluation loop
now handles the cases in which the EndTxn used to bump the commit
timestamp.

The patch also fixes a third bug: the logic evaluateBatch() for
resetting the WriteTooOld state after a successful EndTransaction was
ignoring the STAGING state, meaning that the server would return a
WriteTooOldError even though the transaction was committed. I'm not sure
if this had dramatic consequences or was benign...

Fixes #42849

Release note (bug fix): A bug causing lost update transaction anomalies
was fixed.
  • Loading branch information
andreimatei committed Jan 15, 2020
1 parent f37f977 commit 1edb0d5
Show file tree
Hide file tree
Showing 19 changed files with 572 additions and 409 deletions.
57 changes: 52 additions & 5 deletions pkg/kv/dist_sender_server_test.go
Expand Up @@ -2072,7 +2072,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// No retries, 1pc commit.
},
{
name: "require1PC commit with injected possible replay error",
name: "require1PC commit with injected unknown serializable error",
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Put("a", "put")
Expand Down Expand Up @@ -2427,6 +2427,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expFailure: "unexpected value", // condition failed error when failing on tombstones
},
{
// This test sends a 1PC batch with Put+EndTxn.
// The Put gets a write too old error but, since there's no refresh spans,
// the commit succeeds.
name: "write too old with put in batch commit",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
Expand All @@ -2438,6 +2441,28 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
// No retries, 1pc commit.
},
{
// This test is like the previous one in that the commit batch succeeds at
// an updated timestamp, but this time the EndTxn puts the
// transaction in the STAGING state instead of COMMITTED because there had
// been previous write in a different batch. Like above, the commit is
// successful since there are no refresh spans.
name: "write too old in staging commit",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
if err := txn.Put(ctx, "aother", "another put"); err != nil {
return err
}
b := txn.NewBatch()
b.Put("a", "final value")
return txn.CommitInBatch(ctx, b)
},
},
{
name: "write too old with cput in batch commit",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
Expand All @@ -2451,7 +2476,28 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("a", "cput", strToValue("put"))
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
// No retries, 1pc commit.
// No client-side retries, 1PC commit. On the server-side, the batch is
// evaluated twice: once at the original timestamp, where it gets a
// WriteTooOldError, and then once at the pushed timestamp. The
// server-side retry is enabled by the fact that there have not been any
// previous reads and so the transaction can commit at a pushed timestamp.
},
{
// This test is like the previous one, except the 1PC batch cannot commit
// at the updated timestamp.
name: "write too old with failed cput in batch commit",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.CPut("a", "cput", strToValue("orig"))
return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry
},
expFailure: "unexpected value", // The CPut cannot succeed.
},
{
name: "multi-range batch with forwarded timestamp",
Expand Down Expand Up @@ -2581,7 +2627,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: true,
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "multi-range batch with deferred write too old and failed cput",
Expand Down Expand Up @@ -2616,8 +2663,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "cput within uncertainty interval",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/txn_coord_sender_test.go
Expand Up @@ -553,6 +553,7 @@ func TestTxnCoordSenderAddIntentOnError(t *testing.T) {
}

func assertTransactionRetryError(t *testing.T, e error) {
t.Helper()
if retErr, ok := e.(*roachpb.TransactionRetryWithProtoRefreshError); ok {
if !testutils.IsError(retErr, "TransactionRetryError") {
t.Fatalf("expected the cause to be TransactionRetryError, but got %s",
Expand Down
18 changes: 15 additions & 3 deletions pkg/kv/txn_interceptor_span_refresher.go
Expand Up @@ -146,9 +146,8 @@ func (sr *txnSpanRefresher) SendLocked(
// Sanity check: we're supposed to control the read timestamp. What we're
// tracking in sr.refreshedTimestamp is not supposed to get out of sync
// with what batches use (which comes from tc.mu.txn).
return nil, roachpb.NewError(errors.AssertionFailedf(
"unexpected batch read timestamp: %s. Expected refreshed timestamp: %s. ba: %s",
batchReadTimestamp, sr.refreshedTimestamp, ba))
log.Fatalf(ctx, "unexpected batch read timestamp: %s. Expected refreshed timestamp: %s. ba: %s",
batchReadTimestamp, sr.refreshedTimestamp, ba)
}

if rArgs, hasET := ba.GetArg(roachpb.EndTxn); hasET {
Expand All @@ -170,6 +169,19 @@ func (sr *txnSpanRefresher) SendLocked(
// Send through wrapped lockedSender. Unlocks while sending then re-locks.
br, pErr, largestRefreshTS := sr.sendLockedWithRefreshAttempts(ctx, ba, maxAttempts)
if pErr != nil {
// The server sometimes "performs refreshes" - it updates the transaction's
// ReadTimestamp when the client said that there's nothing to refresh. In
// these cases, we need to update our refreshedTimestamp too. This is pretty
// inconsequential: the server only does this on an EndTxn. If the
// respective batch succeeds, then there won't be any more requests and so
// sr.refreshedTimestamp doesn't matter (that's why we don't handle the
// success case). However, the server can "refresh" and then return an
// error. In this case, the client will rollback and, if we don't update
// sr.refreshedTimestamp, then an assertion will fire about the rollback's
// timestamp being inconsistent.
if pErr.GetTxn() != nil {
sr.refreshedTimestamp.Forward(pErr.GetTxn().ReadTimestamp)
}
return nil, pErr
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/roachpb/api.go
Expand Up @@ -107,13 +107,6 @@ func IsReadOnly(args Request) bool {
return (flags&isRead) != 0 && (flags&isWrite) == 0
}

// IsReadAndWrite returns true if the request both reads and writes
// (such as conditional puts).
func IsReadAndWrite(args Request) bool {
flags := args.flags()
return (flags&isRead) != 0 && (flags&isWrite) != 0
}

// IsTransactional returns true if the request may be part of a
// transaction.
func IsTransactional(args Request) bool {
Expand Down

0 comments on commit 1edb0d5

Please sign in to comment.