Skip to content

Commit

Permalink
feat(spanner): Add lastUseTime property to session (#8942)
Browse files Browse the repository at this point in the history
* feat(spanner): add new field and add relevant tests

* feat(spanner): add test for read only transaction

* feat(spanner): update lastUseTime before invoking the RPC

* feat(spanner): reduce time to ms range in unit tests
  • Loading branch information
harshachinta committed Nov 3, 2023
1 parent eee4409 commit b560cfc
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 26 deletions.
3 changes: 3 additions & 0 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
return nil, err
}
var md metadata.MD
sh.updateLastUseTime()
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.PartitionReadRequest{
Session: sid,
Transaction: ts,
Expand Down Expand Up @@ -203,6 +204,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
Params: params,
ParamTypes: paramTypes,
}
sh.updateLastUseTime()
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), req, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
Expand Down Expand Up @@ -306,6 +308,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
// Might happen if transaction is closed in the middle of a API call.
return &RowIterator{err: errSessionClosed(sh)}
}
sh.updateLastUseTime()
// Read or query partition.
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
Expand Down
2 changes: 2 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
return nil, err
}
sh = &sessionHandle{session: s}
sh.updateLastUseTime()

// Begin transaction.
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), true), &sppb.BeginTransactionRequest{
Expand Down Expand Up @@ -854,6 +855,7 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup

rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) {
var md metadata.MD
sh.updateLastUseTime()
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
Session: sh.getID(),
MutationGroups: mgsPb,
Expand Down
143 changes: 138 additions & 5 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_Rem
iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
p := client.idleSessions
sh := single.sh
// simulate session to be checked out for more than 60mins
// simulate session to be last used before 60 mins
sh.mu.Lock()
sh.checkoutTime = time.Now().Add(-time.Hour)
sh.lastUseTime = time.Now().Add(-time.Hour)
sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -1038,6 +1038,72 @@ func TestClient_ReadOnlyTransaction_ReadOptions(t *testing.T) {
}
}

func TestClient_ReadOnlyTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
idleTimeThreshold: 30 * time.Millisecond,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
MinimumExecutionTime: 20 * time.Millisecond,
})
server.TestSpanner.PutExecutionTime(MethodStreamingRead,
SimulatedExecutionTime{
MinimumExecutionTime: 20 * time.Millisecond,
})
ctx := context.Background()
p := client.idleSessions

roTxn := client.ReadOnlyTransaction()
defer roTxn.Close()
iter := roTxn.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
iter.Next()
iter.Stop()

// Get the session last use time.
roTxn.sh.mu.Lock()
sessionPrevLastUseTime := roTxn.sh.lastUseTime
roTxn.sh.mu.Unlock()

iter = roTxn.Read(ctx, "FOO", AllKeys(), []string{"BAR"})
iter.Next()
iter.Stop()

// Get the latest session last use time
roTxn.sh.mu.Lock()
sessionLatestLastUseTime := roTxn.sh.lastUseTime
sessionCheckoutTime := roTxn.sh.checkoutTime
roTxn.sh.mu.Unlock()

// sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime.
// This is because session lastUse time should be updated whenever a new operation is being executed on the transaction.
if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Milliseconds() <= 0 {
t.Fatalf("Session lastUseTime times should not be equal")
}

if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
// force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec.
// The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.
p.removeLongRunningSessions()
if p.numOfLeakedSessionsRemoved > 0 {
t.Fatalf("Expected session to not get cleaned by background maintainer")
}
}

func setQueryOptionsEnvVars(opts *sppb.ExecuteSqlRequest_QueryOptions) func() {
os.Setenv("SPANNER_OPTIMIZER_VERSION", opts.OptimizerVersion)
os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", opts.OptimizerStatisticsPackage)
Expand Down Expand Up @@ -1460,10 +1526,10 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}

// Simulate the session to be checked out for more than 60 mins.
// Simulate the session to be last used before 60 mins.
// The background task cleans up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
tx.sh.lastUseTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, false; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
Expand All @@ -1489,6 +1555,73 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh
}
}

