Skip to content
Permalink
Browse files
perf: increase sessions in the pool in batches (#134)
* perf: increase sessions in the pool in batches

When more sessions are requested by the user application than are available in the session pool,
the session pool will now create new sessions in batches instead of in steps of 1. This reduces
the number of RPCs needed to serve a burst of requests.

A benchmark for the session pool has also been added to be able to compare performance and the
number of RPCs needed before and after this change. This benchmark can also be used for future
changes to verify that the change does not deteriorate performance or increase the number of
RPCs needed.

* fix: remove unused code

* fix: include num rpcs and sessions in benchmark results

* fix: remove commented code

* fix: rename parameter
  • Loading branch information
olavloite committed Apr 8, 2020
1 parent 77c1558 commit 9e5a1cdaacf71147b67681861f063c3276705f44
@@ -111,7 +111,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1</ignoredDependencies>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,org.openjdk.jmh:jmh-generator-annprocess,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1</ignoredDependencies>
</configuration>
</plugin>
</plugins>
@@ -305,6 +305,20 @@
<version>2.2</version>
<scope>test</scope>
</dependency>

<!-- Benchmarking dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
@@ -320,5 +334,35 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>benchmark</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>run-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-classpath</argument>
<classpath />
<argument>org.openjdk.jmh.Main</argument>
<argument>.*</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
@@ -231,12 +231,21 @@ SessionImpl createSession() {
* sessions that could not be created.
*
* @param sessionCount The number of sessions to create.
* @param distributeOverChannels Whether to distribute the sessions over all available channels
* (true) or create all for the next channel round robin.
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer) {
// We spread the session creation evenly over all available channels.
int sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels();
int remainder = sessionCount % spanner.getOptions().getNumChannels();
void asyncBatchCreateSessions(
final int sessionCount, boolean distributeOverChannels, SessionConsumer consumer) {
int sessionCountPerChannel;
int remainder;
if (distributeOverChannels) {
sessionCountPerChannel = sessionCount / spanner.getOptions().getNumChannels();
remainder = sessionCount % spanner.getOptions().getNumChannels();
} else {
sessionCountPerChannel = sessionCount;
remainder = 0;
}
int numBeingCreated = 0;
synchronized (this) {
for (int channelIndex = 0;
@@ -252,7 +261,7 @@ void asyncBatchCreateSessions(final int sessionCount, SessionConsumer consumer)
if (channelIndex == 0) {
createCountForChannel = sessionCountPerChannel + remainder;
}
if (createCountForChannel > 0) {
if (createCountForChannel > 0 && numBeingCreated < sessionCount) {
try {
executor.submit(
new BatchCreateSessionsRunnable(
@@ -1077,7 +1077,7 @@ private void replenishPool() {
// If we have gone below min pool size, create that many sessions.
int sessionCount = options.getMinSessions() - (totalSessions() + numSessionsBeingCreated);
if (sessionCount > 0) {
createSessions(getAllowedCreateSessions(sessionCount));
createSessions(getAllowedCreateSessions(sessionCount), false);
}
}
}
@@ -1269,7 +1269,7 @@ private void initPool() {
synchronized (lock) {
poolMaintainer.init();
if (options.getMinSessions() > 0) {
createSessions(options.getMinSessions());
createSessions(options.getMinSessions(), true);
}
}
}
@@ -1308,7 +1308,7 @@ private void invalidateSession(PooledSession session) {
}
allSessions.remove(session);
// replenish the pool.
createSessions(getAllowedCreateSessions(1));
createSessions(getAllowedCreateSessions(1), false);
}
}

@@ -1507,7 +1507,7 @@ private void maybeCreateSession() {
if (numWaiters() >= numSessionsBeingCreated) {
if (canCreateSession()) {
span.addAnnotation("Creating sessions");
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated + 1));
createSessions(getAllowedCreateSessions(options.getIncStep()), false);
} else if (options.isFailIfPoolExhausted()) {
span.addAnnotation("Pool exhausted. Failing");
// throw specific exception
@@ -1732,7 +1732,8 @@ public void run() {
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
createSessions(
getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated), false);
}
}
}
@@ -1794,7 +1795,7 @@ private boolean canCreateSession() {
}
}

private void createSessions(final int sessionCount) {
private void createSessions(final int sessionCount, boolean distributeOverChannels) {
logger.log(Level.FINE, String.format("Creating %d sessions", sessionCount));
synchronized (lock) {
numSessionsBeingCreated += sessionCount;
@@ -1803,8 +1804,8 @@ private void createSessions(final int sessionCount) {
// calls and the session consumer consumes the returned sessions as they become available.
// The batchCreateSessions method automatically spreads the sessions evenly over all
// available channels.
sessionClient.asyncBatchCreateSessions(sessionCount, sessionConsumer);
logger.log(Level.FINE, "Sessions created");
sessionClient.asyncBatchCreateSessions(
sessionCount, distributeOverChannels, sessionConsumer);
} catch (Throwable t) {
// Expose this to customer via a metric.
numSessionsBeingCreated -= sessionCount;
@@ -24,9 +24,11 @@ public class SessionPoolOptions {
// Default number of channels * 100.
private static final int DEFAULT_MAX_SESSIONS = 400;
private static final int DEFAULT_MIN_SESSIONS = 100;
private static final int DEFAULT_INC_STEP = 25;
private static final ActionOnExhaustion DEFAULT_ACTION = ActionOnExhaustion.BLOCK;
private final int minSessions;
private final int maxSessions;
private final int incStep;
private final int maxIdleSessions;
private final float writeSessionsFraction;
private final ActionOnExhaustion actionOnExhaustion;
@@ -40,6 +42,7 @@ private SessionPoolOptions(Builder builder) {
// maxSessions value is less than the default for minSessions.
this.minSessions = Math.min(builder.minSessions, builder.maxSessions);
this.maxSessions = builder.maxSessions;
this.incStep = builder.incStep;
this.maxIdleSessions = builder.maxIdleSessions;
this.writeSessionsFraction = builder.writeSessionsFraction;
this.actionOnExhaustion = builder.actionOnExhaustion;
@@ -56,6 +59,10 @@ public int getMaxSessions() {
return maxSessions;
}

int getIncStep() {
return incStep;
}

public int getMaxIdleSessions() {
return maxIdleSessions;
}
@@ -105,6 +112,7 @@ public static class Builder {
private boolean minSessionsSet = false;
private int minSessions = DEFAULT_MIN_SESSIONS;
private int maxSessions = DEFAULT_MAX_SESSIONS;
private int incStep = DEFAULT_INC_STEP;
private int maxIdleSessions;
private float writeSessionsFraction = 0.2f;
private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION;
@@ -135,6 +143,16 @@ public Builder setMaxSessions(int maxSessions) {
return this;
}

/**
* Number of sessions to batch create when the pool needs at least one more session. Defaults to
* 25.
*/
Builder setIncStep(int incStep) {
Preconditions.checkArgument(incStep > 0, "incStep must be > 0");
this.incStep = incStep;
return this;
}

/**
* Maximum number of idle sessions that this pool will maintain. Pool will close any sessions
* beyond this but making sure to always have at least as many sessions as specified by {@link
@@ -104,6 +104,7 @@ public static void stopServer() throws InterruptedException {
@Before
public void setUp() throws IOException {
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}

private Spanner createSpanner(int minSessions, int maxSessions) {
@@ -245,7 +246,7 @@ public void testPrepareSessionFailPropagatesToUser() throws InterruptedException
int maxSessions = 1000;
DatabaseClientImpl client = null;
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofException(
SimulatedExecutionTime.ofStickyException(
Status.ABORTED.withDescription("BeginTransaction failed").asRuntimeException()));
try (Spanner spanner = createSpanner(minSessions, maxSessions)) {
client =
@@ -92,6 +92,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -469,6 +470,7 @@ private static void checkException(Queue<Exception> exceptions, boolean keepExce
private ConcurrentMap<ByteString, Instant> transactionLastUsed = new ConcurrentHashMap<>();
private int maxNumSessionsInOneBatch = 100;
private int maxTotalSessions = Integer.MAX_VALUE;
private AtomicInteger numSessionsCreated = new AtomicInteger();

private SimulatedExecutionTime beginTransactionExecutionTime = NO_EXECUTION_TIME;
private SimulatedExecutionTime commitExecutionTime = NO_EXECUTION_TIME;
@@ -642,6 +644,7 @@ public void batchCreateSessions(
if (sessions.size() <= maxTotalSessions) {
sessionLastUsed.put(name, Instant.now());
response.addSession(session);
numSessionsCreated.incrementAndGet();
} else {
sessions.remove(name);
}
@@ -687,6 +690,7 @@ public void createSession(
Session prev = sessions.putIfAbsent(name, session);
if (prev == null) {
sessionLastUsed.put(name, Instant.now());
numSessionsCreated.incrementAndGet();
responseObserver.onNext(session);
responseObserver.onCompleted();
} else {
@@ -1623,6 +1627,10 @@ private void partition(
}
}

public int numSessionsCreated() {
return numSessionsCreated.get();
}

@Override
public List<AbstractMessage> getRequests() {
return new ArrayList<>(this.requests);
@@ -1652,6 +1660,7 @@ public ServerServiceDefinition getServiceDefinition() {
public void reset() {
requests.clear();
sessions.clear();
numSessionsCreated.set(0);
sessionLastUsed.clear();
transactions.clear();
isPartitionedDmlTransaction.clear();
@@ -182,7 +182,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
};
final int numSessions = 10;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(returnedSessionCount.get()).isEqualTo(numSessions);
assertThat(usedChannels.size()).isEqualTo(spannerOptions.getNumChannels());
@@ -275,7 +275,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
};
final int numSessions = 10;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(errorCount.get()).isEqualTo(errorOnChannels.size());
assertThat(returnedSessionCount.get())
@@ -330,7 +330,7 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
// sessions.
final int numSessions = 100;
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.asyncBatchCreateSessions(numSessions, consumer);
client.asyncBatchCreateSessions(numSessions, true, consumer);
}
assertThat(returnedSessionCount.get()).isEqualTo(numSessions);
}

0 comments on commit 9e5a1cd

Please sign in to comment.