Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): set client wide ReadOptions, ApplyOptions, and TransactionOptions #6486

Merged
merged 6 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table stri
// can be configured using PartitionOptions. Pass a ReadOptions to modify the
// read operation.
func (t *BatchReadOnlyTransaction) PartitionReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions, readOptions ReadOptions) ([]*Partition, error) {
return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, readOptions)
return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro.merge(readOptions))
}

// PartitionReadUsingIndex returns a list of Partitions that can be used to read
// rows from the database using an index.
func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, ReadOptions{})
return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro)
}

// PartitionReadUsingIndexWithOptions returns a list of Partitions that can be
Expand Down
74 changes: 73 additions & 1 deletion spanner/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"testing"
"time"

. "cloud.google.com/go/spanner/internal/testutil"
sppb "google.golang.org/genproto/googleapis/spanner/v1"

. "cloud.google.com/go/spanner/internal/testutil"
)

func TestPartitionRoundTrip(t *testing.T) {
Expand Down Expand Up @@ -120,6 +121,77 @@ func TestPartitionQuery_QueryOptions(t *testing.T) {
}
}

func TestPartitionQuery_ReadOptions(t *testing.T) {
testcases := []ReadOptionsTestCase{
{
name: "Client level",
client: &ReadOptions{Index: "testIndex", Limit: 100, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
// Index and Limit are always ignored
want: &ReadOptions{Index: "", Limit: 0, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
},
{
name: "Read level",
client: &ReadOptions{},
read: &ReadOptions{Index: "testIndex", Limit: 100, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
// Index and Limit are always ignored
want: &ReadOptions{Index: "", Limit: 0, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
},
{
name: "Read level has precedence than client level",
client: &ReadOptions{Index: "clientIndex", Limit: 10, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "clientRequestTag"},
read: &ReadOptions{Index: "readIndex", Limit: 20, Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "readRequestTag"},
// Index and Limit are always ignored
want: &ReadOptions{Index: "", Limit: 0, Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "readRequestTag"},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ReadOptions: *tt.client})
defer teardown()

var (
err error
txn *BatchReadOnlyTransaction
ps []*Partition
)

if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
t.Fatal(err)
}
defer txn.Cleanup(ctx)

if tt.read == nil {
ps, err = txn.PartitionRead(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{0, 3})
} else {
ps, err = txn.PartitionReadWithOptions(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{0, 3}, *tt.read)
}
if err != nil {
t.Fatal(err)
}

for _, p := range ps {
req := p.rreq
if got, want := req.Index, tt.want.Index; got != want {
t.Fatalf("Incorrect index: got %v, want %v", got, want)
}
if got, want := req.Limit, int64(tt.want.Limit); got != want {
t.Fatalf("Incorrect limit: got %v, want %v", got, want)
}

ro := req.RequestOptions
if got, want := ro.Priority, tt.want.Priority; got != want {
t.Fatalf("Incorrect priority: got %v, want %v", got, want)
}
if got, want := ro.RequestTag, tt.want.RequestTag; got != want {
t.Fatalf("Incorrect request tag: got %v, want %v", got, want)
}
}
})
}
}

func TestPartitionQuery_Parallel(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
Expand Down
32 changes: 29 additions & 3 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"time"

"cloud.google.com/go/internal/trace"
vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
Expand All @@ -35,6 +33,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"

vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal"

// Install google-c2p resolver, which is required for direct path.
_ "google.golang.org/grpc/xds/googledirectpath"
// Install RLS load balancer policy, which is needed for gRPC RLS.
Expand Down Expand Up @@ -86,6 +87,9 @@ type Client struct {
idleSessions *sessionPool
logger *log.Logger
qo QueryOptions
ro ReadOptions
ao []ApplyOption
txo TransactionOptions
ct *commonTags
}

Expand Down Expand Up @@ -117,6 +121,15 @@ type ClientConfig struct {
// QueryOptions is the configuration for executing a sql query.
QueryOptions QueryOptions

// ReadOptions is the configuration for reading rows from a database
ReadOptions ReadOptions

// ApplyOptions is the configuration for applying
ApplyOptions []ApplyOption

// TransactionOptions is the configuration for a transaction.
TransactionOptions TransactionOptions

// CallOptions is the configuration for providing custom retry settings that
// override the default values.
CallOptions *vkit.CallOptions
Expand Down Expand Up @@ -211,6 +224,9 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
idleSessions: sp,
logger: config.logger,
qo: getQueryOptions(config.QueryOptions),
ro: config.ReadOptions,
ao: config.ApplyOptions,
txo: config.TransactionOptions,
ct: getCommonTags(sc),
}
return c, nil
Expand Down Expand Up @@ -273,6 +289,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
if t.sh == nil {
return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
Expand Down Expand Up @@ -309,6 +326,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.ct = c.ct
return t
}
Expand Down Expand Up @@ -378,6 +396,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.ct = c.ct
return t, nil
}
Expand Down Expand Up @@ -406,6 +425,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.ct = c.ct
return t
}
Expand Down Expand Up @@ -491,7 +511,8 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
t.txReadOnly.ro = c.ro
t.txOpts = c.txo.merge(options)
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
Expand Down Expand Up @@ -555,6 +576,11 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}

for _, opt := range c.ao {
opt(ao)
}

for _, opt := range opts {
opt(ao)
}
Expand Down
Loading