Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[Google Cloud Spanner](https://cloud.google.com/spanner) driver for
Go's [database/sql](https://golang.org/pkg/database/sql/) package.

``` go
```go
import _ "github.com/googleapis/go-sql-spanner"

db, err := sql.Open("spanner", "projects/PROJECT/instances/INSTANCE/databases/DATABASE")
Expand Down Expand Up @@ -65,16 +65,45 @@ the same named query parameter is used in multiple places in the statement.
db.ExecContext(ctx, "DELETE FROM tweets WHERE id = @id", 14544498215374)
```

### Query Options
Query options can be passed in as arguments to a query. Pass in a value of
type `spannerdriver.ExecOptions` to supply additional execution options for
a statement. The `spanner.QueryOptions` will be passed through to the Spanner
client as the query options to use for the query or DML statement.

```go
tx.ExecContext(ctx, "INSERT INTO Singers (SingerId, Name) VALUES (@id, @name)",
spannerdriver.ExecOptions{QueryOptions: spanner.QueryOptions{RequestTag: "insert_singer"}},
123, "Bruce Allison")
tx.QueryContext(ctx, "SELECT SingerId, Name FROM Singers WHERE SingerId = ?",
spannerdriver.ExecOptions{QueryOptions: spanner.QueryOptions{RequestTag: "select_singer"}},
123)
```

Statement tags (request tags) can also be set using the custom SQL statement
`set statement_tag='my_tag'`:

```go
tx.ExecContext(ctx, "set statement_tag = 'select_singer'")
tx.QueryContext(ctx, "SELECT SingerId, Name FROM Singers WHERE SingerId = ?", 123)
```

## Transactions

- Read-write transactions always uses the strongest isolation level and ignore the user-specified level.
- Read-only transactions do strong-reads by default. Read-only transactions must be ended by calling
either Commit or Rollback. Calling either of these methods will end the current read-only
transaction and return the session that is used to the session pool.

``` go
```go
tx, err := db.BeginTx(ctx, &sql.TxOptions{}) // Read-write transaction.

// Read-write transaction with a transaction tag.
conn, _ := db.Conn(ctx)
_, _ := conn.ExecContext(ctx, "SET TRANSACTION_TAG='my_transaction_tag'")
tx, err := conn.BeginTx(ctx, &sql.TxOptions{})


tx, err := db.BeginTx(ctx, &sql.TxOptions{
ReadOnly: true, // Read-only transaction using strong reads.
})
Expand All @@ -91,8 +120,8 @@ tx, err := conn.BeginTx(ctx, &sql.TxOptions{
Spanner can abort a read/write transaction if concurrent modifications are detected
that would violate the transaction consistency. When this happens, the driver will
return the `ErrAbortedDueToConcurrentModification` error. You can use the
`RunTransaction` function to let the driver automatically retry transactions that
are aborted by Spanner.
`RunTransaction` and `RunTransactionWithOptions` functions to let the driver
automatically retry transactions that are aborted by Spanner.

```go
package sample
Expand All @@ -106,14 +135,14 @@ import (
spannerdriver "github.com/googleapis/go-sql-spanner"
)

spannerdriver.RunTransaction(ctx, db, &sql.TxOptions{}, func(ctx context.Context, tx *sql.Tx) error {
spannerdriver.RunTransactionWithOptions(ctx, db, &sql.TxOptions{}, func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRowContext(ctx, "select Name from Singers where SingerId=@id", 123)
var name string
if err := row.Scan(&name); err != nil {
return err
}
return nil
})
}, spanner.TransactionOptions{TransactionTag: "my_transaction_tag"})
```

See also the [transaction runner sample](./examples/run-transaction/main.go).
Expand Down
9 changes: 5 additions & 4 deletions checksum_row_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type checksumRowIterator struct {
*spanner.RowIterator
metadata *sppb.ResultSetMetadata

ctx context.Context
tx *readWriteTransaction
stmt spanner.Statement
ctx context.Context
tx *readWriteTransaction
stmt spanner.Statement
options spanner.QueryOptions
// nc (nextCount) indicates the number of times that next has been called
// on the iterator. Next() will be called the same number of times during
// a retry.
Expand Down Expand Up @@ -160,7 +161,7 @@ func createMetadataChecksum(enc *gob.Encoder, buffer *bytes.Buffer, metadata *sp
func (it *checksumRowIterator) retry(ctx context.Context, tx *spanner.ReadWriteStmtBasedTransaction) error {
buffer := &bytes.Buffer{}
enc := gob.NewEncoder(buffer)
retryIt := tx.Query(ctx, it.stmt)
retryIt := tx.QueryWithOptions(ctx, it.stmt, it.options)
// If the original iterator had been stopped, we should also always stop the
// new iterator.
if it.stopped {
Expand Down
45 changes: 45 additions & 0 deletions client_side_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ func (s *statementExecutor) ShowExcludeTxnFromChangeStreams(_ context.Context, c
return &rows{it: it}, nil
}

func (s *statementExecutor) ShowTransactionTag(_ context.Context, c *conn, _ string, _ []driver.NamedValue) (driver.Rows, error) {
it, err := createStringIterator("TransactionTag", c.TransactionTag())
if err != nil {
return nil, err
}
return &rows{it: it}, nil
}

func (s *statementExecutor) ShowStatementTag(_ context.Context, c *conn, _ string, _ []driver.NamedValue) (driver.Rows, error) {
it, err := createStringIterator("StatementTag", c.StatementTag())
if err != nil {
return nil, err
}
return &rows{it: it}, nil
}

func (s *statementExecutor) StartBatchDdl(_ context.Context, c *conn, _ string, _ []driver.NamedValue) (driver.Result, error) {
return c.startBatchDDL()
}
Expand Down Expand Up @@ -147,6 +163,35 @@ func (s *statementExecutor) SetExcludeTxnFromChangeStreams(_ context.Context, c
return c.setExcludeTxnFromChangeStreams(exclude)
}

func (s *statementExecutor) SetTransactionTag(_ context.Context, c *conn, params string, _ []driver.NamedValue) (driver.Result, error) {
tag, err := parseTag(params)
if err != nil {
return nil, err
}
return c.setTransactionTag(tag)
}

func (s *statementExecutor) SetStatementTag(_ context.Context, c *conn, params string, _ []driver.NamedValue) (driver.Result, error) {
tag, err := parseTag(params)
if err != nil {
return nil, err
}
return c.setStatementTag(tag)
}

func parseTag(params string) (string, error) {
if params == "" {
return "", spanner.ToSpannerError(status.Error(codes.InvalidArgument, "no value given for tag"))
}
tag := strings.TrimSpace(params)
if !(strings.HasPrefix(tag, "'") && strings.HasSuffix(tag, "'")) {
return "", spanner.ToSpannerError(status.Error(codes.InvalidArgument, "missing single quotes around tag"))
}
tag = strings.TrimLeft(tag, "'")
tag = strings.TrimRight(tag, "'")
return tag, nil
}

var strongRegexp = regexp.MustCompile("(?i)'STRONG'")
var exactStalenessRegexp = regexp.MustCompile(`(?i)'(?P<type>EXACT_STALENESS)[\t ]+(?P<duration>(\d{1,19})(s|ms|us|ns))'`)
var maxStalenessRegexp = regexp.MustCompile(`(?i)'(?P<type>MAX_STALENESS)[\t ]+(?P<duration>(\d{1,19})(s|ms|us|ns))'`)
Expand Down
72 changes: 72 additions & 0 deletions client_side_statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,75 @@ func TestStatementExecutor_ExcludeTxnFromChangeStreams(t *testing.T) {
}
}
}

func TestStatementExecutor_SetTransactionTag(t *testing.T) {
ctx := context.Background()
for i, test := range []struct {
wantValue string
setValue string
wantSetErr bool
}{
{"test-tag", "'test-tag'", false},
{"other-tag", " 'other-tag'\t\n", false},
{" tag with spaces ", "' tag with spaces '", false},
{"", "tag-without-quotes", true},
{"", "tag-with-missing-opening-quote'", true},
{"", "'tag-with-missing-closing-quote", true},
} {
c := &conn{retryAborts: true}
s := &statementExecutor{}

it, err := s.ShowTransactionTag(ctx, c, "", nil)
if err != nil {
t.Fatalf("%d: could not get current transaction tag value from connection: %v", i, err)
}
cols := it.Columns()
wantCols := []string{"TransactionTag"}
if !cmp.Equal(cols, wantCols) {
t.Fatalf("%d: column names mismatch\nGot: %v\nWant: %v", i, cols, wantCols)
}
values := make([]driver.Value, len(cols))
if err := it.Next(values); err != nil {
t.Fatalf("%d: failed to get first row: %v", i, err)
}
wantValues := []driver.Value{""}
if !cmp.Equal(values, wantValues) {
t.Fatalf("%d: default transaction tag mismatch\nGot: %v\nWant: %v", i, values, wantValues)
}
if err := it.Next(values); err != io.EOF {
t.Fatalf("%d: error mismatch\nGot: %v\nWant: %v", i, err, io.EOF)
}

// Set a transaction tag.
res, err := s.SetTransactionTag(ctx, c, test.setValue, nil)
if test.wantSetErr {
if err == nil {
t.Fatalf("%d: missing expected error for value %q", i, test.setValue)
}
} else {
if err != nil {
t.Fatalf("%d: could not set new value %q for exclude: %v", i, test.setValue, err)
}
if res != driver.ResultNoRows {
t.Fatalf("%d: result mismatch\nGot: %v\nWant: %v", i, res, driver.ResultNoRows)
}
}

// Get the tag that was set
it, err = s.ShowTransactionTag(ctx, c, "", nil)
if err != nil {
t.Fatalf("%d: could not get current transaction tag value from connection: %v", i, err)
}
if err := it.Next(values); err != nil {
t.Fatalf("%d: failed to get first row: %v", i, err)
}
wantValues = []driver.Value{test.wantValue}
if !cmp.Equal(values, wantValues) {
t.Fatalf("%d: transaction tag mismatch\nGot: %v\nWant: %v", i, values, wantValues)
}
if err := it.Next(values); err != io.EOF {
t.Fatalf("%d: error mismatch\nGot: %v\nWant: %v", i, err, io.EOF)
}

}
}
48 changes: 48 additions & 0 deletions client_side_statements_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ var jsonFile = `{
"method": "statementShowExcludeTxnFromChangeStreams",
"exampleStatements": ["show variable exclude_txn_from_change_streams"]
},
{
"name": "SHOW VARIABLE TRANSACTION_TAG",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"statementType": "SHOW_TRANSACTION_TAG",
"regex": "(?is)\\A\\s*show\\s+variable\\s+transaction_tag\\s*\\z",
"method": "statementShowTransactionTag",
"exampleStatements": ["show variable transaction_tag"]
},
{
"name": "SHOW VARIABLE STATEMENT_TAG",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"statementType": "SHOW_STATEMENT_TAG",
"regex": "(?is)\\A\\s*show\\s+variable\\s+statement_tag\\s*\\z",
"method": "statementShowStatementTag",
"exampleStatements": ["show variable statement_tag"]
},
{
"name": "START BATCH DDL",
"executorName": "ClientSideStatementNoParamExecutor",
Expand Down Expand Up @@ -165,6 +183,36 @@ var jsonFile = `{
"allowedValues": "(TRUE|FALSE)",
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
}
},
{
"name": "SET TRANSACTION_TAG = '<tag>'",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_TRANSACTION_TAG",
"regex": "(?is)\\A\\s*set\\s+transaction_tag\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetTransactionTag",
"exampleStatements": ["set transaction_tag='tag1'", "set transaction_tag='tag2'", "set transaction_tag=''", "set transaction_tag='test_tag'"],
"setStatement": {
"propertyName": "TRANSACTION_TAG",
"separator": "=",
"allowedValues": "'(.*)'",
"converterName": "ClientSideStatementValueConverters$StringValueConverter"
}
},
{
"name": "SET STATEMENT_TAG = '<tag>'",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_STATEMENT_TAG",
"regex": "(?is)\\A\\s*set\\s+statement_tag\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetStatementTag",
"exampleStatements": ["set statement_tag='tag1'", "set statement_tag='tag2'", "set statement_tag=''", "set statement_tag='test_tag'"],
"setStatement": {
"propertyName": "STATEMENT_TAG",
"separator": "=",
"allowedValues": "'(.*)'",
"converterName": "ClientSideStatementValueConverters$StringValueConverter"
}
}
]
}
Expand Down
Loading
Loading