diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cc24dd2ba0..3e01112366 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -49,6 +49,7 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.Tuple; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.Options.QueryOption; @@ -1854,7 +1855,7 @@ private void keepAliveSessions(Instant currTime) { // Keep chugging till there is no session that needs to be kept alive. while (numSessionsToKeepAlive > 0) { - PooledSession sessionToKeepAlive = null; + Tuple sessionToKeepAlive; synchronized (lock) { sessionToKeepAlive = findSessionToKeepAlive(sessions, keepAliveThreshold, 0); } @@ -1862,10 +1863,10 @@ private void keepAliveSessions(Instant currTime) { break; } try { - logger.log(Level.FINE, "Keeping alive session " + sessionToKeepAlive.getName()); + logger.log(Level.FINE, "Keeping alive session " + sessionToKeepAlive.x().getName()); numSessionsToKeepAlive--; - sessionToKeepAlive.keepAlive(); - releaseSession(sessionToKeepAlive, false); + sessionToKeepAlive.x().keepAlive(); + releaseSession(sessionToKeepAlive); } catch (SpannerException e) { handleException(e, sessionToKeepAlive); } @@ -2314,11 +2315,11 @@ private boolean isClosed() { } } - private void handleException(SpannerException e, PooledSession session) { + private void handleException(SpannerException e, Tuple session) { if (isSessionNotFound(e)) { - invalidateSession(session); + invalidateSession(session.x()); } else { - releaseSession(session, false); + releaseSession(session); } } @@ -2342,7 +2343,7 @@ private void invalidateSession(PooledSession session) { } } - private PooledSession findSessionToKeepAlive( + private Tuple findSessionToKeepAlive( Queue queue, Instant keepAliveThreshold, int numAlreadyChecked) { int numChecked = 0; Iterator iterator = queue.iterator(); @@ -2352,7 +2353,7 @@ private PooledSession findSessionToKeepAlive( PooledSession session = iterator.next(); if (session.delegate.getLastUseTime().isBefore(keepAliveThreshold)) { iterator.remove(); - return session; + return Tuple.of(session, numChecked); } numChecked++; } @@ -2476,8 +2477,17 @@ private void maybeCreateSession() { } } - /** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */ + private void releaseSession(Tuple sessionWithPosition) { + releaseSession(sessionWithPosition.x(), false, sessionWithPosition.y()); + } + private void releaseSession(PooledSession session, boolean isNewSession) { + releaseSession(session, isNewSession, null); + } + + /** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */ + private void releaseSession( + PooledSession session, boolean isNewSession, @Nullable Integer position) { Preconditions.checkNotNull(session); synchronized (lock) { if (closureFuture != null) { @@ -2497,7 +2507,12 @@ private void releaseSession(PooledSession session, boolean isNewSession) { // more efficient. session.releaseToPosition = options.getReleaseToPosition(); } - if (session.releaseToPosition == Position.RANDOM && !sessions.isEmpty()) { + if (position != null) { + // Make sure we use a valid position, as the number of sessions could have changed in the + // meantime. + int actualPosition = Math.min(position, sessions.size()); + sessions.add(actualPosition, session); + } else if (session.releaseToPosition == Position.RANDOM && !sessions.isEmpty()) { // A session should only be added at a random position the first time it is added to // the pool or if the pool was deemed unbalanced. All following releases into the pool // should normally happen at the default release position (unless the pool is again deemed @@ -2510,6 +2525,7 @@ private void releaseSession(PooledSession session, boolean isNewSession) { } else { sessions.addFirst(session); } + session.releaseToPosition = options.getReleaseToPosition(); } else { waiters.poll().put(session); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java index 127ffd3bff..f629c0fc1d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -175,14 +176,20 @@ public void testKeepAlive() throws Exception { Session session3 = pool.getSession(); Session session4 = pool.getSession(); Session session5 = pool.getSession(); - // Note that session2 was now the first session in the pool as it was the last to receive a - // ping. - assertThat(session3.getName()).isEqualTo(session2.getName()); - assertThat(session4.getName()).isEqualTo(session1.getName()); + // Pinging a session will put it at the back of the pool. A session that needed a ping to be + // kept alive is not one that should be preferred for use. This means that session2 is the last + // session in the pool, and session1 the second-to-last. + assertEquals(session1.getName(), session3.getName()); + assertEquals(session2.getName(), session4.getName()); session5.close(); session4.close(); session3.close(); // Advance the clock to force pings for the sessions in the pool and do three maintenance loops. + // This should ping the sessions in the following order: + // 1. session3 (=session1) + // 2. session4 (=session2) + // The pinged sessions already contains: {session1: 1, session2: 1} + // Note that the pool only pings up to MinSessions sessions. clock.currentTimeMillis.addAndGet( TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1); runMaintenanceLoop(clock, pool, 3); @@ -192,16 +199,18 @@ public void testKeepAlive() throws Exception { // should cause only one session to get a ping. clock.currentTimeMillis.addAndGet( TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1); - // We are now checking out session2 because + // This will be session1, as all sessions were pinged in the previous 3 maintenance loops, and + // this will have brought session1 back to the front of the pool. Session session6 = pool.getSession(); // The session that was first in the pool now is equal to the initial first session as each full // round of pings will swap the order of the first MinSessions sessions in the pool. assertThat(session6.getName()).isEqualTo(session1.getName()); runMaintenanceLoop(clock, pool, 3); + // Running 3 cycles will only ping the 2 sessions in the pool once. assertThat(pool.totalSessions()).isEqualTo(3); assertThat(pingedSessions).containsExactly(session1.getName(), 2, session2.getName(), 3); // Update the last use date and release the session to the pool and do another maintenance - // cycle. + // cycle. This should not ping any sessions. ((PooledSessionFuture) session6).get().markUsed(); session6.close(); runMaintenanceLoop(clock, pool, 3); @@ -267,10 +276,10 @@ public void testIdleSessions() throws Exception { Session session3 = pool.getSession().get(); Session session4 = pool.getSession().get(); Session session5 = pool.getSession().get(); - // Note that session2 was now the first session in the pool as it was the last to receive a - // ping. - assertThat(session3.getName()).isEqualTo(session2.getName()); - assertThat(session4.getName()).isEqualTo(session1.getName()); + // Note that pinging sessions does not change the order of the pool. This means that session2 + // is still the last session in the pool. + assertThat(session3.getName()).isEqualTo(session1.getName()); + assertThat(session4.getName()).isEqualTo(session2.getName()); session5.close(); session4.close(); session3.close();