Skip to content
Permalink
Browse files
feat: mark when a Spanner client is closed (#198)
Closing a Spanner client means that all resources that have been returned
by the client are no longer valid, including all DatabaseClients and
corresponding session pools. This will cause errors for any other process
that might still want to use these resources. This change marks when and by
which call stack a Spanner client is closed, and includes that in any
subsequent IllegalStateException that is returned to any process that tries
to use the resources that have been returned by the Spanner client. This
makes it easier to track down where and when a Spanner client is closed by
accident.
  • Loading branch information
olavloite committed May 14, 2020
1 parent a608460 commit 50cb1744e7ede611758d3ff63b3df77a1d3682eb
@@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
@@ -225,7 +226,7 @@ private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callabl
}
}

ListenableFuture<Void> closeAsync() {
return pool.closeAsync();
ListenableFuture<Void> closeAsync(ClosedException closedException) {
return pool.closeAsync(closedException);
}
}
@@ -42,6 +42,7 @@
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
@@ -1123,6 +1124,9 @@ private static enum Position {
@GuardedBy("lock")
private SettableFuture<Void> closureFuture;

@GuardedBy("lock")
private ClosedException closedException;

@GuardedBy("lock")
private ResourceNotFoundException resourceNotFoundException;

@@ -1428,7 +1432,7 @@ PooledSession getReadSession() throws SpannerException {
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
throw new IllegalStateException("Pool has been closed", closedException);
}
if (resourceNotFoundException != null) {
span.addAnnotation("Database has been deleted");
@@ -1497,7 +1501,7 @@ PooledSession getReadWriteSession() {
synchronized (lock) {
if (closureFuture != null) {
span.addAnnotation("Pool has been closed");
throw new IllegalStateException("Pool has been closed");
throw new IllegalStateException("Pool has been closed", closedException);
}
if (resourceNotFoundException != null) {
span.addAnnotation("Database has been deleted");
@@ -1761,12 +1765,13 @@ private void decrementPendingClosures(int count) {
* #getReadWriteSession()} will start throwing {@code IllegalStateException}. The returned future
* blocks till all the sessions created in this pool have been closed.
*/
ListenableFuture<Void> closeAsync() {
ListenableFuture<Void> closeAsync(ClosedException closedException) {
ListenableFuture<Void> retFuture = null;
synchronized (lock) {
if (closureFuture != null) {
throw new IllegalStateException("Close has already been invoked");
throw new IllegalStateException("Close has already been invoked", this.closedException);
}
this.closedException = closedException;
// Fail all pending waiters.
Waiter waiter = readWaiters.poll();
while (waiter != null) {
@@ -46,6 +46,7 @@
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Instant;

/** Default implementation of the Cloud Spanner interface. */
class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
@@ -94,8 +95,22 @@ private static String nextDatabaseClientId(DatabaseId databaseId) {
private final DatabaseAdminClient dbAdminClient;
private final InstanceAdminClient instanceClient;

/**
* Exception class used to track the stack trace at the point when a Spanner instance is closed.
* This exception will be thrown if a user tries to use any resources that were returned by this
* Spanner instance after the instance has been closed. This makes it easier to track down the
* code that (accidently) closed the Spanner instance.
*/
static final class ClosedException extends RuntimeException {
private static final long serialVersionUID = 1451131180314064914L;

ClosedException() {
super("Spanner client was closed at " + Instant.now());
}
}

@GuardedBy("this")
private boolean spannerIsClosed = false;
private ClosedException closedException;

@VisibleForTesting
SpannerImpl(SpannerRpc gapicRpc, SpannerOptions options) {
@@ -131,9 +146,17 @@ SessionImpl sessionWithId(String name) {
return getSessionClient(id.getDatabaseId()).sessionWithId(name);
}

void checkClosed() {
synchronized (this) {
if (closedException != null) {
throw new IllegalStateException("Cloud Spanner client has been closed", closedException);
}
}
}

SessionClient getSessionClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
checkClosed();
if (sessionClients.containsKey(db)) {
return sessionClients.get(db);
} else {
@@ -161,7 +184,7 @@ public InstanceAdminClient getInstanceAdminClient() {
@Override
public DatabaseClient getDatabaseClient(DatabaseId db) {
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
checkClosed();
if (dbClients.containsKey(db) && !dbClients.get(db).pool.isValid()) {
// Move the invalidated client to a separate list, so we can close it together with the
// other database clients when the Spanner instance is closed.
@@ -206,12 +229,12 @@ public void close() {
void close(long timeout, TimeUnit unit) {
List<ListenableFuture<Void>> closureFutures = null;
synchronized (this) {
Preconditions.checkState(!spannerIsClosed, "Cloud Spanner client has been closed");
spannerIsClosed = true;
checkClosed();
closedException = new ClosedException();
closureFutures = new ArrayList<>();
invalidatedDbClients.addAll(dbClients.values());
for (DatabaseClientImpl dbClient : invalidatedDbClients) {
closureFutures.add(dbClient.closeAsync());
closureFutures.add(dbClient.closeAsync(closedException));
}
dbClients.clear();
}
@@ -234,7 +257,9 @@ void close(long timeout, TimeUnit unit) {

@Override
public boolean isClosed() {
return spannerIsClosed;
synchronized (this) {
return closedException != null;
}
}

/** Helper class for gRPC calls that can return paginated results. */
@@ -159,18 +159,18 @@ public void run() {

@Test
public void closeQuicklyDoesNotBlockIndefinitely() throws Exception {
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}

@Test
public void closeAfterInitialCreateDoesNotBlockIndefinitely() throws Exception {
pool.getReadSession().close();
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}

@Test
public void closeWhenSessionsActiveFinishes() throws Exception {
Session session = pool.getReadSession();
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
}
}
@@ -322,7 +322,7 @@ public void run() {
assertThat(maxAliveSessions).isAtMost(maxSessions);
}
stopMaintenance.set(true);
pool.closeAsync().get();
pool.closeAsync(new SpannerImpl.ClosedException()).get();
Exception e = getFailedError();
if (e != null) {
throw e;
@@ -42,6 +42,7 @@
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
@@ -58,6 +59,8 @@
import com.google.spanner.v1.RollbackRequest;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.MetricRegistry;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -165,6 +168,26 @@ public void run() {
Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class));
}

@Test
public void testClosedPoolIncludesClosedException() {
pool = createPool();
assertThat(pool.isValid()).isTrue();
closePoolWithStacktrace();
try {
pool.getReadSession();
fail("missing expected exception");
} catch (IllegalStateException e) {
assertThat(e.getCause()).isInstanceOf(ClosedException.class);
StringWriter sw = new StringWriter();
e.getCause().printStackTrace(new PrintWriter(sw));
assertThat(sw.toString()).contains("closePoolWithStacktrace");
}
}

private void closePoolWithStacktrace() {
pool.closeAsync(new SpannerImpl.ClosedException());
}

@Test
public void sessionCreation() {
setupMockSessionCreation();
@@ -203,7 +226,7 @@ public void poolLifo() {
public void poolClosure() throws Exception {
setupMockSessionCreation();
pool = createPool();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
@@ -237,7 +260,7 @@ public void run() {
// Clear the leaked exception to suppress logging of expected exceptions.
leakedSession.clearLeakedException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
}
@@ -260,7 +283,7 @@ public void run() {
}
})
.start();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
stop.set(true);
}

@@ -316,7 +339,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
@@ -374,7 +397,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
@@ -411,7 +434,7 @@ public Void call() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
ListenableFuture<Void> f = pool.closeAsync();
ListenableFuture<Void> f = pool.closeAsync(new SpannerImpl.ClosedException());
releaseCreation.countDown();
f.get();
assertThat(f.isDone()).isTrue();
@@ -456,7 +479,7 @@ public Session answer(InvocationOnMock invocation) throws Throwable {
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insidePrepare.await();
ListenableFuture<Void> f = pool.closeAsync();
ListenableFuture<Void> f = pool.closeAsync(new SpannerImpl.ClosedException());
releasePrepare.countDown();
f.get();
assertThat(f.isDone()).isTrue();
@@ -487,7 +510,7 @@ public void run() {
PooledSession leakedSession = pool.getReadSession();
// Suppress expected leakedSession warning.
leakedSession.clearLeakedException();
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
expectedException.expect(IllegalStateException.class);
pool.getReadSession();
}
@@ -925,7 +948,7 @@ public void run() {
runMaintainanceLoop(clock, pool, cycles);
// We will still close 2 sessions since at any point in time only 1 session was in use.
assertThat(pool.numIdleSessionsRemoved()).isEqualTo(2L);
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
@@ -976,7 +999,7 @@ public void run() {
// The session pool only keeps MinSessions + MaxIdleSessions alive.
verify(session, times(options.getMinSessions() + options.getMaxIdleSessions()))
.singleUse(any(TimestampBound.class));
pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

@Test
@@ -1061,7 +1084,7 @@ public void run() {
assertThat(pool.getNumberOfAvailableWritePreparedSessions())
.isEqualTo((int) Math.ceil(options.getMinSessions() * options.getWriteSessionsFraction()));

pool.closeAsync().get(5L, TimeUnit.SECONDS);
pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS);
}

private void waitForExpectedSessionPool(int expectedSessions, float writeFraction)
@@ -1447,7 +1470,7 @@ public Integer run(TransactionContext transaction) throws Exception {
.isTrue();
}
}
pool.closeAsync();
pool.closeAsync(new SpannerImpl.ClosedException());
}
}
}
@@ -27,8 +27,11 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceRpc;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -222,13 +225,36 @@ public void testClientId() {

// Get a database client for the same database as the first database. As this goes through a
// different Spanner instance with potentially different options, it will get a different
// client
// id.
// client id.
DatabaseClientImpl databaseClient3 = (DatabaseClientImpl) spanner.getDatabaseClient(db);
assertThat(databaseClient3.clientId).isEqualTo("client-2");
}
}

@Test
public void testClosedException() {
Spanner spanner = new SpannerImpl(rpc, spannerOptions);
assertThat(spanner.isClosed()).isFalse();
// Close the Spanner instance in a different method so we can actually verify that the entire
// stacktrace of the method that closed the instance is included in the exception that will be
// thrown by the instance after it has been closed.
closeSpannerAndIncludeStacktrace(spanner);
assertThat(spanner.isClosed()).isTrue();
try {
spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
fail("missing expected exception");
} catch (IllegalStateException e) {
assertThat(e.getCause()).isInstanceOf(ClosedException.class);
StringWriter sw = new StringWriter();
e.getCause().printStackTrace(new PrintWriter(sw));
assertThat(sw.toString()).contains("closeSpannerAndIncludeStacktrace");
}
}

private void closeSpannerAndIncludeStacktrace(Spanner spanner) {
spanner.close();
}

private SpannerOptions createSpannerOptions() {
return SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")

0 comments on commit 50cb174

Please sign in to comment.