Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): inline begin transaction for ReadWriteTransactions #7149

Merged
merged 23 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a7410ac
feat(spanner): inline begin transaction
rahul2393 Dec 8, 2022
493e538
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 9, 2022
44af2f6
fix apidiff install issue
rahul2393 Dec 9, 2022
c94939b
fix github workflow install issue
rahul2393 Dec 9, 2022
aa5d55a
add test to for a read/write transaction executing two queries in par…
rahul2393 Dec 9, 2022
03f03c2
incorporate requested changes
rahul2393 Dec 9, 2022
1405187
fix check
rahul2393 Dec 9, 2022
0d8e9c7
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 9, 2022
883021f
incorporate requested changes
rahul2393 Dec 13, 2022
8796c70
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 13, 2022
ba363c8
minor fixes
rahul2393 Dec 13, 2022
7d357c4
fix benchmark tests
rahul2393 Dec 13, 2022
88d075b
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 13, 2022
136af30
feat(spanner): testcase fixes
harshachinta Dec 14, 2022
7c17277
feat(spanner): nit add key to struct
harshachinta Dec 15, 2022
50281ee
incoporate suggested changes
rahul2393 Dec 16, 2022
b304bb0
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 16, 2022
17f7b10
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 19, 2022
464bce1
incorporate requested changes
rahul2393 Dec 19, 2022
b9b184e
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 20, 2022
20a6da8
Merge branch 'main' into inline-begin-transaction
rahul2393 Jan 9, 2023
f15c6ad
re-trigger checks
rahul2393 Jan 9, 2023
5a44cd5
Merge branch 'main' into inline-begin-transaction
rahul2393 Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 16 additions & 11 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,43 +491,48 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
return resp, err
}
var (
sh *sessionHandle
sh *sessionHandle
t *ReadWriteTransaction
attempt = 0
)
defer func() {
if sh != nil {
sh.recycle()
}
}()
err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
err = runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx, func(ctx context.Context) error {
var (
err error
t *ReadWriteTransaction
)
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// Session handle hasn't been allocated or has been destroyed.
sh, err = c.idleSessions.takeWriteSession(ctx)
sh, err = c.idleSessions.take(ctx)
if err != nil {
// If session retrieval fails, just fail the transaction.
return err
}
t = &ReadWriteTransaction{
tx: sh.getTransactionID(),
}
if t.shouldExplicitBegin(attempt) {
if err = t.begin(ctx); err != nil {
return spannerErrorf(codes.Internal, "error while BeginTransaction during retrying a ReadWrite transaction: %v", err)
}
} else {
t = &ReadWriteTransaction{}
t = &ReadWriteTransaction{
txReadyOrClosed: make(chan struct{}),
}
}
attempt++
t.txReadOnly.sh = sh
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txOpts = c.txo.merge(options)
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
trace.TracePrintf(ctx, map[string]interface{}{"transactionSelector": t.getTransactionSelector().String()},
"Starting transaction attempt")
if err = t.begin(ctx); err != nil {
return err
}

resp, err = t.runInTransaction(ctx, f)
return err
})
Expand Down
21 changes: 10 additions & 11 deletions spanner/client_benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func createBenchmarkServer(incStep uint64) (server *MockedSpannerInMemTestServer
})
// Wait until the session pool has been initialized.
waitFor(t, func() error {
if uint64(client.idleSessions.idleList.Len()+client.idleSessions.idleWriteList.Len()) == client.idleSessions.MinOpened {
if uint64(client.idleSessions.idleList.Len()) == client.idleSessions.MinOpened {
return nil
}
return fmt.Errorf("not yet initialized")
Expand Down Expand Up @@ -177,8 +177,8 @@ func benchmarkClientBurstRead(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: below in this test a goroutine is launched with the arg b. Calling fatal in a launched goroutine from a test can cause the whole suite to fail. Should be converted to an error or errors collected and considered fatal back in the main goroutine. I see this in several tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @codyoss for suggestion, will be doing benchmark changes in another PR, since we are not changing benchmark tests itself as part of this feature wanted to have improvements separate.

b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalQueries := int(sp.MaxOpened * 8)
Expand Down Expand Up @@ -238,8 +238,8 @@ func benchmarkClientBurstWrite(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalUpdates := int(sp.MaxOpened * 8)
Expand Down Expand Up @@ -299,8 +299,8 @@ func benchmarkClientBurstReadAndWrite(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalUpdates := int(sp.MaxOpened * 4)
Expand Down Expand Up @@ -378,8 +378,8 @@ func benchmarkClientSteadyIncrease(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

transactions := make([]*ReadOnlyTransaction, sp.MaxOpened)
Expand All @@ -404,8 +404,7 @@ func reportBenchmark(b *testing.B, sp *sessionPool, server *MockedSpannerInMemTe
b.Logf("CreateSession: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CreateSessionRequest{})))
b.Logf("BeginTransaction: %d\t", countRequests(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{})))
b.Logf("Commit: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CommitRequest{})))
b.Logf("ReadSessions: %d\t", sp.idleList.Len())
b.Logf("WriteSessions: %d\n", sp.idleWriteList.Len())
b.Logf("NumSessions: %d\t", sp.idleList.Len())
}

func countRequests(requests []interface{}, tp reflect.Type) (count int) {
Expand Down