Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): long running transaction clean up - disabled #8177

Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7e7375e
feat(spanner): long running transaction clean up to prevent session l…
harshachinta Jun 27, 2023
77959b4
feat(spanner): code refactoring in session.go file
harshachinta Jun 27, 2023
643678e
fix(spanner): fix vet
harshachinta Jun 27, 2023
454e2a6
feat(spanner): refactor client.go file
harshachinta Jun 27, 2023
37997cb
feat(spanner): add lock when updating session handle
harshachinta Jun 27, 2023
a99ae01
feat(spanner): code refactoring in transaction.go file
harshachinta Jun 27, 2023
acfa972
test(spanner): remove test
harshachinta Jun 27, 2023
89b31c1
feat(spanner): code refactor
harshachinta Jun 27, 2023
c073016
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Jun 27, 2023
e0d1afc
feat(spanner): refactor nit comments
harshachinta Jul 5, 2023
327cd79
feat(spanner): reduce sleep timing to milli seconds for unit tests
harshachinta Jul 5, 2023
ee6cd46
feat(spanner): update idleTimeThresholdSecs field to time.Duration
harshachinta Jul 5, 2023
6094f0b
feat(spanner): make the log messages conditional based on type of act…
harshachinta Jul 5, 2023
9314761
feat(spanner): combine get and remove long running sessions in a sing…
harshachinta Jul 8, 2023
f578fac
feat(spanner): modify presubmit condition to run tests for changed mo…
harshachinta Jul 12, 2023
6aa6558
feat(spanner): revert presubmit.sh fix
harshachinta Jul 29, 2023
2f927c4
feat(spanner): update doc
harshachinta Jul 29, 2023
3897b35
feat(spanner): reword isLongRunning to eligibleForLongRunning in sess…
harshachinta Jul 29, 2023
7e52ce0
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Jul 29, 2023
1c15ef5
feat(spanner): update TrackSessionHandles logic to turn off stack tra…
harshachinta Aug 1, 2023
9efe433
feat(spanner): fix test
harshachinta Aug 1, 2023
4e2d597
feat(spanner): change action on inactive transactions option to enum
harshachinta Aug 1, 2023
7a7986e
feat(spanner): fix lint
harshachinta Aug 1, 2023
5f2d55a
feat(spanner): fix lint
harshachinta Aug 1, 2023
ed12707
feat(spanner): fix lint
harshachinta Aug 1, 2023
cbfe38c
feat(spanner): fix doc
harshachinta Aug 2, 2023
f0bacdb
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 4, 2023
168438d
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 6, 2023
fb7c2d7
feat(spanner): disable the feature by default
harshachinta Aug 6, 2023
04b01a6
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 10, 2023
0359065
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 18, 2023
bc48a8a
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 21, 2023
ba90f75
feat(spanner): revert commit - disable the feature by default
harshachinta Aug 21, 2023
750b911
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Oct 5, 2023
2e26b54
fix: lint
harshachinta Oct 5, 2023
b9e272e
docs: add Readme
harshachinta Oct 6, 2023
057fcfb
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Oct 6, 2023
1efbea6
feat(spanner): make WARN as default for action on inactive transactions
harshachinta Oct 6, 2023
1ee50f8
docs: address review comments
harshachinta Oct 6, 2023
ab268ca
docs: move README.md changes in a different PR
harshachinta Oct 6, 2023
15b7af3
feat: disable feature
harshachinta Oct 6, 2023
7910e1f
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 83 additions & 0 deletions spanner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,87 @@ row, err := client.Single().ReadRow(ctx, "Users",
if err != nil {
log.Fatal(err)
}
```
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arpan14 @olavloite @rahul2393 @asthamohta
Can you please take a look at the golang documentation for session leaks and give your LGTM?


### Session Leak
A `Client` object of the Client Library has a limit on the number of maximum sessions. For example the
default value of `MaxOpened`, which is the maximum number of sessions allowed by the session pool in the
Golang Client Library is 400. You can configure these values at the time of
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
creating a `Client` by passing custom `SessionPoolConfig` as part of `ClientConfig`. When all the sessions are checked
out of the session pool, every new transaction has to wait until a session is returned to the pool.
If a session is never returned to the pool (hence causing a session leak), the transactions will have to wait
indefinitely and your application will be blocked.

#### Common Root Causes
The most common reason for session leaks in the Golang client library are:
1. Not stopping a `RowIterator` that is returned by `Query`, `Read` and other methods. Always use `RowIterator.Stop()` to ensure that the `RowIterator` is always closed.
2. Not closing a `ReadOnlyTransaction` when you no longer need it. Always call `ReadOnlyTransaction.Close()` after use, to ensure that the `ReadOnlyTransaction` is always closed.
3. Not closing a `BatchReadOnlyTransaction` when you no longer need it. Always call `BatchReadOnlyTransaction.Close()` after use, to ensure that the `BatchReadOnlyTransaction` is always closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a different PR: We should change the implementation of BatchReadOnlyTransaction to not use any sessions from the pool, and instead create a separate session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchReadOnlyTransaction already creates its own session and do not use any sessions from the pool. Sorry for the confusion.
I have removed this line.


As shown in the example below, the `txn.Close()` statement releases the session after it is complete.
If you don't do `txn.Close()`, the session is not released back to the pool. The recommended way is to use `defer` as shown below,
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
```go
client, err := spanner.NewClient(ctx, "projects/P/instances/I/databases/D")
if err != nil {
log.Fatal(err)
}
txn := client.ReadOnlyTransaction()
defer txn.Close()
```

#### Debugging and Resolving Session Leaks

##### Logging inactive transactions
Enabled by default, the logging option shares warn logs when you have exhausted >95% of your session pool.
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
This could mean two things, either you need to increase the max sessions in your session pool (as the number
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
of queries run using the client side database object is greater than your session pool can serve) or you may
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
have a session leak. To help debug which transactions may be causing this session leak, the logs will also contain stack traces of
transactions which have been running longer than expected if `TrackSessionHandles` under `SessionPoolConfig` is enabled.

```go
sessionPoolConfig := spanner.SessionPoolConfig{
TrackSessionHandles: true,
InactiveTransactionRemovalOptions: spanner.InactiveTransactionRemovalOptions{
ActionOnInactiveTransaction: spanner.Warn,
},
}
client, err := spanner.NewClientWithConfig(
ctx, database, spanner.ClientConfig{SessionPoolConfig: sessionPoolConfig},
)
if err != nil {
log.Fatal(err)
}
defer client.Close()

// Example Log message to warn presence of long running transactions
// session <session-info> checked out of pool at <session-checkout-time> is long running due to possible session leak for goroutine
// <Stack Trace of transaction>

```

##### Automatically clean inactive transactions
When the option to automatically clean inactive transactions is enabled, the client library will automatically spot
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
problematic transactions that are running for extremely long periods of time (thus causing session leaks) and close them.
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
The session will be removed from the pool and be replaced by a new session. To dig deeper into which transactions are being
closed, you can check the logs to see the stack trace of the transactions which might be causing these leaks and further
debug them.

```go
sessionPoolConfig := spanner.SessionPoolConfig{
TrackSessionHandles: true,
InactiveTransactionRemovalOptions: spanner.InactiveTransactionRemovalOptions{
ActionOnInactiveTransaction: spanner.WarnAndClose,
},
}
client, err := spanner.NewClientWithConfig(
ctx, database, spanner.ClientConfig{SessionPoolConfig: sessionPoolConfig},
)
if err != nil {
log.Fatal(err)
}
defer client.Close()

// Example Log message for when transaction is recycled
// session <session-info> checked out of pool at <session-checkout-time> is long running and will be removed due to possible session leak for goroutine
// <Stack Trace of transaction>
```
26 changes: 18 additions & 8 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,11 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound

t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tx,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: rts,
tx: tx,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: rts,
isLongRunningTransaction: true,
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
},
ID: BatchReadOnlyTransactionID{
tid: tx,
Expand Down Expand Up @@ -473,10 +474,11 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)

t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tid.tid,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: tid.rts,
tx: tid.tid,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: tid.rts,
isLongRunningTransaction: true,
},
ID: tid,
}
Expand Down Expand Up @@ -558,6 +560,14 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// Session handle hasn't been allocated or has been destroyed.
sh, err = c.idleSessions.take(ctx)
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
if t != nil {
// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
sh.mu.Lock()
t.mu.Lock()
sh.eligibleForLongRunning = t.isLongRunningTransaction
t.mu.Unlock()
sh.mu.Unlock()
}
if err != nil {
// If session retrieval fails, just fail the transaction.
return err
Expand Down
217 changes: 217 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,73 @@ func TestClient_Single_Read_SessionNotFound(t *testing.T) {
}
}

func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_RemoveSessionFromPool(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
ActionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
single := client.Single()
iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
p := client.idleSessions
sh := single.sh
// simulate session to be checked out for more than 60mins
sh.mu.Lock()
sh.checkoutTime = time.Now().Add(-time.Hour)
sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()
rowCount := int64(0)
for {
// Backend throws SessionNotFoundError. Session gets replaced with new session
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatal(err)
}
rowCount++
}
// New session returns back to pool
iter.Stop()

p.mu.Lock()
defer p.mu.Unlock()
if g, w := p.idleList.Len(), 1; g != w {
t.Fatalf("Idle Sessions in pool, count mismatch\nGot: %d\nWant: %d\n", g, w)
}
if g, w := p.numInUse, uint64(0); g != w {
t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w)
}
if g, w := p.numOpened, uint64(1); g != w {
t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w)
}

sh.mu.Lock()
defer sh.mu.Unlock()
if g, w := sh.eligibleForLongRunning, false; g != w {
t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1368,6 +1435,59 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteUpdate(t *testing.T
}
}

func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionShouldFail(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
ActionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
ctx := context.Background()
p := client.idleSessions
msg := "session is already recycled / destroyed"
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
rowCount, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w {
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}

// Simulate the session to be checked out for more than 60 mins.
// The background task cleans up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, false; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
}
tx.sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()

// The session associated with this transaction tx has been destroyed. So the below call should fail.
// Eventually this means the entire transaction should not succeed.
_, err = tx.Update(ctx, NewStatement("UPDATE FOO SET BAR='value' WHERE ID=1"))
if err != nil {
return err
}
return nil
})
if err == nil {
t.Fatalf("Missing expected exception")
}
if status.Code(err) != codes.FailedPrecondition || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1401,6 +1521,65 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *test
}
}

func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAction(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
ActionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteBatchDml,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
p := client.idleSessions
var attempts int
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
if attempts == 2 {
// Simulate the session to be long-running. The background task should not clean up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, true; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
}
tx.sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()
}
rowCounts, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
if err != nil {
return err
}
if g, w := len(rowCounts), 1; g != w {
return status.Errorf(codes.FailedPrecondition, "Row counts length mismatch\nGot: %v\nWant: %v", g, w)
}
if g, w := rowCounts[0], int64(UpdateBarSetFooRowCount); g != w {
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}
return nil
})
if err != nil {
t.Fatal(err)
}
if g, w := attempts, 2; g != w {
t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_ReadWriteTransaction_Query_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -3912,6 +4091,44 @@ func TestClient_PDML_Priority(t *testing.T) {
}
}

func TestClient_WhenLongRunningPartitionedUpdateRequest_TakeNoAction(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
healthCheckSampleInterval: 10 * time.Millisecond, // maintainer runs every 10ms
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
ActionOnInactiveTransaction: WarnAndClose,
executionFrequency: 15 * time.Millisecond, // check long-running sessions every 15ms
},
},
})
defer teardown()
// delay the rpc by 30ms. The background task runs to clean long-running sessions.
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 30 * time.Millisecond,
})

stmt := NewStatement(UpdateBarSetFoo)
// This transaction is eligible to be long-running, so the background task should not clean its session.
rowCount, err := client.PartitionedUpdate(ctx, stmt)
if err != nil {
t.Fatal(err)
}
if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w {
t.Errorf("Row count mismatch\nGot: %v\nWant: %v", g, w)
}
p := client.idleSessions
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_Apply_Priority(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
}

sh, err := c.idleSessions.take(ctx)
// Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running
sh.mu.Lock()
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
sh.eligibleForLongRunning = true
sh.mu.Unlock()
if err != nil {
return 0, ToSpannerError(err)
}
Expand Down