Skip to content

Commit

Permalink
fix: Improve handling of futures and threads during refresh.
Browse files Browse the repository at this point in the history
  • Loading branch information
hessjcg committed Oct 4, 2023
1 parent 4df326f commit 62a72ad
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 84 deletions.
175 changes: 139 additions & 36 deletions core/src/main/java/com/google/cloud/sql/core/CloudSqlInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.security.KeyPair;
Expand Down Expand Up @@ -68,7 +67,10 @@ class CloudSqlInstance {
private ListenableFuture<InstanceData> nextInstanceData;

@GuardedBy("instanceDataGuard")
private boolean forceRefreshRunning;
private boolean refreshRunning;

@GuardedBy("instanceDataGuard")
private Throwable currentRefreshFailure;

/**
* Initializes a new Cloud SQL instance based on the given connection name.
Expand Down Expand Up @@ -100,24 +102,71 @@ class CloudSqlInstance {
}

synchronized (instanceDataGuard) {
this.currentInstanceData = executor.submit(this::performRefresh);
this.currentInstanceData = this.startRefreshAttempt();
this.nextInstanceData = currentInstanceData;
}
}

/**
* Returns the current data related to the instance from {@link #performRefresh()}. May block if
* no valid data is currently available.
* Returns the current data related to the instance from {@link #startRefreshAttempt()}. May block
* if no valid data is currently available. This method is called by an application thread when it
* is trying to create a new connection to the database. (It is not called by a
* ListeningScheduledExecutorService task.) So it is OK to block waiting for a future to complete.
*
* <p>When no refresh attempt is in progress, this returns immediately. Otherwise, it waits up to
* 30 seconds. If a refresh attempt succeeds, returns immediately at the end of that successful
* attempt. If no attempts succeed within the 30 second timeout, throws a RuntimeException with
* the exception from the last failed refresh attempt as the cause.
*
* <p>The behavior of this method has changed from earlier releases. In version 1.14.1 and
* earlier, When no refresh attempt is in progress, returns immediately. Otherwise, blocks
* application thread until the current refresh attempt finishes. If the refresh attempt succeeds,
* this returns the InstanceData. If not, this throws a RuntimeException, while a new refresh
* attempt is submitted to the executor in the background.
*/
private InstanceData getInstanceData(long timeoutMs) {
ListenableFuture<InstanceData> instanceDataFuture;
synchronized (instanceDataGuard) {
instanceDataFuture = currentInstanceData;
// If the currentInstanceData has expired, then force refresh (which will balk if a refresh
// is already running) and make this and future requests to getInstanceData wait on the
// refresh operation to complete.
if (instanceDataFuture.isDone()) {
Instant expiration;
try {
expiration = instanceDataFuture.get().getExpiration();
if (expiration == null || expiration.isBefore(Instant.now())) {
forceRefresh();
currentInstanceData = nextInstanceData;
instanceDataFuture = currentInstanceData;
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
forceRefresh();
}
}
}

try {
return Uninterruptibles.getUninterruptibly(
instanceDataFuture, timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException ex) {
return instanceDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
synchronized (instanceDataGuard) {
if (currentRefreshFailure != null) {
throw new RuntimeException(
String.format(
"Unable to get valid instance data within %d ms."
+ " Last refresh attempt failed:",
timeoutMs)
+ currentRefreshFailure.getMessage(),
currentRefreshFailure);
}
}
throw new RuntimeException(
String.format(
"Unable to get valid instance data within %d ms. No refresh has completed.",
timeoutMs),
e);
} catch (ExecutionException | InterruptedException ex) {
Throwable cause = ex.getCause();
Throwables.throwIfUnchecked(cause);
throw new RuntimeException(cause);
Expand Down Expand Up @@ -161,45 +210,88 @@ String getPreferredIp(List<String> preferredTypes, long timeoutMs) {
* new refresh is already in progress. If successful, other methods will block until refresh has
* been completed.
*/
void forceRefresh() {
boolean forceRefresh() {
synchronized (instanceDataGuard) {
// Don't force a refresh until the current forceRefresh operation
// Don't force a refresh until the current refresh operation
// has produced a successful refresh.
if (forceRefreshRunning) {
return;
if (refreshRunning) {
return false;
}

forceRefreshRunning = true;
nextInstanceData.cancel(false);
logger.fine(
String.format(
"[%s] Force Refresh: the next refresh operation was cancelled."
+ " Scheduling new refresh operation immediately.",
instanceName));
nextInstanceData = executor.submit(this::performRefresh);
nextInstanceData = this.startRefreshAttempt();
}
return true;
}

/**
* Triggers an update of internal information obtained from the Cloud SQL Admin API. Replaces the
* value of currentInstanceData and schedules the next refresh shortly before the information
* would expire.
* Triggers an update of internal information obtained from the Cloud SQL Admin API, returning a
* future that resolves once a valid InstanceData has been acquired. This sets up a chain of
* futures that will 1. Acquire a rate limiter. 2. Attempt to fetch instance data. 3. Schedule the
* next attempt to get instance data based on the success/failure of this attempt.
*
* <p>This behavior changed. In version 1.14.1 and earlier, this method was called
* performRefresh(). It used to block until either (1) an InstanceData was retrieved or (2) an
* attempt failed, and an exception was thrown.
*
* @see com.google.cloud.sql.core.CloudSqlInstance#handleRefreshResult(
* com.google.common.util.concurrent.ListenableFuture)
*/
private InstanceData performRefresh() throws InterruptedException, ExecutionException {
logger.fine(
String.format("[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
private ListenableFuture<InstanceData> startRefreshAttempt() {
// As soon as we begin submitting refresh attempts to the executor, mark a refresh
// as "in-progress" so that subsequent forceRefresh() calls balk until this one completes.
synchronized (instanceDataGuard) {
refreshRunning = true;
}

// To avoid unreasonable SQL Admin API usage, use a rate limit to throttle our usage.
//noinspection UnstableApiUsage
forcedRenewRateLimiter.acquire();
logger.fine(
String.format(
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
instanceName));
ListenableFuture rateLimit =
executor.submit(
() -> {
logger.fine(
String.format(
"[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
//noinspection UnstableApiUsage
forcedRenewRateLimiter.acquire();
logger.fine(
String.format(
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
instanceName));
},
executor);

// Once rate limiter is done, attempt to getInstanceData.
ListenableFuture<InstanceData> dataFuture =
Futures.whenAllComplete(rateLimit)
.callAsync(
() -> {
return instanceDataSupplier.getInstanceData(
this.instanceName,
this.accessTokenSupplier,
this.authType,
executor,
keyPair);
},
executor);

// Finally, reschedule refresh after getInstanceData is complete.
ListenableFuture<InstanceData> rescheduleFuture =
Futures.whenAllComplete(dataFuture)
.callAsync(() -> handleRefreshResult(dataFuture), executor);

return rescheduleFuture;
}

private ListenableFuture<InstanceData> handleRefreshResult(
ListenableFuture<InstanceData> dataFuture) {
try {
InstanceData data =
instanceDataSupplier.getInstanceData(
this.instanceName, this.accessTokenSupplier, this.authType, executor, keyPair);
// This does not block, because it only gets called when dataFuture has completed.
// This will throw an exception if the refresh attempt has failed.
InstanceData data = dataFuture.get();

logger.fine(
String.format(
Expand All @@ -218,13 +310,21 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
.toString()));

synchronized (instanceDataGuard) {
// Refresh completed successfully, reset forceRefreshRunning.
refreshRunning = false;
currentRefreshFailure = null;
currentInstanceData = Futures.immediateFuture(data);

// Now update nextInstanceData to perform a refresh after the
// scheduled delay
nextInstanceData =
executor.schedule(this::performRefresh, secondsToRefresh, TimeUnit.SECONDS);
// Refresh completed successfully, reset forceRefreshRunning.
forceRefreshRunning = false;
Futures.scheduleAsync(
this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor);

// Resolves to an InstanceData immediately
return currentInstanceData;
}
return data;

} catch (ExecutionException | InterruptedException e) {
logger.log(
Level.FINE,
Expand All @@ -233,9 +333,12 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
instanceName),
e);
synchronized (instanceDataGuard) {
nextInstanceData = executor.submit(this::performRefresh);
currentRefreshFailure = e;
nextInstanceData = this.startRefreshAttempt();

// Resolves after the next successful refresh attempt.
return nextInstanceData;
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ interface InstanceDataSupplier {
* @throws ExecutionException if an exception is thrown during execution.
* @throws InterruptedException if the executor is interrupted.
*/
InstanceData getInstanceData(
ListenableFuture<InstanceData> getInstanceData(
CloudSqlInstanceName instanceName,
AccessTokenSupplier accessTokenSupplier,
AuthType authType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private String generatePublicKeyCert(KeyPair keyPair) {
* @throws InterruptedException if the executor is interrupted.
*/
@Override
public InstanceData getInstanceData(
public ListenableFuture<InstanceData> getInstanceData(
CloudSqlInstanceName instanceName,
AccessTokenSupplier accessTokenSupplier,
AuthType authType,
Expand Down Expand Up @@ -163,9 +163,9 @@ public InstanceData getInstanceData(
},
executor);

InstanceData instanceData = done.get();
logger.fine(String.format("[%s] ALL FUTURES DONE", instanceName));
return instanceData;
done.addListener(
() -> logger.fine(String.format("[%s] ALL FUTURES DONE", instanceName)), executor);
return done;
}

String getApplicationName() {
Expand Down
Loading

0 comments on commit 62a72ad

Please sign in to comment.