From 9cb393ed56bcdd78596d29a8618c63cdcb4cd876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 2 Jul 2025 18:55:35 +0200 Subject: [PATCH] chore: allow changing retry mode for inactive tx Allow enabling/disabling automatic retries of transactions after a transaction has been started, but before any statements have been executed. This allows an application to do the following: ``` tx, _ := db.BeginTx(ctx, nil) _, _ = tx.ExecContext(ctx, "set retry_aborts_internally=true") _l _ = tx.ExecContext(ctx, "insert into my_table (id, values) values (1, 'one')") ``` --- conn.go | 2 +- conn_with_mockserver_test.go | 38 ++++++++++++++++++++++++++++++++++ driver_with_mockserver_test.go | 16 ++++++++++++-- transaction.go | 21 +++++++++++++++++++ 4 files changed, 74 insertions(+), 3 deletions(-) diff --git a/conn.go b/conn.go index 57fae247..2ee53cb1 100644 --- a/conn.go +++ b/conn.go @@ -287,7 +287,7 @@ func (c *conn) SetRetryAbortsInternally(retry bool) error { func (c *conn) setRetryAbortsInternally(retry bool) (driver.Result, error) { if c.inTransaction() { - return nil, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "cannot change retry mode while a transaction is active")) + return c.tx.setRetryAbortsInternally(retry) } c.retryAborts = retry return driver.ResultNoRows, nil diff --git a/conn_with_mockserver_test.go b/conn_with_mockserver_test.go index cbdfa256..7da41095 100644 --- a/conn_with_mockserver_test.go +++ b/conn_with_mockserver_test.go @@ -369,3 +369,41 @@ func TestDDLUsingQueryContextInReadWriteTransaction(t *testing.T) { t.Fatalf("error mismatch\n Got: %v\nWant: %v", g, w) } } + +func TestSetRetryAbortsInternallyInInactiveTransaction(t *testing.T) { + t.Parallel() + + db, _, teardown := setupTestDBConnection(t) + defer teardown() + ctx := context.Background() + + tx, err := db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + t.Fatal(err) + } + if _, err := tx.ExecContext(ctx, "set retry_aborts_internally = false"); err != nil { + t.Fatal(err) + } + _ = tx.Rollback() +} + +func TestSetRetryAbortsInternallyInActiveTransaction(t *testing.T) { + t.Parallel() + + db, _, teardown := setupTestDBConnection(t) + defer teardown() + ctx := context.Background() + + tx, err := db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + t.Fatal(err) + } + if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil { + t.Fatal(err) + } + _, err = tx.ExecContext(ctx, "set retry_aborts_internally = false") + if g, w := err.Error(), "spanner: code = \"FailedPrecondition\", desc = \"cannot change retry mode while a transaction is active\""; g != w { + t.Fatalf("error mismatch\n Got: %v\nWant: %v", g, w) + } + _ = tx.Rollback() +} diff --git a/driver_with_mockserver_test.go b/driver_with_mockserver_test.go index ac00c122..818174d1 100644 --- a/driver_with_mockserver_test.go +++ b/driver_with_mockserver_test.go @@ -2673,13 +2673,25 @@ func TestShowAndSetVariableRetryAbortsInternally(t *testing.T) { } } - // Verify that the value cannot be set during a transaction. + // Verify that the value cannot be set during an active transaction. tx, _ := c.BeginTx(ctx, nil) - defer func() { _ = tx.Rollback() }() + // Execute a statement to activate the transaction. + if _, err := c.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil { + t.Fatal(err) + } _, err = c.ExecContext(ctx, "SET RETRY_ABORTS_INTERNALLY = TRUE") if g, w := spanner.ErrCode(err), codes.FailedPrecondition; g != w { t.Fatalf("error code mismatch for setting retry_aborts_internally during a transaction\nGot: %v\nWant: %v", g, w) } + _ = tx.Rollback() + + // Verify that the value can be set at the start of a transaction + // before any statements have been executed. + tx, _ = c.BeginTx(ctx, nil) + if _, err = c.ExecContext(ctx, "SET RETRY_ABORTS_INTERNALLY = TRUE"); err != nil { + t.Fatal(err) + } + _ = tx.Rollback() } func TestPartitionedDml(t *testing.T) { diff --git a/transaction.go b/transaction.go index 26283ef7..b7ec0ca2 100644 --- a/transaction.go +++ b/transaction.go @@ -53,6 +53,8 @@ type contextTransaction interface { AbortBatch() (driver.Result, error) BufferWrite(ms []*spanner.Mutation) error + + setRetryAbortsInternally(retry bool) (driver.Result, error) } type rowIterator interface { @@ -195,6 +197,11 @@ func (tx *readOnlyTransaction) BufferWrite([]*spanner.Mutation) error { return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "read-only transactions cannot write")) } +func (tx *readOnlyTransaction) setRetryAbortsInternally(_ bool) (driver.Result, error) { + // no-op, ignore + return driver.ResultNoRows, nil +} + // ErrAbortedDueToConcurrentModification is returned by a read/write transaction // that was aborted by Cloud Spanner, and where the internal retry attempt // failed because it detected that the results during the retry were different @@ -222,6 +229,8 @@ type readWriteTransaction struct { // rwTx is the underlying Spanner read/write transaction. This transaction // will be replaced with a new one if the initial transaction is aborted. rwTx *spanner.ReadWriteStmtBasedTransaction + // active indicates whether at least one statement has been executed on this transaction. + active bool // batch is any DML batch that is active for this transaction. batch *batch close func(commitTs *time.Time, commitErr error) @@ -391,6 +400,7 @@ func (tx *readWriteTransaction) retry(ctx context.Context) (err error) { // unless internal retries have been disabled. func (tx *readWriteTransaction) Commit() (err error) { tx.logger.Debug("committing transaction") + tx.active = true if err := tx.maybeRunAutoDmlBatch(tx.ctx); err != nil { _ = tx.rollback(tx.ctx) return err @@ -447,6 +457,7 @@ func (tx *readWriteTransaction) resetForRetry(ctx context.Context) error { // transaction is aborted during the query or while iterating the returned rows. func (tx *readWriteTransaction) Query(ctx context.Context, stmt spanner.Statement, execOptions ExecOptions) (rowIterator, error) { tx.logger.Debug("Query", "stmt", stmt.SQL) + tx.active = true if err := tx.maybeRunAutoDmlBatch(ctx); err != nil { return nil, err } @@ -478,6 +489,7 @@ func (tx *readWriteTransaction) partitionQuery(ctx context.Context, stmt spanner func (tx *readWriteTransaction) ExecContext(ctx context.Context, stmt spanner.Statement, statementInfo *statementInfo, options spanner.QueryOptions) (res *result, err error) { tx.logger.Debug("ExecContext", "stmt", stmt.SQL) + tx.active = true if tx.batch != nil { tx.logger.Debug("adding statement to batch") tx.batch.statements = append(tx.batch.statements, stmt) @@ -515,6 +527,7 @@ func (tx *readWriteTransaction) StartBatchDML(options spanner.QueryOptions, auto return nil, spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "This transaction already has an active batch.")) } tx.logger.Debug("starting dml batch in transaction", "automatic", automatic) + tx.active = true tx.batch = &batch{tp: dml, options: ExecOptions{QueryOptions: options}, automatic: automatic} return driver.ResultNoRows, nil } @@ -629,3 +642,11 @@ func errorsEqualForRetry(err1, err2 error) bool { } return false } + +func (tx *readWriteTransaction) setRetryAbortsInternally(retry bool) (driver.Result, error) { + if tx.active { + return nil, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "cannot change retry mode while a transaction is active")) + } + tx.retryAborts = retry + return driver.ResultNoRows, nil +}