diff --git a/conn.go b/conn.go index 31dc2c41..7b7e5102 100644 --- a/conn.go +++ b/conn.go @@ -275,14 +275,9 @@ type conn struct { // tempExecOptions can be set by passing it in as an argument to ExecContext or QueryContext // and are applied only to that statement. tempExecOptions *ExecOptions - // tempTransactionOptions are temporarily set right before a read/write transaction is started. - tempTransactionOptions *ReadWriteTransactionOptions - // tempReadOnlyTransactionOptions are temporarily set right before a read-only - // transaction is started on a Spanner connection. - tempReadOnlyTransactionOptions *ReadOnlyTransactionOptions - // tempBatchReadOnlyTransactionOptions are temporarily set right before a - // batch read-only transaction is started on a Spanner connection. - tempBatchReadOnlyTransactionOptions *BatchReadOnlyTransactionOptions + // tempTransactionCloseFunc is set right before a transaction is started, and is set as the + // close function for that transaction. + tempTransactionCloseFunc func() } func (c *conn) UnderlyingClient() (*spanner.Client, error) { @@ -1011,8 +1006,10 @@ func (c *conn) options(reset bool) *ExecOptions { TransactionTag: c.TransactionTag(), IsolationLevel: toProtoIsolationLevelOrDefault(c.IsolationLevel()), ReadLockMode: c.ReadLockMode(), + CommitPriority: propertyCommitPriority.GetValueOrDefault(c.state), CommitOptions: spanner.CommitOptions{ - MaxCommitDelay: c.maxCommitDelayPointer(), + MaxCommitDelay: c.maxCommitDelayPointer(), + ReturnCommitStats: propertyReturnCommitStats.GetValueOrDefault(c.state), }, }, PartitionedQueryOptions: PartitionedQueryOptions{}, @@ -1045,16 +1042,43 @@ func (c *conn) resetTransactionForRetry(ctx context.Context, errDuringCommit boo } func (c *conn) withTempTransactionOptions(options *ReadWriteTransactionOptions) { - c.tempTransactionOptions = options + if options == nil { + return + } + c.tempTransactionCloseFunc = options.close + // Start a transaction for the connection state, so we can set the transaction options + // as local options in the current transaction. + _ = c.state.Begin() + if options.DisableInternalRetries { + _ = propertyRetryAbortsInternally.SetLocalValue(c.state, !options.DisableInternalRetries) + } + if options.TransactionOptions.BeginTransactionOption != spanner.DefaultBeginTransaction { + _ = propertyBeginTransactionOption.SetLocalValue(c.state, options.TransactionOptions.BeginTransactionOption) + } + if options.TransactionOptions.CommitOptions.MaxCommitDelay != nil { + _ = propertyMaxCommitDelay.SetLocalValue(c.state, *options.TransactionOptions.CommitOptions.MaxCommitDelay) + } + if options.TransactionOptions.CommitOptions.ReturnCommitStats { + _ = propertyReturnCommitStats.SetLocalValue(c.state, options.TransactionOptions.CommitOptions.ReturnCommitStats) + } + if options.TransactionOptions.TransactionTag != "" { + _ = propertyTransactionTag.SetLocalValue(c.state, options.TransactionOptions.TransactionTag) + } + if options.TransactionOptions.ReadLockMode != spannerpb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED { + _ = propertyReadLockMode.SetLocalValue(c.state, options.TransactionOptions.ReadLockMode) + } + if options.TransactionOptions.IsolationLevel != spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED { + _ = propertyIsolationLevel.SetLocalValue(c.state, toSqlIsolationLevelOrDefault(options.TransactionOptions.IsolationLevel)) + } + if options.TransactionOptions.ExcludeTxnFromChangeStreams { + _ = propertyExcludeTxnFromChangeStreams.SetLocalValue(c.state, options.TransactionOptions.ExcludeTxnFromChangeStreams) + } + if options.TransactionOptions.CommitPriority != spannerpb.RequestOptions_PRIORITY_UNSPECIFIED { + _ = propertyCommitPriority.SetLocalValue(c.state, options.TransactionOptions.CommitPriority) + } } func (c *conn) getTransactionOptions(execOptions *ExecOptions) ReadWriteTransactionOptions { - if c.tempTransactionOptions != nil { - defer func() { c.tempTransactionOptions = nil }() - opts := *c.tempTransactionOptions - opts.TransactionOptions.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.TransactionOptions.BeginTransactionOption) - return opts - } txOpts := ReadWriteTransactionOptions{ TransactionOptions: execOptions.TransactionOptions, DisableInternalRetries: !c.RetryAbortsInternally(), @@ -1075,28 +1099,39 @@ func (c *conn) getTransactionOptions(execOptions *ExecOptions) ReadWriteTransact } func (c *conn) withTempReadOnlyTransactionOptions(options *ReadOnlyTransactionOptions) { - c.tempReadOnlyTransactionOptions = options + if options == nil { + return + } + c.tempTransactionCloseFunc = options.close + // Start a transaction for the connection state, so we can set the transaction options + // as local options in the current transaction. + _ = c.state.Begin() + if options.BeginTransactionOption != spanner.DefaultBeginTransaction { + _ = propertyBeginTransactionOption.SetLocalValue(c.state, options.BeginTransactionOption) + } + if options.TimestampBound.String() != "(strong)" { + _ = propertyReadOnlyStaleness.SetLocalValue(c.state, options.TimestampBound) + } } func (c *conn) getReadOnlyTransactionOptions() ReadOnlyTransactionOptions { - if c.tempReadOnlyTransactionOptions != nil { - defer func() { c.tempReadOnlyTransactionOptions = nil }() - opts := *c.tempReadOnlyTransactionOptions - opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.BeginTransactionOption) - return opts - } return ReadOnlyTransactionOptions{TimestampBound: c.ReadOnlyStaleness(), BeginTransactionOption: c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state))} } func (c *conn) withTempBatchReadOnlyTransactionOptions(options *BatchReadOnlyTransactionOptions) { - c.tempBatchReadOnlyTransactionOptions = options + if options == nil { + return + } + c.tempTransactionCloseFunc = options.close + // Start a transaction for the connection state, so we can set the transaction options + // as local options in the current transaction. + _ = c.state.Begin() + if options.TimestampBound.String() != "(strong)" { + _ = propertyReadOnlyStaleness.SetLocalValue(c.state, options.TimestampBound) + } } func (c *conn) getBatchReadOnlyTransactionOptions() BatchReadOnlyTransactionOptions { - if c.tempBatchReadOnlyTransactionOptions != nil { - defer func() { c.tempBatchReadOnlyTransactionOptions = nil }() - return *c.tempBatchReadOnlyTransactionOptions - } return BatchReadOnlyTransactionOptions{TimestampBound: c.ReadOnlyStaleness()} } @@ -1108,7 +1143,6 @@ func (c *conn) BeginReadOnlyTransaction(ctx context.Context, options *ReadOnlyTr c.withTempReadOnlyTransactionOptions(options) tx, err := c.BeginTx(ctx, driver.TxOptions{ReadOnly: true}) if err != nil { - c.withTempReadOnlyTransactionOptions(nil) return nil, err } return tx, nil @@ -1122,7 +1156,6 @@ func (c *conn) BeginReadWriteTransaction(ctx context.Context, options *ReadWrite c.withTempTransactionOptions(options) tx, err := c.BeginTx(ctx, driver.TxOptions{}) if err != nil { - c.withTempTransactionOptions(nil) return nil, err } return tx, nil @@ -1133,6 +1166,13 @@ func (c *conn) Begin() (driver.Tx, error) { } func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver.Tx, error) { + defer func() { + c.tempTransactionCloseFunc = nil + }() + return c.beginTx(ctx, driverOpts, c.tempTransactionCloseFunc) +} + +func (c *conn) beginTx(ctx context.Context, driverOpts driver.TxOptions, closeFunc func()) (driver.Tx, error) { if c.resetForRetry { c.resetForRetry = false return c.tx, nil @@ -1141,6 +1181,10 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver defer func() { if c.tx != nil { _ = c.state.Begin() + } else { + // Rollback in case the connection state transaction was started before this function + // was called, for example if the caller set temporary transaction options. + _ = c.state.Rollback() } }() @@ -1180,6 +1224,9 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver if batchReadOnly && !driverOpts.ReadOnly { return nil, status.Error(codes.InvalidArgument, "levelBatchReadOnly can only be used for read-only transactions") } + if closeFunc == nil { + closeFunc = func() {} + } if driverOpts.ReadOnly { var logger *slog.Logger @@ -1188,6 +1235,8 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver if batchReadOnly { logger = c.logger.With("tx", "batchro") var err error + // BatchReadOnly transactions (currently) do not support inline-begin. + // This means that the transaction options must be supplied here, and not through a callback. bo, err = c.client.BatchReadOnlyTransaction(ctx, batchReadOnlyTxOpts.TimestampBound) if err != nil { return nil, err @@ -1195,19 +1244,14 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver ro = &bo.ReadOnlyTransaction } else { logger = c.logger.With("tx", "ro") - ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(readOnlyTxOpts.BeginTransactionOption).WithTimestampBound(readOnlyTxOpts.TimestampBound) + ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(readOnlyTxOpts.BeginTransactionOption) } c.tx = &readOnlyTransaction{ roTx: ro, boTx: bo, logger: logger, close: func(result txResult) { - if batchReadOnlyTxOpts.close != nil { - batchReadOnlyTxOpts.close() - } - if readOnlyTxOpts.close != nil { - readOnlyTxOpts.close() - } + closeFunc() if result == txResultCommit { _ = c.state.Commit() } else { @@ -1215,22 +1259,23 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver } c.tx = nil }, + timestampBoundCallback: func() spanner.TimestampBound { + return propertyReadOnlyStaleness.GetValueOrDefault(c.state) + }, } return c.tx, nil } + // These options are only used to determine how to start the transaction. + // All other options are fetched in a callback that is called when the transaction is actually started. + // That callback reads all transaction options from the connection state at that moment. This allows + // applications to execute a series of statement like this: + // BEGIN TRANSACTION; + // SET LOCAL transaction_tag='my_tag'; + // SET LOCAL commit_priority=LOW; + // INSERT INTO my_table ... -- This starts the transaction with the options above included. opts := spanner.TransactionOptions{} - if c.tempTransactionOptions != nil { - opts = c.tempTransactionOptions.TransactionOptions - } - opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.BeginTransactionOption) - tempCloseFunc := func() {} - if c.tempTransactionOptions != nil && c.tempTransactionOptions.close != nil { - tempCloseFunc = c.tempTransactionOptions.close - } - if !disableRetryAborts && c.tempTransactionOptions != nil { - disableRetryAborts = c.tempTransactionOptions.DisableInternalRetries - } + opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state)) tx, err := spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, c.client, opts, func() spanner.TransactionOptions { defer func() { @@ -1249,7 +1294,7 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver logger: logger, rwTx: tx, close: func(result txResult, commitResponse *spanner.CommitResponse, commitErr error) { - tempCloseFunc() + closeFunc() c.prevTx = c.tx c.tx = nil if commitErr == nil { diff --git a/connection_properties.go b/connection_properties.go index be6716b7..84e1cbcd 100644 --- a/connection_properties.go +++ b/connection_properties.go @@ -257,6 +257,27 @@ var propertyMaxCommitDelay = createConnectionProperty( connectionstate.ContextUser, connectionstate.ConvertDuration, ) +var propertyCommitPriority = createConnectionProperty( + "commit_priority", + "Sets the priority for commit RPC invocations from this connection (HIGH/MEDIUM/LOW/UNSPECIFIED). "+ + "The default is UNSPECIFIED.", + spannerpb.RequestOptions_PRIORITY_UNSPECIFIED, + false, + nil, + connectionstate.ContextUser, + func(value string) (spannerpb.RequestOptions_Priority, error) { + return parseRpcPriority(value) + }, +) +var propertyReturnCommitStats = createConnectionProperty( + "return_commit_stats", + "return_commit_stats determines whether transactions should request Spanner to return commit statistics.", + false, + false, + nil, + connectionstate.ContextUser, + connectionstate.ConvertBool, +) // ------------------------------------------------------------------------------------------------ // Statement connection properties. diff --git a/driver.go b/driver.go index 15cec71a..6a381bcc 100644 --- a/driver.go +++ b/driver.go @@ -1148,7 +1148,6 @@ func BeginReadWriteTransaction(ctx context.Context, db *sql.DB, options ReadWrit } tx, err := conn.BeginTx(ctx, &sql.TxOptions{}) if err != nil { - clearTempReadWriteTransactionOptions(conn) return nil, err } return tx, nil @@ -1166,11 +1165,6 @@ func withTempReadWriteTransactionOptions(conn *sql.Conn, options *ReadWriteTrans }) } -func clearTempReadWriteTransactionOptions(conn *sql.Conn) { - _ = withTempReadWriteTransactionOptions(conn, nil) - _ = conn.Close() -} - // ReadOnlyTransactionOptions can be used to create a read-only transaction // on a Spanner connection. type ReadOnlyTransactionOptions struct { @@ -1529,6 +1523,24 @@ func toProtoIsolationLevelOrDefault(level sql.IsolationLevel) spannerpb.Transact return res } +func toSqlIsolationLevel(level spannerpb.TransactionOptions_IsolationLevel) (sql.IsolationLevel, error) { + switch level { + case spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED: + return sql.LevelDefault, nil + case spannerpb.TransactionOptions_SERIALIZABLE: + return sql.LevelSerializable, nil + case spannerpb.TransactionOptions_REPEATABLE_READ: + return sql.LevelRepeatableRead, nil + default: + } + return sql.LevelDefault, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "invalid or unsupported isolation level: %v", level)) +} + +func toSqlIsolationLevelOrDefault(level spannerpb.TransactionOptions_IsolationLevel) sql.IsolationLevel { + res, _ := toSqlIsolationLevel(level) + return res +} + type spannerIsolationLevel sql.IsolationLevel const ( diff --git a/driver_with_mockserver_test.go b/driver_with_mockserver_test.go index ac9cbf79..209c1fa1 100644 --- a/driver_with_mockserver_test.go +++ b/driver_with_mockserver_test.go @@ -5076,7 +5076,7 @@ func TestBeginReadWriteTransaction(t *testing.T) { t.Fatalf("missing transaction for ExecuteSqlRequest") } if req.Transaction.GetId() == nil { - t.Fatalf("missing begin selector for ExecuteSqlRequest") + t.Fatalf("missing ID selector for ExecuteSqlRequest") } if g, w := req.RequestOptions.TransactionTag, tag; g != w { t.Fatalf("transaction tag mismatch\n Got: %v\nWant: %v", g, w) diff --git a/spannerlib/api/transaction_test.go b/spannerlib/api/transaction_test.go index 1f0ec81d..efb6c993 100644 --- a/spannerlib/api/transaction_test.go +++ b/spannerlib/api/transaction_test.go @@ -24,6 +24,7 @@ import ( "cloud.google.com/go/spanner/apiv1/spannerpb" "github.com/googleapis/go-sql-spanner/testutil" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestBeginAndCommit(t *testing.T) { @@ -409,3 +410,166 @@ func TestDdlInTransaction(t *testing.T) { t.Fatalf("ClosePool returned unexpected error: %v", err) } } + +func TestTransactionOptionsAsSqlStatements(t *testing.T) { + t.Parallel() + + ctx := context.Background() + server, teardown := setupMockServer(t) + defer teardown() + dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address) + + poolId, err := CreatePool(ctx, dsn) + if err != nil { + t.Fatalf("CreatePool returned unexpected error: %v", err) + } + connId, err := CreateConnection(ctx, poolId) + if err != nil { + t.Fatalf("CreateConnection returned unexpected error: %v", err) + } + if err := BeginTransaction(ctx, poolId, connId, &spannerpb.TransactionOptions{}); err != nil { + t.Fatalf("BeginTransaction returned unexpected error: %v", err) + } + + // Set some local transaction options. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: "set local transaction_tag = 'my_tag'"}); err != nil { + t.Fatalf("setting transaction_tag returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: "set local retry_aborts_internally = false"}); err != nil { + t.Fatalf("setting retry_aborts_internally returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Execute a statement in the transaction. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: testutil.UpdateBarSetFoo}); err != nil { + t.Fatalf("Execute returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Abort the transaction to verify that the retry_aborts_internally setting was respected. + server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{ + Errors: []error{status.Error(codes.Aborted, "Aborted")}, + }) + + // Commit the transaction. This should fail with an Aborted error. + if _, err := Commit(ctx, poolId, connId); err == nil { + t.Fatal("missing expected error") + } else { + if g, w := spanner.ErrCode(err), codes.Aborted; g != w { + t.Fatalf("error code mismatch\n Got: %v\nWant: %v", g, w) + } + } + + // Verify that the transaction_tag setting was respected. + requests := server.TestSpanner.DrainRequestsFromServer() + executeRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("Execute request count mismatch\n Got: %v\nWant: %v", g, w) + } + executeRequest := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if executeRequest.RequestOptions == nil { + t.Fatalf("Execute request options not set") + } + if g, w := executeRequest.RequestOptions.TransactionTag, "my_tag"; g != w { + t.Fatalf("TransactionTag mismatch\n Got: %v\nWant: %v", g, w) + } + commitRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CommitRequest{})) + if g, w := len(commitRequests), 1; g != w { + t.Fatalf("Commit request count mismatch\n Got: %v\nWant: %v", g, w) + } + commitRequest := commitRequests[0].(*spannerpb.CommitRequest) + if commitRequest.RequestOptions == nil { + t.Fatalf("Commit request options not set") + } + if g, w := commitRequest.RequestOptions.TransactionTag, "my_tag"; g != w { + t.Fatalf("TransactionTag mismatch\n Got: %v\nWant: %v", g, w) + } + + if err := CloseConnection(ctx, poolId, connId); err != nil { + t.Fatalf("CloseConnection returned unexpected error: %v", err) + } + if err := ClosePool(ctx, poolId); err != nil { + t.Fatalf("ClosePool returned unexpected error: %v", err) + } +} + +func TestReadOnlyTransactionOptionsAsSqlStatements(t *testing.T) { + t.Parallel() + + ctx := context.Background() + server, teardown := setupMockServer(t) + defer teardown() + dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address) + + poolId, err := CreatePool(ctx, dsn) + if err != nil { + t.Fatalf("CreatePool returned unexpected error: %v", err) + } + connId, err := CreateConnection(ctx, poolId) + if err != nil { + t.Fatalf("CreateConnection returned unexpected error: %v", err) + } + // Start a read-only transaction without any further options. + if err := BeginTransaction(ctx, poolId, connId, &spannerpb.TransactionOptions{ + Mode: &spannerpb.TransactionOptions_ReadOnly_{ + ReadOnly: &spannerpb.TransactionOptions_ReadOnly{}, + }, + }); err != nil { + t.Fatalf("BeginTransaction returned unexpected error: %v", err) + } + + // Set a local read-only transaction options. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: "set local read_only_staleness = 'exact_staleness 10s'"}); err != nil { + t.Fatalf("setting read_only_staleness returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Execute a statement in the transaction. + if rowsId, err := Execute(ctx, poolId, connId, &spannerpb.ExecuteSqlRequest{Sql: testutil.SelectFooFromBar}); err != nil { + t.Fatalf("Execute returned unexpected error: %v", err) + } else { + _ = CloseRows(ctx, poolId, connId, rowsId) + } + + // Commit the transaction to end it. + if _, err := Commit(ctx, poolId, connId); err != nil { + t.Fatalf("commit returned unexpected error: %v", err) + } + + // Verify that the read-only staleness setting was used. + requests := server.TestSpanner.DrainRequestsFromServer() + executeRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("Execute request count mismatch\n Got: %v\nWant: %v", g, w) + } + executeRequest := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if executeRequest.GetTransaction() == nil || executeRequest.GetTransaction().GetBegin() == nil || executeRequest.GetTransaction().GetBegin().GetReadOnly() == nil { + t.Fatal("ExecuteRequest does not contain a BeginTransaction option") + } + + readOnly := executeRequest.GetTransaction().GetBegin().GetReadOnly() + if readOnly.GetExactStaleness() == nil { + t.Fatal("BeginTransaction does not contain a ExactStaleness option") + } + if g, w := readOnly.GetExactStaleness().GetSeconds(), int64(10); g != w { + t.Fatalf("read staleness mismatch\n Got: %v\nWant: %v", g, w) + } + + // There should be no commit requests, as committing a read-only transaction is a no-op on Spanner. + commitRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CommitRequest{})) + if g, w := len(commitRequests), 0; g != w { + t.Fatalf("Commit request count mismatch\n Got: %v\nWant: %v", g, w) + } + + if err := CloseConnection(ctx, poolId, connId); err != nil { + t.Fatalf("CloseConnection returned unexpected error: %v", err) + } + if err := ClosePool(ctx, poolId); err != nil { + t.Fatalf("ClosePool returned unexpected error: %v", err) + } +} diff --git a/transaction.go b/transaction.go index a2db9634..297c6b60 100644 --- a/transaction.go +++ b/transaction.go @@ -22,6 +22,7 @@ import ( "fmt" "log/slog" "math/rand" + "sync" "time" "cloud.google.com/go/spanner" @@ -112,6 +113,10 @@ type readOnlyTransaction struct { boTx *spanner.BatchReadOnlyTransaction logger *slog.Logger close func(result txResult) + + timestampBoundMu sync.Mutex + timestampBoundSet bool + timestampBoundCallback func() spanner.TimestampBound } func (tx *readOnlyTransaction) Commit() error { @@ -160,6 +165,14 @@ func (tx *readOnlyTransaction) Query(ctx context.Context, stmt spanner.Statement } return mi, nil } + if tx.timestampBoundCallback != nil { + tx.timestampBoundMu.Lock() + if !tx.timestampBoundSet { + tx.roTx.WithTimestampBound(tx.timestampBoundCallback()) + tx.timestampBoundSet = true + } + tx.timestampBoundMu.Unlock() + } return &readOnlyRowIterator{tx.roTx.QueryWithOptions(ctx, stmt, execOptions.QueryOptions), stmtType}, nil }