Skip to content

Commit

Permalink
feat: add support for stale reads (#44)
Browse files Browse the repository at this point in the history
* feat: add client side statement parser

* feat: statement parser

* feat: support PDML

* add tests for DML batches

* test: add more tests and documentation

* feat: stale reads

* chore: rename Dml and Ddl to DML and DDL

* docs: add sample for stale reads

* fix: add json file as string variable

* fix: add statement to json var + fix printing in sample

* chore: remove unused file

* test: add integration test

* fix: remove timezone name from test

* fix: statement should also use staleness

* fix: document why a read-only transaction needs to be committed
  • Loading branch information
olavloite committed Oct 11, 2021
1 parent 56243a5 commit 2e3a264
Show file tree
Hide file tree
Showing 9 changed files with 538 additions and 8 deletions.
91 changes: 91 additions & 0 deletions client_side_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"database/sql/driver"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"time"

"cloud.google.com/go/spanner"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
Expand Down Expand Up @@ -62,6 +64,14 @@ func (s *statementExecutor) ShowAutocommitDmlMode(_ context.Context, c *conn, _
return &rows{it: it}, nil
}

func (s *statementExecutor) ShowReadOnlyStaleness(_ context.Context, c *conn, _ string, _ []driver.NamedValue) (driver.Rows, error) {
it, err := createStringIterator("ReadOnlyStaleness", c.ReadOnlyStaleness().String())
if err != nil {
return nil, err
}
return &rows{it: it}, nil
}

func (s *statementExecutor) StartBatchDdl(_ context.Context, c *conn, _ string, _ []driver.NamedValue) (driver.Result, error) {
return c.startBatchDDL()
}
Expand Down Expand Up @@ -105,6 +115,87 @@ func (s *statementExecutor) SetAutocommitDmlMode(_ context.Context, c *conn, par
return c.setAutocommitDMLMode(mode)
}

var strongRegexp = regexp.MustCompile("(?i)'STRONG'")
var exactStalenessRegexp = regexp.MustCompile("(?i)'(?P<type>EXACT_STALENESS)[\\t ]+(?P<duration>(\\d{1,19})(s|ms|us|ns))'")
var maxStalenessRegexp = regexp.MustCompile("(?i)'(?P<type>MAX_STALENESS)[\\t ]+(?P<duration>(\\d{1,19})(s|ms|us|ns))'")
var readTimestampRegexp = regexp.MustCompile("(?i)'(?P<type>READ_TIMESTAMP)[\\t ]+(?P<timestamp>(\\d{4})-(\\d{2})-(\\d{2})([Tt](\\d{2}):(\\d{2}):(\\d{2})(\\.\\d{1,9})?)([Zz]|([+-])(\\d{2}):(\\d{2})))'")
var minReadTimestampRegexp = regexp.MustCompile("(?i)'(?P<type>MIN_READ_TIMESTAMP)[\\t ]+(?P<timestamp>(\\d{4})-(\\d{2})-(\\d{2})([Tt](\\d{2}):(\\d{2}):(\\d{2})(\\.\\d{1,9})?)([Zz]|([+-])(\\d{2}):(\\d{2})))'")

func (s *statementExecutor) SetReadOnlyStaleness(_ context.Context, c *conn, params string, _ []driver.NamedValue) (driver.Result, error) {
if params == "" {
return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "no value given for ReadOnlyStaleness"))
}
invalidErr := spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "invalid ReadOnlyStaleness value: %s", params))

var staleness spanner.TimestampBound

