Skip to content

Commit

Permalink
feat: add exclude_txn_from_change_streams variable (#236)
Browse files Browse the repository at this point in the history
Adds a connection variable named `exclude_txn_from_change_streams` that can
be used to exclude the next transaction from creating mutations for all
change streams that have been created with the DDL option
`allow_txn_exclusion=true`.
  • Loading branch information
olavloite committed May 20, 2024
1 parent 926f621 commit ad95d85
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 22 deletions.
19 changes: 19 additions & 0 deletions client_side_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (s *statementExecutor) ShowReadOnlyStaleness(_ context.Context, c *conn, _
return &rows{it: it}, nil
}

func (s *statementExecutor) ShowExcludeTxnFromChangeStreams(_ context.Context, c *conn, _ string, _ []driver.NamedValue) (driver.Rows, error) {
it, err := createBooleanIterator("ExcludeTxnFromChangeStreams", c.ExcludeTxnFromChangeStreams())
if err != nil {
return nil, err
}
return &rows{it: it}, nil
}

func (s *statementExecutor) StartBatchDdl(_ context.Context, c *conn, _ string, _ []driver.NamedValue) (driver.Result, error) {
return c.startBatchDDL()
}
Expand Down Expand Up @@ -128,6 +136,17 @@ func (s *statementExecutor) SetAutocommitDmlMode(_ context.Context, c *conn, par
return c.setAutocommitDMLMode(mode)
}

func (s *statementExecutor) SetExcludeTxnFromChangeStreams(_ context.Context, c *conn, params string, _ []driver.NamedValue) (driver.Result, error) {
if params == "" {
return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "no value given for ExcludeTxnFromChangeStreams"))
}
exclude, err := strconv.ParseBool(params)
if err != nil {
return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "invalid boolean value: %s", params))
}
return c.setExcludeTxnFromChangeStreams(exclude)
}

var strongRegexp = regexp.MustCompile("(?i)'STRONG'")
var exactStalenessRegexp = regexp.MustCompile(`(?i)'(?P<type>EXACT_STALENESS)[\t ]+(?P<duration>(\d{1,19})(s|ms|us|ns))'`)
var maxStalenessRegexp = regexp.MustCompile(`(?i)'(?P<type>MAX_STALENESS)[\t ]+(?P<duration>(\d{1,19})(s|ms|us|ns))'`)
Expand Down
56 changes: 56 additions & 0 deletions client_side_statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,59 @@ func TestShowCommitTimestamp(t *testing.T) {
}
}
}

