diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 371ff652b6..f5fa0ebdc4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -51,6 +51,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; +import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.common.annotations.VisibleForTesting; @@ -1279,7 +1280,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { try { - return get().executePartitionedUpdate(stmt, options); + return get(true).executePartitionedUpdate(stmt, options); } finally { close(); } @@ -1332,6 +1333,10 @@ private PooledSession getOrNull() { @Override public PooledSession get() { + return get(false); + } + + PooledSession get(final boolean eligibleForLongRunning) { if (inUse.compareAndSet(false, true)) { PooledSession res = null; try { @@ -1346,6 +1351,7 @@ public PooledSession get() { incrementNumSessionsInUse(); checkedOutSessions.add(this); } + res.eligibleForLongRunning = eligibleForLongRunning; } initialized.countDown(); } @@ -1366,6 +1372,28 @@ final class PooledSession implements Session { private volatile SpannerException lastException; private volatile boolean allowReplacing = true; + /** + * Property to mark if the session is eligible to be long-running. This can only be true if the + * session is executing certain types of transactions (for ex - Partitioned DML) which can be + * long-running. By default, most transaction types are not expected to be long-running and + * hence this value is false. + */ + private volatile boolean eligibleForLongRunning = false; + + /** + * Property to mark if the session is no longer part of the session pool. For ex - A session + * which is long-running gets cleaned up and removed from the pool. + */ + private volatile boolean isRemovedFromPool = false; + + /** + * Property to mark if a leaked session exception is already logged. Given a session maintainer + * thread runs repeatedly at a defined interval, this property allows us to ensure that an + * exception is logged only once per leaked session. This is to avoid noisy repeated logs around + * session leaks for long-running sessions. + */ + private volatile boolean isLeakedExceptionLogged = false; + @GuardedBy("lock") private SessionState state; @@ -1385,6 +1413,11 @@ void setAllowReplacing(boolean allowReplacing) { this.allowReplacing = allowReplacing; } + @VisibleForTesting + void setEligibleForLongRunning(boolean eligibleForLongRunning) { + this.eligibleForLongRunning = eligibleForLongRunning; + } + @Override public Timestamp write(Iterable mutations) throws SpannerException { return writeWithOptions(mutations).getCommitTimestamp(); @@ -1485,7 +1518,7 @@ public void close() { numSessionsInUse--; numSessionsReleased++; } - if (lastException != null && isSessionNotFound(lastException)) { + if ((lastException != null && isSessionNotFound(lastException)) || isRemovedFromPool) { invalidateSession(this); } else { if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) { @@ -1499,6 +1532,7 @@ public void close() { } } lastException = null; + isRemovedFromPool = false; if (state != SessionState.CLOSING) { state = SessionState.AVAILABLE; } @@ -1651,6 +1685,10 @@ private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) { *
  • Keeps alive sessions that have not been used for a user configured time in order to keep * MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out * over a window of 10 minutes to avoid bursty traffic. + *
  • Removes unexpected long running transactions from the pool. Only certain transaction + * types (for ex - Partitioned DML / Batch Reads) can be long running. This tasks checks the + * sessions which have been inactive for a longer than usual duration (for ex - 60 minutes) + * and removes such sessions from the pool. * */ final class PoolMaintainer { @@ -1659,16 +1697,24 @@ final class PoolMaintainer { private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10)); // Frequency of the timer loop. @VisibleForTesting final long loopFrequency = options.getLoopFrequency(); - // Number of loop iterations in which we need to to close all the sessions waiting for closure. + // Number of loop iterations in which we need to close all the sessions waiting for closure. @VisibleForTesting final long numClosureCycles = windowLength.toMillis() / loopFrequency; private final Duration keepAliveMillis = Duration.ofMillis(TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes())); // Number of loop iterations in which we need to keep alive all the sessions @VisibleForTesting final long numKeepAliveCycles = keepAliveMillis.toMillis() / loopFrequency; - Instant lastResetTime = Instant.ofEpochMilli(0); - int numSessionsToClose = 0; - int sessionsToClosePerLoop = 0; + /** + * Variable maintaining the last execution time of the long-running transaction cleanup task. + * + *

    The long-running transaction cleanup needs to be performed every X minutes. The X minutes + * recurs multiple times within the invocation of the pool maintainer thread. For ex - If the + * main thread runs every 10s and the long-running transaction clean-up needs to be performed + * every 2 minutes, then we need to keep a track of when was the last time that this task + * executed and makes sure we only execute it every 2 minutes and not every 10 seconds. + */ + @VisibleForTesting Instant lastExecutionTime; + boolean closed = false; @GuardedBy("lock") @@ -1678,6 +1724,7 @@ final class PoolMaintainer { boolean running; void init() { + lastExecutionTime = clock.instant(); // Scheduled pool maintenance worker. synchronized (lock) { scheduledFuture = @@ -1723,6 +1770,7 @@ void maintainPool() { decrementPendingClosures(1); } } + removeLongRunningSessions(currTime); } private void removeIdleSessions(Instant currTime) { @@ -1736,7 +1784,13 @@ private void removeIdleSessions(Instant currTime) { PooledSession session = iterator.next(); if (session.lastUseTime.isBefore(minLastUseTime)) { if (session.state != SessionState.CLOSING) { - removeFromPool(session); + boolean isRemoved = removeFromPool(session); + if (isRemoved) { + numIdleSessionsRemoved++; + if (idleSessionRemovedListener != null) { + idleSessionRemovedListener.apply(session); + } + } iterator.remove(); } } @@ -1792,6 +1846,87 @@ private void replenishPool() { } } } + + // cleans up sessions which are unexpectedly long-running. + void removeLongRunningSessions(Instant currentTime) { + try { + if (SessionPool.this.isClosed()) { + return; + } + final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + options.getInactiveTransactionRemovalOptions(); + final Instant minExecutionTime = + lastExecutionTime.plus(inactiveTransactionRemovalOptions.getExecutionFrequency()); + if (currentTime.isBefore(minExecutionTime)) { + return; + } + lastExecutionTime = currentTime; // update this only after we have decided to execute task + if (options.closeInactiveTransactions() + || options.warnInactiveTransactions() + || options.warnAndCloseInactiveTransactions()) { + removeLongRunningSessions(currentTime, inactiveTransactionRemovalOptions); + } + } catch (final Throwable t) { + logger.log(Level.WARNING, "Failed removing long running transactions", t); + } + } + + private void removeLongRunningSessions( + final Instant currentTime, + final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions) { + synchronized (lock) { + final double usedSessionsRatio = getRatioOfSessionsInUse(); + if (usedSessionsRatio > inactiveTransactionRemovalOptions.getUsedSessionsRatioThreshold()) { + Iterator iterator = checkedOutSessions.iterator(); + while (iterator.hasNext()) { + final PooledSessionFuture sessionFuture = iterator.next(); + // the below get() call on future object is non-blocking since checkedOutSessions + // collection is populated only when the get() method in {@code PooledSessionFuture} is + // called. + final PooledSession session = sessionFuture.get(); + final Duration durationFromLastUse = Duration.between(session.lastUseTime, currentTime); + if (!session.eligibleForLongRunning + && durationFromLastUse.compareTo( + inactiveTransactionRemovalOptions.getIdleTimeThreshold()) + > 0) { + if ((options.warnInactiveTransactions() || options.warnAndCloseInactiveTransactions()) + && !session.isLeakedExceptionLogged) { + if (options.warnAndCloseInactiveTransactions()) { + logger.log( + Level.WARNING, + String.format("Removing long-running session => %s", session.getName()), + sessionFuture.leakedException); + session.isLeakedExceptionLogged = true; + } else if (options.warnInactiveTransactions()) { + logger.log( + Level.WARNING, + String.format( + "Detected long-running session => %s. To automatically remove " + + "long-running sessions, set SessionOption ActionOnInactiveTransaction " + + "to WARN_AND_CLOSE by invoking setWarnAndCloseIfInactiveTransactions() method.", + session.getName()), + sessionFuture.leakedException); + session.isLeakedExceptionLogged = true; + } + } + if ((options.closeInactiveTransactions() + || options.warnAndCloseInactiveTransactions()) + && session.state != SessionState.CLOSING) { + final boolean isRemoved = removeFromPool(session); + if (isRemoved) { + session.isRemovedFromPool = true; + numLeakedSessionsRemoved++; + if (longRunningSessionRemovedListener != null) { + longRunningSessionRemovedListener.apply(session); + } + } + iterator.remove(); + } + } + } + } + } + } } private enum Position { @@ -1872,6 +2007,9 @@ private enum Position { @GuardedBy("lock") private long numIdleSessionsRemoved = 0; + @GuardedBy("lock") + private long numLeakedSessionsRemoved = 0; + private AtomicLong numWaiterTimeouts = new AtomicLong(); @GuardedBy("lock") @@ -1885,6 +2023,8 @@ private enum Position { @VisibleForTesting Function idleSessionRemovedListener; + @VisibleForTesting Function longRunningSessionRemovedListener; + private final CountDownLatch waitOnMinSessionsLatch; /** @@ -1895,12 +2035,16 @@ private enum Position { */ static SessionPool createPool( SpannerOptions spannerOptions, SessionClient sessionClient, List labelValues) { + final SessionPoolOptions sessionPoolOptions = spannerOptions.getSessionPoolOptions(); + + // A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests. + final Clock poolMaintainerClock = sessionPoolOptions.getPoolMaintainerClock(); return createPool( - spannerOptions.getSessionPoolOptions(), + sessionPoolOptions, spannerOptions.getDatabaseRole(), ((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(), sessionClient, - new Clock(), + poolMaintainerClock == null ? new Clock() : poolMaintainerClock, Metrics.getMetricRegistry(), labelValues); } @@ -2015,18 +2159,26 @@ int getNumberOfSessionsInUse() { } } - void removeFromPool(PooledSession session) { + @VisibleForTesting + double getRatioOfSessionsInUse() { + synchronized (lock) { + final int maxSessions = options.getMaxSessions(); + if (maxSessions == 0) { + return 0; + } + return (double) numSessionsInUse / maxSessions; + } + } + + boolean removeFromPool(PooledSession session) { synchronized (lock) { if (isClosed()) { decrementPendingClosures(1); - return; + return false; } session.markClosing(); allSessions.remove(session); - numIdleSessionsRemoved++; - } - if (idleSessionRemovedListener != null) { - idleSessionRemovedListener.apply(session); + return true; } } @@ -2036,6 +2188,13 @@ long numIdleSessionsRemoved() { } } + @VisibleForTesting + long numLeakedSessionsRemoved() { + synchronized (lock) { + return numLeakedSessionsRemoved; + } + } + @VisibleForTesting int getNumberOfSessionsInPool() { synchronized (lock) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 8856081b36..6fa9ad8156 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import com.google.cloud.spanner.SessionPool.Clock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Objects; @@ -50,10 +51,14 @@ public class SessionPoolOptions { private final ActionOnSessionNotFound actionOnSessionNotFound; private final ActionOnSessionLeak actionOnSessionLeak; private final boolean trackStackTraceOfSessionCheckout; + private final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions; private final long initialWaitForSessionTimeoutMillis; private final boolean autoDetectDialect; private final Duration waitForMinSessions; + /** Property for allowing mocking of session maintenance clock. */ + private final Clock poolMaintainerClock; + private SessionPoolOptions(Builder builder) { // minSessions > maxSessions is only possible if the user has only set a value for maxSessions. // We allow that to prevent code that only sets a value for maxSessions to break if the @@ -73,6 +78,8 @@ private SessionPoolOptions(Builder builder) { this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter; this.autoDetectDialect = builder.autoDetectDialect; this.waitForMinSessions = builder.waitForMinSessions; + this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions; + this.poolMaintainerClock = builder.poolMaintainerClock; } @Override @@ -97,7 +104,10 @@ public boolean equals(Object o) { && Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes) && Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter) && Objects.equals(this.autoDetectDialect, other.autoDetectDialect) - && Objects.equals(this.waitForMinSessions, other.waitForMinSessions); + && Objects.equals(this.waitForMinSessions, other.waitForMinSessions) + && Objects.equals( + this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions) + && Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock); } @Override @@ -117,7 +127,9 @@ public int hashCode() { this.keepAliveIntervalMinutes, this.removeInactiveSessionAfter, this.autoDetectDialect, - this.waitForMinSessions); + this.waitForMinSessions, + this.inactiveTransactionRemovalOptions, + this.poolMaintainerClock); } public Builder toBuilder() { @@ -180,6 +192,25 @@ public boolean isAutoDetectDialect() { return autoDetectDialect; } + InactiveTransactionRemovalOptions getInactiveTransactionRemovalOptions() { + return inactiveTransactionRemovalOptions; + } + + boolean closeInactiveTransactions() { + return inactiveTransactionRemovalOptions.actionOnInactiveTransaction + == ActionOnInactiveTransaction.CLOSE; + } + + boolean warnAndCloseInactiveTransactions() { + return inactiveTransactionRemovalOptions.actionOnInactiveTransaction + == ActionOnInactiveTransaction.WARN_AND_CLOSE; + } + + boolean warnInactiveTransactions() { + return inactiveTransactionRemovalOptions.actionOnInactiveTransaction + == ActionOnInactiveTransaction.WARN; + } + @VisibleForTesting long getInitialWaitForSessionTimeoutMillis() { return initialWaitForSessionTimeoutMillis; @@ -195,6 +226,11 @@ boolean isFailOnSessionLeak() { return actionOnSessionLeak == ActionOnSessionLeak.FAIL; } + @VisibleForTesting + Clock getPoolMaintainerClock() { + return poolMaintainerClock; + } + public boolean isTrackStackTraceOfSessionCheckout() { return trackStackTraceOfSessionCheckout; } @@ -223,6 +259,134 @@ private enum ActionOnSessionLeak { FAIL } + @VisibleForTesting + enum ActionOnInactiveTransaction { + WARN, + WARN_AND_CLOSE, + CLOSE + } + + /** Configuration options for task to clean up inactive transactions. */ + static class InactiveTransactionRemovalOptions { + + /** Option to set the behaviour when there are inactive transactions. */ + private ActionOnInactiveTransaction actionOnInactiveTransaction; + + /** + * Frequency for closing inactive transactions. Between two consecutive task executions, it's + * ensured that the duration is greater or equal to this duration. + */ + private Duration executionFrequency; + + /** + * Long-running transactions will be cleaned up if utilisation is greater than the below value. + */ + private double usedSessionsRatioThreshold; + + /** + * A transaction is considered to be idle if it has not been used for a duration greater than + * the below value. + */ + private Duration idleTimeThreshold; + + InactiveTransactionRemovalOptions(final Builder builder) { + this.actionOnInactiveTransaction = builder.actionOnInactiveTransaction; + this.idleTimeThreshold = builder.idleTimeThreshold; + this.executionFrequency = builder.executionFrequency; + this.usedSessionsRatioThreshold = builder.usedSessionsRatioThreshold; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof InactiveTransactionRemovalOptions)) { + return false; + } + InactiveTransactionRemovalOptions other = (InactiveTransactionRemovalOptions) o; + return Objects.equals(this.actionOnInactiveTransaction, other.actionOnInactiveTransaction) + && Objects.equals(this.idleTimeThreshold, other.idleTimeThreshold) + && Objects.equals(this.executionFrequency, other.executionFrequency) + && Objects.equals(this.usedSessionsRatioThreshold, other.usedSessionsRatioThreshold); + } + + @Override + public int hashCode() { + return Objects.hash( + this.actionOnInactiveTransaction, + this.idleTimeThreshold, + this.executionFrequency, + this.usedSessionsRatioThreshold); + } + + Duration getExecutionFrequency() { + return executionFrequency; + } + + double getUsedSessionsRatioThreshold() { + return usedSessionsRatioThreshold; + } + + Duration getIdleTimeThreshold() { + return idleTimeThreshold; + } + + static InactiveTransactionRemovalOptions.Builder newBuilder() { + return new Builder(); + } + + static class Builder { + private ActionOnInactiveTransaction actionOnInactiveTransaction; + private Duration executionFrequency = Duration.ofMinutes(2); + private double usedSessionsRatioThreshold = 0.95; + private Duration idleTimeThreshold = Duration.ofMinutes(60L); + + public Builder() {} + + InactiveTransactionRemovalOptions build() { + validate(); + return new InactiveTransactionRemovalOptions(this); + } + + private void validate() { + Preconditions.checkArgument( + executionFrequency.toMillis() > 0, + "Execution frequency %s should be positive", + executionFrequency.toMillis()); + Preconditions.checkArgument( + idleTimeThreshold.toMillis() > 0, + "Idle Time Threshold duration %s should be positive", + idleTimeThreshold.toMillis()); + } + + @VisibleForTesting + InactiveTransactionRemovalOptions.Builder setActionOnInactiveTransaction( + final ActionOnInactiveTransaction actionOnInactiveTransaction) { + this.actionOnInactiveTransaction = actionOnInactiveTransaction; + return this; + } + + @VisibleForTesting + InactiveTransactionRemovalOptions.Builder setExecutionFrequency( + final Duration executionFrequency) { + this.executionFrequency = executionFrequency; + return this; + } + + @VisibleForTesting + InactiveTransactionRemovalOptions.Builder setUsedSessionsRatioThreshold( + final double usedSessionsRatioThreshold) { + this.usedSessionsRatioThreshold = usedSessionsRatioThreshold; + return this; + } + + @VisibleForTesting + InactiveTransactionRemovalOptions.Builder setIdleTimeThreshold( + final Duration idleTimeThreshold) { + this.idleTimeThreshold = idleTimeThreshold; + return this; + } + } + } + /** Builder for creating SessionPoolOptions. */ public static class Builder { private boolean minSessionsSet = false; @@ -254,12 +418,16 @@ public static class Builder { */ private boolean trackStackTraceOfSessionCheckout = true; + private InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder().build(); private long loopFrequency = 10 * 1000L; private int keepAliveIntervalMinutes = 30; private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L); private boolean autoDetectDialect = false; private Duration waitForMinSessions = Duration.ZERO; + private Clock poolMaintainerClock; + public Builder() {} private Builder(SessionPoolOptions options) { @@ -279,6 +447,8 @@ private Builder(SessionPoolOptions options) { this.removeInactiveSessionAfter = options.removeInactiveSessionAfter; this.autoDetectDialect = options.autoDetectDialect; this.waitForMinSessions = options.waitForMinSessions; + this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; + this.poolMaintainerClock = options.poolMaintainerClock; } /** @@ -335,6 +505,12 @@ Builder setLoopFrequency(long loopFrequency) { return this; } + Builder setInactiveTransactionRemovalOptions( + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions) { + this.inactiveTransactionRemovalOptions = inactiveTransactionRemovalOptions; + return this; + } + public Builder setRemoveInactiveSessionAfter(Duration duration) { this.removeInactiveSessionAfter = duration; return this; @@ -369,6 +545,70 @@ public Builder setBlockIfPoolExhausted() { return this; } + /** + * If there are inactive transactions, log warning messages with the origin of such transactions + * to aid debugging. A transaction is classified as inactive if it executes for more than a + * system defined duration. + * + *

    This option won't change the state of the transactions. It only generates warning logs + * that can be used for debugging. + * + * @return this builder for chaining + */ + Builder setWarnIfInactiveTransactions() { + this.inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.WARN) + .build(); + return this; + } + + /** + * If there are inactive transactions, release the resources consumed by such transactions. A + * transaction is classified as inactive if it executes for more than a system defined duration. + * The option would also produce necessary warning logs through which it can be debugged as to + * what resources were released due to this option. + * + *

    Use the option {@link Builder#setWarnIfInactiveTransactions()} if you only want to log + * warnings about long-running transactions. + * + * @return this builder for chaining + */ + Builder setWarnAndCloseIfInactiveTransactions() { + this.inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.WARN_AND_CLOSE) + .build(); + return this; + } + + /** + * If there are inactive transactions, release the resources consumed by such transactions. A + * transaction is classified as inactive if it executes for more than a system defined duration. + * + *

    Use the option {@link Builder#setWarnIfInactiveTransactions()} if you only want to log + * warnings about long-running sessions. + * + *

    Use the option {@link Builder#setWarnAndCloseIfInactiveTransactions()} if you want to log + * warnings along with closing the long-running transactions. + * + * @return this builder for chaining + */ + @VisibleForTesting + Builder setCloseIfInactiveTransactions() { + this.inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .build(); + return this; + } + + @VisibleForTesting + Builder setPoolMaintainerClock(Clock poolMaintainerClock) { + this.poolMaintainerClock = poolMaintainerClock; + return this; + } + /** * Sets whether the client should automatically execute a background query to detect the dialect * that is used by the database or not. Set this option to true if you do not know what the diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 53bb30dba7..99e40daa53 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -26,6 +26,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -45,12 +46,16 @@ import com.google.cloud.spanner.AbstractResultSet.GrpcStreamIterator; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; +import com.google.cloud.spanner.BaseSessionPoolTest.FakeClock; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; +import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; +import com.google.cloud.spanner.SessionPoolOptions.ActionOnInactiveTransaction; +import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator; @@ -111,6 +116,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; +import org.threeten.bp.Instant; @RunWith(JUnit4.class) public class DatabaseClientImplTest { @@ -198,6 +204,305 @@ public void tearDown() { mockSpanner.removeAllExecutionTimes(); } + @Test + public void + testPoolMaintainer_whenInactiveTransactionAndSessionIsNotFoundOnBackend_removeSessionsFromPool() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 2L)) // any session not used for more than 2s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createSessionNotFoundException("TEST_SESSION_NAME"))); + while (true) { + try { + transaction.executeUpdate(UPDATE_STATEMENT); + + // Simulate a delay of 3 minutes to ensure that the below transaction is a long-running + // one. + // As per this test, anything which takes more than 2s is long-running + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + mockSpanner.setCommitExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(0, 0)); + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + // first session executed update, session found to be long-running and cleaned up. + // During commit, SessionNotFound exception from backend caused replacement of session and + // transaction needs to be retried. + // On retry, session again found to be long-running and cleaned up. + // During commit, there was no exception from backend. + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(2, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void + testPoolMaintainer_whenInactiveTransactionAndSessionExistsOnBackend_removeSessionsFromPool() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 2L)) // any session not used for more than 2s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + transaction.executeUpdate(UPDATE_STATEMENT); + + // Simulate a delay of 3 minutes to ensure that the below transaction is a long-running + // one. + // As per this test, anything which takes more than 2s is long-running + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + // first session executed update, session found to be long-running and cleaned up. + // During commit, SessionNotFound exception from backend caused replacement of session and + // transaction needs to be retried. + // On retry, session again found to be long-running and cleaned up. + // During commit, there was no exception from backend. + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(1, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void testPoolMaintainer_whenLongRunningPartitionedUpdateRequest_takeNoAction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 2L)) // any session not used for more than 2s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + client.executePartitionedUpdate(UPDATE_STATEMENT); + + // Simulate a delay of 3 minutes to ensure that the below transaction is a long-running one. + // As per this test, anything which takes more than 2s is long-running + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + /** + * PDML transaction is expected to be long-running. This is indicated through session flag + * eligibleForLongRunning = true . For all other transactions which are not expected to be + * long-running eligibleForLongRunning = false. + * + *

    Below tests uses a session for PDML transaction. Post that, the same session is used for + * executeUpdate(). Both transactions are long-running. The test verifies that + * eligibleForLongRunning = false for the second transaction, and it's identified as a + * long-running transaction. + */ + @Test + public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessionsFromPool() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 2L)) // any session not used for more than 2s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + client.executePartitionedUpdate(UPDATE_STATEMENT); + + // Simulate a delay of 3 minutes to ensure that the below transaction is a long-running one. + // As per this test, anything which takes more than 2s is long-running + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + transaction.executeUpdate(UPDATE_STATEMENT); + + // Simulate a delay of 3 minutes to ensure that the below transaction is a long-running + // one. + // As per this test, anything which takes more than 2s is long-running + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + // first session executed update, session found to be long-running and cleaned up. + // During commit, SessionNotFound exception from backend caused replacement of session and + // transaction needs to be retried. + // On retry, session again found to be long-running and cleaned up. + // During commit, there was no exception from backend. + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(1, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + @Test public void testWrite() { DatabaseClient client = @@ -3034,4 +3339,13 @@ static void assertAsString(ImmutableList expected, ResultSet resultSet, expected.stream().collect(Collectors.joining(",", "[", "]")), resultSet.getValue(col).getAsString()); } + + static class FakeClock extends Clock { + volatile long currentTimeMillis; + + @Override + public Instant instant() { + return Instant.ofEpochMilli(currentTimeMillis); + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 6669567611..892993cae1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -596,7 +596,6 @@ private static void checkStreamException( private int maxNumSessionsInOneBatch = 100; private int maxTotalSessions = Integer.MAX_VALUE; private AtomicInteger numSessionsCreated = new AtomicInteger(); - private SimulatedExecutionTime beginTransactionExecutionTime = NO_EXECUTION_TIME; private SimulatedExecutionTime commitExecutionTime = NO_EXECUTION_TIME; private SimulatedExecutionTime batchCreateSessionsExecutionTime = NO_EXECUTION_TIME; @@ -903,7 +902,7 @@ public void getSession(GetSessionRequest request, StreamObserver respon } } - private void setSessionNotFound(String name, StreamObserver responseObserver) { + public StatusRuntimeException createSessionNotFoundException(String name) { ResourceInfo resourceInfo = ResourceInfo.newBuilder() .setResourceType(SpannerExceptionFactory.SESSION_RESOURCE_TYPE) @@ -915,10 +914,14 @@ private void setSessionNotFound(String name, StreamObserver responseObser ProtoLiteUtils.metadataMarshaller(resourceInfo)); Metadata trailers = new Metadata(); trailers.put(key, resourceInfo); - responseObserver.onError( - Status.NOT_FOUND - .withDescription(String.format("Session not found: Session with id %s not found", name)) - .asRuntimeException(trailers)); + return Status.NOT_FOUND + .withDescription(String.format("Session not found: Session with id %s not found", name)) + .asRuntimeException(trailers); + } + + private void setSessionNotFound(String name, StreamObserver responseObserver) { + final StatusRuntimeException statusRuntimeException = createSessionNotFoundException(name); + responseObserver.onError(statusRuntimeException); } @Override @@ -2207,10 +2210,6 @@ public void removeAllExecutionTimes() { streamingReadExecutionTime = NO_EXECUTION_TIME; } - public SimulatedExecutionTime getBeginTransactionExecutionTime() { - return beginTransactionExecutionTime; - } - public void setBeginTransactionExecutionTime( SimulatedExecutionTime beginTransactionExecutionTime) { this.beginTransactionExecutionTime = Preconditions.checkNotNull(beginTransactionExecutionTime); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java index 9cdabfac68..a979c6fed9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java @@ -16,9 +16,13 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -27,6 +31,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.threeten.bp.Duration; /** Unit tests for {@link com.google.cloud.spanner.SessionPoolOptions} */ @RunWith(Parameterized.class) @@ -117,4 +122,68 @@ public void setZeroMaxSessions() { public void setNegativeMaxSessions() { SessionPoolOptions.newBuilder().setMaxSessions(-1); } + + @Test + public void verifyDefaultInactiveTransactionRemovalOptions() { + SessionPoolOptions sessionPoolOptions = SessionPoolOptions.newBuilder().build(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + sessionPoolOptions.getInactiveTransactionRemovalOptions(); + + assertFalse(sessionPoolOptions.warnInactiveTransactions()); + assertFalse(sessionPoolOptions.warnAndCloseInactiveTransactions()); + assertFalse(sessionPoolOptions.closeInactiveTransactions()); + assertEquals(0.95, inactiveTransactionRemovalOptions.getUsedSessionsRatioThreshold(), 0.0); + assertEquals(Duration.ofMinutes(2), inactiveTransactionRemovalOptions.getExecutionFrequency()); + assertEquals(Duration.ofMinutes(60), inactiveTransactionRemovalOptions.getIdleTimeThreshold()); + } + + @Test + public void setWarnIfInactiveTransactions() { + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder().setWarnIfInactiveTransactions().build(); + + assertTrue(sessionPoolOptions.warnInactiveTransactions()); + assertFalse(sessionPoolOptions.warnAndCloseInactiveTransactions()); + assertFalse(sessionPoolOptions.closeInactiveTransactions()); + } + + @Test + public void setWarnAndCloseIfInactiveTransactions() { + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder().setWarnAndCloseIfInactiveTransactions().build(); + + assertFalse(sessionPoolOptions.warnInactiveTransactions()); + assertTrue(sessionPoolOptions.warnAndCloseInactiveTransactions()); + assertFalse(sessionPoolOptions.closeInactiveTransactions()); + } + + @Test + public void setCloseIfInactiveTransactions() { + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder().setCloseIfInactiveTransactions().build(); + + assertFalse(sessionPoolOptions.warnInactiveTransactions()); + assertFalse(sessionPoolOptions.warnAndCloseInactiveTransactions()); + assertTrue(sessionPoolOptions.closeInactiveTransactions()); + } + + @Test(expected = IllegalArgumentException.class) + public void setNegativeExecutionFrequency() { + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setExecutionFrequency(Duration.ofMillis(-1)) + .build(); + SessionPoolOptions.newBuilder() + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions); + } + + @Test(expected = IllegalArgumentException.class) + public void setNegativeIdleTimeThreshold() { + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold(Duration.ofMillis(-1)) + .build(); + SessionPoolOptions.newBuilder() + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index 9a1df1c964..d1aab02d32 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -26,6 +26,8 @@ import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; +import com.google.cloud.spanner.SessionPoolOptions.ActionOnInactiveTransaction; +import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; @@ -66,7 +68,6 @@ public class SessionPoolStressTest extends BaseSessionPoolTest { Object lock = new Object(); Random random = new Random(); FakeClock clock = new FakeClock(); - Map sessions = new HashMap<>(); // Exception keeps track of where the session was closed at. Map closedSessions = new HashMap<>(); @@ -211,7 +212,13 @@ public void stressTest() throws Exception { int minSessions = 2; int maxSessions = concurrentThreads / 2; SessionPoolOptions.Builder builder = - SessionPoolOptions.newBuilder().setMinSessions(minSessions).setMaxSessions(maxSessions); + SessionPoolOptions.newBuilder() + .setMinSessions(minSessions) + .setMaxSessions(maxSessions) + .setInactiveTransactionRemovalOptions( + InactiveTransactionRemovalOptions.newBuilder() + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .build()); if (shouldBlock) { builder.setBlockIfPoolExhausted(); } else { @@ -228,6 +235,14 @@ public void stressTest() throws Exception { return null; } }; + pool.longRunningSessionRemovedListener = + pooled -> { + String name = pooled.getName(); + synchronized (lock) { + sessions.remove(name); + return null; + } + }; for (int i = 0; i < concurrentThreads; i++) { new Thread( () -> { @@ -263,7 +278,7 @@ public void stressTest() throws Exception { releaseThreads.countDown(); threadsDone.await(); synchronized (lock) { - assertThat(maxAliveSessions).isAtMost(maxSessions); + assertThat(pool.totalSessions()).isAtMost(maxSessions); } stopMaintenance.set(true); pool.closeAsync(new SpannerImpl.ClosedException()).get(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 1f7391a60f..bc4757f11d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -95,6 +95,8 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.threeten.bp.Duration; +import org.threeten.bp.Instant; +import org.threeten.bp.temporal.ChronoUnit; /** Tests for SessionPool that mock out the underlying stub. */ @RunWith(Parameterized.class) @@ -552,6 +554,359 @@ public void idleSessionCleanup() throws Exception { pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); } + @Test + public void longRunningTransactionsCleanup_whenActionSetToClose_verifyInactiveSessionsClosed() + throws Exception { + setupForLongRunningTransactionsCleanup(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(3) + .setIncStep(1) + .setMaxIdleSessions(0) + .setCloseIfInactiveTransactions() // set option to close inactive transactions + .build(); + + Clock clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + pool = createPool(clock); + // Make sure pool has been initialized + pool.getSession().close(); + + // All 3 sessions used. 100% of pool utilised. + PooledSessionFuture readSession1 = pool.getSession(); + PooledSessionFuture readSession2 = pool.getSession(); + PooledSessionFuture readSession3 = pool.getSession(); + + // complete the async tasks + readSession1.get().setEligibleForLongRunning(false); + readSession2.get().setEligibleForLongRunning(false); + readSession3.get().setEligibleForLongRunning(true); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + + // ensure that the sessions are in use for > 60 minutes + pool.poolMaintainer.lastExecutionTime = Instant.now(); + when(clock.instant()).thenReturn(Instant.now().plus(61, ChronoUnit.MINUTES)); + + pool.poolMaintainer.maintainPool(); + + // the two session that were un-expectedly long-running were removed from the pool. + // verify that only 1 session that is unexpected to be long-running remains in the pool. + assertEquals(1, pool.totalSessions()); + assertEquals(2, pool.numLeakedSessionsRemoved()); + pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); + } + + @Test + public void longRunningTransactionsCleanup_whenActionSetToWarn_verifyInactiveSessionsOpen() + throws Exception { + setupForLongRunningTransactionsCleanup(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(3) + .setIncStep(1) + .setMaxIdleSessions(0) + .setWarnIfInactiveTransactions() // set option to warn (via logs) inactive transactions + .build(); + Clock clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + pool = createPool(clock); + // Make sure pool has been initialized + pool.getSession().close(); + + // All 3 sessions used. 100% of pool utilised. + PooledSessionFuture readSession1 = pool.getSession(); + PooledSessionFuture readSession2 = pool.getSession(); + PooledSessionFuture readSession3 = pool.getSession(); + + // complete the async tasks + readSession1.get().setEligibleForLongRunning(false); + readSession2.get().setEligibleForLongRunning(false); + readSession3.get().setEligibleForLongRunning(true); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + + // ensure that the sessions are in use for > 60 minutes + pool.poolMaintainer.lastExecutionTime = Instant.now(); + when(clock.instant()).thenReturn(Instant.now().plus(61, ChronoUnit.MINUTES)); + + pool.poolMaintainer.maintainPool(); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + assertEquals(0, pool.numLeakedSessionsRemoved()); + pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); + } + + @Test + public void + longRunningTransactionsCleanup_whenUtilisationBelowThreshold_verifyInactiveSessionsOpen() + throws Exception { + setupForLongRunningTransactionsCleanup(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(3) + .setIncStep(1) + .setMaxIdleSessions(0) + .setCloseIfInactiveTransactions() // set option to close inactive transactions + .build(); + Clock clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + pool = createPool(clock); + pool.getSession().close(); + + // 2/3 sessions are used. Hence utilisation < 95% + PooledSessionFuture readSession1 = pool.getSession(); + PooledSessionFuture readSession2 = pool.getSession(); + + // complete the async tasks and mark sessions as checked out + readSession1.get().setEligibleForLongRunning(false); + readSession2.get().setEligibleForLongRunning(false); + + assertEquals(2, pool.totalSessions()); + assertEquals(2, pool.checkedOutSessions.size()); + + // ensure that the sessions are in use for > 60 minutes + pool.poolMaintainer.lastExecutionTime = Instant.now(); + when(clock.instant()).thenReturn(Instant.now().plus(61, ChronoUnit.MINUTES)); + + pool.poolMaintainer.maintainPool(); + + assertEquals(2, pool.totalSessions()); + assertEquals(2, pool.checkedOutSessions.size()); + assertEquals(0, pool.numLeakedSessionsRemoved()); + pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); + } + + @Test + public void + longRunningTransactionsCleanup_whenAllAreExpectedlyLongRunning_verifyInactiveSessionsOpen() + throws Exception { + SessionImpl session1 = mockSession(); + SessionImpl session2 = mockSession(); + SessionImpl session3 = mockSession(); + + final LinkedList sessions = + new LinkedList<>(Arrays.asList(session1, session2, session3)); + doAnswer( + invocation -> { + executor.submit( + () -> { + SessionConsumerImpl consumer = + invocation.getArgument(2, SessionConsumerImpl.class); + consumer.onSessionReady(sessions.pop()); + }); + return null; + }) + .when(sessionClient) + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); + + for (SessionImpl session : sessions) { + mockKeepAlive(session); + } + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(3) + .setIncStep(1) + .setMaxIdleSessions(0) + .setCloseIfInactiveTransactions() // set option to close inactive transactions + .build(); + Clock clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + pool = createPool(clock); + // Make sure pool has been initialized + pool.getSession().close(); + + // All 3 sessions used. 100% of pool utilised. + PooledSessionFuture readSession1 = pool.getSession(); + PooledSessionFuture readSession2 = pool.getSession(); + PooledSessionFuture readSession3 = pool.getSession(); + + // complete the async tasks + readSession1.get().setEligibleForLongRunning(true); + readSession2.get().setEligibleForLongRunning(true); + readSession3.get().setEligibleForLongRunning(true); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + + // ensure that the sessions are in use for > 60 minutes + pool.poolMaintainer.lastExecutionTime = Instant.now(); + when(clock.instant()).thenReturn(Instant.now().plus(61, ChronoUnit.MINUTES)); + + pool.poolMaintainer.maintainPool(); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + assertEquals(0, pool.numLeakedSessionsRemoved()); + pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); + } + + @Test + public void longRunningTransactionsCleanup_whenBelowDurationThreshold_verifyInactiveSessionsOpen() + throws Exception { + setupForLongRunningTransactionsCleanup(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(3) + .setIncStep(1) + .setMaxIdleSessions(0) + .setCloseIfInactiveTransactions() // set option to close inactive transactions + .build(); + Clock clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + pool = createPool(clock); + // Make sure pool has been initialized + pool.getSession().close(); + + // All 3 sessions used. 100% of pool utilised. + PooledSessionFuture readSession1 = pool.getSession(); + PooledSessionFuture readSession2 = pool.getSession(); + PooledSessionFuture readSession3 = pool.getSession(); + + // complete the async tasks + readSession1.get().setEligibleForLongRunning(false); + readSession2.get().setEligibleForLongRunning(false); + readSession3.get().setEligibleForLongRunning(true); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + + // ensure that the sessions are in use for < 60 minutes + pool.poolMaintainer.lastExecutionTime = Instant.now(); + when(clock.instant()).thenReturn(Instant.now().plus(50, ChronoUnit.MINUTES)); + + pool.poolMaintainer.maintainPool(); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + assertEquals(0, pool.numLeakedSessionsRemoved()); + pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); + } + + @Test + public void longRunningTransactionsCleanup_whenException_doNothing() throws Exception { + setupForLongRunningTransactionsCleanup(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(3) + .setIncStep(1) + .setMaxIdleSessions(0) + .setCloseIfInactiveTransactions() // set option to close inactive transactions + .build(); + Clock clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + pool = createPool(clock); + // Make sure pool has been initialized + pool.getSession().close(); + + // All 3 sessions used. 100% of pool utilised. + PooledSessionFuture readSession1 = pool.getSession(); + PooledSessionFuture readSession2 = pool.getSession(); + PooledSessionFuture readSession3 = pool.getSession(); + + // complete the async tasks + readSession1.get().setEligibleForLongRunning(false); + readSession2.get().setEligibleForLongRunning(false); + readSession3.get().setEligibleForLongRunning(true); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + + when(clock.instant()).thenReturn(Instant.now().plus(50, ChronoUnit.MINUTES)); + + pool.poolMaintainer.lastExecutionTime = null; // setting null to throw exception + pool.poolMaintainer.maintainPool(); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + assertEquals(0, pool.numLeakedSessionsRemoved()); + pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); + } + + @Test + public void + longRunningTransactionsCleanup_whenTaskRecurrenceBelowThreshold_verifyInactiveSessionsOpen() + throws Exception { + setupForLongRunningTransactionsCleanup(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(3) + .setIncStep(1) + .setMaxIdleSessions(0) + .setCloseIfInactiveTransactions() // set option to close inactive transactions + .build(); + Clock clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + pool = createPool(clock); + // Make sure pool has been initialized + pool.getSession().close(); + + // All 3 sessions used. 100% of pool utilised. + PooledSessionFuture readSession1 = pool.getSession(); + PooledSessionFuture readSession2 = pool.getSession(); + PooledSessionFuture readSession3 = pool.getSession(); + + // complete the async tasks + readSession1.get(); + readSession2.get(); + readSession3.get(); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + + pool.poolMaintainer.lastExecutionTime = Instant.now(); + when(clock.instant()).thenReturn(Instant.now().plus(10, ChronoUnit.SECONDS)); + + pool.poolMaintainer.maintainPool(); + + assertEquals(3, pool.totalSessions()); + assertEquals(3, pool.checkedOutSessions.size()); + assertEquals(0, pool.numLeakedSessionsRemoved()); + pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); + } + + private void setupForLongRunningTransactionsCleanup() { + SessionImpl session1 = mockSession(); + SessionImpl session2 = mockSession(); + SessionImpl session3 = mockSession(); + + final LinkedList sessions = + new LinkedList<>(Arrays.asList(session1, session2, session3)); + doAnswer( + invocation -> { + executor.submit( + () -> { + SessionConsumerImpl consumer = + invocation.getArgument(2, SessionConsumerImpl.class); + consumer.onSessionReady(sessions.pop()); + }); + return null; + }) + .when(sessionClient) + .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); + + for (SessionImpl session : sessions) { + mockKeepAlive(session); + } + } + @Test public void keepAlive() throws Exception { options = SessionPoolOptions.newBuilder().setMinSessions(2).setMaxSessions(3).build();