Skip to content

Commit

Permalink
feat(spanner): support max_commit_delay in Spanner transactions (#9299)
Browse files Browse the repository at this point in the history
* Expose MaxCommitDelay.

* Fix build errors

* Add unit tests.

* Formatting

* Update integration test

* Change MaxCommitDelay to a pointer, update merge logic.

---------

Co-authored-by: rahul2393 <irahul@google.com>
  • Loading branch information
nginsberg-google and rahul2393 committed Feb 8, 2024
1 parent 46a5050 commit a8078f0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
26 changes: 26 additions & 0 deletions spanner/client_test.go
Expand Up @@ -4120,24 +4120,50 @@ type TransactionOptionsTestCase struct {
}

func transactionOptionsTestCases() []TransactionOptionsTestCase {
duration, _ := time.ParseDuration("100ms")
otherDuration, _ := time.ParseDuration("50ms")

return []TransactionOptionsTestCase{
{
name: "Client level",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Client level with MaxCommitDelay",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Write level",
client: &TransactionOptions{},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Write level with MaxCommitDelay",
client: &TransactionOptions{},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "testTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
},
{
name: "Write level has precedence than client level",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: false}, TransactionTag: "clientTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
},
{
name: "Write level nil MaxCommitDelay does not unset client level MaxCommitDelay",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: false, MaxCommitDelay: &duration}, TransactionTag: "clientTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
},
{
name: "Write level has precedence than client level MaxCommitDelay",
client: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: false, MaxCommitDelay: &duration}, TransactionTag: "clientTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &otherDuration}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &otherDuration}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
},
{
name: "Read lock mode is optimistic",
client: &TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC},
Expand Down
3 changes: 2 additions & 1 deletion spanner/integration_test.go
Expand Up @@ -1430,7 +1430,8 @@ func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) {
}
}

txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
duration, _ := time.ParseDuration("100ms")
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}}
resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Query Foo's balance and Bar's balance.
queryAccountByID := "SELECT Balance FROM Accounts WHERE AccountId = @p1"
Expand Down
16 changes: 15 additions & 1 deletion spanner/transaction.go
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/spanner/apiv1"
durationpb "google.golang.org/protobuf/types/known/durationpb"
)

// transactionID stores a transaction ID which uniquely identifies a transaction
Expand Down Expand Up @@ -1486,14 +1487,22 @@ type CommitResponse struct {
// CommitOptions provides options for committing a transaction in a database.
type CommitOptions struct {
ReturnCommitStats bool
MaxCommitDelay *time.Duration
}

// merge combines two CommitOptions that the input parameter will have higher
// order of precedence.
func (co CommitOptions) merge(opts CommitOptions) CommitOptions {
return CommitOptions{
var newOpts CommitOptions
newOpts = CommitOptions{
ReturnCommitStats: co.ReturnCommitStats || opts.ReturnCommitStats,
MaxCommitDelay: opts.MaxCommitDelay,
}

if newOpts.MaxCommitDelay == nil {
newOpts.MaxCommitDelay = co.MaxCommitDelay
}
return newOpts
}

// commit tries to commit a readwrite transaction to Cloud Spanner. It also
Expand Down Expand Up @@ -1532,6 +1541,10 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions
t.sh.updateLastUseTime()

var md metadata.MD
var maxCommitDelay *durationpb.Duration
if options.MaxCommitDelay != nil {
maxCommitDelay = durationpb.New(*(options.MaxCommitDelay))
}
res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata(), t.disableRouteToLeader), &sppb.CommitRequest{
Session: sid,
Transaction: &sppb.CommitRequest_TransactionId{
Expand All @@ -1540,6 +1553,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions
RequestOptions: createRequestOptions(t.txOpts.CommitPriority, "", t.txOpts.TransactionTag),
Mutations: mPb,
ReturnCommitStats: options.ReturnCommitStats,
MaxCommitDelay: maxCommitDelay,
}, gax.WithGRPCOptions(grpc.Header(&md)))
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "commit"); err != nil {
Expand Down

0 comments on commit a8078f0

Please sign in to comment.