Skip to content

Commit

Permalink
kvcoord: keep refresh spans after savepoint rollback
Browse files Browse the repository at this point in the history
Previously, when a read occurred between SAVEPOINT and ROLLBACK TO
SAVEPOINT, upon rollback the read was removed from the transaction's
refresh spans. If the transaction's read and write timestamps diverged,
this read would not be refreshed, which could lead to a serializability
violation. Moreover, this behavior diverged from the Postgres behavior,
which considers reads in a subtransaction to belong to the parent
transaction
(https://github.com/postgres/postgres/blob/master/src/backend/storage/lmgr/README-SSI#L461-L467).

The previous behavior was an intentional design choice, which made sense
at the time with the intention of using savepoints to recover from
serialization errors; however, this was never implemented. After some
discussions, we have decided to match the Postgres behavior instead.

This patch adresses the issue by ensuring that all refresh spans
accumulated since a savepoint was created are kept after the savepoint
is rolled back. We don't expect this new behavior to impact customers
because they should already be able to handle serialization errors; in
case any unforeseen customer issues arise, this patch also includes a
private cluster setting to revert to the old behavior.

Fixes: #111228

Release note (sql change): Reads rolled back by savepoints are now
refreshable, matching the Postgres behavior and avoiding potential
serializability violations.
  • Loading branch information
miraradeva committed Nov 28, 2023
1 parent 1248a4a commit 66c74aa
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 49 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type savepoint struct {
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
// TODO(mira): after we remove
// kv.transaction.keep_refresh_spans_on_savepoint_rollback.enabled, we won't
// need these two fields anymore.
refreshSpans []roachpb.Span
refreshInvalid bool
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2997,3 +2997,63 @@ func TestTxnSetIsoLevel(t *testing.T) {
require.Equal(t, prev, txn.IsoLevel())
}
}

// TestRefreshWithSavepoint is an integration test that ensures the correct
// behavior of refreshes under savepoint rollback. The test sets up a write-skew
// example where txn1 reads keyA and writes to keyB, while concurrently txn2
// reads keyB and writes to keyA. The two txns can't be serialized so one is
// expected to get a serialization error upon commit.
//
// However, with the old behavior of discarding refresh spans upon savepoint
// rollback, the read corresponding to the discarded refresh span is not
// refreshed, so the conflict goes unnoticed and both txns commit successfully.
// See #111228 for more details.
func TestRefreshWithSavepoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "keep-refresh-spans", func(t *testing.T, keepRefreshSpans bool) {
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(context.Background())

if keepRefreshSpans {
kvcoord.KeepRefreshSpansOnSavepointRollback.Override(ctx, &s.ClusterSettings().SV, true)
} else {
kvcoord.KeepRefreshSpansOnSavepointRollback.Override(ctx, &s.ClusterSettings().SV, false)
}

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
txn1 := kvDB.NewTxn(ctx, "txn1")
txn2 := kvDB.NewTxn(ctx, "txn2")

spt1, err := txn1.CreateSavepoint(ctx)
require.NoError(t, err)

_, err = txn1.Get(ctx, keyA)
require.NoError(t, err)

err = txn1.RollbackToSavepoint(ctx, spt1)
require.NoError(t, err)

_, err = txn2.Get(ctx, keyB)
require.NoError(t, err)

err = txn1.Put(ctx, keyB, "bb")
require.NoError(t, err)

err = txn2.Put(ctx, keyA, "aa")
require.NoError(t, err)

err = txn1.Commit(ctx)
if keepRefreshSpans {
require.Regexp(t, ".*RETRY_SERIALIZABLE - failed preemptive refresh due to conflicting locks on \"a\"*", err)
} else {
require.NoError(t, err)
}

err = txn2.Commit(ctx)
require.NoError(t, err)
})
}
28 changes: 25 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ var MaxTxnRefreshSpansBytes = settings.RegisterIntSetting(
1<<22, /* 4 MB */
settings.WithPublic)

// KeepRefreshSpansOnSavepointRollback is a boolean flag that, when enabled,
// ensures that all refresh spans accumulated since a savepoint was created are
// kept even after the savepoint is rolled back. This ensures that the reads
// corresponding to the refresh spans are serialized correctly, even though they
// were rolled back. See #111228 for more details.
// When set to true, this setting corresponds to the correct new behavior,
// which also matches the Postgres behavior. We don't expect this new behavior
// to impact customers because they should already be able to handle
// serialization errors; in case any unforeseen customer issues arise, the
// setting here allows us to revert to the old behavior.
// TODO(mira): set the default to true after #113765.
var KeepRefreshSpansOnSavepointRollback = settings.RegisterBoolSetting(
settings.SystemVisible,
"kv.transaction.keep_refresh_spans_on_savepoint_rollback.enabled",
"if enabled, all refresh spans accumulated since a savepoint was created are kept after the savepoint is rolled back",
false)