if strongRegexp.MatchString(params) {
staleness = spanner.StrongRead()
} else if exactStalenessRegexp.MatchString(params) {
d, err := parseDuration(exactStalenessRegexp, params)
if err != nil {
return nil, err
}
staleness = spanner.ExactStaleness(d)
} else if maxStalenessRegexp.MatchString(params) {
d, err := parseDuration(maxStalenessRegexp, params)
if err != nil {
return nil, err
}
staleness = spanner.MaxStaleness(d)
} else if readTimestampRegexp.MatchString(params) {
t, err := parseTimestamp(readTimestampRegexp, params)
if err != nil {
return nil, err
}
staleness = spanner.ReadTimestamp(t)
} else if minReadTimestampRegexp.MatchString(params) {
t, err := parseTimestamp(minReadTimestampRegexp, params)
if err != nil {
return nil, err
}
staleness = spanner.MinReadTimestamp(t)
} else {
return nil, invalidErr
}
return c.setReadOnlyStaleness(staleness)
}

func parseDuration(re *regexp.Regexp, params string) (time.Duration, error) {
matches := matchesToMap(re, params)
if matches["duration"] == "" {
return 0, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "No duration found in staleness string"))
}
d, err := time.ParseDuration(matches["duration"])
if err != nil {
return 0, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Invalid duration: %s", matches["duration"]))
}
return d, nil
}

func parseTimestamp(re *regexp.Regexp, params string) (time.Time, error) {
matches := matchesToMap(re, params)
if matches["timestamp"] == "" {
return time.Time{}, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "No timestamp found in staleness string"))
}
t, err := time.Parse(time.RFC3339Nano, matches["timestamp"])
if err != nil {
return time.Time{}, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Invalid timestamp: %s", matches["timestamp"]))
}
return t, nil
}

func matchesToMap(re *regexp.Regexp, s string) map[string]string {
match := re.FindStringSubmatch(s)
matches := make(map[string]string)
for i, name := range re.SubexpNames() {
if i != 0 && name != "" {
matches[name] = match[i]
}
}
return matches
}

// createBooleanIterator creates a row iterator with a single BOOL column with
// one row. This is used for client side statements that return a result set
// containing a BOOL value.
Expand Down
70 changes: 70 additions & 0 deletions client_side_statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"database/sql/driver"
"io"
"testing"
"time"

"cloud.google.com/go/spanner"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -204,3 +205,72 @@ func TestStatementExecutor_AutocommitDmlMode(t *testing.T) {
}
}
}

