Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
michelelacorte-quinck committed Apr 22, 2024
2 parents 2cfe9e2 + 7cb2deb commit e0cc5b0
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 20 deletions.
4 changes: 4 additions & 0 deletions .github/.OwlBot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ deep-remove-regex:
- /internal/generated/snippets/serviceusage/apiv1/
- /internal/generated/snippets/shell/apiv1/
- /internal/generated/snippets/shopping/css/apiv1/
- /internal/generated/snippets/shopping/merchant/conversions/v1beta/
- /internal/generated/snippets/shopping/merchant/notifications/v1beta/
- /internal/generated/snippets/shopping/merchant/quota/apiv1beta/
- /internal/generated/snippets/shopping/merchant/inventories/apiv1beta/
Expand Down Expand Up @@ -492,6 +493,7 @@ deep-remove-regex:
- /serviceusage/apiv1/
- /shell/apiv1/
- /shopping/css/apiv1/
- /shopping/merchant/conversions/apiv1beta/
- /shopping/merchant/notifications/apiv1beta/
- /shopping/merchant/quota/apiv1beta/
- /shopping/merchant/inventories/apiv1beta/
Expand Down Expand Up @@ -1035,6 +1037,8 @@ deep-copy-regex:
dest: /
- source: /google/shopping/css/v1/cloud.google.com/go
dest: /
- source: /google/shopping/merchant/conversions/v1beta/cloud.google.com/go
dest: /
- source: /google/shopping/merchant/notifications/v1beta/cloud.google.com/go
dest: /
- source: /google/shopping/merchant/quota/v1beta/cloud.google.com/go
Expand Down
5 changes: 4 additions & 1 deletion auth/credentials/detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func DetectDefault(opts *DetectOptions) (*auth.Credentials, error) {
if opts.CredentialsJSON != nil {
return readCredentialsFileJSON(opts.CredentialsJSON, opts)
}
if filename := credsfile.GetFileNameFromEnv(opts.CredentialsFile); filename != "" {
if opts.CredentialsFile != "" {
return readCredentialsFile(opts.CredentialsFile, opts)
}
if filename := os.Getenv(credsfile.GoogleAppCredsEnvVar); filename != "" {
if creds, err := readCredentialsFile(filename, opts); err == nil {
return creds, err
}
Expand Down
9 changes: 9 additions & 0 deletions auth/credentials/detect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,15 @@ func TestDefaultCredentials_BadFiletype(t *testing.T) {
}
}

func TestDefaultCredentials_BadFileName(t *testing.T) {
if _, err := DetectDefault(&DetectOptions{
CredentialsFile: "a/bad/filepath",
Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"},
}); err == nil {
t.Fatal("got nil, want non-nil err")
}
}

func TestDefaultCredentials_Validate(t *testing.T) {
tests := []struct {
name string
Expand Down
3 changes: 3 additions & 0 deletions internal/postprocessor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,9 @@ service-configs:
- input-directory: google/shopping/css/v1
service-config: css_v1.yaml
import-path: cloud.google.com/go/shopping/css/apiv1
- input-directory: google/shopping/merchant/conversions/v1beta
service-config: merchantapi_v1beta.yaml
import-path: cloud.google.com/go/shopping/merchant/conversions/apiv1beta
- input-directory: google/shopping/merchant/notifications/v1beta
service-config: merchantapi_v1beta.yaml
import-path: cloud.google.com/go/shopping/merchant/notifications/apiv1beta
Expand Down
32 changes: 25 additions & 7 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ type applyOption struct {
transactionTag string
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
// If excludeTxnFromChangeStreams == true, mutations from this Client.Apply
// will not be recorded in allowed tracking change streams with DDL option
// allow_txn_exclusion=true.
excludeTxnFromChangeStreams bool
}

// An ApplyOption is an optional argument to Apply.
Expand Down Expand Up @@ -721,6 +725,13 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
}
}

// ExcludeTxnFromChangeStreams returns an ApplyOptions that sets whether to exclude recording this commit operation from allowed tracking change streams.
func ExcludeTxnFromChangeStreams() ApplyOption {
return func(ao *applyOption) {
ao.excludeTxnFromChangeStreams = true
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -739,10 +750,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
if !ao.atLeastOnce {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand All @@ -753,14 +764,20 @@ type BatchWriteOptions struct {

// The transaction tag to use for this request.
TransactionTag string

// If excludeTxnFromChangeStreams == true, modifications from all transactions
// in this batch write request will not be recorded in allowed tracking
// change treams with DDL option allow_txn_exclusion=true.
ExcludeTxnFromChangeStreams bool
}

// merge combines two BatchWriteOptions such that the input parameter will have higher
// order of precedence.
func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {
merged := BatchWriteOptions{
TransactionTag: bwo.TransactionTag,
Priority: bwo.Priority,
TransactionTag: bwo.TransactionTag,
Priority: bwo.Priority,
ExcludeTxnFromChangeStreams: bwo.ExcludeTxnFromChangeStreams || opts.ExcludeTxnFromChangeStreams,
}
if opts.TransactionTag != "" {
merged.TransactionTag = opts.TransactionTag
Expand Down Expand Up @@ -915,9 +932,10 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
var md metadata.MD
sh.updateLastUseTime()
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams,
}, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && c.ct != nil {
Expand Down
200 changes: 200 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5338,3 +5338,203 @@ func TestClient_NestedReadWriteTransactionWithTag_InnerBlindWrite(t *testing.T)
t.Fatalf("transaction tag mismatch\nGot: %s\nWant: %s", g, w)
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_ExecuteSqlRequest(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.ExecuteSqlRequest).Transaction.GetBegin().ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BufferWrite(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BeginTransactionRequest).Options.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BatchUpdate(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteBatchDmlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.ExecuteBatchDmlRequest).Transaction.GetBegin().ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_RequestLevelDMLWithExcludeTxnFromChangeStreams_Failed(t *testing.T) {
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

// Test normal DML
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err == nil {
t.Fatalf("Missing expected exception")
}
msg := "cannot set exclude transaction from change streams for a request-level DML statement."
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}

// Test batch DML
_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err == nil {
t.Fatalf("Missing expected exception")
}
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}
}

func TestClient_ApplyExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}

_, err := client.Apply(context.Background(), ms, ExcludeTxnFromChangeStreams())
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BeginTransactionRequest).Options.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}

_, err := client.Apply(context.Background(), ms, []ApplyOption{ExcludeTxnFromChangeStreams(), ApplyAtLeastOnce()}...)
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.CommitRequest).Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

mutationGroups := []*MutationGroup{
{[]*Mutation{
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}},
}},
}
iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, BatchWriteOptions{ExcludeTxnFromChangeStreams: true})
responseCount := 0
doFunc := func(r *sppb.BatchWriteResponse) error {
responseCount++
return nil
}
if err := iter.Do(doFunc); err != nil {
t.Fatal(err)
}
if responseCount != len(mutationGroups) {
t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups))
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchWriteRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BatchWriteRequest).ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}
7 changes: 4 additions & 3 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// Execute the PDML and retry if the transaction is aborted.
executePdmlWithRetry := func(ctx context.Context) (int64, error) {
for {
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req)
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req, options)
if err == nil {
return count, nil
}
Expand All @@ -106,14 +106,15 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// 3. Execute the update statement on the PDML transaction
//
// Note that PDML transactions cannot be committed or rolled back.
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest, options QueryOptions) (count int64, err error) {
var md metadata.MD
sh.updateLastUseTime()
// Begin transaction.
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
ExcludeTxnFromChangeStreams: options.ExcludeTxnFromChangeStreams,
},
})
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions spanner/pdml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,25 @@ func TestPartitionedUpdate_Tagging(t *testing.T) {
}
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"})
}

func TestPartitionedUpdate_ExcludeTxnFromChangeStreams(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

_, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("expect no errors, but got %v", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{}}, requests); err != nil {
t.Fatal(err)
}

if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

0 comments on commit e0cc5b0

Please sign in to comment.