Skip to content

Commit

Permalink
sql: add max_retries_for_read_committed_transactions session variable
Browse files Browse the repository at this point in the history
This setting controls how many retries will be performed for a statement
inside of an explicit READ COMMITTED transaction.

Release note: None
  • Loading branch information
rafiss committed Jul 18, 2023
1 parent 6cffb7e commit 633add0
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 5 deletions.
8 changes: 3 additions & 5 deletions pkg/sql/conn_executor_exec.go
Expand Up @@ -977,14 +977,12 @@ func (ex *connExecutor) execStmtInOpenState(
}
}

maxExecCount := 1
maxRetries := 0
if readCommittedSavePointToken != nil {
// TODO(rafi): Make this configurable.
const maxExecCountForReadCommitted = 5
maxExecCount = maxExecCountForReadCommitted
maxRetries = int(ex.sessionData().MaxRetriesForReadCommittedTransactions)
}

for attemptNum := 0; attemptNum < maxExecCount; attemptNum++ {
for attemptNum := 0; attemptNum <= maxRetries; attemptNum++ {
var bufferPos int
if readCommittedSavePointToken != nil {
bufferPos = res.BufferedResultsLen()
Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/conn_executor_test.go
Expand Up @@ -1508,6 +1508,35 @@ func TestInjectRetryErrors(t *testing.T) {
_, err = db.ExecContext(ctx, "DROP TABLE t")
require.NoError(t, err)
})

t.Run("read_committed_txn", func(t *testing.T) {
tx, err := db.BeginTx(ctx, &gosql.TxOptions{Isolation: gosql.LevelReadCommitted})
require.NoError(t, err)

var txRes int
err = tx.QueryRow("SELECT $1::int8", 3).Scan(&txRes)
require.NoError(t, err)
require.NoError(t, tx.Commit())
require.Equal(t, 3, txRes)
})

t.Run("read_committed_txn_retries_exceeded", func(t *testing.T) {
// inject_retry_errors_enabled is hardcoded to always inject an error
// 3 times, so if we lower max_retries_for_read_committed_transactions,
// the error should bubble up to the client.
_, err := db.Exec("SET max_retries_for_read_committed_transactions = 2")
require.NoError(t, err)
tx, err := db.BeginTx(ctx, &gosql.TxOptions{Isolation: gosql.LevelReadCommitted})
require.NoError(t, err)

var txRes int
err = tx.QueryRow("SELECT $1::int8", 3).Scan(&txRes)

pqErr := (*pq.Error)(nil)
require.ErrorAs(t, err, &pqErr)
require.Equal(t, "40001", string(pqErr.Code), "expected a transaction retry error code. got %v", pqErr)
require.NoError(t, tx.Rollback())
})
}

func TestInjectRetryOnCommitErrors(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Expand Up @@ -3442,6 +3442,10 @@ func (m *sessionDataMutator) SetInjectRetryErrorsEnabled(val bool) {
m.data.InjectRetryErrorsEnabled = val
}

func (m *sessionDataMutator) SetMaxRetriesForReadCommittedTransactions(val int32) {
m.data.MaxRetriesForReadCommittedTransactions = int64(val)
}

func (m *sessionDataMutator) SetJoinReaderOrderingStrategyBatchSize(val int64) {
m.data.JoinReaderOrderingStrategyBatchSize = val
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Expand Up @@ -405,6 +405,10 @@ message LocalOnlySessionData {
// ReplicationMode represents the replication parameter passed in during
// connection time.
ReplicationMode replication_mode = 106;
// MaxRetriesForReadCommittedTransactions indicates the maximum number of
// automatic retries to perform for statements in explicit READ COMMITTED
// transactions that see a transaction retry error.
int64 max_retries_for_read_committed_transactions = 107;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
30 changes: 30 additions & 0 deletions pkg/sql/vars.go
Expand Up @@ -2028,6 +2028,36 @@ var varGen = map[string]sessionVar{
GlobalDefault: globalFalse,
},

// CockroachDB extension. Configures the maximum number of automatic retries
// to perform for statements in explicit READ COMMITTED transactions that
// see a transaction retry error.
`max_retries_for_read_committed_transactions`: {
GetStringVal: makeIntGetStringValFn(`max_retries_for_read_committed_transactions`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
if b < 0 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"cannot set max_retries_for_read_committed_transactions to a negative value: %d", b)
}
if b > math.MaxInt32 {
return pgerror.Newf(pgcode.InvalidParameterValue,
"cannot set max_retries_for_read_committed_transactions to a value greater than %d", math.MaxInt32)
}

m.SetMaxRetriesForReadCommittedTransactions(int32(b))
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return strconv.FormatInt(int64(evalCtx.SessionData().OptSplitScanLimit), 10), nil
},
GlobalDefault: func(sv *settings.Values) string {
return "5"
},
},

// CockroachDB extension.
`join_reader_ordering_strategy_batch_size`: {
Set: func(_ context.Context, m sessionDataMutator, s string) error {
Expand Down

0 comments on commit 633add0

Please sign in to comment.