// txnSpanRefresher is a txnInterceptor that collects the read spans of a
// serializable transaction in the event it gets a serializable retry error. It
// can then use the set of read spans to avoid retrying the transaction if all
Expand Down Expand Up @@ -794,16 +811,21 @@ func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepo
// TODO(nvanbenschoten): make sure this works correctly with ReadCommitted.
// The refresh spans should either be empty when captured into a savepoint or
// should be cleared when the savepoint is rolled back to.
// TODO(mira): after we remove
// kv.transaction.keep_refresh_spans_on_savepoint_rollback.enabled, we won't
// need to keep refresh spans in the savepoint anymore.
s.refreshSpans = make([]roachpb.Span, len(sr.refreshFootprint.asSlice()))
copy(s.refreshSpans, sr.refreshFootprint.asSlice())
s.refreshInvalid = sr.refreshInvalid
}

// rollbackToSavepointLocked is part of the txnInterceptor interface.
func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
sr.refreshFootprint.clear()
sr.refreshFootprint.insert(s.refreshSpans...)
sr.refreshInvalid = s.refreshInvalid
if !KeepRefreshSpansOnSavepointRollback.Get(&sr.st.SV) {
sr.refreshFootprint.clear()
sr.refreshFootprint.insert(s.refreshSpans...)
sr.refreshInvalid = s.refreshInvalid
}
}

// closeLocked implements the txnInterceptor interface.
Expand Down
110 changes: 65 additions & 45 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,57 +1436,77 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) {
func TestTxnSpanRefresherSavepoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
txn := makeTxnProto()
testutils.RunTrueAndFalse(t, "keep-refresh-spans", func(t *testing.T, keepRefreshSpans bool) {
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

read := func(key roachpb.Key) {
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
getArgs := kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: key}}
ba.Add(&getArgs)
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})
br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
}
read(keyA)
require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice())
if keepRefreshSpans {
KeepRefreshSpansOnSavepointRollback.Override(ctx, &tsr.st.SV, true)
} else {
KeepRefreshSpansOnSavepointRollback.Override(ctx, &tsr.st.SV, false)
}

s := savepoint{}
tsr.createSavepointLocked(ctx, &s)
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
txn := makeTxnProto()

// Another read after the savepoint was created.
read(keyB)
require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice())
read := func(key roachpb.Key) {
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
getArgs := kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: key}}
ba.Add(&getArgs)
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())

require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans)
require.False(t, s.refreshInvalid)
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})
br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
}
read(keyA)
require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice())

// Rollback the savepoint and check that refresh spans were overwritten.
tsr.rollbackToSavepointLocked(ctx, s)
require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice())
s := savepoint{}
tsr.createSavepointLocked(ctx, &s)

// Check that rolling back to the savepoint resets refreshInvalid.
tsr.refreshInvalid = true
tsr.rollbackToSavepointLocked(ctx, s)
require.False(t, tsr.refreshInvalid)
// Another read after the savepoint was created.
read(keyB)
require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice())

// Set refreshInvalid and then create a savepoint.
tsr.refreshInvalid = true
s = savepoint{}
tsr.createSavepointLocked(ctx, &s)
require.True(t, s.refreshInvalid)
// Rollback to the savepoint check that refreshes are still invalid.
tsr.rollbackToSavepointLocked(ctx, s)
require.True(t, tsr.refreshInvalid)
require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans)
require.False(t, s.refreshInvalid)

// Rollback the savepoint.
tsr.rollbackToSavepointLocked(ctx, s)
if keepRefreshSpans {
// Check that refresh spans were kept as such.
require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice())
} else {
// Check that refresh spans were overwritten.
require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice())
}

tsr.refreshInvalid = true
tsr.rollbackToSavepointLocked(ctx, s)
if keepRefreshSpans {
// Check that rolling back to the savepoint keeps refreshInvalid as such.
require.True(t, tsr.refreshInvalid)
} else {
// Check that rolling back to the savepoint resets refreshInvalid.
require.False(t, tsr.refreshInvalid)
}

// Set refreshInvalid and then create a savepoint.
tsr.refreshInvalid = true
s = savepoint{}
tsr.createSavepointLocked(ctx, &s)
require.True(t, s.refreshInvalid)
// Rollback to the savepoint check that refreshes are still invalid.
tsr.rollbackToSavepointLocked(ctx, s)
require.True(t, tsr.refreshInvalid)
})
}
2 changes: 1 addition & 1 deletion pkg/kv/kvpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (e *TransactionRetryError) SafeFormatError(p errors.Printer) (next error) {
msg = redact.Sprintf(" - %s", e.ExtraMsg)
}
if e.ConflictingTxn != nil {
msg = redact.Sprintf(" %s - conflicting txn: meta={%s}", msg, e.ConflictingTxn.String())
msg = redact.Sprintf("%s - conflicting txn: meta={%s}", msg, e.ConflictingTxn.String())
}
p.Printf("TransactionRetryError: retry txn (%s%s)", redact.SafeString(TransactionRetryReason_name[int32(e.Reason)]), msg)
return nil
Expand Down

0 comments on commit 66c74aa

Please sign in to comment.