@@ -49,6 +49,7 @@
import com .google .common .base .Supplier ;
import com .google .common .collect .ImmutableList ;
import com .google .common .collect .ImmutableMap ;
import com .google .common .collect .ImmutableSet ;
import com .google .common .util .concurrent .ListenableFuture ;
import com .google .common .util .concurrent .MoreExecutors ;
import com .google .common .util .concurrent .SettableFuture ;
@@ -102,6 +103,17 @@ final class SessionPool {
private static final Logger logger = Logger .getLogger (SessionPool .class .getName ());
private static final Tracer tracer = Tracing .getTracer ();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession" ;
static final ImmutableSet <ErrorCode > SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
ImmutableSet .of (
ErrorCode .UNKNOWN ,
ErrorCode .INVALID_ARGUMENT ,
ErrorCode .PERMISSION_DENIED ,
ErrorCode .UNAUTHENTICATED ,
ErrorCode .RESOURCE_EXHAUSTED ,
ErrorCode .FAILED_PRECONDITION ,
ErrorCode .OUT_OF_RANGE ,
ErrorCode .UNIMPLEMENTED ,
ErrorCode .INTERNAL );
/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
@@ -1114,6 +1126,9 @@ private static enum Position {
@ GuardedBy ("lock" )
private ResourceNotFoundException resourceNotFoundException ;
@ GuardedBy ("lock" )
private boolean stopAutomaticPrepare ;
@ GuardedBy ("lock" )
private final LinkedList <PooledSession > readSessions = new LinkedList <>();
@@ -1348,8 +1363,9 @@ private boolean isDatabaseOrInstanceNotFound(SpannerException e) {
return e instanceof DatabaseNotFoundException || e instanceof InstanceNotFoundException ;
}
private boolean isPermissionDenied (SpannerException e ) {
return e .getErrorCode () == ErrorCode .PERMISSION_DENIED ;
private boolean shouldStopPrepareSessions (SpannerException e ) {
return isDatabaseOrInstanceNotFound (e )
|| SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES .contains (e .getErrorCode ());
}
private void invalidateSession (PooledSession session ) {
@@ -1477,7 +1493,7 @@ PooledSession getReadWriteSession() {
// session.
while (true ) {
Waiter waiter = null ;
boolean inProcessPrepare = false ;
boolean inProcessPrepare = stopAutomaticPrepare ;
synchronized (lock ) {
if (closureFuture != null ) {
span .addAnnotation ("Pool has been closed" );
@@ -1494,7 +1510,7 @@ PooledSession getReadWriteSession() {
}
sess = writePreparedSessions .poll ();
if (sess == null ) {
if (numSessionsBeingPrepared <= prepareThreadPoolSize ) {
if (! inProcessPrepare && numSessionsBeingPrepared <= prepareThreadPoolSize ) {
if (numSessionsBeingPrepared <= readWriteWaiters .size ()) {
PooledSession readSession = readSessions .poll ();
if (readSession != null ) {
@@ -1550,12 +1566,16 @@ PooledSession getReadWriteSession() {
if (inProcessPrepare ) {
try {
sess .prepareReadWriteTransaction ();
// Session prepare succeeded, restart automatic prepare if it had been stopped.
synchronized (lock ) {
stopAutomaticPrepare = false ;
}
} catch (Throwable t ) {
sess = null ;
SpannerException e = newSpannerException (t );
if (!isClosed ()) {
handlePrepareSessionFailure (e , sess , false );
}
sess = null ;
if (!isSessionNotFound (e )) {
throw e ;
}
@@ -1696,25 +1716,30 @@ private void handlePrepareSessionFailure(
synchronized (lock ) {
if (isSessionNotFound (e )) {
invalidateSession (session );
} else if (isDatabaseOrInstanceNotFound (e ) || isPermissionDenied (e )) {
// Database has been deleted or the user has no permission to write to this database. We
// should stop trying to prepare any transactions. Also propagate the error to all waiters,
// as any further waiting is pointless.
} else if (shouldStopPrepareSessions (e )) {
// Database has been deleted or the user has no permission to write to this database, or
// there is some other semi-permanent error. We should stop trying to prepare any
// transactions. Also propagate the error to all waiters if the database or instance has
// been deleted, as any further waiting is pointless.
stopAutomaticPrepare = true ;
while (readWriteWaiters .size () > 0 ) {
readWriteWaiters .poll ().put (e );
}
while (readWaiters .size () > 0 ) {
readWaiters .poll ().put (e );
}
// Remove the session from the pool.
allSessions .remove (session );
if (isClosed ()) {
decrementPendingClosures (1 );
if (isDatabaseOrInstanceNotFound (e )) {
// Remove the session from the pool.
if (isClosed ()) {
decrementPendingClosures (1 );
}
allSessions .remove (session );
this .resourceNotFoundException =
MoreObjects .firstNonNull (
this .resourceNotFoundException , (ResourceNotFoundException ) e );
} else {
releaseSession (session , Position .FIRST );
}
this .resourceNotFoundException =
MoreObjects .firstNonNull (
this .resourceNotFoundException ,
isDatabaseOrInstanceNotFound (e ) ? (ResourceNotFoundException ) e : null );
} else if (informFirstWaiter && readWriteWaiters .size () > 0 ) {
releaseSession (session , Position .FIRST );
readWriteWaiters .poll ().put (e );
@@ -1809,6 +1834,9 @@ private boolean shouldUnblockReader() {
private boolean shouldPrepareSession () {
synchronized (lock ) {
if (stopAutomaticPrepare ) {
return false ;
}
int preparedSessions = writePreparedSessions .size () + numSessionsBeingPrepared ;
return preparedSessions < Math .floor (options .getWriteSessionsFraction () * totalSessions ());
}