Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,38 @@ tx, err := conn.BeginTx(ctx, &sql.TxOptions{
})
```

## Transaction Runner (Retry Transactions)

Spanner can abort a read/write transaction if concurrent modifications are detected
that would violate the transaction consistency. When this happens, the driver will
return the `ErrAbortedDueToConcurrentModification` error. You can use the
`RunTransaction` function to let the driver automatically retry transactions that
are aborted by Spanner.

```go
package sample

import (
"context"
"database/sql"
"fmt"

_ "github.com/googleapis/go-sql-spanner"
spannerdriver "github.com/googleapis/go-sql-spanner"
)

spannerdriver.RunTransaction(ctx, db, &sql.TxOptions{}, func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRowContext(ctx, "select Name from Singers where SingerId=@id", 123)
var name string
if err := row.Scan(&name); err != nil {
return err
}
return nil
})
```

See also the [transaction runner sample](./examples/run-transaction/main.go).

## DDL Statements

[DDL statements](https://cloud.google.com/spanner/docs/data-definition-language)
Expand Down
98 changes: 98 additions & 0 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
adminapi "cloud.google.com/go/spanner/admin/database/apiv1"
adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -403,6 +404,103 @@ func (c *connector) closeClients() (err error) {
return err
}

// RunTransaction runs the given function in a transaction on the given database.
// If the connection is a connection to a Spanner database, the transaction will
// automatically be retried if the transaction is aborted by Spanner. Any other
// errors will be propagated to the caller and the transaction will be rolled
// back. The transaction will be committed if the supplied function did not
// return an error.
//
// If the connection is to a non-Spanner database, no retries will be attempted,
// and any error that occurs during the transaction will be propagated to the
// caller.
//
// The application should *NOT* call tx.Commit() or tx.Rollback(). This is done
// automatically by this function, depending on whether the transaction function
// returned an error or not.
//
// This function will never return ErrAbortedDueToConcurrentModification.
func RunTransaction(ctx context.Context, db *sql.DB, opts *sql.TxOptions, f func(ctx context.Context, tx *sql.Tx) error) error {
// Get a connection from the pool that we can use to run a transaction.
// Getting a connection here already makes sure that we can reserve this
// connection exclusively for the duration of this method. That again
// allows us to temporarily change the state of the connection (e.g. set
// the retryAborts flag to false).
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()

// We don't need to keep track of a running checksum for retries when using
// this method, so we disable internal retries.
// Retries will instead be handled by the loop below.
origRetryAborts := false
var spannerConn SpannerConn
if err := conn.Raw(func(driverConn any) error {
var ok bool
spannerConn, ok = driverConn.(SpannerConn)
if !ok {
// It is not a Spanner connection, so just ignore and continue without any special handling.
return nil
}
origRetryAborts = spannerConn.RetryAbortsInternally()
return spannerConn.SetRetryAbortsInternally(false)
}); err != nil {
return err
}
// Reset the flag for internal retries after the transaction (if applicable).
if origRetryAborts {
defer func() { _ = spannerConn.SetRetryAbortsInternally(origRetryAborts) }()
}

tx, err := conn.BeginTx(ctx, opts)
if err != nil {
return err
}
for {
err = f(ctx, tx)
if err == nil {
err = tx.Commit()
if err == nil {
return nil
}
}
// Rollback and return the error if:
// 1. The connection is not a Spanner connection.
// 2. Or the error code is not Aborted.
if spannerConn == nil || spanner.ErrCode(err) != codes.Aborted {
// We don't really need to call Rollback here if the error happened
// during the Commit. However, the SQL package treats this as a no-op
// and just returns an ErrTxDone if we do, so this is simpler than
// keeping track of where the error happened.
_ = tx.Rollback()
return err
}

// The transaction was aborted by Spanner.
// Back off and retry the entire transaction.
if delay, ok := spanner.ExtractRetryDelay(err); ok {
err = gax.Sleep(ctx, delay)
if err != nil {
// We need to 'roll back' the transaction here to tell the sql
// package that there is no active transaction on the connection
// anymore. It does not actually roll back the transaction, as it
// has already been aborted by Spanner.
_ = tx.Rollback()
return err
}
}

// TODO: Reset the existing transaction for retry instead of creating a new one.
_ = tx.Rollback()
tx, err = conn.BeginTx(ctx, opts)
if err != nil {
return err
}
}
}

// SpannerConn is the public interface for the raw Spanner connection for the
// sql driver. This interface can be used with the db.Conn().Raw() method.
type SpannerConn interface {
Expand Down
Loading
Loading