Skip to content

Commit

Permalink
feat: add option to wait on session pool creation (#2329)
Browse files Browse the repository at this point in the history
* feat: add option to wait on session pool creation

Adds option to wait for min sessions to be populated in the session pool
before returning the database client back to the user. This only done
during the database client creation and it is useful for benchmarking.

* refactor: fix imports

* fix: fix comments

* fix: propagate interrupt

Co-authored-by: Knut Olav Løite <koloite@gmail.com>

---------

Co-authored-by: Knut Olav Løite <koloite@gmail.com>
  • Loading branch information
thiagotnunes and olavloite committed Mar 13, 2023
1 parent 27ef53c commit ff17244
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,30 @@ class SessionPool {
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
* for the creation of at least {@link SessionPoolOptions#getMinSessions()} in the pool using the
* given duration. If the waiting times out, a {@link SpannerException} with the {@link
* ErrorCode#DEADLINE_EXCEEDED} is thrown.
*/
void maybeWaitOnMinSessions() {
final long timeoutNanos = options.getWaitForMinSessions().toNanos();
if (timeoutNanos <= 0) {
return;
}

try {
if (!waitOnMinSessionsLatch.await(timeoutNanos, TimeUnit.NANOSECONDS)) {
final long timeoutMillis = options.getWaitForMinSessions().toMillis();
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Timed out after waiting " + timeoutMillis + "ms for session pool creation");
}
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
* Clock.
Expand Down Expand Up @@ -1855,6 +1879,8 @@ private enum Position {

@VisibleForTesting Function<PooledSession, Void> idleSessionRemovedListener;

private final CountDownLatch waitOnMinSessionsLatch;

/**
* Create a session pool with the given options and for the given database. It will also start
* eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0.
Expand Down Expand Up @@ -1934,6 +1960,8 @@ private SessionPool(
this.clock = clock;
this.poolMaintainer = new PoolMaintainer();
this.initMetricsCollection(metricRegistry, labelValues);
this.waitOnMinSessionsLatch =
options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0);
}

/**
Expand Down Expand Up @@ -2399,13 +2427,17 @@ public void onSessionReady(SessionImpl session) {
PooledSession pooledSession = null;
boolean closeSession = false;
synchronized (lock) {
int minSessions = options.getMinSessions();
pooledSession = new PooledSession(session);
numSessionsBeingCreated--;
if (closureFuture != null) {
closeSession = true;
} else {
Preconditions.checkState(totalSessions() <= options.getMaxSessions() - 1);
allSessions.add(pooledSession);
if (allSessions.size() >= minSessions) {
waitOnMinSessionsLatch.countDown();
}
if (options.isAutoDetectDialect() && !detectDialectStarted) {
// Get the dialect of the underlying database if that has not yet been done. Note that
// this method will release the session into the pool once it is done.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class SessionPoolOptions {
private final ActionOnSessionLeak actionOnSessionLeak;
private final long initialWaitForSessionTimeoutMillis;
private final boolean autoDetectDialect;
private final Duration waitForMinSessions;

private SessionPoolOptions(Builder builder) {
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
Expand All @@ -69,6 +70,7 @@ private SessionPoolOptions(Builder builder) {
this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
}

@Override
Expand All @@ -90,7 +92,8 @@ public boolean equals(Object o) {
&& Objects.equals(this.loopFrequency, other.loopFrequency)
&& Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes)
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect);
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions);
}

@Override
Expand All @@ -108,7 +111,8 @@ public int hashCode() {
this.loopFrequency,
this.keepAliveIntervalMinutes,
this.removeInactiveSessionAfter,
this.autoDetectDialect);
this.autoDetectDialect,
this.waitForMinSessions);
}

public Builder toBuilder() {
Expand Down Expand Up @@ -186,6 +190,11 @@ boolean isFailOnSessionLeak() {
return actionOnSessionLeak == ActionOnSessionLeak.FAIL;
}

@VisibleForTesting
Duration getWaitForMinSessions() {
return waitForMinSessions;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -229,6 +238,7 @@ public static class Builder {
private int keepAliveIntervalMinutes = 30;
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;

public Builder() {}

Expand All @@ -247,6 +257,7 @@ private Builder(SessionPoolOptions options) {
this.keepAliveIntervalMinutes = options.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
}

/**
Expand Down Expand Up @@ -394,6 +405,21 @@ public Builder setWriteSessionsFraction(float writeSessionsFraction) {
return this;
}

/**
* If greater than zero, waits for the session pool to have at least {@link
* SessionPoolOptions#minSessions} before returning the database client to the caller. Note that
* this check is only done during the session pool creation. This is usually done asynchronously
* in order to provide the client back to the caller as soon as possible. We don't recommend
* using this option unless you are executing benchmarks and want to guarantee the session pool
* has min sessions in the pool before continuing.
*
* <p>Defaults to zero (initialization is done asynchronously).
*/
public Builder setWaitForMinSessions(Duration waitForMinSessions) {
this.waitForMinSessions = waitForMinSessions;
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
SessionPool pool =
SessionPool.createPool(
getOptions(), SpannerImpl.this.getSessionClient(db), labelValues);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient = createDatabaseClient(clientId, pool);
dbClients.put(db, dbClient);
return dbClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

/** Tests for SessionPool that mock out the underlying stub. */
@RunWith(Parameterized.class)
Expand Down Expand Up @@ -1188,6 +1189,47 @@ public void testGetDatabaseRole() throws Exception {
assertEquals(TEST_DATABASE_ROLE, pool.getDatabaseRole());
}

@Test
public void testWaitOnMinSessionsWhenSessionsAreCreatedBeforeTimeout() {
doAnswer(
invocation ->
executor.submit(
() -> {
SessionConsumerImpl consumer =
invocation.getArgument(2, SessionConsumerImpl.class);
consumer.onSessionReady(mockSession());
}))
.when(sessionClient)
.asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class));

options =
SessionPoolOptions.newBuilder()
.setMinSessions(minSessions)
.setMaxSessions(minSessions + 1)
.setWaitForMinSessions(Duration.ofSeconds(5))
.build();
pool = createPool(new FakeClock(), new FakeMetricRegistry(), SPANNER_DEFAULT_LABEL_VALUES);
pool.maybeWaitOnMinSessions();
assertTrue(pool.getNumberOfSessionsInPool() >= minSessions);
}

@Test(expected = SpannerException.class)
public void testWaitOnMinSessionsThrowsExceptionWhenTimeoutIsReached() {
// Does not call onSessionReady, so session pool is never populated
doAnswer(invocation -> null)
.when(sessionClient)
.asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class));

options =
SessionPoolOptions.newBuilder()
.setMinSessions(minSessions + 1)
.setMaxSessions(minSessions + 1)
.setWaitForMinSessions(Duration.ofMillis(100))
.build();
pool = createPool(new FakeClock(), new FakeMetricRegistry(), SPANNER_DEFAULT_LABEL_VALUES);
pool.maybeWaitOnMinSessions();
}

private void mockKeepAlive(Session session) {
ReadContext context = mock(ReadContext.class);
ResultSet resultSet = mock(ResultSet.class);
Expand Down

0 comments on commit ff17244

Please sign in to comment.