Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): long running transaction clean up - disabled #8177

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7e7375e
feat(spanner): long running transaction clean up to prevent session l…
harshachinta Jun 27, 2023
77959b4
feat(spanner): code refactoring in session.go file
harshachinta Jun 27, 2023
643678e
fix(spanner): fix vet
harshachinta Jun 27, 2023
454e2a6
feat(spanner): refactor client.go file
harshachinta Jun 27, 2023
37997cb
feat(spanner): add lock when updating session handle
harshachinta Jun 27, 2023
a99ae01
feat(spanner): code refactoring in transaction.go file
harshachinta Jun 27, 2023
acfa972
test(spanner): remove test
harshachinta Jun 27, 2023
89b31c1
feat(spanner): code refactor
harshachinta Jun 27, 2023
c073016
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Jun 27, 2023
e0d1afc
feat(spanner): refactor nit comments
harshachinta Jul 5, 2023
327cd79
feat(spanner): reduce sleep timing to milli seconds for unit tests
harshachinta Jul 5, 2023
ee6cd46
feat(spanner): update idleTimeThresholdSecs field to time.Duration
harshachinta Jul 5, 2023
6094f0b
feat(spanner): make the log messages conditional based on type of act…
harshachinta Jul 5, 2023
9314761
feat(spanner): combine get and remove long running sessions in a sing…
harshachinta Jul 8, 2023
f578fac
feat(spanner): modify presubmit condition to run tests for changed mo…
harshachinta Jul 12, 2023
6aa6558
feat(spanner): revert presubmit.sh fix
harshachinta Jul 29, 2023
2f927c4
feat(spanner): update doc
harshachinta Jul 29, 2023
3897b35
feat(spanner): reword isLongRunning to eligibleForLongRunning in sess…
harshachinta Jul 29, 2023
7e52ce0
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Jul 29, 2023
1c15ef5
feat(spanner): update TrackSessionHandles logic to turn off stack tra…
harshachinta Aug 1, 2023
9efe433
feat(spanner): fix test
harshachinta Aug 1, 2023
4e2d597
feat(spanner): change action on inactive transactions option to enum
harshachinta Aug 1, 2023
7a7986e
feat(spanner): fix lint
harshachinta Aug 1, 2023
5f2d55a
feat(spanner): fix lint
harshachinta Aug 1, 2023
ed12707
feat(spanner): fix lint
harshachinta Aug 1, 2023
cbfe38c
feat(spanner): fix doc
harshachinta Aug 2, 2023
f0bacdb
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 4, 2023
168438d
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 6, 2023
fb7c2d7
feat(spanner): disable the feature by default
harshachinta Aug 6, 2023
04b01a6
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 10, 2023
0359065
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 18, 2023
bc48a8a
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Aug 21, 2023
ba90f75
feat(spanner): revert commit - disable the feature by default
harshachinta Aug 21, 2023
750b911
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Oct 5, 2023
2e26b54
fix: lint
harshachinta Oct 5, 2023
b9e272e
docs: add Readme
harshachinta Oct 6, 2023
057fcfb
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Oct 6, 2023
1efbea6
feat(spanner): make WARN as default for action on inactive transactions
harshachinta Oct 6, 2023
1ee50f8
docs: address review comments
harshachinta Oct 6, 2023
ab268ca
docs: move README.md changes in a different PR
harshachinta Oct 6, 2023
15b7af3
feat: disable feature
harshachinta Oct 6, 2023
7910e1f
Merge branch 'main' into session-leak-cleaning-long-transactions
harshachinta Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 18 additions & 8 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,11 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound

t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tx,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: rts,
tx: tx,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: rts,
isLongRunningTransaction: true,
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
},
ID: BatchReadOnlyTransactionID{
tid: tx,
Expand Down Expand Up @@ -481,10 +482,11 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)

