Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): inline begin transaction for ReadWriteTransactions #7149

Merged
merged 23 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a7410ac
feat(spanner): inline begin transaction
rahul2393 Dec 8, 2022
493e538
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 9, 2022
44af2f6
fix apidiff install issue
rahul2393 Dec 9, 2022
c94939b
fix github workflow install issue
rahul2393 Dec 9, 2022
aa5d55a
add test to for a read/write transaction executing two queries in par…
rahul2393 Dec 9, 2022
03f03c2
incorporate requested changes
rahul2393 Dec 9, 2022
1405187
fix check
rahul2393 Dec 9, 2022
0d8e9c7
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 9, 2022
883021f
incorporate requested changes
rahul2393 Dec 13, 2022
8796c70
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 13, 2022
ba363c8
minor fixes
rahul2393 Dec 13, 2022
7d357c4
fix benchmark tests
rahul2393 Dec 13, 2022
88d075b
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 13, 2022
136af30
feat(spanner): testcase fixes
harshachinta Dec 14, 2022
7c17277
feat(spanner): nit add key to struct
harshachinta Dec 15, 2022
50281ee
incoporate suggested changes
rahul2393 Dec 16, 2022
b304bb0
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 16, 2022
17f7b10
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 19, 2022
464bce1
incorporate requested changes
rahul2393 Dec 19, 2022
b9b184e
Merge branch 'main' into inline-begin-transaction
rahul2393 Dec 20, 2022
20a6da8
Merge branch 'main' into inline-begin-transaction
rahul2393 Jan 9, 2023
f15c6ad
re-trigger checks
rahul2393 Jan 9, 2023
5a44cd5
Merge branch 'main' into inline-begin-transaction
rahul2393 Jan 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/apidiff.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
with:
go-version: '1.18.3'
- name: Install latest apidiff
run: go install golang.org/x/exp/cmd/apidiff@latest
run: go install golang.org/x/exp/cmd/apidiff@master
- uses: actions/checkout@v3
with:
ref: main
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/vet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ jobs:
go-version: '1.19'
- name: Install tools
run: |
go install golang.org/x/lint/golint@latest && \
go install golang.org/x/tools/cmd/goimports@latest && \
go install golang.org/x/lint/golint@master && \
go install golang.org/x/tools/cmd/goimports@master && \
go install honnef.co/go/tools/cmd/staticcheck@latest
- name: Execute vet.sh
run: ./.github/workflows/vet.sh
13 changes: 4 additions & 9 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,16 +501,14 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
)
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// Session handle hasn't been allocated or has been destroyed.
sh, err = c.idleSessions.takeWriteSession(ctx)
sh, err = c.idleSessions.take(ctx)
if err != nil {
// If session retrieval fails, just fail the transaction.
return err
}
t = &ReadWriteTransaction{
tx: sh.getTransactionID(),
}
} else {
t = &ReadWriteTransaction{}
}
t = &ReadWriteTransaction{
txReadyOrClosed: make(chan struct{}),
}
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
Expand All @@ -521,9 +519,6 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
if err = t.begin(ctx); err != nil {
return err
}
resp, err = t.runInTransaction(ctx, f)
return err
})
Expand Down
21 changes: 10 additions & 11 deletions spanner/client_benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func createBenchmarkServer(incStep uint64) (server *MockedSpannerInMemTestServer
})
// Wait until the session pool has been initialized.
waitFor(t, func() error {
if uint64(client.idleSessions.idleList.Len()+client.idleSessions.idleWriteList.Len()) == client.idleSessions.MinOpened {
if uint64(client.idleSessions.idleList.Len()) == client.idleSessions.MinOpened {
return nil
}
return fmt.Errorf("not yet initialized")
Expand Down Expand Up @@ -177,8 +177,8 @@ func benchmarkClientBurstRead(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: below in this test a goroutine is launched with the arg b. Calling fatal in a launched goroutine from a test can cause the whole suite to fail. Should be converted to an error or errors collected and considered fatal back in the main goroutine. I see this in several tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @codyoss for suggestion, will be doing benchmark changes in another PR, since we are not changing benchmark tests itself as part of this feature wanted to have improvements separate.

b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalQueries := int(sp.MaxOpened * 8)
Expand Down Expand Up @@ -238,8 +238,8 @@ func benchmarkClientBurstWrite(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalUpdates := int(sp.MaxOpened * 8)
Expand Down Expand Up @@ -299,8 +299,8 @@ func benchmarkClientBurstReadAndWrite(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalUpdates := int(sp.MaxOpened * 4)
Expand Down Expand Up @@ -378,8 +378,8 @@ func benchmarkClientSteadyIncrease(b *testing.B, incStep uint64) {
for n := 0; n < b.N; n++ {
server, client, teardown := createBenchmarkServer(incStep)
sp := client.idleSessions
if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

transactions := make([]*ReadOnlyTransaction, sp.MaxOpened)
Expand All @@ -404,8 +404,7 @@ func reportBenchmark(b *testing.B, sp *sessionPool, server *MockedSpannerInMemTe
b.Logf("CreateSession: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CreateSessionRequest{})))
b.Logf("BeginTransaction: %d\t", countRequests(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{})))
b.Logf("Commit: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CommitRequest{})))
b.Logf("ReadSessions: %d\t", sp.idleList.Len())
b.Logf("WriteSessions: %d\n", sp.idleWriteList.Len())
b.Logf("NumSessions: %d\t", sp.idleList.Len())
}

func countRequests(requests []interface{}, tp reflect.Type) (count int) {
Expand Down
81 changes: 61 additions & 20 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -815,21 +816,6 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransaction(t *testin
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransactionWithEmptySessionPool(t *testing.T) {
t.Parallel()
// There will be no prepared sessions in the pool, so the error will occur
// when the transaction tries to get a session from the pool. This will
// also be handled by the session pool, so the transaction itself does not
// need to retry, hence the expectedAttempts == 1.
if err := testReadWriteTransactionWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.0},
}, map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 1); err != nil {
t.Fatal(err)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
Expand Down Expand Up @@ -1435,6 +1421,66 @@ func TestClient_ReadWriteTransactionCommitAlreadyExists(t *testing.T) {
}
}

func TestClient_ReadWriteTransactionConcurrentQueries(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
var (
ctx = context.Background()
wg = sync.WaitGroup{}
firstTransactionID transactionID
secondTransactionID transactionID
)
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
query := func(id *transactionID) {
defer func() {
if tx.tx != nil {
*id = tx.tx
}
wg.Done()
}()
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
rowCount := int64(0)
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return
}
rowCount++
}
return
}
wg.Add(2)
go query(&firstTransactionID)
go query(&secondTransactionID)
wg.Wait()
return nil
})
if err != nil {
t.Fatal(err)
}
if firstTransactionID == nil || secondTransactionID == nil || string(firstTransactionID) != string(secondTransactionID) {
rahul2393 marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("transactionID mismatch:\nfirst: %v\nsecong: %v", firstTransactionID, secondTransactionID)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
}

func testReadWriteTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime, expectedAttempts int) error {
return testReadWriteTransactionWithConfig(t, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, executionTimes, expectedAttempts)
}
Expand Down Expand Up @@ -1772,10 +1818,6 @@ func TestReadWriteTransaction_WrapSessionNotFoundError(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
Expand Down Expand Up @@ -2146,7 +2188,6 @@ func TestFailedUpdate_ShouldRollback(t *testing.T) {
// The failed update should trigger a rollback.
if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.RollbackRequest{},
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ loop:
t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
default:
sp.mu.Lock()
numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
numOpened = sp.idleList.Len()
sp.mu.Unlock()
if uint64(numOpened) == want {
break loop
Expand Down
45 changes: 38 additions & 7 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,26 @@ func (s *StatementResult) updateCountToPartialResultSet(exact bool) *spannerpb.P
// Converts an update count to a ResultSet, as DML statements also return the
// update count as the statistics of a ResultSet.
func (s *StatementResult) convertUpdateCountToResultSet(exact bool) *spannerpb.ResultSet {
rs := &spannerpb.ResultSet{
Stats: &spannerpb.ResultSetStats{
RowCount: &spannerpb.ResultSetStats_RowCountLowerBound{
RowCountLowerBound: s.UpdateCount,
},
},
}
if exact {
return &spannerpb.ResultSet{
rs = &spannerpb.ResultSet{
Stats: &spannerpb.ResultSetStats{
RowCount: &spannerpb.ResultSetStats_RowCountExact{
RowCountExact: s.UpdateCount,
},
},
}
}
return &spannerpb.ResultSet{
Stats: &spannerpb.ResultSetStats{
RowCount: &spannerpb.ResultSetStats_RowCountLowerBound{
RowCountLowerBound: s.UpdateCount,
},
},
if s.ResultSet != nil {
rs.Metadata = s.ResultSet.Metadata
}
return rs
}

// SimulatedExecutionTime represents the time the execution of a method
Expand Down Expand Up @@ -787,6 +791,15 @@ func (s *inMemSpannerServer) ExecuteSql(ctx context.Context, req *spannerpb.Exec
if err != nil {
return nil, err
}
if _, ok := req.GetTransaction().GetSelector().(*spannerpb.TransactionSelector_Begin); ok {
rahul2393 marked this conversation as resolved.
Show resolved Hide resolved
if statementResult.ResultSet == nil {
statementResult.ResultSet = &spannerpb.ResultSet{}
}
if statementResult.ResultSet.Metadata == nil {
statementResult.ResultSet.Metadata = &spannerpb.ResultSetMetadata{}
}
statementResult.ResultSet.Metadata.Transaction = &spannerpb.Transaction{Id: id}
}
s.mu.Lock()
isPartitionedDml := s.partitionedDmlTransactions[string(id)]
s.mu.Unlock()
Expand Down Expand Up @@ -833,6 +846,15 @@ func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlReques
if err != nil {
return err
}
if _, ok := req.GetTransaction().GetSelector().(*spannerpb.TransactionSelector_Begin); ok {
if statementResult.ResultSet == nil {
statementResult.ResultSet = &spannerpb.ResultSet{}
}
if statementResult.ResultSet.Metadata == nil {
statementResult.ResultSet.Metadata = &spannerpb.ResultSetMetadata{}
}
statementResult.ResultSet.Metadata.Transaction = &spannerpb.Transaction{Id: id}
}
s.mu.Lock()
isPartitionedDml := s.partitionedDmlTransactions[string(id)]
s.mu.Unlock()
Expand Down Expand Up @@ -906,6 +928,15 @@ func (s *inMemSpannerServer) ExecuteBatchDml(ctx context.Context, req *spannerpb
if err != nil {
return nil, err
}
if _, ok := req.GetTransaction().GetSelector().(*spannerpb.TransactionSelector_Begin); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also use the helper method getResultSetWithTransactionSet?

if statementResult.ResultSet == nil {
statementResult.ResultSet = &spannerpb.ResultSet{}
}
if statementResult.ResultSet.Metadata == nil {
statementResult.ResultSet.Metadata = &spannerpb.ResultSetMetadata{}
}
statementResult.ResultSet.Metadata.Transaction = &spannerpb.Transaction{Id: id}
}
switch statementResult.Type {
case StatementResultError:
resp.Status = &status.Status{Code: int32(gstatus.Code(statementResult.Err)), Message: statementResult.Err.Error()}
Expand Down
17 changes: 6 additions & 11 deletions spanner/oc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package spanner
import (
"context"
"fmt"
"math"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -156,12 +155,12 @@ func TestOCStats_SessionPool_SessionsCount(t *testing.T) {
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig})
defer teardown()
// Wait for the session pool initialization to finish.
expectedWrites := uint64(math.Floor(float64(DefaultSessionPoolConfig.MinOpened) * DefaultSessionPoolConfig.WriteSessions))
expectedWrites := uint64(0)
expectedReads := DefaultSessionPoolConfig.MinOpened - expectedWrites
waitFor(t, func() error {
client.idleSessions.mu.Lock()
defer client.idleSessions.mu.Unlock()
if client.idleSessions.numReads == expectedReads && client.idleSessions.numWrites == expectedWrites {
if client.idleSessions.numSessions == expectedReads {
return nil
}
return waitErr
Expand All @@ -172,7 +171,7 @@ func TestOCStats_SessionPool_SessionsCount(t *testing.T) {
waitFor(t, func() error {
select {
case stat := <-te.Stats:
if len(stat.Rows) >= 4 {
if len(stat.Rows) >= 2 {
return nil
}
}
Expand All @@ -184,7 +183,7 @@ func TestOCStats_SessionPool_SessionsCount(t *testing.T) {
case stat := <-te.Stats:
// There are 4 types for this metric, so we should see at least four
// rows.
if len(stat.Rows) < 4 {
if len(stat.Rows) < 2 {
t.Fatal("No enough metrics are exported")
}
if got, want := stat.View.Measure.Name(), statsPrefix+"num_sessions_in_pool"; got != want {
Expand All @@ -200,12 +199,8 @@ func TestOCStats_SessionPool_SessionsCount(t *testing.T) {
got := fmt.Sprintf("%v", data.Value)
var want string
switch m[tagKeyType] {
case "num_write_prepared_sessions":
want = "20"
case "num_read_sessions":
want = "80"
case "num_sessions_being_prepared":
want = "0"
case "num_sessions":
want = "100"
case "num_in_use_sessions":
want = "0"
default:
Expand Down