func TestStatementExecutor_ReadOnlyStaleness(t *testing.T) {
c := &conn{}
s := &statementExecutor{}
ctx := context.Background()
for i, test := range []struct {
wantValue spanner.TimestampBound
setValue string
wantSetErr bool
}{
{spanner.ExactStaleness(time.Second), "'Exact_Staleness 1s'", false},
{spanner.ExactStaleness(10 * time.Millisecond), "'Exact_Staleness 10ms'", false},
{spanner.MaxStaleness(time.Second), "'Max_Staleness 1s'", false},
{spanner.MaxStaleness(10 * time.Millisecond), "'Max_Staleness 10ms'", false},
{spanner.ReadTimestamp(time.Date(2021, 10, 8, 9, 14, 30, 10, time.UTC)), "'Read_Timestamp 2021-10-08T09:14:30.000000010Z'", false},
{spanner.ReadTimestamp(time.Date(2021, 10, 8, 11, 14, 30, 10, time.UTC)), "'Read_Timestamp 2021-10-08T11:14:30.000000010Z'", false},
{spanner.MinReadTimestamp(time.Date(2021, 10, 8, 9, 14, 30, 10, time.UTC)), "'Min_Read_Timestamp 2021-10-08T09:14:30.000000010Z'", false},
{spanner.MinReadTimestamp(time.Date(2021, 10, 8, 11, 14, 30, 10, time.UTC)), "'Min_Read_Timestamp 2021-10-08T11:14:30.000000010Z'", false},
{spanner.StrongRead(), "'Strong'", false},
{spanner.StrongRead(), "'Non_Existing_Staleness'", true},
{spanner.StrongRead(), "'Exact_Staleness 1m'", true},
{spanner.StrongRead(), "'Exact_Staleness 1'", true},
{spanner.StrongRead(), "'Max_Staleness 1m'", true},
{spanner.StrongRead(), "'Max_Staleness 1'", true},
{spanner.StrongRead(), "'Read_Timestamp 2021-10-08T09:14:30.000000010'", true},
{spanner.StrongRead(), "'Read_Timestamp 2021-10-08T09:14:30'", true},
{spanner.StrongRead(), "'Read_Timestamp'", true},
{spanner.StrongRead(), "'Read_Timestamp 2021-10-08 09:14:30Z'", true},
{spanner.StrongRead(), "'Min_Read_Timestamp 2021-10-08T09:14:30.000000010'", true},
{spanner.StrongRead(), "'Min_Read_Timestamp 2021-10-08T09:14:30'", true},
{spanner.StrongRead(), "'Min_Read_Timestamp'", true},
{spanner.StrongRead(), "'Min_Read_Timestamp 2021-10-08 09:14:30Z'", true},
} {
res, err := s.SetReadOnlyStaleness(ctx, c, test.setValue, nil)
if test.wantSetErr {
if err == nil {
t.Fatalf("%d: missing expected error for value %q", i, test.setValue)
}
} else {
if err != nil {
t.Fatalf("%d: could not set new value %q for read-only staleness: %v", i, test.setValue, err)
}
if res != driver.ResultNoRows {
t.Fatalf("%d: result mismatch\nGot: %v\nWant: %v", i, res, driver.ResultNoRows)
}
}

it, err := s.ShowReadOnlyStaleness(ctx, c, "", nil)
if err != nil {
t.Fatalf("%d: could not get current read-only staleness value from connection: %v", i, err)
}
cols := it.Columns()
wantCols := []string{"ReadOnlyStaleness"}
if !cmp.Equal(cols, wantCols) {
t.Fatalf("%d: column names mismatch\nGot: %v\nWant: %v", i, cols, wantCols)
}
values := make([]driver.Value, len(cols))
if err := it.Next(values); err != nil {
t.Fatalf("%d: failed to get first row for read-only staleness: %v", i, err)
}
wantValues := []driver.Value{test.wantValue.String()}
if !cmp.Equal(values, wantValues) {
t.Fatalf("%d: read-only staleness values mismatch\nGot: %v\nWant: %v", i, values, wantValues)
}
if err := it.Next(values); err != io.EOF {
t.Fatalf("%d: error mismatch\nGot: %v\nWant: %v", i, err, io.EOF)
}
}
}
36 changes: 36 additions & 0 deletions client_side_statements_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ var jsonFile = `{
"method": "statementShowAutocommitDmlMode",
"exampleStatements": ["show variable autocommit_dml_mode"]
},
{
"name": "SHOW VARIABLE READ_ONLY_STALENESS",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"regex": "(?is)\\A\\s*show\\s+variable\\s+read_only_staleness\\s*\\z",
"method": "statementShowReadOnlyStaleness",
"exampleStatements": ["show variable read_only_staleness"]
},
{
"name": "START BATCH DDL",
"executorName": "ClientSideStatementNoParamExecutor",
Expand Down Expand Up @@ -82,6 +90,34 @@ var jsonFile = `{
"allowedValues": "'(PARTITIONED_NON_ATOMIC|TRANSACTIONAL)'",
"converterName": "ClientSideStatementValueConverters$AutocommitDmlModeConverter"
}
},
{
"name": "SET READ_ONLY_STALENESS = 'STRONG' | 'MIN_READ_TIMESTAMP <timestamp>' | 'READ_TIMESTAMP <timestamp>' | 'MAX_STALENESS <int64>s|ms|us|ns' | 'EXACT_STALENESS (<int64>s|ms|us|ns)'",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"regex": "(?is)\\A\\s*set\\s+read_only_staleness\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetReadOnlyStaleness",
"exampleStatements": ["set read_only_staleness='STRONG'",
"set read_only_staleness='MIN_READ_TIMESTAMP 2018-01-02T03:04:05.123-08:00'",
"set read_only_staleness='MIN_READ_TIMESTAMP 2018-01-02T03:04:05.123Z'",
"set read_only_staleness='MIN_READ_TIMESTAMP 2018-01-02T03:04:05.123+07:45'",
"set read_only_staleness='READ_TIMESTAMP 2018-01-02T03:04:05.54321-07:00'",
"set read_only_staleness='READ_TIMESTAMP 2018-01-02T03:04:05.54321Z'",
"set read_only_staleness='READ_TIMESTAMP 2018-01-02T03:04:05.54321+05:30'",
"set read_only_staleness='MAX_STALENESS 12s'",
"set read_only_staleness='MAX_STALENESS 100ms'",
"set read_only_staleness='MAX_STALENESS 99999us'",
"set read_only_staleness='MAX_STALENESS 10ns'",
"set read_only_staleness='EXACT_STALENESS 15s'",
"set read_only_staleness='EXACT_STALENESS 1500ms'",
"set read_only_staleness='EXACT_STALENESS 15000000us'",
"set read_only_staleness='EXACT_STALENESS 9999ns'"],
"setStatement": {
"propertyName": "READ_ONLY_STALENESS",
"separator": "=",
"allowedValues": "'((STRONG)|(MIN_READ_TIMESTAMP)[\\t ]+((\\d{4})-(\\d{2})-(\\d{2})([Tt](\\d{2}):(\\d{2}):(\\d{2})(\\.\\d{1,9})?)([Zz]|([+-])(\\d{2}):(\\d{2})))|(READ_TIMESTAMP)[\\t ]+((\\d{4})-(\\d{2})-(\\d{2})([Tt](\\d{2}):(\\d{2}):(\\d{2})(\\.\\d{1,9})?)([Zz]|([+-])(\\d{2}):(\\d{2})))|(MAX_STALENESS)[\\t ]+((\\d{1,19})(s|ms|us|ns))|(EXACT_STALENESS)[\\t ]+((\\d{1,19})(s|ms|us|ns)))'",
"converterName": "ClientSideStatementValueConverters$ReadOnlyStalenessConverter"
}
}
]
}
Expand Down
34 changes: 29 additions & 5 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ type SpannerConn interface {
// information on Partitioned DML.
SetAutocommitDMLMode(mode AutocommitDMLMode) error

// ReadOnlyStaleness returns the current staleness that is used for
// queries in autocommit mode, and for read-only transactions.
ReadOnlyStaleness() spanner.TimestampBound
// SetReadOnlyStaleness sets the staleness to use for queries in autocommit
// mode and for read-only transaction.
SetReadOnlyStaleness(staleness spanner.TimestampBound) error

// Apply writes an array of mutations to the database. This method may only be called while the connection
// is outside a transaction. Use BufferWrite to write mutations in a transaction.
// See also spanner.Client#Apply
Expand All @@ -280,7 +287,7 @@ type conn struct {
database string
retryAborts bool

execSingleQuery func(ctx context.Context, c *spanner.Client, statement spanner.Statement) *spanner.RowIterator
execSingleQuery func(ctx context.Context, c *spanner.Client, statement spanner.Statement, bound spanner.TimestampBound) *spanner.RowIterator
execSingleDMLTransactional func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error)
execSingleDMLPartitioned func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error)

