Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add databoost property for batch transactions #8152

Merged
merged 4 commits into from Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 36 additions & 32 deletions spanner/batch.go
Expand Up @@ -150,13 +150,14 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
DataBoostEnabled: readOptions.DataBoostEnabled,
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
}
// Generate partitions.
for _, p := range resp.GetPartitions() {
Expand Down Expand Up @@ -212,13 +213,14 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
DataBoostEnabled: qOpts.DataBoostEnabled,
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
}

// generate Partitions
Expand Down Expand Up @@ -308,15 +310,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.rreq.DataBoostEnabled,
})
if err != nil {
return client, err
Expand All @@ -332,15 +335,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.qreq.DataBoostEnabled,
})
if err != nil {
return client, err
Expand Down
4 changes: 2 additions & 2 deletions spanner/integration_test.go
Expand Up @@ -3141,7 +3141,7 @@ func TestIntegration_BatchQuery(t *testing.T) {
t.Fatal(err)
}
defer txn.Cleanup(ctx)
if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
if partitions, err = txn.PartitionQueryWithOptions(ctx, stmt, PartitionOptions{0, 3}, QueryOptions{DataBoostEnabled: false}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -3224,7 +3224,7 @@ func TestIntegration_BatchRead(t *testing.T) {
t.Fatal(err)
}
defer txn.Cleanup(ctx)
if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
if partitions, err = txn.PartitionReadWithOptions(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}, ReadOptions{DataBoostEnabled: false}); err != nil {
t.Fatal(err)
}

Expand Down
72 changes: 46 additions & 26 deletions spanner/transaction.go
Expand Up @@ -165,16 +165,20 @@ type ReadOptions struct {

// The request tag to use for this request.
RequestTag string

// If this is for a partitioned read and DataBoostEnabled field is set to true, the request will be executed via Spanner independent compute resources.
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
DataBoostEnabled bool
}

// merge combines two ReadOptions that the input parameter will have higher
// order of precedence.
func (ro ReadOptions) merge(opts ReadOptions) ReadOptions {
merged := ReadOptions{
Index: ro.Index,
Limit: ro.Limit,
Priority: ro.Priority,
RequestTag: ro.RequestTag,
Index: ro.Index,
Limit: ro.Limit,
Priority: ro.Priority,
RequestTag: ro.RequestTag,
DataBoostEnabled: ro.DataBoostEnabled,
}
if opts.Index != "" {
merged.Index = opts.Index
Expand All @@ -188,6 +192,9 @@ func (ro ReadOptions) merge(opts ReadOptions) ReadOptions {
if opts.RequestTag != "" {
merged.RequestTag = opts.RequestTag
}
if opts.DataBoostEnabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm.... This means that you cannot override a false value in ro. I guess that is OK, as we don't allow 'unsetting' any of the other values through this method either. (No action needed, just thinking out loud....)

merged.DataBoostEnabled = opts.DataBoostEnabled
}
return merged
}

Expand Down Expand Up @@ -218,13 +225,17 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
limit := t.ro.Limit
prio := t.ro.Priority
requestTag := t.ro.RequestTag
dataBoostEnabled := t.ro.DataBoostEnabled
if opts != nil {
index = opts.Index
if opts.Limit > 0 {
limit = opts.Limit
}
prio = opts.Priority
requestTag = opts.RequestTag
if opts.DataBoostEnabled {
dataBoostEnabled = opts.DataBoostEnabled
}
}
var setTransactionID func(transactionID)
if _, ok := ts.Selector.(*sppb.TransactionSelector_Begin); ok {
Expand All @@ -238,15 +249,16 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.StreamingRead(ctx,
&sppb.ReadRequest{
Session: t.sh.getID(),
Transaction: t.getTransactionSelector(),
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
ResumeToken: resumeToken,
Limit: int64(limit),
RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag),
Session: t.sh.getID(),
Transaction: t.getTransactionSelector(),
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
ResumeToken: resumeToken,
Limit: int64(limit),
RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag),
DataBoostEnabled: dataBoostEnabled,
})
if err != nil {
if _, ok := t.getTransactionSelector().GetSelector().(*sppb.TransactionSelector_Begin); ok {
Expand Down Expand Up @@ -357,16 +369,20 @@ type QueryOptions struct {

// The request tag to use for this request.
RequestTag string

// If this is for a partitioned query and DataBoostEnabled field is set to true, the request will be executed via Spanner independent compute resources.
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
DataBoostEnabled bool
}

// merge combines two QueryOptions that the input parameter will have higher
// order of precedence.
func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
merged := QueryOptions{
Mode: qo.Mode,
Options: &sppb.ExecuteSqlRequest_QueryOptions{},
RequestTag: qo.RequestTag,
Priority: qo.Priority,
Mode: qo.Mode,
Options: &sppb.ExecuteSqlRequest_QueryOptions{},
RequestTag: qo.RequestTag,
Priority: qo.Priority,
DataBoostEnabled: qo.DataBoostEnabled,
}
if opts.Mode != nil {
merged.Mode = opts.Mode
Expand All @@ -377,6 +393,9 @@ func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
merged.Priority = opts.Priority
}
if opts.DataBoostEnabled {
merged.DataBoostEnabled = opts.DataBoostEnabled
}
proto.Merge(merged.Options, qo.Options)
proto.Merge(merged.Options, opts.Options)
return merged
Expand Down Expand Up @@ -517,15 +536,16 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti
mode = *options.Mode
}
req := &sppb.ExecuteSqlRequest{
Session: sid,
Transaction: ts,
Sql: stmt.SQL,
QueryMode: mode,
Seqno: atomic.AddInt64(&t.sequenceNumber, 1),
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag),
Session: sid,
Transaction: ts,
Sql: stmt.SQL,
QueryMode: mode,
Seqno: atomic.AddInt64(&t.sequenceNumber, 1),
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag),
DataBoostEnabled: options.DataBoostEnabled,
}
return req, sh, nil
}
Expand Down