diff --git a/conn.go b/conn.go index 3fe807b2..07145746 100644 --- a/conn.go +++ b/conn.go @@ -126,6 +126,38 @@ type SpannerConn interface { // transactions on this connection. SetIsolationLevel(level sql.IsolationLevel) error + // ReadLockMode returns the current read lock mode that is used for read/write + // transactions on this connection. + ReadLockMode() spannerpb.TransactionOptions_ReadWrite_ReadLockMode + // SetReadLockMode sets the read lock mode to use for read/write transactions + // on this connection. + // + // The read lock mode option controls the locking behavior for read operations and queries within a + // read-write transaction. It works in conjunction with the transaction's isolation level. + // + // PESSIMISTIC: Read locks are acquired immediately on read. This mode only applies to SERIALIZABLE + // isolation. This mode prevents concurrent modifications by locking data throughout the transaction. + // This reduces commit-time aborts due to conflicts but can increase how long transactions wait for + // locks and the overall contention. + // + // OPTIMISTIC: Locks for reads within the transaction are not acquired on read. Instead, the locks + // are acquired on commit to validate that read/queried data has not changed since the transaction + // started. If a conflict is detected, the transaction will fail. This mode only applies to SERIALIZABLE + // isolation. This mode defers locking until commit, which can reduce contention and improve throughput. + // However, be aware that this increases the risk of transaction aborts if there's significant write + // competition on the same data. + // + // READ_LOCK_MODE_UNSPECIFIED: This is the default if no mode is set. The locking behavior depends on + // the isolation level: + // + // REPEATABLE_READ isolation: Locking semantics default to OPTIMISTIC. However, validation checks at + // commit are only performed for queries using SELECT FOR UPDATE, statements with LOCK_SCANNED_RANGES + // hints, and DML statements. Note: It is an error to explicitly set ReadLockMode when the isolation + // level is REPEATABLE_READ. + // + // For all other isolation levels: If the read lock mode is not set, it defaults to PESSIMISTIC locking. + SetReadLockMode(mode spannerpb.TransactionOptions_ReadWrite_ReadLockMode) error + // TransactionTag returns the transaction tag that will be applied to the next // read/write transaction on this connection. The transaction tag that is set // on the connection is cleared when a read/write transaction is started. @@ -365,6 +397,14 @@ func (c *conn) SetIsolationLevel(level sql.IsolationLevel) error { return propertyIsolationLevel.SetValue(c.state, level, connectionstate.ContextUser) } +func (c *conn) ReadLockMode() spannerpb.TransactionOptions_ReadWrite_ReadLockMode { + return propertyReadLockMode.GetValueOrDefault(c.state) +} + +func (c *conn) SetReadLockMode(mode spannerpb.TransactionOptions_ReadWrite_ReadLockMode) error { + return propertyReadLockMode.SetValue(c.state, mode, connectionstate.ContextUser) +} + func (c *conn) MaxCommitDelay() time.Duration { return propertyMaxCommitDelay.GetValueOrDefault(c.state) } @@ -918,6 +958,7 @@ func (c *conn) options(reset bool) *ExecOptions { ExcludeTxnFromChangeStreams: c.ExcludeTxnFromChangeStreams(), TransactionTag: c.TransactionTag(), IsolationLevel: toProtoIsolationLevelOrDefault(c.IsolationLevel()), + ReadLockMode: c.ReadLockMode(), CommitOptions: spanner.CommitOptions{ MaxCommitDelay: c.maxCommitDelayPointer(), }, diff --git a/conn_with_mockserver_test.go b/conn_with_mockserver_test.go index c30987cb..fc34fc5e 100644 --- a/conn_with_mockserver_test.go +++ b/conn_with_mockserver_test.go @@ -314,6 +314,184 @@ func TestIsolationLevelAutoCommit(t *testing.T) { } } +func TestDefaultReadLockMode(t *testing.T) { + t.Parallel() + + for mode, name := range spannerpb.TransactionOptions_ReadWrite_ReadLockMode_name { + readLockMode := spannerpb.TransactionOptions_ReadWrite_ReadLockMode(mode) + db, server, teardown := setupTestDBConnectionWithParams(t, fmt.Sprintf("read_lock_mode=%v", name)) + defer teardown() + ctx := context.Background() + + c, err := db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + if err := c.Raw(func(driverConn interface{}) error { + spannerConn, ok := driverConn.(*conn) + if !ok { + return fmt.Errorf("expected spanner conn, got %T", driverConn) + } + if spannerConn.ReadLockMode() != readLockMode { + return fmt.Errorf("expected read lock mode %v, got %v", readLockMode, spannerConn.ReadLockMode()) + } + return nil + }); err != nil { + t.Fatal(err) + } + + tx, _ := db.BeginTx(ctx, &sql.TxOptions{}) + _, _ = tx.ExecContext(ctx, testutil.UpdateBarSetFoo) + _ = tx.Rollback() + + requests := drainRequestsFromServer(server.TestSpanner) + beginRequests := requestsOfType(requests, reflect.TypeOf(&spannerpb.BeginTransactionRequest{})) + if g, w := len(beginRequests), 0; g != w { + t.Fatalf("begin requests count mismatch\n Got: %v\nWant: %v", g, w) + } + executeRequests := requestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("execute requests count mismatch\n Got: %v\nWant: %v", g, w) + } + request := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if request.GetTransaction() == nil || request.GetTransaction().GetBegin() == nil { + t.Fatalf("ExecuteSqlRequest should have a Begin transaction") + } + if g, w := request.GetTransaction().GetBegin().GetReadWrite().GetReadLockMode(), readLockMode; g != w { + t.Fatalf("begin read lock mode mismatch\n Got: %v\nWant: %v", g, w) + } + } +} + +func TestSetReadLockMode(t *testing.T) { + t.Parallel() + + db, _, teardown := setupTestDBConnection(t) + defer teardown() + ctx := context.Background() + + // Repeat twice to ensure that the state is reset after closing the connection. + for i := 0; i < 2; i++ { + c, err := db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + var readLockMode spannerpb.TransactionOptions_ReadWrite_ReadLockMode + _ = c.Raw(func(driverConn interface{}) error { + readLockMode = driverConn.(*conn).ReadLockMode() + return nil + }) + if g, w := readLockMode, spannerpb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED; g != w { + t.Fatalf("read lock mode mismatch\n Got: %v\nWant: %v", g, w) + } + _ = c.Raw(func(driverConn interface{}) error { + return driverConn.(SpannerConn).SetReadLockMode(spannerpb.TransactionOptions_ReadWrite_OPTIMISTIC) + }) + _ = c.Raw(func(driverConn interface{}) error { + readLockMode = driverConn.(SpannerConn).ReadLockMode() + return nil + }) + if g, w := readLockMode, spannerpb.TransactionOptions_ReadWrite_OPTIMISTIC; g != w { + t.Fatalf("read lock mode mismatch\n Got: %v\nWant: %v", g, w) + } + _ = c.Close() + } +} + +func TestReadLockModeAutoCommit(t *testing.T) { + t.Parallel() + + db, server, teardown := setupTestDBConnection(t) + defer teardown() + ctx := context.Background() + + for mode := range spannerpb.TransactionOptions_ReadWrite_ReadLockMode_name { + readLockMode := spannerpb.TransactionOptions_ReadWrite_ReadLockMode(mode) + _, _ = db.ExecContext(ctx, testutil.UpdateBarSetFoo, ExecOptions{TransactionOptions: spanner.TransactionOptions{ + ReadLockMode: readLockMode, + }}) + + requests := drainRequestsFromServer(server.TestSpanner) + executeRequests := requestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("execute requests count mismatch\n Got: %v\nWant: %v", g, w) + } + request := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if g, w := request.Transaction.GetBegin().GetReadWrite().GetReadLockMode(), readLockMode; g != w { + t.Fatalf("begin read lock mode mismatch\n Got: %v\nWant: %v", g, w) + } + } +} + +func TestSetLocalReadLockMode(t *testing.T) { + t.Parallel() + + db, server, teardown := setupTestDBConnection(t) + // Make sure we only have one connection in the pool. + db.SetMaxOpenConns(1) + defer teardown() + ctx := context.Background() + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + t.Fatal(err) + } + if _, err := tx.ExecContext(ctx, "set local read_lock_mode='optimistic'"); err != nil { + t.Fatal(err) + } + if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + requests := drainRequestsFromServer(server.TestSpanner) + beginRequests := requestsOfType(requests, reflect.TypeOf(&spannerpb.BeginTransactionRequest{})) + if g, w := len(beginRequests), 0; g != w { + t.Fatalf("begin requests count mismatch\n Got: %v\nWant: %v", g, w) + } + executeRequests := requestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("execute requests count mismatch\n Got: %v\nWant: %v", g, w) + } + request := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if request.GetTransaction() == nil || request.GetTransaction().GetBegin() == nil { + t.Fatalf("ExecuteSqlRequest should have a Begin transaction") + } + if g, w := request.GetTransaction().GetBegin().GetReadWrite().GetReadLockMode(), spannerpb.TransactionOptions_ReadWrite_OPTIMISTIC; g != w { + t.Fatalf("begin read lock mode mismatch\n Got: %v\nWant: %v", g, w) + } + + // Execute another transaction without a specific read lock mode. This should then use the default. + tx, err = db.BeginTx(ctx, nil) + if err != nil { + t.Fatal(err) + } + if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + requests = drainRequestsFromServer(server.TestSpanner) + beginRequests = requestsOfType(requests, reflect.TypeOf(&spannerpb.BeginTransactionRequest{})) + if g, w := len(beginRequests), 0; g != w { + t.Fatalf("begin requests count mismatch\n Got: %v\nWant: %v", g, w) + } + executeRequests = requestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("execute requests count mismatch\n Got: %v\nWant: %v", g, w) + } + request = executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if request.GetTransaction() == nil || request.GetTransaction().GetBegin() == nil { + t.Fatalf("ExecuteSqlRequest should have a Begin transaction") + } + if g, w := request.GetTransaction().GetBegin().GetReadWrite().GetReadLockMode(), spannerpb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED; g != w { + t.Fatalf("begin read lock mode mismatch\n Got: %v\nWant: %v", g, w) + } +} + func TestCreateDatabase(t *testing.T) { t.Parallel() diff --git a/connection_properties.go b/connection_properties.go index 35f848d9..df268492 100644 --- a/connection_properties.go +++ b/connection_properties.go @@ -17,6 +17,7 @@ package spannerdriver import ( "database/sql" "fmt" + "strings" "time" "cloud.google.com/go/spanner" @@ -73,6 +74,34 @@ var propertyIsolationLevel = createConnectionProperty( return parseIsolationLevel(value) }, ) +var propertyReadLockMode = createConnectionProperty( + "read_lock_mode", + "This option controls the locking behavior for read operations and queries within a read/write transaction. "+ + "It works in conjunction with the transaction's isolation level.\n\n"+ + "PESSIMISTIC: Read locks are acquired immediately on read. This mode only applies to SERIALIZABLE isolation. "+ + "This mode prevents concurrent modifications by locking data throughout the transaction. This reduces commit-time "+ + "aborts due to conflicts, but can increase how long transactions wait for locks and the overall contention.\n\n"+ + "OPTIMISTIC: Locks for reads within the transaction are not acquired on read. Instead, the locks are acquired on "+ + "commit to validate that read/queried data has not changed since the transaction started. If a conflict is "+ + "detected, the transaction will fail. This mode only applies to SERIALIZABLE isolation. This mode defers locking "+ + "until commit, which can reduce contention and improve throughput. However, be aware that this increases the "+ + "risk of transaction aborts if there's significant write competition on the same data.\n\n"+ + "READ_LOCK_MODE_UNSPECIFIED: This is the default if no mode is set. The locking behavior depends on the isolation level:\n\n"+ + "REPEATABLE_READ: Locking semantics default to OPTIMISTIC. However, validation checks at commit are only "+ + "performed for queries using SELECT FOR UPDATE, statements with {@code LOCK_SCANNED_RANGES} hints, and DML statements.\n\n"+ + "For all other isolation levels: If the read lock mode is not set, it defaults to PESSIMISTIC locking.", + spannerpb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED, + false, + nil, + connectionstate.ContextUser, + func(value string) (spannerpb.TransactionOptions_ReadWrite_ReadLockMode, error) { + name := strings.ToUpper(value) + if _, ok := spannerpb.TransactionOptions_ReadWrite_ReadLockMode_value[name]; ok { + return spannerpb.TransactionOptions_ReadWrite_ReadLockMode(spannerpb.TransactionOptions_ReadWrite_ReadLockMode_value[name]), nil + } + return spannerpb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED, status.Errorf(codes.InvalidArgument, "unknown read lock mode: %v", value) + }, +) var propertyBeginTransactionOption = createConnectionProperty( "begin_transaction_option", "BeginTransactionOption determines the default for how to begin transactions. "+