func TestStatementExecutor_ExcludeTxnFromChangeStreams(t *testing.T) {
c := &conn{retryAborts: true}
s := &statementExecutor{}
ctx := context.Background()
for i, test := range []struct {
wantValue bool
setValue string
wantSetErr bool
}{
{false, "false", false},
{false, "true", false},
{true, "FALSE", false},
{false, "TRUE", false},
{true, "False", false},
{false, "True", false},
{true, "fasle", true},
{true, "truye", true},
} {
it, err := s.ShowExcludeTxnFromChangeStreams(ctx, c, "", nil)
if err != nil {
t.Fatalf("%d: could not get current exclude value from connection: %v", i, err)
}
cols := it.Columns()
wantCols := []string{"ExcludeTxnFromChangeStreams"}
if !cmp.Equal(cols, wantCols) {
t.Fatalf("%d: column names mismatch\nGot: %v\nWant: %v", i, cols, wantCols)
}
values := make([]driver.Value, len(cols))
if err := it.Next(values); err != nil {
t.Fatalf("%d: failed to get first row: %v", i, err)
}
wantValues := []driver.Value{test.wantValue}
if !cmp.Equal(values, wantValues) {
t.Fatalf("%d: exclude values mismatch\nGot: %v\nWant: %v", i, values, wantValues)
}
if err := it.Next(values); err != io.EOF {
t.Fatalf("%d: error mismatch\nGot: %v\nWant: %v", i, err, io.EOF)
}

// Set the next value.
res, err := s.SetExcludeTxnFromChangeStreams(ctx, c, test.setValue, nil)
if test.wantSetErr {
if err == nil {
t.Fatalf("%d: missing expected error for value %q", i, test.setValue)
}
} else {
if err != nil {
t.Fatalf("%d: could not set new value %q for exclude: %v", i, test.setValue, err)
}
if res != driver.ResultNoRows {
t.Fatalf("%d: result mismatch\nGot: %v\nWant: %v", i, res, driver.ResultNoRows)
}
}
}
}
28 changes: 26 additions & 2 deletions client_side_statements_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ var jsonFile = `{
"method": "statementShowReadOnlyStaleness",
"exampleStatements": ["show variable read_only_staleness"]
},
{
{
"name": "SHOW VARIABLE EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"statementType": "SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"regex": "(?is)\\A\\s*show\\s+variable\\s+exclude_txn_from_change_streams\\s*\\z",
"method": "statementShowExcludeTxnFromChangeStreams",
"exampleStatements": ["show variable exclude_txn_from_change_streams"]
},
{
"name": "START BATCH DDL",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "NO_RESULT",
Expand Down Expand Up @@ -141,7 +150,22 @@ var jsonFile = `{
"allowedValues": "'((STRONG)|(MIN_READ_TIMESTAMP)[\\t ]+((\\d{4})-(\\d{2})-(\\d{2})([Tt](\\d{2}):(\\d{2}):(\\d{2})(\\.\\d{1,9})?)([Zz]|([+-])(\\d{2}):(\\d{2})))|(READ_TIMESTAMP)[\\t ]+((\\d{4})-(\\d{2})-(\\d{2})([Tt](\\d{2}):(\\d{2}):(\\d{2})(\\.\\d{1,9})?)([Zz]|([+-])(\\d{2}):(\\d{2})))|(MAX_STALENESS)[\\t ]+((\\d{1,19})(s|ms|us|ns))|(EXACT_STALENESS)[\\t ]+((\\d{1,19})(s|ms|us|ns)))'",
"converterName": "ClientSideStatementValueConverters$ReadOnlyStalenessConverter"
}
}
},
{
"name": "SET EXCLUDE_TXN_FROM_CHANGE_STREAMS = TRUE|FALSE",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"regex": "(?is)\\A\\s*set\\s+exclude_txn_from_change_streams\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetExcludeTxnFromChangeStreams",
"exampleStatements": ["set exclude_txn_from_change_streams = true", "set exclude_txn_from_change_streams = false"],
"setStatement": {
"propertyName": "EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"separator": "=",
"allowedValues": "(TRUE|FALSE)",
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
}
}
]
}
`
62 changes: 50 additions & 12 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ type SpannerConn interface {
// mode and for read-only transaction.
SetReadOnlyStaleness(staleness spanner.TimestampBound) error

// ExcludeTxnFromChangeStreams returns true if the next transaction should be excluded from change streams with the
// DDL option `allow_txn_exclusion=true`.
ExcludeTxnFromChangeStreams() bool
// SetExcludeTxnFromChangeStreams sets whether the next transaction should be excluded from change streams with the
// DDL option `allow_txn_exclusion=true`.
SetExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams bool) error

// Apply writes an array of mutations to the database. This method may only be called while the connection
// is outside a transaction. Use BufferWrite to write mutations in a transaction.
// See also spanner.Client#Apply
Expand Down Expand Up @@ -399,8 +406,8 @@ type conn struct {
retryAborts bool

execSingleQuery func(ctx context.Context, c *spanner.Client, statement spanner.Statement, bound spanner.TimestampBound) *spanner.RowIterator
execSingleDMLTransactional func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, time.Time, error)
execSingleDMLPartitioned func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error)
execSingleDMLTransactional func(ctx context.Context, c *spanner.Client, statement spanner.Statement, transactionOptions spanner.TransactionOptions) (int64, time.Time, error)
execSingleDMLPartitioned func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.QueryOptions) (int64, error)

// batch is the currently active DDL or DML batch on this connection.
batch *batch
Expand All @@ -412,6 +419,9 @@ type conn struct {
autocommitDMLMode AutocommitDMLMode
// readOnlyStaleness is used for queries in autocommit mode and for read-only transactions.
readOnlyStaleness spanner.TimestampBound
// excludeTxnFromChangeStreams is used to exlude the next transaction from change streams with the DDL option
// `allow_txn_exclusion=true`
excludeTxnFromChangeStreams bool
}

type batchType int
Expand Down Expand Up @@ -498,6 +508,23 @@ func (c *conn) setReadOnlyStaleness(staleness spanner.TimestampBound) (driver.Re
return driver.ResultNoRows, nil
}

func (c *conn) ExcludeTxnFromChangeStreams() bool {
return c.excludeTxnFromChangeStreams
}

func (c *conn) SetExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams bool) error {
_, err := c.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
return err
}

func (c *conn) setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams bool) (driver.Result, error) {
if c.inTransaction() {
return nil, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "cannot set ExcludeTxnFromChangeStreams while a transaction is active"))
}
c.excludeTxnFromChangeStreams = excludeTxnFromChangeStreams
return driver.ResultNoRows, nil
}

func (c *conn) StartBatchDDL() error {
_, err := c.startBatchDDL()
return err
Expand Down Expand Up @@ -637,10 +664,10 @@ func (c *conn) execBatchDML(ctx context.Context, statements []spanner.Statement)
}
affected, err = tx.rwTx.BatchUpdate(ctx, statements)
} else {
_, err = c.client.ReadWriteTransaction(ctx, func(ctx context.Context, transaction *spanner.ReadWriteTransaction) error {
_, err = c.client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, transaction *spanner.ReadWriteTransaction) error {
affected, err = transaction.BatchUpdate(ctx, statements)
return err
})
}, c.createTransactionOptions())
}
return &result{rowsAffected: sum(affected)}, err
}
Expand Down Expand Up @@ -888,12 +915,12 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
c.batch.statements = append(c.batch.statements, ss)
} else {
if c.autocommitDMLMode == Transactional {
rowsAffected, commitTs, err = c.execSingleDMLTransactional(ctx, c.client, ss)
rowsAffected, commitTs, err = c.execSingleDMLTransactional(ctx, c.client, ss, c.createTransactionOptions())
if err == nil {
c.commitTs = &commitTs
}
} else if c.autocommitDMLMode == PartitionedNonAtomic {
rowsAffected, err = c.execSingleDMLPartitioned(ctx, c.client, ss)
rowsAffected, err = c.execSingleDMLPartitioned(ctx, c.client, ss, c.createPartitionedDmlQueryOptions())
} else {
return nil, status.Errorf(codes.FailedPrecondition, "connection in invalid state for DML statements: %s", c.autocommitDMLMode.String())
}
Expand Down Expand Up @@ -945,7 +972,8 @@ func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
return c.tx, nil
}

tx, err := spanner.NewReadWriteStmtBasedTransaction(ctx, c.client)
options := c.createTransactionOptions()
tx, err := spanner.NewReadWriteStmtBasedTransactionWithOptions(ctx, c.client, options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -989,20 +1017,30 @@ func queryInSingleUse(ctx context.Context, c *spanner.Client, statement spanner.
return c.Single().WithTimestampBound(tb).Query(ctx, statement)
}

func execInNewRWTransaction(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, time.Time, error) {
func execInNewRWTransaction(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.TransactionOptions) (int64, time.Time, error) {
var rowsAffected int64
fn := func(ctx context.Context, tx *spanner.ReadWriteTransaction) error {
count, err := tx.Update(ctx, statement)
rowsAffected = count
return err
}
ts, err := c.ReadWriteTransaction(ctx, fn)
resp, err := c.ReadWriteTransactionWithOptions(ctx, fn, options)
if err != nil {
return 0, time.Time{}, err
}
return rowsAffected, ts, nil
return rowsAffected, resp.CommitTs, nil
}

func execAsPartitionedDML(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.QueryOptions) (int64, error) {
return c.PartitionedUpdateWithOptions(ctx, statement, options)
}

func (c *conn) createTransactionOptions() spanner.TransactionOptions {
defer func() { c.excludeTxnFromChangeStreams = false }()
return spanner.TransactionOptions{ExcludeTxnFromChangeStreams: c.excludeTxnFromChangeStreams}
}

func execAsPartitionedDML(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
return c.PartitionedUpdate(ctx, statement)
func (c *conn) createPartitionedDmlQueryOptions() spanner.QueryOptions {
defer func() { c.excludeTxnFromChangeStreams = false }()
return spanner.QueryOptions{ExcludeTxnFromChangeStreams: c.excludeTxnFromChangeStreams}
}
16 changes: 8 additions & 8 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ func TestConn_NonDdlStatementsInDdlBatch(t *testing.T) {
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound) *spanner.RowIterator {
return &spanner.RowIterator{}
},
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, time.Time, error) {
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.TransactionOptions) (int64, time.Time, error) {
return 0, time.Time{}, nil
},
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.QueryOptions) (int64, error) {
return 0, nil
},
}
Expand Down Expand Up @@ -392,10 +392,10 @@ func TestConn_NonDmlStatementsInDmlBatch(t *testing.T) {
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound) *spanner.RowIterator {
return &spanner.RowIterator{}
},
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, time.Time, error) {
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.TransactionOptions) (int64, time.Time, error) {
return 0, time.Time{}, nil
},
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.QueryOptions) (int64, error) {
return 0, nil
},
}
Expand Down Expand Up @@ -426,10 +426,10 @@ func TestConn_GetCommitTimestampAfterAutocommitDml(t *testing.T) {
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound) *spanner.RowIterator {
return &spanner.RowIterator{}
},
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, time.Time, error) {
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.TransactionOptions) (int64, time.Time, error) {
return 0, want, nil
},
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.QueryOptions) (int64, error) {
return 0, nil
},
}
Expand All @@ -451,10 +451,10 @@ func TestConn_GetCommitTimestampAfterAutocommitQuery(t *testing.T) {
execSingleQuery: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound) *spanner.RowIterator {
return &spanner.RowIterator{}
},
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, time.Time, error) {
execSingleDMLTransactional: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.TransactionOptions) (int64, time.Time, error) {
return 0, time.Time{}, nil
},
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement) (int64, error) {
execSingleDMLPartitioned: func(ctx context.Context, c *spanner.Client, statement spanner.Statement, options spanner.QueryOptions) (int64, error) {
return 0, nil
},
}
Expand Down
Loading

0 comments on commit ad95d85

Please sign in to comment.