Expand All @@ -292,6 +299,8 @@ type conn struct {
// it can also be set to PartitionedNonAtomic to execute the statement as
// Partitioned DML.
autocommitDMLMode AutocommitDMLMode
// readOnlyStaleness is used for queries in autocommit mode and for read-only transactions.
readOnlyStaleness spanner.TimestampBound
}

type batchType int
Expand Down Expand Up @@ -357,6 +366,20 @@ func (c *conn) setAutocommitDMLMode(mode AutocommitDMLMode) (driver.Result, erro
return driver.ResultNoRows, nil
}

func (c *conn) ReadOnlyStaleness() spanner.TimestampBound {
return c.readOnlyStaleness
}

func (c *conn) SetReadOnlyStaleness(staleness spanner.TimestampBound) error {
_, err := c.setReadOnlyStaleness(staleness)
return err
}

func (c *conn) setReadOnlyStaleness(staleness spanner.TimestampBound) (driver.Result, error) {
c.readOnlyStaleness = staleness
return driver.ResultNoRows, nil
}

func (c *conn) StartBatchDDL() error {
_, err := c.startBatchDDL()
return err
Expand Down Expand Up @@ -567,6 +590,7 @@ func (c *conn) ResetSession(_ context.Context) error {
c.batch = nil
c.retryAborts = true
c.autocommitDMLMode = Transactional
c.readOnlyStaleness = spanner.TimestampBound{}
return nil
}

Expand Down Expand Up @@ -671,7 +695,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
}
var iter rowIterator
if c.tx == nil {
iter = &readOnlyRowIterator{c.execSingleQuery(ctx, c.client, stmt)}
iter = &readOnlyRowIterator{c.execSingleQuery(ctx, c.client, stmt, c.readOnlyStaleness)}
} else {
iter = c.tx.Query(ctx, stmt)
}
Expand Down Expand Up @@ -748,7 +772,7 @@ func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
}