func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
idleTimeThreshold: 30 * time.Millisecond,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 20 * time.Millisecond,
})
ctx := context.Background()
p := client.idleSessions
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Execute first operation on the transaction
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}

// Get the session last use time.
tx.sh.mu.Lock()
sessionPrevLastUseTime := tx.sh.lastUseTime
tx.sh.mu.Unlock()

// Execute second operation on the transaction
_, err = tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
// Get the latest session last use time
tx.sh.mu.Lock()
sessionLatestLastUseTime := tx.sh.lastUseTime
sessionCheckoutTime := tx.sh.checkoutTime
tx.sh.mu.Unlock()

// sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime.
// This is because session lastUse time should be updated whenever a new operation is being executed on the transaction.
if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Milliseconds() <= 0 {
t.Fatalf("Session lastUseTime times should not be equal")
}

if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
// force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec.
// The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.
p.removeLongRunningSessions()
if p.numOfLeakedSessionsRemoved > 0 {
t.Fatalf("Expected session to not get cleaned by background maintainer")
}
return nil
})
if err != nil {
t.Fatal(err)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1546,7 +1679,7 @@ func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAct
if attempts == 2 {
// Simulate the session to be long-running. The background task should not clean up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
tx.sh.lastUseTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, true; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
Expand Down
3 changes: 3 additions & 0 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// Note that PDML transactions cannot be committed or rolled back.
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
var md metadata.MD
sh.updateLastUseTime()
// Begin transaction.
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Expand All @@ -122,6 +123,8 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
req.Transaction = &sppb.TransactionSelector{
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
}

sh.updateLastUseTime()
resultSet, err := sh.getClient().ExecuteSql(ctx, req, gax.WithGRPCOptions(grpc.Header(&md)))
if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil {
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
Expand Down
16 changes: 14 additions & 2 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type sessionHandle struct {
session *session
// checkoutTime is the time the session was checked out of the pool.
checkoutTime time.Time
// lastUseTime is the time the session was last used after checked out of the pool.
lastUseTime time.Time
// trackedSessionHandle is the linked list node which links the session to
// the list of tracked session handles. trackedSessionHandle is only set if
// TrackSessionHandles has been enabled in the session pool configuration.
Expand Down Expand Up @@ -118,6 +120,7 @@ func (sh *sessionHandle) recycle() {
sh.session = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
sh.stack = nil
sh.mu.Unlock()
s.recycle()
Expand Down Expand Up @@ -187,6 +190,7 @@ func (sh *sessionHandle) destroy() {
sh.session = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
sh.stack = nil
sh.mu.Unlock()

Expand All @@ -199,6 +203,14 @@ func (sh *sessionHandle) destroy() {
s.destroy(false)
}

func (sh *sessionHandle) updateLastUseTime() {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.session != nil {
sh.lastUseTime = time.Now()
}
}

// session wraps a Cloud Spanner session ID through which transactions are
// created and executed.
type session struct {
Expand Down Expand Up @@ -712,7 +724,7 @@ func (p *sessionPool) getLongRunningSessionsLocked() []*sessionHandle {
for element != nil {
sh := element.Value.(*sessionHandle)
sh.mu.Lock()
diff := time.Now().Sub(sh.checkoutTime)
diff := time.Now().Sub(sh.lastUseTime)
if !sh.eligibleForLongRunning && diff.Seconds() >= p.idleTimeThreshold.Seconds() {
if (p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose) && !sh.isSessionLeakLogged {
if p.actionOnInactiveTransaction == Warn {
Expand Down Expand Up @@ -880,7 +892,7 @@ var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canc
// stack if the session pool has been configured to track the call stacks of
// sessions being checked out of the pool.
func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
sh = &sessionHandle{session: s, checkoutTime: time.Now()}
sh = &sessionHandle{session: s, checkoutTime: time.Now(), lastUseTime: time.Now()}
if p.TrackSessionHandles || p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close {
p.mu.Lock()
sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
Expand Down
Loading

0 comments on commit b560cfc

Please sign in to comment.