From 27d7edc170df240a2b5597f5a8914175a861eef6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Sat, 4 Oct 2025 14:35:22 +0200 Subject: [PATCH] chore: store temp TransactionOptions in connection state Store temporary TransactionOptions in the connection state as local options. Local options only apply to the current transaction. This simplifies the internal state handling of the driver, as all transaction state should only be read from the connection state, and not also from a temporary variable. This also enables the use of a combination of temporary transaction options and using SQL statements to set further options. The shared library always includes temporary transaction options, as the BeginTransaction function accepts TransactionOptions as an input argument. This meant that using SQL statements to set further transaction options was not supported through the shared library. --- conn.go | 143 ++++++++++++++++--------- connection_properties.go | 21 ++++ driver.go | 24 +++-- driver_with_mockserver_test.go | 2 +- spannerlib/api/transaction_test.go | 164 +++++++++++++++++++++++++++++ transaction.go | 13 +++ 6 files changed, 311 insertions(+), 56 deletions(-) diff --git a/conn.go b/conn.go index 31dc2c41..7b7e5102 100644 --- a/conn.go +++ b/conn.go @@ -275,14 +275,9 @@ type conn struct { // tempExecOptions can be set by passing it in as an argument to ExecContext or QueryContext // and are applied only to that statement. tempExecOptions *ExecOptions - // tempTransactionOptions are temporarily set right before a read/write transaction is started. - tempTransactionOptions *ReadWriteTransactionOptions - // tempReadOnlyTransactionOptions are temporarily set right before a read-only - // transaction is started on a Spanner connection. - tempReadOnlyTransactionOptions *ReadOnlyTransactionOptions - // tempBatchReadOnlyTransactionOptions are temporarily set right before a - // batch read-only transaction is started on a Spanner connection. - tempBatchReadOnlyTransactionOptions *BatchReadOnlyTransactionOptions + // tempTransactionCloseFunc is set right before a transaction is started, and is set as the + // close function for that transaction. + tempTransactionCloseFunc func() } func (c *conn) UnderlyingClient() (*spanner.Client, error) { @@ -1011,8 +1006,10 @@ func (c *conn) options(reset bool) *ExecOptions { TransactionTag: c.TransactionTag(), IsolationLevel: toProtoIsolationLevelOrDefault(c.IsolationLevel()), ReadLockMode: c.ReadLockMode(), + CommitPriority: propertyCommitPriority.GetValueOrDefault(c.state), CommitOptions: spanner.CommitOptions{ - MaxCommitDelay: c.maxCommitDelayPointer(), + MaxCommitDelay: c.maxCommitDelayPointer(), + ReturnCommitStats: propertyReturnCommitStats.GetValueOrDefault(c.state), }, }, PartitionedQueryOptions: PartitionedQueryOptions{}, @@ -1045,16 +1042,43 @@ func (c *conn) resetTransactionForRetry(ctx context.Context, errDuringCommit boo } func (c *conn) withTempTransactionOptions(options *ReadWriteTransactionOptions) { - c.tempTransactionOptions = options + if options == nil { + return + } + c.tempTransactionCloseFunc = options.close + // Start a transaction for the connection state, so we can set the transaction options + // as local options in the current transaction. + _ = c.state.Begin() + if options.DisableInternalRetries { + _ = propertyRetryAbortsInternally.SetLocalValue(c.state, !options.DisableInternalRetries) + } + if options.TransactionOptions.BeginTransactionOption != spanner.DefaultBeginTransaction { + _ = propertyBeginTransactionOption.SetLocalValue(c.state, options.TransactionOptions.BeginTransactionOption) + } + if options.TransactionOptions.CommitOptions.MaxCommitDelay != nil { + _ = propertyMaxCommitDelay.SetLocalValue(c.state, *options.TransactionOptions.CommitOptions.MaxCommitDelay) + } + if options.TransactionOptions.CommitOptions.ReturnCommitStats { + _ = propertyReturnCommitStats.SetLocalValue(c.state, options.TransactionOptions.CommitOptions.ReturnCommitStats) + } + if options.TransactionOptions.TransactionTag != "" { + _ = propertyTransactionTag.SetLocalValue(c.state, options.TransactionOptions.TransactionTag) + } + if options.TransactionOptions.ReadLockMode != spannerpb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED { + _ = propertyReadLockMode.SetLocalValue(c.state, options.TransactionOptions.ReadLockMode) + } + if options.TransactionOptions.IsolationLevel != spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED { + _ = propertyIsolationLevel.SetLocalValue(c.state, toSqlIsolationLevelOrDefault(options.TransactionOptions.IsolationLevel)) + } + if options.TransactionOptions.ExcludeTxnFromChangeStreams { + _ = propertyExcludeTxnFromChangeStreams.SetLocalValue(c.state, options.TransactionOptions.ExcludeTxnFromChangeStreams) + } + if options.TransactionOptions.CommitPriority != spannerpb.RequestOptions_PRIORITY_UNSPECIFIED { + _ = propertyCommitPriority.SetLocalValue(c.state, options.TransactionOptions.CommitPriority) + } } func (c *conn) getTransactionOptions(execOptions *ExecOptions) ReadWriteTransactionOptions { - if c.tempTransactionOptions != nil { - defer func() { c.tempTransactionOptions = nil }() - opts := *c.tempTransactionOptions - opts.TransactionOptions.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.TransactionOptions.BeginTransactionOption) - return opts - } txOpts := ReadWriteTransactionOptions{ TransactionOptions: execOptions.TransactionOptions, DisableInternalRetries: !c.RetryAbortsInternally(), @@ -1075,28 +1099,39 @@ func (c *conn) getTransactionOptions(execOptions *ExecOptions) ReadWriteTransact } func (c *conn) withTempReadOnlyTransactionOptions(options *ReadOnlyTransactionOptions) { - c.tempReadOnlyTransactionOptions = options + if options == nil { + return + } + c.tempTransactionCloseFunc = options.close + // Start a transaction for the connection state, so we can set the transaction options + // as local options in the current transaction. + _ = c.state.Begin() + if options.BeginTransactionOption != spanner.DefaultBeginTransaction { + _ = propertyBeginTransactionOption.SetLocalValue(c.state, options.BeginTransactionOption) + } + if options.TimestampBound.String() != "(strong)" { + _ = propertyReadOnlyStaleness.SetLocalValue(c.state, options.TimestampBound) + } } func (c *conn) getReadOnlyTransactionOptions() ReadOnlyTransactionOptions { - if c.tempReadOnlyTransactionOptions != nil { - defer func() { c.tempReadOnlyTransactionOptions = nil }() - opts := *c.tempReadOnlyTransactionOptions - opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.BeginTransactionOption) - return opts - } return ReadOnlyTransactionOptions{TimestampBound: c.ReadOnlyStaleness(), BeginTransactionOption: c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state))} } func (c *conn) withTempBatchReadOnlyTransactionOptions(options *BatchReadOnlyTransactionOptions) { - c.tempBatchReadOnlyTransactionOptions = options + if options == nil { + return + } + c.tempTransactionCloseFunc = options.close + // Start a transaction for the connection state, so we can set the transaction options + // as local options in the current transaction. + _ = c.state.Begin() + if options.TimestampBound.String() != "(strong)" { + _ = propertyReadOnlyStaleness.SetLocalValue(c.state, options.TimestampBound) + } } func (c *conn) getBatchReadOnlyTransactionOptions() BatchReadOnlyTransactionOptions { - if c.tempBatchReadOnlyTransactionOptions != nil { - defer func() { c.tempBatchReadOnlyTransactionOptions = nil }() - return *c.tempBatchReadOnlyTransactionOptions - } return BatchReadOnlyTransactionOptions{TimestampBound: c.ReadOnlyStaleness()} } @@ -1108,7 +1143,6 @@ func (c *conn) BeginReadOnlyTransaction(ctx context.Context, options *ReadOnlyTr c.withTempReadOnlyTransactionOptions(options) tx, err := c.BeginTx(ctx, driver.TxOptions{ReadOnly: true}) if err != nil { - c.withTempReadOnlyTransactionOptions(nil) return nil, err } return tx, nil @@ -1122,7 +1156,6 @@ func (c *conn) BeginReadWriteTransaction(ctx context.Context, options *ReadWrite c.withTempTransactionOptions(options) tx, err := c.BeginTx(ctx, driver.TxOptions{}) if err != nil { - c.withTempTransactionOptions(nil) return nil, err } return tx, nil @@ -1133,6 +1166,13 @@ func (c *conn) Begin() (driver.Tx, error) { } func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver.Tx, error) { + defer func() { + c.tempTransactionCloseFunc = nil + }() + return c.beginTx(ctx, driverOpts, c.tempTransactionCloseFunc) +} + +func (c *conn) beginTx(ctx context.Context, driverOpts driver.TxOptions, closeFunc func()) (driver.Tx, error) { if c.resetForRetry { c.resetForRetry = false return c.tx, nil @@ -1141,6 +1181,10 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver defer func() { if c.tx != nil { _ = c.state.Begin() + } else { + // Rollback in case the connection state transaction was started before this function + // was called, for example if the caller set temporary transaction options. + _ = c.state.Rollback() } }() @@ -1180,6 +1224,9 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver if batchReadOnly && !driverOpts.ReadOnly { return nil, status.Error(codes.InvalidArgument, "levelBatchReadOnly can only be used for read-only transactions") } + if closeFunc == nil { + closeFunc = func() {} + } if driverOpts.ReadOnly { var logger *slog.Logger @@ -1188,6 +1235,8 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver if batchReadOnly { logger = c.logger.With("tx", "batchro") var err error + // BatchReadOnly transactions (currently) do not support inline-begin. + // This means that the transaction options must be supplied here, and not through a callback. bo, err = c.client.BatchReadOnlyTransaction(ctx, batchReadOnlyTxOpts.TimestampBound) if err != nil { return nil, err @@ -1195,19 +1244,14 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver ro = &bo.ReadOnlyTransaction } else { logger = c.logger.With("tx", "ro") - ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(readOnlyTxOpts.BeginTransactionOption).WithTimestampBound(readOnlyTxOpts.TimestampBound) + ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(readOnlyTxOpts.BeginTransactionOption) } c.tx = &readOnlyTransaction{ roTx: ro, boTx: bo, logger: logger, close: func(result txResult) { - if batchReadOnlyTxOpts.close != nil { - batchReadOnlyTxOpts.close() - } - if readOnlyTxOpts.close != nil { - readOnlyTxOpts.close() - } + closeFunc() if result == txResultCommit { _ = c.state.Commit() } else { @@ -1215,22 +1259,23 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver } c.tx = nil }, + timestampBoundCallback: func() spanner.TimestampBound { + return propertyReadOnlyStaleness.GetValueOrDefault(c.state) + }, } return c.tx, nil } + // These options are only used to determine how to start the transaction. + // All other options are fetched in a callback that is called when the transaction is actually started. + // That callback reads all transaction options from the connection state at that moment. This allows + // applications to execute a series of statement like this: + // BEGIN TRANSACTION; + // SET LOCAL transaction_tag='my_tag'; + // SET LOCAL commit_priority=LOW; + // INSERT INTO my_table ... -- This starts the transaction with the options above included. opts := spanner.TransactionOptions{} - if c.tempTransactionOptions != nil { - opts = c.tempTransactionOptions.TransactionOptions - } - opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.BeginTransactionOption) - tempCloseFunc := func() {} - if c.tempTransactionOptions != nil && c.tempTransactionOptions.close != nil { - tempCloseFunc = c.tempTransactionOptions.close - } - if !disableRetryAborts && c.tempTransactionOptions != nil { - disableRetryAborts = c.tempTransactionOptions.DisableInternalRetries - } + opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state)) tx, err := spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, c.client, opts, func() spanner.TransactionOptions { defer func() { @@ -1249,7 +1294,7 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver logger: logger, rwTx: tx, close: func(result txResult, commitResponse *spanner.CommitResponse, commitErr error) { - tempCloseFunc() + closeFunc() c.prevTx = c.tx c.tx = nil if commitErr == nil { diff --git a/connection_properties.go b/connection_properties.go index be6716b7..84e1cbcd 100644 --- a/connection_properties.go +++ b/connection_properties.go @@ -257,6 +257,27 @@ var propertyMaxCommitDelay = createConnectionProperty( connectionstate.ContextUser, connectionstate.ConvertDuration, ) +var propertyCommitPriority = createConnectionProperty( + "commit_priority", + "Sets the priority for commit RPC invocations from this connection (HIGH/MEDIUM/LOW/UNSPECIFIED). "+ + "The default is UNSPECIFIED.", + spannerpb.RequestOptions_PRIORITY_UNSPECIFIED, + false, + nil, + connectionstate.ContextUser, + func(value string) (spannerpb.RequestOptions_Priority, error) { + return parseRpcPriority(value) + }, +) +var propertyReturnCommitStats = createConnectionProperty( + "return_commit_stats", + "return_commit_stats determines whether transactions should request Spanner to return commit statistics.", + false, + false, + nil, + connectionstate.ContextUser, + connectionstate.ConvertBool, +) // ------------------------------------------------------------------------------------------------ // Statement connection properties. diff --git a/driver.go b/driver.go index 15cec71a..6a381bcc 100644 --- a/driver.go +++ b/driver.go @@ -1148,7 +1148,6 @@ func BeginReadWriteTransaction(ctx context.Context, db *sql.DB, options ReadWrit } tx, err := conn.BeginTx(ctx, &sql.TxOptions{}) if err != nil { - clearTempReadWriteTransactionOptions(conn) return nil, err } return tx, nil @@ -1166,11 +1165,6 @@ func withTempReadWriteTransactionOptions(conn *sql.Conn, options *ReadWriteTrans }) } -func clearTempReadWriteTransactionOptions(conn *sql.Conn) { - _ = withTempReadWriteTransactionOptions(conn, nil) - _ = conn.Close() -} - // ReadOnlyTransactionOptions can be used to create a read-only transaction // on a Spanner connection. type ReadOnlyTransactionOptions struct { @@ -1529,6 +1523,24 @@ func toProtoIsolationLevelOrDefault(level sql.IsolationLevel) spannerpb.Transact return res } +func toSqlIsolationLevel(level spannerpb.TransactionOptions_IsolationLevel) (sql.IsolationLevel, error) { + switch level { + case spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED: + return sql.LevelDefault, nil + case spannerpb.TransactionOptions_SERIALIZABLE: + return sql.LevelSerializable, nil + case spannerpb.TransactionOptions_REPEATABLE_READ: + return sql.LevelRepeatableRead, nil + default: + } + return sql.LevelDefault, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "invalid or unsupported isolation level: %v", level)) +} + +func toSqlIsolationLevelOrDefault(level spannerpb.TransactionOptions_IsolationLevel) sql.IsolationLevel { + res, _ := toSqlIsolationLevel(level) + return res +} + type spannerIsolationLevel sql.IsolationLevel const ( diff --git a/driver_with_mockserver_test.go b/driver_with_mockserver_test.go index ac9cbf79..209c1fa1 100644 --- a/driver_with_mockserver_test.go +++ b/driver_with_mockserver_test.go @@ -5076,7 +5076,7 @@ func TestBeginReadWriteTransaction(t *testing.T) { t.Fatalf("missing transaction for ExecuteSqlRequest") } if req.Transaction.GetId() == nil { - t.Fatalf("missing begin selector for ExecuteSqlRequest") + t.Fatalf("missing ID selector for ExecuteSqlRequest") } if g, w := req.RequestOptions.TransactionTag, tag; g != w { t.Fatalf("transaction tag mismatch\n Got: %v\nWant: %v", g, w) diff --git a/spannerlib/api/transaction_test.go b/spannerlib/api/transaction_test.go index 1f0ec81d..efb6c993 100644 --- a/spannerlib/api/transaction_test.go +++ b/spannerlib/api/transaction_test.go @@ -24,6 +24,7 @@ import ( "cloud.google.com/go/spanner/apiv1/spannerpb" "github.com/googleapis/go-sql-spanner/testutil" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestBeginAndCommit(t *testing.T) { @@ -409,3 +410,166 @@ func TestDdlInTransaction(t *testing.T) { t.Fatalf("ClosePool returned unexpected error: %v", err) } } + +func TestTransactionOptionsAsSqlStatements(t *testing.T) { + t.Parallel() + + ctx := context.Background() + server, teardown := setupMockServer(t) + defer teardown() + dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address) + + poolId, err := CreatePool(ctx, dsn) + if err != nil { + t.Fatalf("CreatePool returned unexpected error: %v", err) + } + connId, err := CreateConnection(ctx, poolId) + if err != nil { + t.Fatalf("CreateConnection returned unexpected error: %v", err) + } + if err := BeginTransaction(ctx, poolId, connId, &spannerpb.TransactionOptions{}); err != nil { + t.Fatalf("BeginTransaction returned unexpected error: %v", err) + } + + // Set some local transaction options. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: "set local transaction_tag = 'my_tag'"}); err != nil { + t.Fatalf("setting transaction_tag returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: "set local retry_aborts_internally = false"}); err != nil { + t.Fatalf("setting retry_aborts_internally returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Execute a statement in the transaction. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: testutil.UpdateBarSetFoo}); err != nil { + t.Fatalf("Execute returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Abort the transaction to verify that the retry_aborts_internally setting was respected. + server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{ + Errors: []error{status.Error(codes.Aborted, "Aborted")}, + }) + + // Commit the transaction. This should fail with an Aborted error. + if _, err := Commit(ctx, poolId, connId); err == nil { + t.Fatal("missing expected error") + } else { + if g, w := spanner.ErrCode(err), codes.Aborted; g != w { + t.Fatalf("error code mismatch\n Got: %v\nWant: %v", g, w) + } + } + + // Verify that the transaction_tag setting was respected. + requests := server.TestSpanner.DrainRequestsFromServer() + executeRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("Execute request count mismatch\n Got: %v\nWant: %v", g, w) + } + executeRequest := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if executeRequest.RequestOptions == nil { + t.Fatalf("Execute request options not set") + } + if g, w := executeRequest.RequestOptions.TransactionTag, "my_tag"; g != w { + t.Fatalf("TransactionTag mismatch\n Got: %v\nWant: %v", g, w) + } + commitRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CommitRequest{})) + if g, w := len(commitRequests), 1; g != w { + t.Fatalf("Commit request count mismatch\n Got: %v\nWant: %v", g, w) + } + commitRequest := commitRequests[0].(*spannerpb.CommitRequest) + if commitRequest.RequestOptions == nil { + t.Fatalf("Commit request options not set") + } + if g, w := commitRequest.RequestOptions.TransactionTag, "my_tag"; g != w { + t.Fatalf("TransactionTag mismatch\n Got: %v\nWant: %v", g, w) + } + + if err := CloseConnection(ctx, poolId, connId); err != nil { + t.Fatalf("CloseConnection returned unexpected error: %v", err) + } + if err := ClosePool(ctx, poolId); err != nil { + t.Fatalf("ClosePool returned unexpected error: %v", err) + } +} + +func TestReadOnlyTransactionOptionsAsSqlStatements(t *testing.T) { + t.Parallel() + + ctx := context.Background() + server, teardown := setupMockServer(t) + defer teardown() + dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address) + + poolId, err := CreatePool(ctx, dsn) + if err != nil { + t.Fatalf("CreatePool returned unexpected error: %v", err) + } + connId, err := CreateConnection(ctx, poolId) + if err != nil { + t.Fatalf("CreateConnection returned unexpected error: %v", err) + } + // Start a read-only transaction without any further options. + if err := BeginTransaction(ctx, poolId, connId, &spannerpb.TransactionOptions{ + Mode: &spannerpb.TransactionOptions_ReadOnly_{ + ReadOnly: &spannerpb.TransactionOptions_ReadOnly{}, + }, + }); err != nil { + t.Fatalf("BeginTransaction returned unexpected error: %v", err) + } + + // Set a local read-only transaction options. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: "set local read_only_staleness = 'exact_staleness 10s'"}); err != nil { + t.Fatalf("setting read_only_staleness returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Execute a statement in the transaction. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: testutil.SelectFooFromBar}); err != nil { + t.Fatalf("Execute returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Commit the transaction to end it. + if _, err := Commit(ctx, poolId, connId); err != nil { + t.Fatalf("commit returned unexpected error: %v", err) + } + + // Verify that the read-only staleness setting was used. + requests := server.TestSpanner.DrainRequestsFromServer() + executeRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("Execute request count mismatch\n Got: %v\nWant: %v", g, w) + } + executeRequest := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if executeRequest.GetTransaction() == nil || executeRequest.GetTransaction().GetBegin() == nil || executeRequest.GetTransaction().GetBegin().GetReadOnly() == nil { + t.Fatal("ExecuteRequest does not contain a BeginTransaction option") + } + + readOnly := executeRequest.GetTransaction().GetBegin().GetReadOnly() + if readOnly.GetExactStaleness() == nil { + t.Fatal("BeginTransaction does not contain a ExactStaleness option") + } + if g, w := readOnly.GetExactStaleness().GetSeconds(), int64(10); g != w { + t.Fatalf("read staleness mismatch\n Got: %v\nWant: %v", g, w) + } + + // There should be no commit requests, as committing a read-only transaction is a no-op on Spanner. + commitRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CommitRequest{})) + if g, w := len(commitRequests), 0; g != w { + t.Fatalf("Commit request count mismatch\n Got: %v\nWant: %v", g, w) + } + + if err := CloseConnection(ctx, poolId, connId); err != nil { + t.Fatalf("CloseConnection returned unexpected error: %v", err) + } + if err := ClosePool(ctx, poolId); err != nil { + t.Fatalf("ClosePool returned unexpected error: %v", err) + } +} diff --git a/transaction.go b/transaction.go index a2db9634..297c6b60 100644 --- a/transaction.go +++ b/transaction.go @@ -22,6 +22,7 @@ import ( "fmt" "log/slog" "math/rand" + "sync" "time" "cloud.google.com/go/spanner" @@ -112,6 +113,10 @@ type readOnlyTransaction struct { boTx *spanner.BatchReadOnlyTransaction logger *slog.Logger close func(result txResult) + + timestampBoundMu sync.Mutex + timestampBoundSet bool + timestampBoundCallback func() spanner.TimestampBound } func (tx *readOnlyTransaction) Commit() error { @@ -160,6 +165,14 @@ func (tx *readOnlyTransaction) Query(ctx context.Context, stmt spanner.Statement } return mi, nil } + if tx.timestampBoundCallback != nil { + tx.timestampBoundMu.Lock() + if !tx.timestampBoundSet { + tx.roTx.WithTimestampBound(tx.timestampBoundCallback()) + tx.timestampBoundSet = true + } + tx.timestampBoundMu.Unlock() + } return &readOnlyRowIterator{tx.roTx.QueryWithOptions(ctx, stmt, execOptions.QueryOptions), stmtType}, nil }