Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: randomize session pool order based on TPS #2792

Merged
merged 3 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,13 @@ final class PoolMaintainer {
*/
@VisibleForTesting Instant lastExecutionTime;

/**
* The previous numSessionsAcquired seen by the maintainer. This is used to calculate the
* transactions per second, which again is used to determine whether to randomize the order of
* the session pool.
*/
private long prevNumSessionsAcquired;

boolean closed = false;

@GuardedBy("lock")
Expand Down Expand Up @@ -1794,6 +1801,12 @@ void maintainPool() {
return;
}
running = true;
if (loopFrequency >= 1000L) {
SessionPool.this.transactionsPerSecond =
(SessionPool.this.numSessionsAcquired - prevNumSessionsAcquired)
/ (loopFrequency / 1000L);
}
this.prevNumSessionsAcquired = SessionPool.this.numSessionsAcquired;
}
Instant currTime = clock.instant();
removeIdleSessions(currTime);
Expand Down Expand Up @@ -1995,6 +2008,7 @@ enum Position {
private final SettableFuture<Dialect> dialect = SettableFuture.create();
private final String databaseRole;
private final SessionClient sessionClient;
private final int numChannels;
private final ScheduledExecutorService executor;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;

Expand Down Expand Up @@ -2054,6 +2068,9 @@ enum Position {
@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

@GuardedBy("lock")
private long transactionsPerSecond = 0L;

@GuardedBy("lock")
private long numLeakedSessionsRemoved = 0;

Expand Down Expand Up @@ -2190,6 +2207,7 @@ private SessionPool(
this.executorFactory = executorFactory;
this.executor = executor;
this.sessionClient = sessionClient;
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
this.clock = clock;
this.initialReleasePosition = initialReleasePosition;
this.poolMaintainer = new PoolMaintainer();
Expand Down Expand Up @@ -2493,11 +2511,13 @@ private void releaseSession(
if (closureFuture != null) {
return;
}
if (waiters.size() == 0) {
if (waiters.isEmpty()) {
// There are no pending waiters.
// Add to a random position if the head of the session pool already contains many sessions
// with the same channel as this one.
if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) {
// Add to a random position if the transactions per second is high or the head of the
// session pool already contains many sessions with the same channel as this one.
if (session.releaseToPosition != Position.RANDOM && shouldRandomize()) {
session.releaseToPosition = Position.RANDOM;
} else if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) {
session.releaseToPosition = Position.RANDOM;
} else if (session.releaseToPosition == Position.RANDOM
&& !isNewSession
Expand Down Expand Up @@ -2532,6 +2552,25 @@ private void releaseSession(
}
}

/**
* Returns true if the position where we return the session should be random if:
*
* <ol>
* <li>The current TPS is higher than the configured threshold.
* <li>AND the number of sessions checked out is larger than the number of channels.
* </ol>
*
* The second check prevents the session pool from being randomized when the application is
* running many small, quick queries using a small number of parallel threads. This can cause a
* high TPS, without actually having a high degree of parallelism.
*/
@VisibleForTesting
boolean shouldRandomize() {
return this.options.getRandomizePositionQPSThreshold() > 0
&& this.transactionsPerSecond >= this.options.getRandomizePositionQPSThreshold()
&& this.numSessionsInUse >= this.numChannels;
}

private boolean isUnbalanced(PooledSession session) {
int channel = session.getChannel();
int numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class SessionPoolOptions {
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;
private final Position releaseToPosition;
private final long randomizePositionQPSThreshold;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -89,6 +90,7 @@ private SessionPoolOptions(Builder builder) {
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.releaseToPosition = builder.releaseToPosition;
this.randomizePositionQPSThreshold = builder.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -118,6 +120,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(this.releaseToPosition, other.releaseToPosition)
&& Objects.equals(this.randomizePositionQPSThreshold, other.randomizePositionQPSThreshold)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -143,6 +146,7 @@ public int hashCode() {
this.waitForMinSessions,
this.acquireSessionTimeout,
this.releaseToPosition,
this.randomizePositionQPSThreshold,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -263,6 +267,10 @@ Position getReleaseToPosition() {
return releaseToPosition;
}

long getRandomizePositionQPSThreshold() {
return randomizePositionQPSThreshold;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -451,6 +459,13 @@ public static class Builder {
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);
private Position releaseToPosition = getReleaseToPositionFromSystemProperty();
/**
* The session pool will randomize the position of a session that is being returned when this
* threshold is exceeded. That is: If the transactions per second exceeds this threshold, then
* the session pool will use a random order for the sessions instead of LIFO. The default is 0,
* which means that the option is disabled.
*/
private long randomizePositionQPSThreshold = 0L;

private Clock poolMaintainerClock;

Expand Down Expand Up @@ -487,6 +502,7 @@ private Builder(SessionPoolOptions options) {
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
this.acquireSessionTimeout = options.acquireSessionTimeout;
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -764,6 +780,13 @@ Builder setReleaseToPosition(Position releaseToPosition) {
return this;
}

Builder setRandomizePositionQPSThreshold(long randomizePositionQPSThreshold) {
Preconditions.checkArgument(
randomizePositionQPSThreshold >= 0L, "randomizePositionQPSThreshold must be >= 0");
this.randomizePositionQPSThreshold = randomizePositionQPSThreshold;
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand All @@ -29,6 +31,7 @@
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.base.Preconditions;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
Expand Down Expand Up @@ -116,6 +119,10 @@ private SessionImpl setupMockSession(final SessionImpl session, final ReadContex
}

private SessionPool createPool() throws Exception {
return createPool(this.options);
}

private SessionPool createPool(SessionPoolOptions options) throws Exception {
// Allow sessions to be added to the head of the pool in all cases in this test, as it is
// otherwise impossible to know which session exactly is getting pinged at what point in time.
SessionPool pool =
Expand Down Expand Up @@ -324,4 +331,67 @@ public void testIdleSessions() throws Exception {
}
assertThat(pool.totalSessions()).isEqualTo(options.getMinSessions());
}

@Test
public void testRandomizeThreshold() throws Exception {
SessionPool pool =
createPool(
this.options
.toBuilder()
.setMaxSessions(400)
.setLoopFrequency(1000L)
.setRandomizePositionQPSThreshold(4)
.build());
List<Session> sessions;

// Run a maintenance loop. No sessions have been checked out so far, so the TPS should be 0.
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get and return one session. This means TPS == 1.
returnSessions(1, useSessions(1, pool));
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get and return four sessions. This means TPS == 4, and that no sessions are checked out.
returnSessions(4, useSessions(4, pool));
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get four sessions without returning them.
// This means TPS == 4 and that they are all still checked out.
sessions = useSessions(4, pool);
runMaintenanceLoop(clock, pool, 1);
assertTrue(pool.shouldRandomize());
// Returning one of the sessions reduces the number of checked out sessions enough to stop the
// randomizing.
returnSessions(1, sessions);
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

// Get three more session and run the maintenance loop.
// The TPS is then 3, as we've only gotten 3 sessions since the last maintenance run.
// That means that we should not randomize.
sessions.addAll(useSessions(3, pool));
runMaintenanceLoop(clock, pool, 1);
assertFalse(pool.shouldRandomize());

returnSessions(sessions.size(), sessions);
}

private List<Session> useSessions(int numSessions, SessionPool pool) {
List<Session> sessions = new ArrayList<>(numSessions);
for (int i = 0; i < numSessions; i++) {
sessions.add(pool.getSession());
sessions.get(sessions.size() - 1).singleUse().executeQuery(Statement.of("SELECT 1")).next();
}
return sessions;
}

private void returnSessions(int numSessions, List<Session> sessions) {
Preconditions.checkArgument(numSessions <= sessions.size());
for (int i = 0; i < numSessions; i++) {
sessions.remove(0).close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -218,4 +219,31 @@ public void verifyDefaultAcquireSessionTimeout() {

assertEquals(Duration.ofSeconds(60), sessionPoolOptions.getAcquireSessionTimeout());
}

@Test
public void testRandomizePositionQPSThreshold() {
assertEquals(0L, SessionPoolOptions.newBuilder().build().getRandomizePositionQPSThreshold());
assertEquals(
4L,
SessionPoolOptions.newBuilder()
.setRandomizePositionQPSThreshold(4L)
.build()
.getRandomizePositionQPSThreshold());
assertEquals(
10L,
SessionPoolOptions.newBuilder()
.setRandomizePositionQPSThreshold(4L)
.setRandomizePositionQPSThreshold(10L)
.build()
.getRandomizePositionQPSThreshold());
assertEquals(
0L,
SessionPoolOptions.newBuilder()
.setRandomizePositionQPSThreshold(0L)
.build()
.getRandomizePositionQPSThreshold());
assertThrows(
IllegalArgumentException.class,
() -> SessionPoolOptions.newBuilder().setRandomizePositionQPSThreshold(-1L));
}
}