diff --git a/client_side_statement_test.go b/client_side_statement_test.go index 3af93e6c..91d49786 100644 --- a/client_side_statement_test.go +++ b/client_side_statement_test.go @@ -151,7 +151,8 @@ func TestStatementExecutor_RetryAbortsInternally(t *testing.T) { } func TestStatementExecutor_AutocommitDmlMode(t *testing.T) { - c := &conn{logger: noopLogger} + c := &conn{logger: noopLogger, connector: &connector{}} + _ = c.ResetSession(context.Background()) s := &statementExecutor{} ctx := context.Background() for i, test := range []struct { diff --git a/conn.go b/conn.go index d3293268..59d1abd0 100644 --- a/conn.go +++ b/conn.go @@ -283,6 +283,9 @@ func (c *conn) AutocommitDMLMode() AutocommitDMLMode { } func (c *conn) SetAutocommitDMLMode(mode AutocommitDMLMode) error { + if mode == Unspecified { + return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "autocommit dml mode cannot be unspecified")) + } _, err := c.setAutocommitDMLMode(mode) return err } @@ -815,17 +818,21 @@ func (c *conn) execContext(ctx context.Context, query string, execOptions ExecOp c.batch.statements = append(c.batch.statements, ss) res = &result{} } else { - if c.autocommitDMLMode == Transactional { + dmlMode := c.autocommitDMLMode + if execOptions.AutocommitDMLMode != Unspecified { + dmlMode = execOptions.AutocommitDMLMode + } + if dmlMode == Transactional { res, commitTs, err = c.execSingleDMLTransactional(ctx, c.client, ss, statementInfo, execOptions) if err == nil { c.commitTs = &commitTs } - } else if c.autocommitDMLMode == PartitionedNonAtomic { + } else if dmlMode == PartitionedNonAtomic { var rowsAffected int64 rowsAffected, err = c.execSingleDMLPartitioned(ctx, c.client, ss, execOptions) res = &result{rowsAffected: rowsAffected} } else { - return nil, status.Errorf(codes.FailedPrecondition, "connection in invalid state for DML statements: %s", c.autocommitDMLMode.String()) + return nil, status.Errorf(codes.FailedPrecondition, "invalid dml mode: %s", dmlMode.String()) } } } else { diff --git a/driver.go b/driver.go index 3f8fba51..cfa7ae5a 100644 --- a/driver.go +++ b/driver.go @@ -135,6 +135,10 @@ type ExecOptions struct { // PartitionedQueryOptions are used for partitioned queries, and ignored // for all other statements. PartitionedQueryOptions PartitionedQueryOptions + + // AutoCommitDMLMode determines the type of transaction that DML statements + // that are executed outside explicit transactions use. + AutocommitDMLMode AutocommitDMLMode } type DecodeOption int @@ -876,6 +880,8 @@ type AutocommitDMLMode int func (mode AutocommitDMLMode) String() string { switch mode { + case Unspecified: + return "Unspecified" case Transactional: return "Transactional" case PartitionedNonAtomic: @@ -885,7 +891,16 @@ func (mode AutocommitDMLMode) String() string { } const ( - Transactional AutocommitDMLMode = iota + // Unspecified DML mode uses the default of the current connection. + Unspecified AutocommitDMLMode = iota + + // Transactional DML mode uses a regular, atomic read/write transaction to + // execute the DML statement. + Transactional + + // PartitionedNonAtomic mode uses a Partitioned DML transaction to execute + // the DML statement. Partitioned DML transactions are not guaranteed to be + // atomic, but allow the statement to exceed the transactional mutation limit. PartitionedNonAtomic ) diff --git a/driver_test.go b/driver_test.go index 225144a8..50a6dd35 100644 --- a/driver_test.go +++ b/driver_test.go @@ -393,8 +393,9 @@ func TestConn_StartBatchDml(t *testing.T) { func TestConn_NonDdlStatementsInDdlBatch(t *testing.T) { c := &conn{ - logger: noopLogger, - batch: &batch{tp: ddl}, + logger: noopLogger, + autocommitDMLMode: Transactional, + batch: &batch{tp: ddl}, execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound, options ExecOptions) *spanner.RowIterator { return &spanner.RowIterator{} }, @@ -509,7 +510,8 @@ func TestConn_GetBatchedStatements(t *testing.T) { func TestConn_GetCommitTimestampAfterAutocommitDml(t *testing.T) { want := time.Now() c := &conn{ - logger: noopLogger, + logger: noopLogger, + autocommitDMLMode: Transactional, execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound, options ExecOptions) *spanner.RowIterator { return &spanner.RowIterator{} }, diff --git a/examples/partitioned-dml/main.go b/examples/partitioned-dml/main.go index ab70f7cd..fd9cac4c 100644 --- a/examples/partitioned-dml/main.go +++ b/examples/partitioned-dml/main.go @@ -1,4 +1,4 @@ -// Copyright 2021 Google LLC +// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import ( "github.com/googleapis/go-sql-spanner/examples" ) -var createTableStatement = "CREATE TABLE Singers (SingerId INT64, Name STRING(MAX)) PRIMARY KEY (SingerId)" +var createTableStatement = "CREATE TABLE Singers (SingerId INT64, Name STRING(MAX), Active BOOL NOT NULL DEFAULT (TRUE)) PRIMARY KEY (SingerId)" // Example for executing a Partitioned DML transaction on a Google Cloud Spanner database. // See https://cloud.google.com/spanner/docs/dml-partitioned for more information on Partitioned DML. @@ -38,12 +38,11 @@ func partitionedDml(projectId, instanceId, databaseId string) error { } defer db.Close() - // First insert a couple of test records that we will delete using Partitioned DML. + // First insert a couple of test records that we will update and delete using Partitioned DML. conn, err := db.Conn(ctx) if err != nil { return fmt.Errorf("failed to get a connection: %v", err) } - defer conn.Close() if err := conn.Raw(func(driverConn interface{}) error { _, err := driverConn.(spannerdriver.SpannerConn).Apply(ctx, []*spanner.Mutation{ spanner.InsertOrUpdateMap("Singers", map[string]interface{}{"SingerId": 1, "Name": "Singer 1"}), @@ -56,13 +55,13 @@ func partitionedDml(projectId, instanceId, databaseId string) error { return fmt.Errorf("failed to insert test records: %v", err) } - // Now delete all records in the Singers table using Partitioned DML. + // Now update all records in the Singers table using Partitioned DML. if _, err := conn.ExecContext(ctx, "SET AUTOCOMMIT_DML_MODE='PARTITIONED_NON_ATOMIC'"); err != nil { return fmt.Errorf("failed to change DML mode to Partitioned_Non_Atomic: %v", err) } - res, err := conn.ExecContext(ctx, "DELETE FROM Singers WHERE TRUE") + res, err := conn.ExecContext(ctx, "UPDATE Singers SET Active = FALSE WHERE TRUE") if err != nil { - return fmt.Errorf("failed to execute DELETE statement: %v", err) + return fmt.Errorf("failed to execute UPDATE statement: %v", err) } affected, err := res.RowsAffected() if err != nil { @@ -70,13 +69,31 @@ func partitionedDml(projectId, instanceId, databaseId string) error { } // Partitioned DML returns the minimum number of records that were affected. - fmt.Printf("Deleted %v records using Partitioned DML\n", affected) + fmt.Printf("Updated %v records using Partitioned DML\n", affected) // Closing the connection will return it to the connection pool. The DML mode will automatically be reset to the // default TRANSACTIONAL mode when the connection is returned to the pool, so we do not need to change it back // manually. _ = conn.Close() + // The AutoCommitDMLMode can also be specified as an ExecOption for a single statement. + conn, err = db.Conn(ctx) + if err != nil { + return fmt.Errorf("failed to get a connection: %v", err) + } + res, err = conn.ExecContext(ctx, "DELETE FROM Singers WHERE NOT Active", + spannerdriver.ExecOptions{AutocommitDMLMode: spannerdriver.PartitionedNonAtomic}) + if err != nil { + return fmt.Errorf("failed to execute DELETE statement: %v", err) + } + affected, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get affected rows: %v", err) + } + + // Partitioned DML returns the minimum number of records that were affected. + fmt.Printf("Deleted %v records using Partitioned DML\n", affected) + return nil }