Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: deflake TestReadCommittedStmtRetry #123957

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions pkg/ccl/testccl/sqlccl/read_committed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ func TestReadCommittedStmtRetry(t *testing.T) {
ctx := context.Background()
params := base.TestServerArgs{}

var readCommittedWriteCount atomic.Int64
var finishedReadCommittedScans sync.WaitGroup
finishedReadCommittedScans.Add(1)
var finishedExternalTxn sync.WaitGroup
finishedExternalTxn.Add(1)
var trapReadCommittedWrites atomic.Bool
var trappedReadCommittedWritesOnce sync.Once
finishedReadCommittedScans := make(chan struct{})
finishedExternalTxn := make(chan struct{})
var sawWriteTooOldError atomic.Bool
var codec keys.SQLCodec
var kvTableId uint32

filterFunc := func(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if !trapReadCommittedWrites.Load() {
return nil
}
if ba.Txn == nil || ba.Txn.IsoLevel != isolation.ReadCommitted {
return nil
}
Expand All @@ -61,13 +63,10 @@ func TestReadCommittedStmtRetry(t *testing.T) {
if err != nil || tableID != kvTableId {
return nil
}
// Because of the queries the test executes below, we know that before
// the second read committed write begins, the read committed scans
// will have finished.
if newCount := readCommittedWriteCount.Add(1); newCount == 2 {
finishedReadCommittedScans.Done()
finishedExternalTxn.Wait()
}
trappedReadCommittedWritesOnce.Do(func() {
close(finishedReadCommittedScans)
<-finishedExternalTxn
})
}
}

Expand Down Expand Up @@ -110,6 +109,9 @@ func TestReadCommittedStmtRetry(t *testing.T) {
_, err = tx.Exec(`UPDATE kv SET v = v+10 WHERE k = 'a'`)
require.NoError(t, err)

// Start blocking writes in the read committed transaction.
trapReadCommittedWrites.Store(true)

// Perform a series of reads and writes in the second statement.
// Read from "b" and "c" to establish refresh spans.
// Write to "b" in the transaction, without issue.
Expand All @@ -121,15 +123,15 @@ func TestReadCommittedStmtRetry(t *testing.T) {
})

// Wait for the table to be scanned first.
finishedReadCommittedScans.Wait()
<-finishedReadCommittedScans

// Write to "c" outside the transaction to create a write-write conflict.
_, err = sqlDB.Exec(`UPDATE kv SET v = v+10 WHERE k = 'c'`)
require.NoError(t, err)

// Now let the READ COMMITTED write go through. It should encounter a
// WriteTooOldError and retry.
finishedExternalTxn.Done()
close(finishedExternalTxn)

err = g.Wait()
require.NoError(t, err)
Expand Down
Loading