Skip to content

Commit

Permalink
fix: Pause connection requests after certificate expires until refres…
Browse files Browse the repository at this point in the history
…h completes.
  • Loading branch information
hessjcg committed Nov 2, 2023
1 parent 60e5344 commit 8eba0b2
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 1 deletion.
53 changes: 52 additions & 1 deletion core/src/main/java/com/google/cloud/sql/core/Refresher.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ ConnectionInfo getConnectionInfo(long timeoutMs) {
if (closed) {
throw new IllegalStateException("Named connection closed");
}
f = current;
f = currentConnectionInfo();
}

try {
Expand Down Expand Up @@ -127,6 +127,57 @@ ConnectionInfo getConnectionInfo(long timeoutMs) {
}
}

/**
* Returns the current connection info, checking if it has expired and forcing a refresh if
* necessary.
*/
private ListenableFuture<ConnectionInfo> currentConnectionInfo() {
synchronized (instanceDataGuard) {

// If current is done, but the data has expired, then force refresh (which will balk if a
// refresh is already running) and make this and future requests to for ConnectionInfo wait on
// the refresh operation to complete.
if (current.isDone()) {
Instant expiration;
try {

// Get the currentInstanceData expiration date. This will throw an ExecutionException if
// currentInstanceData future has failed.
expiration = current.get().getExpiration();

if (expiration == null || expiration.isBefore(Instant.now())) {
// currentInstanceData is invalid. forceRefresh (this sets nextInstanceData)
// and set currentInstanceData to nextInstanceData so that subsequent calls
// to getInstanceData() will block until the refresh succeeds.
logger.log(
Level.FINE,
String.format(
"[%s] Instance data is invalid because the certificate expired.", name));
// Current is invalid, so we replace it with the in-progress refresh
// attempt in this.next.
forceRefresh();
current = next;
}
} catch (ExecutionException | InterruptedException e) {
// currentInstanceData has a failed future, so currentInstanceData is invalid.
// forceRefresh (this sets nextInstanceData) and set currentInstanceData to
// nextInstanceData so that subsequent calls to getInstanceData() will block until the
// refresh succeeds.
logger.log(
Level.FINE,
String.format("[%s] Instance data is invalid due to a failed refresh attempt.", name),
e);
// Current is invalid, so we replace it with the now in-progress refresh
// attempt in this.next.
forceRefresh();
current = next;
}
}

return current;
}
}

/**
* Attempts to force a new refresh of the instance data. May fail if called too frequently or if a
* new refresh is already in progress. If successful, other methods will block until refresh has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.KeyManagerFactory;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -287,6 +288,13 @@ public void testCloudSqlRefreshesExpiredData() throws Exception {
Thread.sleep(10);
}

// Now that the InstanceData has expired, this getSslData should pause until new data
// has been retrieved. In this case, the new data has not yet been retrieved, so the operation
// should time out after 100ms and throw an exception.
assertThrows(
RuntimeException.class, () -> connectionInfoCache.getConnectionMetadata(TEST_TIMEOUT_MS));
assertThat(refreshCount.get()).isEqualTo(1);

// Allow the second refresh operation to complete
refresh1.proceed();
refresh1.waitForPauseToEnd(1000L);
Expand Down Expand Up @@ -479,13 +487,27 @@ public void testRefreshRetriesOnAfterFailedAttempts() throws Exception {
Thread.sleep(10);
}

// Start a thread to getSslData(). This will eventually return after the
// failed attempts.
AtomicReference<ConnectionMetadata> earlyFetchAttempt = new AtomicReference<>();
Thread t =
new Thread(
() ->
earlyFetchAttempt.set(connectionInfoCache.getConnectionMetadata(TEST_TIMEOUT_MS)));
t.start();

// Orchestrate the failed attempts

// Allow the second refresh operation to complete
badRequest1.proceed();
badRequest1.waitForPauseToEnd(5000);
badRequest1.waitForCondition(() -> refreshCount.get() == 2, 2000);

// Now that the InstanceData has expired, this getSslData should pause until new data
// has been retrieved. In this case, the new data has not yet been retrieved, so the operation
// should time out after 10ms.
assertThrows(RuntimeException.class, () -> connectionInfoCache.getConnectionMetadata(10));

// Allow the second bad request completes
badRequest2.proceed();
badRequest2.waitForCondition(() -> refreshCount.get() == 3, 2000);
Expand All @@ -500,6 +522,13 @@ public void testRefreshRetriesOnAfterFailedAttempts() throws Exception {
connectionInfoCache.getConnectionMetadata(TEST_TIMEOUT_MS).getKeyManagerFactory()
== info.getSslData().getKeyManagerFactory(),
2000);

// Wait for the thread to exit, meaning getSslData() finally returned
// after several failed attempts. Assert that the correct InstanceData
// eventually returned.
t.join();
assertThat(earlyFetchAttempt.get().getKeyManagerFactory())
.isSameInstanceAs(info.getSslData().getKeyManagerFactory());
}

@Test
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/java/com/google/cloud/sql/core/RefresherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -202,6 +203,12 @@ public void testCloudSqlRefreshesExpiredData() throws Exception {
Thread.sleep(10);
}

// Now that the InstanceData has expired, this getSslData should pause until new data
// has been retrieved. In this case, the new data has not yet been retrieved, so the operation
// should time out after 100ms and throw an exception.
assertThrows(RuntimeException.class, () -> r.getConnectionInfo(TEST_TIMEOUT_MS));
assertThat(refreshCount.get()).isEqualTo(1);

// Allow the second refresh operation to complete
refresh1.proceed();
refresh1.waitForPauseToEnd(1000L);
Expand Down Expand Up @@ -372,13 +379,24 @@ public void testRefreshRetriesOnAfterFailedAttempts() throws Exception {
Thread.sleep(10);
}

// Start a thread to getSslData(). This will eventually return after the
// failed attempts.
AtomicReference<ConnectionInfo> earlyFetchAttempt = new AtomicReference<>();
Thread t = new Thread(() -> earlyFetchAttempt.set(r.getConnectionInfo(TEST_TIMEOUT_MS)));
t.start();

// Orchestrate the failed attempts

// Allow the second refresh operation to complete
badRequest1.proceed();
badRequest1.waitForPauseToEnd(5000);
badRequest1.waitForCondition(() -> refreshCount.get() == 2, 2000);

// Now that the InstanceData has expired, this getSslData should pause until new data
// has been retrieved. In this case, the new data has not yet been retrieved, so the operation
// should time out after 10ms.
assertThrows(RuntimeException.class, () -> r.getConnectionInfo(10));

// Allow the second bad request completes
badRequest2.proceed();
badRequest2.waitForCondition(() -> refreshCount.get() == 3, 2000);
Expand All @@ -389,6 +407,12 @@ public void testRefreshRetriesOnAfterFailedAttempts() throws Exception {

// Try getSslData() again, and assert the refresh operation eventually completes.
goodRequest.waitForCondition(() -> r.getConnectionInfo(TEST_TIMEOUT_MS) == data, 2000);

// Wait for the thread to exit, meaning getSslData() finally returned
// after several failed attempts. Assert that the correct InstanceData
// eventually returned.
t.join();
assertThat(earlyFetchAttempt.get()).isSameInstanceAs(data);
}

@Test
Expand Down

0 comments on commit 8eba0b2

Please sign in to comment.