Skip to content

Commit

Permalink
kv: deflake and unskip TestPushTxnUpgradeExistingTxn
Browse files Browse the repository at this point in the history
Fixes #64313.

This test would flake if the clock was incremented at all between the creation
of the two transactions. The easiest way to reproduce this was by adding a
`tc.Clock().Now()` call between the calls to `newTransaction`. This was because
a prior iteration would `Forward` the clock to the relevant WallTime, so logical
ticks could cause issues.

This commit deflakes the test by replacing the predetermination of timestamps
with predetermined time offsets. This prevents a clock increment in a previous
iteration from impacting the relative timestamps in the next iteration.

Release justification: testing only
  • Loading branch information
nvanbenschoten committed Sep 3, 2021
1 parent 1266aed commit ecffc89
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5621,45 +5621,47 @@ func TestPushTxnAlreadyCommittedOrAborted(t *testing.T) {
// test cases, the priorities are set such that the push will succeed.
func TestPushTxnUpgradeExistingTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 64313, "flaky test")
defer log.Scope(t).Close(t)
ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
defer stopper.Stop(ctx)
tc.Start(t, stopper)

now := tc.Clock().Now()
ts1 := now.Add(1, 0)
ts2 := now.Add(2, 0)
testCases := []struct {
startTS, ts, expTS hlc.Timestamp
startOffset, pushOffset, expOffset int64
}{
// Noop.
{ts1, ts1, ts1},
{1, 1, 1},
// Move timestamp forward.
{ts1, ts2, ts2},
{1, 2, 2},
// Move timestamp backwards (has no effect).
{ts2, ts1, ts2},
{2, 1, 2},
}

for i, test := range testCases {
now := tc.Clock().Now()
startTS := now.Add(test.startOffset, 0)
pushTS := now.Add(test.pushOffset, 0)
expTS := now.Add(test.expOffset, 0)

key := roachpb.Key(fmt.Sprintf("key-%d", i))
pusher := newTransaction("test", key, 1, tc.Clock())
pushee := newTransaction("test", key, 1, tc.Clock())
pusher := newTransaction("pusher", key, 1, tc.Clock())
pushee := newTransaction("pushee", key, 1, tc.Clock())
pushee.Epoch = 12345
pusher.Priority = enginepb.MaxTxnPriority // Pusher will win

// First, establish "start" of existing pushee's txn via HeartbeatTxn.
pushee.WriteTimestamp = test.startTS
pushee.LastHeartbeat = test.startTS
pushee.ReadTimestamp = test.startTS
pushee.WriteTimestamp = startTS
pushee.LastHeartbeat = startTS
pushee.ReadTimestamp = startTS
hb, hbH := heartbeatArgs(pushee, pushee.WriteTimestamp)
if _, pErr := kv.SendWrappedWith(context.Background(), tc.Sender(), hbH, &hb); pErr != nil {
if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), hbH, &hb); pErr != nil {
t.Fatal(pErr)
}

// Now, attempt to push the transaction using updated timestamp.
pushee.WriteTimestamp = test.ts
pushee.WriteTimestamp = pushTS
args := pushTxnArgs(pusher, pushee, roachpb.PUSH_ABORT)

// Set header timestamp to the maximum of the pusher and pushee timestamps.
Expand All @@ -5674,7 +5676,7 @@ func TestPushTxnUpgradeExistingTxn(t *testing.T) {
expTxn := expTxnRecord.AsTransaction()
expTxn.Priority = enginepb.MaxTxnPriority - 1
expTxn.Epoch = pushee.Epoch // no change
expTxn.WriteTimestamp = test.expTS
expTxn.WriteTimestamp = expTS
expTxn.Status = roachpb.ABORTED

if !reflect.DeepEqual(expTxn, reply.PusheeTxn) {
Expand Down

0 comments on commit ecffc89

Please sign in to comment.