Skip to content

Commit

Permalink
feat(spanner): inline begin transaction for ReadWriteTransactions (#7149
Browse files Browse the repository at this point in the history
)

* feat(spanner): inline begin transaction

* fix apidiff install issue

* fix github workflow install issue

* add test to for a read/write transaction executing two queries in parallel as the first statement, transactionID used should be same and the other should block until the first has returned a transactionID.

* incorporate requested changes

* fix check

* incorporate requested changes

* minor fixes

* fix benchmark tests

* feat(spanner): testcase fixes

* feat(spanner): nit add key to struct

* incoporate suggested changes

* incorporate requested changes

* re-trigger checks

Co-authored-by: harshachinta <sriharshach@google.com>
  • Loading branch information
rahul2393 and harshachinta committed Jan 10, 2023
1 parent 9a6db6d commit 2ce3606
Show file tree
Hide file tree
Showing 14 changed files with 751 additions and 956 deletions.
27 changes: 16 additions & 11 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,43 +491,48 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
return resp, err
}
var (
sh *sessionHandle
sh *sessionHandle
t *ReadWriteTransaction
attempt = 0
)
defer func() {
if sh != nil {
sh.recycle()
}
}()
err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
err = runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx, func(ctx context.Context) error {
var (
err error
t *ReadWriteTransaction
)
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// Session handle hasn't been allocated or has been destroyed.
sh, err = c.idleSessions.takeWriteSession(ctx)
sh, err = c.idleSessions.take(ctx)
if err != nil {
// If session retrieval fails, just fail the transaction.
return err
}
t = &ReadWriteTransaction{
tx: sh.getTransactionID(),
}
if t.shouldExplicitBegin(attempt) {
if err = t.begin(ctx); err != nil {
return spannerErrorf(codes.Internal, "error while BeginTransaction during retrying a ReadWrite transaction: %v", err)
}
} else {
t = &ReadWriteTransaction{}
t = &ReadWriteTransaction{
txReadyOrClosed: make(chan struct{}),
}
}
attempt++
t.txReadOnly.sh = sh
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txOpts = c.txo.merge(options)
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
trace.TracePrintf(ctx, map[string]interface{}{"transactionSelector": t.getTransactionSelector().String()},
"Starting transaction attempt")
if err = t.begin(ctx); err != nil {
return err
}

resp, err = t.runInTransaction(ctx, f)
return err
})
Expand Down
21 changes: 10 additions & 11 deletions spanner/client_benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func createBenchmarkServer(incStep uint64) (server *MockedSpannerInMemTestServer
})
// Wait until the session pool has been initialized.
waitFor(t, func() error {
if uint64(client.idleSessions.idleList.Len()+client.idleSessions.idleWriteList.Len()) == client.idleSessions.MinOpened {
if uint64(client.idleSessions.idleList.Len()) == client.idleSessions.MinOpened {
return nil
}
return fmt.Errorf("not yet initialized")
Expand Down Expand Up @@ -177,8 +177,8 @@ func benchmarkClientBurstRead(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalQueries := int(sp.MaxOpened * 8)
Expand Down Expand Up @@ -238,8 +238,8 @@ func benchmarkClientBurstWrite(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalUpdates := int(sp.MaxOpened * 8)
Expand Down Expand Up @@ -299,8 +299,8 @@ func benchmarkClientBurstReadAndWrite(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalUpdates := int(sp.MaxOpened * 4)
Expand Down Expand Up @@ -378,8 +378,8 @@ func benchmarkClientSteadyIncrease(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

transactions := make([]*ReadOnlyTransaction, sp.MaxOpened)
Expand All @@ -404,8 +404,7 @@ func reportBenchmark(b *testing.B, sp *sessionPool, server *MockedSpannerInMemTe
b.Logf("CreateSession: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CreateSessionRequest{})))
b.Logf("BeginTransaction: %d\t", countRequests(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{})))
b.Logf("Commit: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CommitRequest{})))
b.Logf("ReadSessions: %d\t", sp.idleList.Len())
b.Logf("WriteSessions: %d\n", sp.idleWriteList.Len())
b.Logf("NumSessions: %d\t", sp.idleList.Len())
}

func countRequests(requests []interface{}, tp reflect.Type) (count int) {
Expand Down
Loading

0 comments on commit 2ce3606

Please sign in to comment.