Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support request and transaction tags #3233

Merged
merged 9 commits into from
Apr 6, 2021
18 changes: 15 additions & 3 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.qo.transactionTag = options.TransactionTag
trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
if err = t.begin(ctx); err != nil {
Expand All @@ -491,6 +492,8 @@ type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
// Spanner at least once.
atLeastOnce bool
// transactionTag will be included with the CommitRequest.
transactionTag string
}

// An ApplyOption is an optional argument to Apply.
Expand All @@ -513,6 +516,14 @@ func ApplyAtLeastOnce() ApplyOption {
}
}

// TransactionTag returns an ApplyOption that will include the given tag as a
// transaction tag for a write-only transaction.
func TransactionTag(tag string) ApplyOption {
return func(ao *applyOption) {
ao.transactionTag = tag
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -524,11 +535,12 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
defer func() { trace.EndSpan(ctx, err) }()

if !ao.atLeastOnce {
return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
})
}, TransactionOptions{TransactionTag: ao.transactionTag})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{c.idleSessions}
t := &writeOnlyTransaction{sp: c.idleSessions, transactionTag: ao.transactionTag}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
208 changes: 207 additions & 1 deletion spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func TestClient_Single_QueryOptions(t *testing.T) {
defer teardown()

var iter *RowIterator
if tt.query.Options == nil {
if tt.query.Options == nil && tt.query.RequestTag == "" {
iter = client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
} else {
iter = client.Single().QueryWithOptions(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), tt.query)
Expand Down Expand Up @@ -2239,3 +2239,209 @@ func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) {
t.Errorf("Span status mismatch\nGot: %v\nWant: %v", s.Code, codes.InvalidArgument)
}
}

func TestClient_ReadOnlyTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, qo := range []QueryOptions{
{},
{RequestTag: "tag-1"},
} {
for _, tx := range []*ReadOnlyTransaction{
client.Single(),
client.ReadOnlyTransaction(),
} {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

if tx.singleUse {
tx = client.Single()
}
iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, qo)
tx.Close()
}
}
}

func TestClient_ReadWriteTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{TransactionTag: "tx-tag-1"},
} {
for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)

// Check for SQL requests inside the transaction to prevent the check to
// drain the commit request from the server.
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, QueryOptions{
RequestTag: qo.RequestTag,
transactionTag: to.TransactionTag,
})
return nil
}, to)
checkCommitForExpectedRequestOptions(t, server.TestSpanner, QueryOptions{
RequestTag: "", // Should never be set
transactionTag: to.TransactionTag,
})
}
}
}

func TestClient_StmtBasedReadWriteTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{TransactionTag: "tx-tag-1"},
} {
for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, QueryOptions{
RequestTag: qo.RequestTag,
transactionTag: to.TransactionTag,
})

tx.Commit(context.Background())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, QueryOptions{
RequestTag: "", // Should never be set
transactionTag: to.TransactionTag,
})
}
}
}

func TestClient_PDML_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, qo)
}
}

func TestClient_Apply_Priority(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apply_Tagging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
checkCommitForExpectedRequestOptions(t, server.TestSpanner, QueryOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, QueryOptions{transactionTag: "tx-tag"})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, QueryOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, QueryOptions{transactionTag: "tx-tag"})
}

func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, qo QueryOptions) {
reqs := drainRequestsFromServer(server)
reqOptions := []*sppb.RequestOptions{}

for _, req := range reqs {
if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok {
reqOptions = append(reqOptions, sqlReq.RequestOptions)
}
if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok {
reqOptions = append(reqOptions, batchReq.RequestOptions)
}
if readReq, ok := req.(*sppb.ReadRequest); ok {
reqOptions = append(reqOptions, readReq.RequestOptions)
}
}

if got, want := len(reqOptions), reqCount; got != want {
t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want)
}

for _, opts := range reqOptions {
if got, want := opts.RequestTag, qo.RequestTag; got != want {
t.Fatalf("Request tag mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := opts.TransactionTag, qo.transactionTag; got != want {
t.Fatalf("Transaction tag mismatch\nGot: %v\nWant: %v", got, want)
}
}
}

func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, qo QueryOptions) {
reqs := drainRequestsFromServer(server)
var commit *sppb.CommitRequest
var ok bool

for _, req := range reqs {
if commit, ok = req.(*sppb.CommitRequest); ok {
break
}
}

if commit == nil {
t.Fatalf("Missing commit request")
}

var requestTag string
var transactionTag string
if commit.RequestOptions != nil {
requestTag = commit.RequestOptions.RequestTag
transactionTag = commit.RequestOptions.TransactionTag
}
if got, want := requestTag, qo.RequestTag; got != want {
t.Fatalf("Commit request tag mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := transactionTag, qo.transactionTag; got != want {
t.Fatalf("Commit transaction tag mismatch\nGot: %v\nWant: %v", got, want)
}
}
3 changes: 3 additions & 0 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: &sppb.RequestOptions{
RequestTag: options.RequestTag,
},
}

// Make a retryer for Aborted and certain Internal errors.
Expand Down