From f2ae2ad1cbb2ae6a5901249476bf6334c7089404 Mon Sep 17 00:00:00 2001 From: shuheiktgw Date: Tue, 9 Aug 2022 08:58:55 +0900 Subject: [PATCH] feat(spanner): set client wide ReadOptions and TransactionOptions --- spanner/batch.go | 4 +-- spanner/client.go | 32 +++++++++++++++++-- spanner/transaction.go | 72 ++++++++++++++++++++++++++++++++++-------- 3 files changed, 90 insertions(+), 18 deletions(-) diff --git a/spanner/batch.go b/spanner/batch.go index 6c999da23740..74088a8dbc75 100644 --- a/spanner/batch.go +++ b/spanner/batch.go @@ -104,13 +104,13 @@ func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table stri // can be configured using PartitionOptions. Pass a ReadOptions to modify the // read operation. func (t *BatchReadOnlyTransaction) PartitionReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions, readOptions ReadOptions) ([]*Partition, error) { - return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, readOptions) + return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro.merge(readOptions)) } // PartitionReadUsingIndex returns a list of Partitions that can be used to read // rows from the database using an index. func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) { - return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, ReadOptions{}) + return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro) } // PartitionReadUsingIndexWithOptions returns a list of Partitions that can be diff --git a/spanner/client.go b/spanner/client.go index fbca1421cf15..7d74a0c1edda 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -25,8 +25,6 @@ import ( "time" "cloud.google.com/go/internal/trace" - vkit "cloud.google.com/go/spanner/apiv1" - "cloud.google.com/go/spanner/internal" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" gtransport "google.golang.org/api/transport/grpc" @@ -34,6 +32,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + + vkit "cloud.google.com/go/spanner/apiv1" + "cloud.google.com/go/spanner/internal" ) const ( @@ -81,6 +82,9 @@ type Client struct { idleSessions *sessionPool logger *log.Logger qo QueryOptions + ro ReadOptions + ao []ApplyOption + txo TransactionOptions ct *commonTags } @@ -112,6 +116,15 @@ type ClientConfig struct { // QueryOptions is the configuration for executing a sql query. QueryOptions QueryOptions + // ReadOptions is the configuration for reading rows from a database + ReadOptions ReadOptions + + // ApplyOptions is the configuration for applying + ApplyOptions []ApplyOption + + // TransactionOptions is the configuration for a transaction. + TransactionOptions TransactionOptions + // CallOptions is the configuration for providing custom retry settings that // override the default values. CallOptions *vkit.CallOptions @@ -206,6 +219,9 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf idleSessions: sp, logger: config.logger, qo: getQueryOptions(config.QueryOptions), + ro: config.ReadOptions, + ao: config.ApplyOptions, + txo: config.TransactionOptions, ct: getCommonTags(sc), } return c, nil @@ -268,6 +284,7 @@ func (c *Client) Single() *ReadOnlyTransaction { t.txReadOnly.sp = c.idleSessions t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo + t.txReadOnly.ro = c.ro t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error { if t.sh == nil { return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction") @@ -304,6 +321,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { t.txReadOnly.sp = c.idleSessions t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo + t.txReadOnly.ro = c.ro t.ct = c.ct return t } @@ -373,6 +391,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo + t.txReadOnly.ro = c.ro t.ct = c.ct return t, nil } @@ -401,6 +420,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo + t.txReadOnly.ro = c.ro t.ct = c.ct return t } @@ -486,7 +506,8 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo - t.txOpts = options + t.txReadOnly.ro = c.ro + t.txOpts = c.txo.merge(options) t.ct = c.ct trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, @@ -550,6 +571,11 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption { // Apply applies a list of mutations atomically to the database. func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { ao := &applyOption{} + + for _, opt := range c.ao { + opt(ao) + } + for _, opt := range opts { opt(ao) } diff --git a/spanner/transaction.go b/spanner/transaction.go index 4a467bba951c..2d58c4cfdd05 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -23,7 +23,6 @@ import ( "time" "cloud.google.com/go/internal/trace" - vkit "cloud.google.com/go/spanner/apiv1" "github.com/golang/protobuf/proto" "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" @@ -32,6 +31,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + + vkit "cloud.google.com/go/spanner/apiv1" ) // transactionID stores a transaction ID which uniquely identifies a transaction @@ -76,6 +77,9 @@ type txReadOnly struct { // qo provides options for executing a sql query. qo QueryOptions + // ro provides options for reading rows from a database. + ro ReadOptions + // txOpts provides options for a transaction. txOpts TransactionOptions @@ -97,12 +101,21 @@ type TransactionOptions struct { CommitPriority sppb.RequestOptions_Priority } -func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority { - return to.CommitPriority -} - -func (to *TransactionOptions) requestTag() string { - return "" +// merge combines two TransactionOptions that the input parameter will have higher +// order of precedence. +func (to TransactionOptions) merge(opts TransactionOptions) TransactionOptions { + merged := TransactionOptions{ + CommitOptions: to.CommitOptions.merge(opts.CommitOptions), + TransactionTag: to.TransactionTag, + CommitPriority: to.CommitPriority, + } + if opts.TransactionTag != "" { + merged.TransactionTag = opts.TransactionTag + } + if opts.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + merged.CommitPriority = opts.CommitPriority + } + return merged } // errSessionClosed returns error for using a recycled/destroyed session @@ -139,6 +152,30 @@ type ReadOptions struct { RequestTag string } +// merge combines two ReadOptions that the input parameter will have higher +// order of precedence. +func (ro ReadOptions) merge(opts ReadOptions) ReadOptions { + merged := ReadOptions{ + Index: ro.Index, + Limit: ro.Limit, + Priority: ro.Priority, + RequestTag: ro.RequestTag, + } + if opts.Index != "" { + merged.Index = opts.Index + } + if opts.Limit > 0 { + merged.Limit = opts.Limit + } + if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { + merged.Priority = opts.Priority + } + if opts.RequestTag != "" { + merged.RequestTag = opts.RequestTag + } + return merged +} + // ReadWithOptions returns a RowIterator for reading multiple rows from the // database. Pass a ReadOptions to modify the read operation. func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { @@ -162,10 +199,10 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key // Might happen if transaction is closed in the middle of a API call. return &RowIterator{err: errSessionClosed(sh)} } - index := "" - limit := 0 - prio := sppb.RequestOptions_PRIORITY_UNSPECIFIED - requestTag := "" + index := t.ro.Index + limit := t.ro.Limit + prio := t.ro.Priority + requestTag := t.ro.RequestTag if opts != nil { index = opts.Index if opts.Limit > 0 { @@ -1106,11 +1143,19 @@ type CommitResponse struct { CommitStats *sppb.CommitResponse_CommitStats } -// CommitOptions provides options for commiting a transaction in a database. +// CommitOptions provides options for committing a transaction in a database. type CommitOptions struct { ReturnCommitStats bool } +// merge combines two CommitOptions that the input parameter will have higher +// order of precedence. +func (co CommitOptions) merge(opts CommitOptions) CommitOptions { + return CommitOptions{ + ReturnCommitStats: co.ReturnCommitStats || opts.ReturnCommitStats, + } +} + // commit tries to commit a readwrite transaction to Cloud Spanner. It also // returns the commit response for the transactions. func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) { @@ -1275,7 +1320,8 @@ func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo - t.txOpts = options + t.txReadOnly.ro = c.ro + t.txOpts = c.txo.merge(options) t.ct = c.ct if err = t.begin(ctx); err != nil {