t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tid.tid,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: tid.rts,
tx: tid.tid,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: tid.rts,
isLongRunningTransaction: true,
},
ID: tid,
}
Expand Down Expand Up @@ -566,6 +568,14 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// Session handle hasn't been allocated or has been destroyed.
sh, err = c.idleSessions.take(ctx)
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
if t != nil {
// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
sh.mu.Lock()
t.mu.Lock()
sh.eligibleForLongRunning = t.isLongRunningTransaction
t.mu.Unlock()
sh.mu.Unlock()
}
if err != nil {
// If session retrieval fails, just fail the transaction.
return err
Expand Down
217 changes: 217 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,73 @@ func TestClient_Single_Read_SessionNotFound(t *testing.T) {
}
}

func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_RemoveSessionFromPool(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
single := client.Single()
iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
p := client.idleSessions
sh := single.sh
// simulate session to be checked out for more than 60mins
sh.mu.Lock()
sh.checkoutTime = time.Now().Add(-time.Hour)
sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()
rowCount := int64(0)
for {
// Backend throws SessionNotFoundError. Session gets replaced with new session
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatal(err)
}
rowCount++
}
// New session returns back to pool
iter.Stop()

p.mu.Lock()
defer p.mu.Unlock()
if g, w := p.idleList.Len(), 1; g != w {
t.Fatalf("Idle Sessions in pool, count mismatch\nGot: %d\nWant: %d\n", g, w)
}
if g, w := p.numInUse, uint64(0); g != w {
t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w)
}
if g, w := p.numOpened, uint64(1); g != w {
t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w)
}

sh.mu.Lock()
defer sh.mu.Unlock()
if g, w := sh.eligibleForLongRunning, false; g != w {
t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1369,6 +1436,59 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteUpdate(t *testing.T
}
}

func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionShouldFail(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
ctx := context.Background()
p := client.idleSessions
msg := "session is already recycled / destroyed"
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
rowCount, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w {
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}

// Simulate the session to be checked out for more than 60 mins.
// The background task cleans up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, false; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
}
tx.sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()

// The session associated with this transaction tx has been destroyed. So the below call should fail.
// Eventually this means the entire transaction should not succeed.
_, err = tx.Update(ctx, NewStatement("UPDATE FOO SET BAR='value' WHERE ID=1"))
if err != nil {
return err
}
return nil
})
if err == nil {
t.Fatalf("Missing expected exception")
}
if status.Code(err) != codes.FailedPrecondition || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1402,6 +1522,65 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *test
}
}

func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAction(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteBatchDml,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
p := client.idleSessions
var attempts int
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
if attempts == 2 {
// Simulate the session to be long-running. The background task should not clean up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, true; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
}
tx.sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()
}
rowCounts, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
if err != nil {
return err
}
if g, w := len(rowCounts), 1; g != w {
return status.Errorf(codes.FailedPrecondition, "Row counts length mismatch\nGot: %v\nWant: %v", g, w)
}
if g, w := rowCounts[0], int64(UpdateBarSetFooRowCount); g != w {
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}
return nil
})
if err != nil {
t.Fatal(err)
}
if g, w := attempts, 2; g != w {
t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_ReadWriteTransaction_Query_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -3913,6 +4092,44 @@ func TestClient_PDML_Priority(t *testing.T) {
}
}

func TestClient_WhenLongRunningPartitionedUpdateRequest_TakeNoAction(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
healthCheckSampleInterval: 10 * time.Millisecond, // maintainer runs every 10ms
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
executionFrequency: 15 * time.Millisecond, // check long-running sessions every 15ms
},
},
})
defer teardown()
// delay the rpc by 30ms. The background task runs to clean long-running sessions.
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 30 * time.Millisecond,
})

stmt := NewStatement(UpdateBarSetFoo)
// This transaction is eligible to be long-running, so the background task should not clean its session.
rowCount, err := client.PartitionedUpdate(ctx, stmt)
if err != nil {
t.Fatal(err)
}
if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w {
t.Errorf("Row count mismatch\nGot: %v\nWant: %v", g, w)
}
p := client.idleSessions
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_Apply_Priority(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
}

sh, err := c.idleSessions.take(ctx)
// Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running
sh.mu.Lock()
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
sh.eligibleForLongRunning = true
sh.mu.Unlock()
if err != nil {
return 0, ToSpannerError(err)
}
Expand Down