if opts.ReadOnly {
ro := c.client.ReadOnlyTransaction().WithTimestampBound(spanner.StrongRead())
ro := c.client.ReadOnlyTransaction().WithTimestampBound(c.readOnlyStaleness)
c.tx = &readOnlyTransaction{
roTx: ro,
close: func() {
Expand Down Expand Up @@ -794,8 +818,8 @@ func (c *conn) inReadWriteTransaction() bool {
return false
}

func queryInSingleUse(ctx context.Context, c *spanner.Client, statement spanner.Statement) *spanner.RowIterator {
return c.Single().Query(ctx, statement)
func queryInSingleUse(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound) *spanner.RowIterator {
return c.Single().WithTimestampBound(tb).Query(ctx, statement)
}

func execInNewRWTransaction(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
Expand Down
31 changes: 29 additions & 2 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql/driver"
"testing"
"time"

"cloud.google.com/go/spanner"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -129,6 +130,32 @@ func TestExtractDnsParts(t *testing.T) {
}
}

func TestConnection_Reset(t *testing.T) {
txClosed := false
c := conn{
readOnlyStaleness: spanner.ExactStaleness(time.Second),
batch: &batch{tp: dml},
tx: &readOnlyTransaction{
close: func() {
txClosed = true
},
},
}

if err := c.ResetSession(context.Background()); err != nil {
t.Fatalf("failed to reset session: %v", err)
}
if !cmp.Equal(c.readOnlyStaleness, spanner.TimestampBound{}, cmp.AllowUnexported(spanner.TimestampBound{})) {
t.Error("failed to reset read-only staleness")
}
if c.inBatch() {
t.Error("failed to clear batch")
}
if !txClosed {
t.Error("failed to close transaction")
}
}

func TestConnection_NoNestedTransactions(t *testing.T) {
c := conn{
tx: &readOnlyTransaction{},
Expand Down Expand Up @@ -229,7 +256,7 @@ func TestConn_StartBatchDml(t *testing.T) {
func TestConn_NonDdlStatementsInDdlBatch(t *testing.T) {
c := &conn{
batch: &batch{tp: ddl},
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) *spanner.RowIterator {
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound) *spanner.RowIterator {
return &spanner.RowIterator{}
},
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
Expand Down Expand Up @@ -262,7 +289,7 @@ func TestConn_NonDdlStatementsInDdlBatch(t *testing.T) {
func TestConn_NonDmlStatementsInDmlBatch(t *testing.T) {
c := &conn{
batch: &batch{tp: dml},
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) *spanner.RowIterator {
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound) *spanner.RowIterator {
return &spanner.RowIterator{}
},
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
Expand Down
Loading

0 comments on commit 2e3a264

Please sign in to comment.