Skip to content

Commit

Permalink
feat(spanner): set client wide ReadOptions and TransactionOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
shuheiktgw committed Aug 8, 2022
1 parent 83d8e8d commit f2ae2ad
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 18 deletions.
4 changes: 2 additions & 2 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table stri
// can be configured using PartitionOptions. Pass a ReadOptions to modify the
// read operation.
func (t *BatchReadOnlyTransaction) PartitionReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions, readOptions ReadOptions) ([]*Partition, error) {
return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, readOptions)
return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro.merge(readOptions))
}

// PartitionReadUsingIndex returns a list of Partitions that can be used to read
// rows from the database using an index.
func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, ReadOptions{})
return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro)
}

// PartitionReadUsingIndexWithOptions returns a list of Partitions that can be
Expand Down
32 changes: 29 additions & 3 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (
"time"

"cloud.google.com/go/internal/trace"
vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"

vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal"
)

const (
Expand Down Expand Up @@ -81,6 +82,9 @@ type Client struct {
idleSessions *sessionPool
logger *log.Logger
qo QueryOptions
ro ReadOptions
ao []ApplyOption
txo TransactionOptions
ct *commonTags
}

Expand Down Expand Up @@ -112,6 +116,15 @@ type ClientConfig struct {
// QueryOptions is the configuration for executing a sql query.
QueryOptions QueryOptions

// ReadOptions is the configuration for reading rows from a database
ReadOptions ReadOptions

// ApplyOptions is the configuration for applying
ApplyOptions []ApplyOption

// TransactionOptions is the configuration for a transaction.
TransactionOptions TransactionOptions

// CallOptions is the configuration for providing custom retry settings that
// override the default values.
CallOptions *vkit.CallOptions
Expand Down Expand Up @@ -206,6 +219,9 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
idleSessions: sp,
logger: config.logger,
qo: getQueryOptions(config.QueryOptions),
ro: config.ReadOptions,
ao: config.ApplyOptions,
txo: config.TransactionOptions,
ct: getCommonTags(sc),
}
return c, nil
Expand Down Expand Up @@ -268,6 +284,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
if t.sh == nil {
return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
Expand Down Expand Up @@ -304,6 +321,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.ct = c.ct
return t
}
Expand Down Expand Up @@ -373,6 +391,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.ct = c.ct
return t, nil
}
Expand Down Expand Up @@ -401,6 +420,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.ct = c.ct
return t
}
Expand Down Expand Up @@ -486,7 +506,8 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
t.txReadOnly.ro = c.ro
t.txOpts = c.txo.merge(options)
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
Expand Down Expand Up @@ -550,6 +571,11 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}

for _, opt := range c.ao {
opt(ao)
}

for _, opt := range opts {
opt(ao)
}
Expand Down
72 changes: 59 additions & 13 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"cloud.google.com/go/internal/trace"
vkit "cloud.google.com/go/spanner/apiv1"
"github.com/golang/protobuf/proto"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
Expand All @@ -32,6 +31,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/spanner/apiv1"
)

// transactionID stores a transaction ID which uniquely identifies a transaction
Expand Down Expand Up @@ -76,6 +77,9 @@ type txReadOnly struct {
// qo provides options for executing a sql query.
qo QueryOptions

// ro provides options for reading rows from a database.
ro ReadOptions

// txOpts provides options for a transaction.
txOpts TransactionOptions

Expand All @@ -97,12 +101,21 @@ type TransactionOptions struct {
CommitPriority sppb.RequestOptions_Priority
}

func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority {
return to.CommitPriority
}

func (to *TransactionOptions) requestTag() string {
return ""
// merge combines two TransactionOptions that the input parameter will have higher
// order of precedence.
func (to TransactionOptions) merge(opts TransactionOptions) TransactionOptions {
merged := TransactionOptions{
CommitOptions: to.CommitOptions.merge(opts.CommitOptions),
TransactionTag: to.TransactionTag,
CommitPriority: to.CommitPriority,
}
if opts.TransactionTag != "" {
merged.TransactionTag = opts.TransactionTag
}
if opts.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
merged.CommitPriority = opts.CommitPriority
}
return merged
}

// errSessionClosed returns error for using a recycled/destroyed session
Expand Down Expand Up @@ -139,6 +152,30 @@ type ReadOptions struct {
RequestTag string
}

// merge combines two ReadOptions that the input parameter will have higher
// order of precedence.
func (ro ReadOptions) merge(opts ReadOptions) ReadOptions {
merged := ReadOptions{
Index: ro.Index,
Limit: ro.Limit,
Priority: ro.Priority,
RequestTag: ro.RequestTag,
}
if opts.Index != "" {
merged.Index = opts.Index
}
if opts.Limit > 0 {
merged.Limit = opts.Limit
}
if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
merged.Priority = opts.Priority
}
if opts.RequestTag != "" {
merged.RequestTag = opts.RequestTag
}
return merged
}

// ReadWithOptions returns a RowIterator for reading multiple rows from the
// database. Pass a ReadOptions to modify the read operation.
func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
Expand All @@ -162,10 +199,10 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
// Might happen if transaction is closed in the middle of a API call.
return &RowIterator{err: errSessionClosed(sh)}
}
index := ""
limit := 0
prio := sppb.RequestOptions_PRIORITY_UNSPECIFIED
requestTag := ""
index := t.ro.Index
limit := t.ro.Limit
prio := t.ro.Priority
requestTag := t.ro.RequestTag
if opts != nil {
index = opts.Index
if opts.Limit > 0 {
Expand Down Expand Up @@ -1106,11 +1143,19 @@ type CommitResponse struct {
CommitStats *sppb.CommitResponse_CommitStats
}

// CommitOptions provides options for commiting a transaction in a database.
// CommitOptions provides options for committing a transaction in a database.
type CommitOptions struct {
ReturnCommitStats bool
}

// merge combines two CommitOptions that the input parameter will have higher
// order of precedence.
func (co CommitOptions) merge(opts CommitOptions) CommitOptions {
return CommitOptions{
ReturnCommitStats: co.ReturnCommitStats || opts.ReturnCommitStats,
}
}

// commit tries to commit a readwrite transaction to Cloud Spanner. It also
// returns the commit response for the transactions.
func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) {
Expand Down Expand Up @@ -1275,7 +1320,8 @@ func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client,
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
t.txReadOnly.ro = c.ro
t.txOpts = c.txo.merge(options)
t.ct = c.ct

if err = t.begin(ctx); err != nil {
Expand Down

0 comments on commit f2ae2ad

Please sign in to comment.