Skip to content

Commit

Permalink
Fix request tags behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
takabow committed Nov 10, 2022
1 parent 0ec9c18 commit 9a67221
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 19 deletions.
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,13 @@ Note that transaction-level priority takes precedence over command-level priorit

In a read-write transaction, you can add a tag following `BEGIN RW TAG <tag>`.
spanner-cli adds the tag set in `BEGIN RW TAG` as a transaction tag.
The tag will also be used as request tags within the transaction.
The tag will also be used as the prefix of request tags within the transaction.

spanner-cli generates request tags in the following format.

`<request tag>` ::= `<transaction tag>_<sequence number>_<SQL Text>`

Because the length of a request_tag is [limited to 50 characters](https://github.com/googleapis/googleapis/blob/150b4079b6441ea8b3d7f9a71d0be7bbacbb4e3a/google/spanner/v1/spanner.proto#L466-L476), request tags, especially SQL Text embedded at the end of a request tag, may be truncated.

```
# Read-write transaction
Expand All @@ -281,27 +287,28 @@ The tag will also be used as request tags within the transaction.
| BEGIN RW TAG tx1; |
| |
| SELECT val |
| FROM tab1 +-----request_tag = tx1
| FROM tab1 +-----request_tag = tx1_1_SELECT val...
| WHERE id = 1; |
| |
| UPDATE tab1 |
| SET val = 10 +-----request_tag = tx1
| SET val = 10 +-----request_tag = tx1_2_UPDATE tab1...
| WHERE id = 1; |
| |
| COMMIT; |
+--------------------+
```

In a read-only transaction, you can add a tag following `BEGIN RO TAG <tag>`.
Since read-only transaction doesn't support transaction tag, spanner-cli adds the tag set in `BEGIN RO TAG` as request tags.
Since read-only transaction doesn't support transaction tag, spanner-cli adds the tag set in `BEGIN RO TAG` as the prefix of request tags.

```
# Read-only transaction
# transaction_tag = N/A
+--------------------+
| BEGIN RO TAG tx2; |
| |
| SELECT SUM(val) |
| FROM tab1 +-----request_tag = tx2
| FROM tab1 +-----request_tag = tx2_1_SELECT SUM(val)...
| WHERE id = 1; |
| |
| CLOSE; |
Expand Down
16 changes: 16 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ func TestReadWriteTransaction(t *testing.T) {
IsMutation: true,
})

if session.tc.statementCounter != 2 {
t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter)
}

// commit
stmt, err = BuildStatement("COMMIT")
if err != nil {
Expand Down Expand Up @@ -368,6 +372,10 @@ func TestReadWriteTransaction(t *testing.T) {
IsMutation: true,
})

if session.tc.statementCounter != 2 {
t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter)
}

// rollback
stmt, err = BuildStatement("ROLLBACK")
if err != nil {
Expand Down Expand Up @@ -485,6 +493,10 @@ func TestReadOnlyTransaction(t *testing.T) {
IsMutation: false,
})

if session.tc.statementCounter != 2 {
t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter)
}

// close
stmt, err = BuildStatement("CLOSE")
if err != nil {
Expand Down Expand Up @@ -556,6 +568,10 @@ func TestReadOnlyTransaction(t *testing.T) {
IsMutation: false,
})

if session.tc.statementCounter != 2 {
t.Errorf("Check the current statement number, Expect 2, Actual %d", session.tc.statementCounter)
}

// close
stmt, err = BuildStatement("CLOSE")
if err != nil {
Expand Down
35 changes: 21 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ type Session struct {
}

type transactionContext struct {
tag string
priority pb.RequestOptions_Priority
sendHeartbeat bool // Becomes true only after a user-driven query is executed on the transaction.
rwTxn *spanner.ReadWriteStmtBasedTransaction
roTxn *spanner.ReadOnlyTransaction
statementCounter int64 // Count current statement number in a transaction
tag string
priority pb.RequestOptions_Priority
sendHeartbeat bool // Becomes true only after a user-driven query is executed on the transaction.
rwTxn *spanner.ReadWriteStmtBasedTransaction
roTxn *spanner.ReadOnlyTransaction
}

func NewSession(projectId string, instanceId string, databaseId string, priority pb.RequestOptions_Priority, opts ...option.ClientOption) (*Session, error) {
Expand Down Expand Up @@ -129,9 +130,10 @@ func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority pb.Req
return err
}
s.tc = &transactionContext{
tag: tag,
priority: priority,
rwTxn: txn,
statementCounter: 1, // Reset counter
tag: tag,
priority: priority,
rwTxn: txn,
}
return nil
}
Expand Down Expand Up @@ -195,9 +197,10 @@ func (s *Session) BeginReadOnlyTransaction(ctx context.Context, typ timestampBou
}

s.tc = &transactionContext{
tag: tag,
priority: priority,
roTxn: txn,
statementCounter: 1, // Reset counter
tag: tag,
priority: priority,
roTxn: txn,
}

return txn.Timestamp()
Expand Down Expand Up @@ -255,13 +258,13 @@ func (s *Session) RunAnalyzeQuery(ctx context.Context, stmt spanner.Statement) (

func (s *Session) runQueryWithOptions(ctx context.Context, stmt spanner.Statement, opts spanner.QueryOptions) (*spanner.RowIterator, *spanner.ReadOnlyTransaction) {
if s.InReadWriteTransaction() {
opts.RequestTag = s.tc.tag
opts.RequestTag = generateRequestTag(s.tc, stmt)
iter := s.tc.rwTxn.QueryWithOptions(ctx, stmt, opts)
s.tc.sendHeartbeat = true
return iter, nil
}
if s.InReadOnlyTransaction() {
opts.RequestTag = s.tc.tag
opts.RequestTag = generateRequestTag(s.tc, stmt)
return s.tc.roTxn.QueryWithOptions(ctx, stmt, opts), s.tc.roTxn
}

Expand All @@ -278,7 +281,7 @@ func (s *Session) RunUpdate(ctx context.Context, stmt spanner.Statement) (int64,

opts := spanner.QueryOptions{
Priority: s.currentPriority(),
RequestTag: s.tc.tag,
RequestTag: generateRequestTag(s.tc, stmt),
}
rowCount, err := s.tc.rwTxn.UpdateWithOptions(ctx, stmt, opts)
s.tc.sendHeartbeat = true
Expand Down Expand Up @@ -374,3 +377,7 @@ func heartbeat(txn *spanner.ReadWriteStmtBasedTransaction, priority pb.RequestOp
_, err := iter.Next()
return err
}

func generateRequestTag(tc *transactionContext, stmt spanner.Statement) string {
return tc.tag + "_" + fmt.Sprintf("%d", tc.statementCounter) + "_" + stmt.SQL
}
6 changes: 6 additions & 0 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func (s *SelectStatement) Execute(ctx context.Context, session *Session) (*Resul
}
return nil, err
}

if session.InReadWriteTransaction() || session.InReadOnlyTransaction() {
session.tc.statementCounter++
}

result := &Result{
ColumnNames: columnNames,
Rows: rows,
Expand Down Expand Up @@ -749,6 +754,7 @@ func (s *DmlStatement) Execute(ctx context.Context, session *Session) (*Result,
rollback.Execute(ctx, session)
return nil, fmt.Errorf("transaction was aborted: %v", err)
}
session.tc.statementCounter++
} else {
// Start implicit transaction.
begin := BeginRwStatement{}
Expand Down

0 comments on commit 9a67221

Please sign in to comment.