Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client_side_statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func TestStatementExecutor_RetryAbortsInternally(t *testing.T) {
}

func TestStatementExecutor_AutocommitDmlMode(t *testing.T) {
c := &conn{logger: noopLogger}
c := &conn{logger: noopLogger, connector: &connector{}}
_ = c.ResetSession(context.Background())
s := &statementExecutor{}
ctx := context.Background()
for i, test := range []struct {
Expand Down
13 changes: 10 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ func (c *conn) AutocommitDMLMode() AutocommitDMLMode {
}

func (c *conn) SetAutocommitDMLMode(mode AutocommitDMLMode) error {
if mode == Unspecified {
return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "autocommit dml mode cannot be unspecified"))
}
_, err := c.setAutocommitDMLMode(mode)
return err
}
Expand Down Expand Up @@ -815,17 +818,21 @@ func (c *conn) execContext(ctx context.Context, query string, execOptions ExecOp
c.batch.statements = append(c.batch.statements, ss)
res = &result{}
} else {
if c.autocommitDMLMode == Transactional {
dmlMode := c.autocommitDMLMode
if execOptions.AutocommitDMLMode != Unspecified {
dmlMode = execOptions.AutocommitDMLMode
}
if dmlMode == Transactional {
res, commitTs, err = c.execSingleDMLTransactional(ctx, c.client, ss, statementInfo, execOptions)
if err == nil {
c.commitTs = &commitTs
}
} else if c.autocommitDMLMode == PartitionedNonAtomic {
} else if dmlMode == PartitionedNonAtomic {
var rowsAffected int64
rowsAffected, err = c.execSingleDMLPartitioned(ctx, c.client, ss, execOptions)
res = &result{rowsAffected: rowsAffected}
} else {
return nil, status.Errorf(codes.FailedPrecondition, "connection in invalid state for DML statements: %s", c.autocommitDMLMode.String())
return nil, status.Errorf(codes.FailedPrecondition, "invalid dml mode: %s", dmlMode.String())
}
}
} else {
Expand Down
17 changes: 16 additions & 1 deletion driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ type ExecOptions struct {
// PartitionedQueryOptions are used for partitioned queries, and ignored
// for all other statements.
PartitionedQueryOptions PartitionedQueryOptions

// AutoCommitDMLMode determines the type of transaction that DML statements
// that are executed outside explicit transactions use.
AutocommitDMLMode AutocommitDMLMode
}

type DecodeOption int
Expand Down Expand Up @@ -876,6 +880,8 @@ type AutocommitDMLMode int

func (mode AutocommitDMLMode) String() string {
switch mode {
case Unspecified:
return "Unspecified"
case Transactional:
return "Transactional"
case PartitionedNonAtomic:
Expand All @@ -885,7 +891,16 @@ func (mode AutocommitDMLMode) String() string {
}

const (
Transactional AutocommitDMLMode = iota
// Unspecified DML mode uses the default of the current connection.
Unspecified AutocommitDMLMode = iota

// Transactional DML mode uses a regular, atomic read/write transaction to
// execute the DML statement.
Transactional

// PartitionedNonAtomic mode uses a Partitioned DML transaction to execute
// the DML statement. Partitioned DML transactions are not guaranteed to be
// atomic, but allow the statement to exceed the transactional mutation limit.
PartitionedNonAtomic
)

Expand Down
8 changes: 5 additions & 3 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,9 @@ func TestConn_StartBatchDml(t *testing.T) {

func TestConn_NonDdlStatementsInDdlBatch(t *testing.T) {
c := &conn{
logger: noopLogger,
batch: &batch{tp: ddl},
logger: noopLogger,
autocommitDMLMode: Transactional,
batch: &batch{tp: ddl},
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound, options ExecOptions) *spanner.RowIterator {
return &spanner.RowIterator{}
},
Expand Down Expand Up @@ -509,7 +510,8 @@ func TestConn_GetBatchedStatements(t *testing.T) {
func TestConn_GetCommitTimestampAfterAutocommitDml(t *testing.T) {
want := time.Now()
c := &conn{
logger: noopLogger,
logger: noopLogger,
autocommitDMLMode: Transactional,
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound, options ExecOptions) *spanner.RowIterator {
return &spanner.RowIterator{}
},
Expand Down
33 changes: 25 additions & 8 deletions examples/partitioned-dml/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Google LLC
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@ import (
"github.com/googleapis/go-sql-spanner/examples"
)

var createTableStatement = "CREATE TABLE Singers (SingerId INT64, Name STRING(MAX)) PRIMARY KEY (SingerId)"
var createTableStatement = "CREATE TABLE Singers (SingerId INT64, Name STRING(MAX), Active BOOL NOT NULL DEFAULT (TRUE)) PRIMARY KEY (SingerId)"

// Example for executing a Partitioned DML transaction on a Google Cloud Spanner database.
// See https://cloud.google.com/spanner/docs/dml-partitioned for more information on Partitioned DML.
Expand All @@ -38,12 +38,11 @@ func partitionedDml(projectId, instanceId, databaseId string) error {
}
defer db.Close()

// First insert a couple of test records that we will delete using Partitioned DML.
// First insert a couple of test records that we will update and delete using Partitioned DML.
conn, err := db.Conn(ctx)
if err != nil {
return fmt.Errorf("failed to get a connection: %v", err)
}
defer conn.Close()
if err := conn.Raw(func(driverConn interface{}) error {
_, err := driverConn.(spannerdriver.SpannerConn).Apply(ctx, []*spanner.Mutation{
spanner.InsertOrUpdateMap("Singers", map[string]interface{}{"SingerId": 1, "Name": "Singer 1"}),
Expand All @@ -56,27 +55,45 @@ func partitionedDml(projectId, instanceId, databaseId string) error {
return fmt.Errorf("failed to insert test records: %v", err)
}

// Now delete all records in the Singers table using Partitioned DML.
// Now update all records in the Singers table using Partitioned DML.
if _, err := conn.ExecContext(ctx, "SET AUTOCOMMIT_DML_MODE='PARTITIONED_NON_ATOMIC'"); err != nil {
return fmt.Errorf("failed to change DML mode to Partitioned_Non_Atomic: %v", err)
}
res, err := conn.ExecContext(ctx, "DELETE FROM Singers WHERE TRUE")
res, err := conn.ExecContext(ctx, "UPDATE Singers SET Active = FALSE WHERE TRUE")
if err != nil {
return fmt.Errorf("failed to execute DELETE statement: %v", err)
return fmt.Errorf("failed to execute UPDATE statement: %v", err)
}
affected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get affected rows: %v", err)
}

// Partitioned DML returns the minimum number of records that were affected.
fmt.Printf("Deleted %v records using Partitioned DML\n", affected)
fmt.Printf("Updated %v records using Partitioned DML\n", affected)

// Closing the connection will return it to the connection pool. The DML mode will automatically be reset to the
// default TRANSACTIONAL mode when the connection is returned to the pool, so we do not need to change it back
// manually.
_ = conn.Close()

// The AutoCommitDMLMode can also be specified as an ExecOption for a single statement.
conn, err = db.Conn(ctx)
if err != nil {
return fmt.Errorf("failed to get a connection: %v", err)
}
res, err = conn.ExecContext(ctx, "DELETE FROM Singers WHERE NOT Active",
spannerdriver.ExecOptions{AutocommitDMLMode: spannerdriver.PartitionedNonAtomic})
if err != nil {
return fmt.Errorf("failed to execute DELETE statement: %v", err)
}
affected, err = res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get affected rows: %v", err)
}

// Partitioned DML returns the minimum number of records that were affected.
fmt.Printf("Deleted %v records using Partitioned DML\n", affected)

return nil
}

Expand Down
Loading