Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve handling of futures and threads during refresh. #1573

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 105 additions & 36 deletions core/src/main/java/com/google/cloud/sql/core/CloudSqlInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.security.KeyPair;
Expand Down Expand Up @@ -68,7 +67,10 @@ class CloudSqlInstance {
private ListenableFuture<InstanceData> nextInstanceData;

@GuardedBy("instanceDataGuard")
private boolean forceRefreshRunning;
private boolean refreshRunning;

@GuardedBy("instanceDataGuard")
private Throwable currentRefreshFailure;

/**
* Initializes a new Cloud SQL instance based on the given connection name.
Expand Down Expand Up @@ -100,26 +102,48 @@ class CloudSqlInstance {
}

synchronized (instanceDataGuard) {
this.currentInstanceData = executor.submit(this::performRefresh);
this.currentInstanceData = this.startRefreshAttempt();
this.nextInstanceData = currentInstanceData;
}
}

/**
* Returns the current data related to the instance from {@link #performRefresh()}. May block if
* no valid data is currently available.
* Returns the current data related to the instance from {@link #startRefreshAttempt()}. May block
* if no valid data is currently available. This method is called by an application thread when it
* is trying to create a new connection to the database. (It is not called by a
* ListeningScheduledExecutorService task.) So it is OK to block waiting for a future to complete.
*
* <p>When no refresh attempt is in progress, this returns immediately. Otherwise, it waits up to
* timeoutMs milliseconds. If a refresh attempt succeeds, returns immediately at the end of that
* successful attempt. If no attempts succeed within the timeout, throws a RuntimeException with
* the exception from the last failed refresh attempt as the cause.
*/
private InstanceData getInstanceData(long timeoutMs) {
ListenableFuture<InstanceData> instanceDataFuture;
synchronized (instanceDataGuard) {
instanceDataFuture = currentInstanceData;
}

try {
return Uninterruptibles.getUninterruptibly(
instanceDataFuture, timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
throw new RuntimeException(ex);
} catch (ExecutionException ex) {
return instanceDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
synchronized (instanceDataGuard) {
if (currentRefreshFailure != null) {
throw new RuntimeException(
String.format(
"Unable to get valid instance data within %d ms."
+ " Last refresh attempt failed:",
timeoutMs)
+ currentRefreshFailure.getMessage(),
currentRefreshFailure);
}
}
throw new RuntimeException(
String.format(
"Unable to get valid instance data within %d ms. No refresh has completed.",
timeoutMs),
e);
} catch (ExecutionException | InterruptedException ex) {
Throwable cause = ex.getCause();
Throwables.throwIfUnchecked(cause);
throw new RuntimeException(cause);
Expand Down Expand Up @@ -165,43 +189,77 @@ String getPreferredIp(List<String> preferredTypes, long timeoutMs) {
*/
void forceRefresh() {
synchronized (instanceDataGuard) {
// Don't force a refresh until the current forceRefresh operation
// Don't force a refresh until the current refresh operation
// has produced a successful refresh.
if (forceRefreshRunning) {
if (refreshRunning) {
return;
}

forceRefreshRunning = true;
nextInstanceData.cancel(false);
logger.fine(
String.format(
"[%s] Force Refresh: the next refresh operation was cancelled."
+ " Scheduling new refresh operation immediately.",
instanceName));
nextInstanceData = executor.submit(this::performRefresh);
nextInstanceData = this.startRefreshAttempt();
}
}

/**
* Triggers an update of internal information obtained from the Cloud SQL Admin API. Replaces the
* value of currentInstanceData and schedules the next refresh shortly before the information
* would expire.
* Triggers an update of internal information obtained from the Cloud SQL Admin API, returning a
* future that resolves once a valid InstanceData has been acquired. This sets up a chain of
* futures that will 1. Acquire a rate limiter. 2. Attempt to fetch instance data. 3. Schedule the
* next attempt to get instance data based on the success/failure of this attempt.
*
* @see com.google.cloud.sql.core.CloudSqlInstance#handleRefreshResult(
* com.google.common.util.concurrent.ListenableFuture)
*/
private InstanceData performRefresh() throws InterruptedException, ExecutionException {
logger.fine(
String.format("[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
private ListenableFuture<InstanceData> startRefreshAttempt() {
// As soon as we begin submitting refresh attempts to the executor, mark a refresh
// as "in-progress" so that subsequent forceRefresh() calls balk until this one completes.
synchronized (instanceDataGuard) {
refreshRunning = true;
enocom marked this conversation as resolved.
Show resolved Hide resolved
}

// To avoid unreasonable SQL Admin API usage, use a rate limit to throttle our usage.
//noinspection UnstableApiUsage
forcedRenewRateLimiter.acquire();
logger.fine(
String.format(
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
instanceName));
ListenableFuture<?> rateLimit =
executor.submit(
() -> {
logger.fine(
String.format(
"[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
//noinspection UnstableApiUsage
forcedRenewRateLimiter.acquire();
logger.fine(
String.format(
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
instanceName));
},
executor);

// Once rate limiter is done, attempt to getInstanceData.
ListenableFuture<InstanceData> dataFuture =
Futures.whenAllComplete(rateLimit)
.callAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: why callAsync and not transformAsync?

() ->
instanceDataSupplier.getInstanceData(
this.instanceName,
this.accessTokenSupplier,
this.authType,
executor,
keyPair),
executor);

// Finally, reschedule refresh after getInstanceData is complete.
return Futures.whenAllComplete(dataFuture)
.callAsync(() -> handleRefreshResult(dataFuture), executor);
}

private ListenableFuture<InstanceData> handleRefreshResult(
enocom marked this conversation as resolved.
Show resolved Hide resolved
ListenableFuture<InstanceData> dataFuture) {
try {
InstanceData data =
instanceDataSupplier.getInstanceData(
this.instanceName, this.accessTokenSupplier, this.authType, executor, keyPair);
// This does not block, because it only gets called when dataFuture has completed.
// This will throw an exception if the refresh attempt has failed.
InstanceData data = dataFuture.get();

logger.fine(
String.format(
Expand All @@ -220,13 +278,21 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
.toString()));

synchronized (instanceDataGuard) {
// Refresh completed successfully, reset forceRefreshRunning.
refreshRunning = false;
currentRefreshFailure = null;
currentInstanceData = Futures.immediateFuture(data);

// Now update nextInstanceData to perform a refresh after the
// scheduled delay
nextInstanceData =
executor.schedule(this::performRefresh, secondsToRefresh, TimeUnit.SECONDS);
// Refresh completed successfully, reset forceRefreshRunning.
forceRefreshRunning = false;
Futures.scheduleAsync(
this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor);

// Resolves to an InstanceData immediately
return currentInstanceData;
}
return data;

} catch (ExecutionException | InterruptedException e) {
logger.log(
Level.FINE,
Expand All @@ -235,9 +301,12 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
instanceName),
e);
synchronized (instanceDataGuard) {
nextInstanceData = executor.submit(this::performRefresh);
currentRefreshFailure = e;
nextInstanceData = this.startRefreshAttempt();

// Resolves after the next successful refresh attempt.
return nextInstanceData;
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ interface InstanceDataSupplier {
* @throws ExecutionException if an exception is thrown during execution.
* @throws InterruptedException if the executor is interrupted.
*/
InstanceData getInstanceData(
ListenableFuture<InstanceData> getInstanceData(
CloudSqlInstanceName instanceName,
AccessTokenSupplier accessTokenSupplier,
AuthType authType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private String generatePublicKeyCert(KeyPair keyPair) {
* @throws InterruptedException if the executor is interrupted.
*/
@Override
public InstanceData getInstanceData(
public ListenableFuture<InstanceData> getInstanceData(
CloudSqlInstanceName instanceName,
AccessTokenSupplier accessTokenSupplier,
AuthType authType,
Expand Down Expand Up @@ -163,9 +163,9 @@ public InstanceData getInstanceData(
},
executor);

InstanceData instanceData = done.get();
logger.fine(String.format("[%s] ALL FUTURES DONE", instanceName));
return instanceData;
done.addListener(
() -> logger.fine(String.format("[%s] ALL FUTURES DONE", instanceName)), executor);
return done;
}

String getApplicationName() {
Expand Down
Loading