Skip to content

Commit

Permalink
feat: Add public API to configure connectors. Fixes #1226
Browse files Browse the repository at this point in the history
  • Loading branch information
hessjcg committed Nov 7, 2023
1 parent c7b599a commit 420e7e1
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 73 deletions.
61 changes: 61 additions & 0 deletions core/src/main/java/com/google/cloud/sql/ConnectorRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.sql;

import com.google.cloud.sql.core.InternalConnectorRegistry;

/** Configure the CloudSQL JDBC Connector. */
public final class ConnectorRegistry {

/**
* Register a named connection so that it can later be referenced by name in a JDBC or R2DBC URL.
*
* @param name the named connection name.
* @param config the full configuration of the connection.
*/
public static void register(String name, ConnectorConfig config) {
InternalConnectorRegistry.getInstance().register(name, config);
}

/**
* Close a named connector. This will stop all background credential refresh processes. All future
* attempts to connect via this named connection will fail.
*
* @param name the name of the connector to close.
*/
public static void close(String name) {
InternalConnectorRegistry.getInstance().close(name);
}

/**
* Shutdown the entire CloudSQL JDBC Connector. This will stop all background threads. All future
* attempts to connect to a CloudSQL database will fail.
*/
public static void shutdown() {
InternalConnectorRegistry.shutdownInstance();
}

/**
* Adds an external application name to the user agent string for tracking. This is known to be
* used by the spring-cloud-gcp project.
*
* @throws IllegalStateException if the SQLAdmin client has already been initialized
*/
public static void addArtifactId(String artifactId) {
InternalConnectorRegistry.getInstance().addArtifactId(artifactId);
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/com/google/cloud/sql/core/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,9 @@ private DefaultConnectionInfoCache createConnectionInfo(ConnectionConfig config)
return new DefaultConnectionInfoCache(
config, adminApi, instanceCredentialFactory, executor, localKeyPair, minRefreshDelayMs);
}

public void close() {
this.instances.forEach((key, c) -> c.close());
this.instances.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,8 @@ ListenableFuture<ConnectionInfo> getCurrent() {
public CloudSqlInstanceName getInstanceName() {
return instanceName;
}

void close() {
refresher.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ public static synchronized InternalConnectorRegistry getInstance() {
return internalConnectorRegistry;
}

/**
* Calls shutdown on the singleton and removes the singleton. After calling shutdownInstance(),
* the next call to getInstance() will start a new singleton instance.
*/
public static synchronized void shutdownInstance() {
if (internalConnectorRegistry != null) {
InternalConnectorRegistry old = internalConnectorRegistry;
internalConnectorRegistry = null;
old.shutdown();
}
}

// TODO(kvg): Figure out better executor to use for testing
@VisibleForTesting
// Returns a listenable, scheduled executor that exits upon shutdown.
Expand All @@ -128,26 +140,38 @@ static ListeningScheduledExecutorService getDefaultExecutor() {
}

/**
* Creates a socket representing a connection to a Cloud SQL instance.
* Internal use only: Creates a socket representing a connection to a Cloud SQL instance.
*
* <p>Depending on the given properties, it may return either a SSL Socket or a Unix Socket.
*
* @param config Configuration used to configure the connection.
* @param config used to configure the connection.
* @return the newly created Socket.
* @throws IOException if error occurs during socket creation.
*/
public Socket connect(ConnectionConfig config) throws IOException, InterruptedException {
// Validate parameters
Preconditions.checkArgument(
config.getCloudSqlInstance() != null,
"cloudSqlInstance property not set. Please specify this property in the JDBC URL or the "
+ "connection Properties with value in form \"project:region:instance\"");
if (config.getNamedConnector() != null) {
return getNamedConnector(config.getNamedConnector()).connect(config);
} else {
// Validate parameters
Preconditions.checkArgument(
config.getCloudSqlInstance() != null,
"cloudSqlInstance property not set. Please specify this property in the JDBC URL or the "
+ "connection Properties with value in form \"project:region:instance\"");

return getConnector(config).connect(config);
return getConnector(config).connect(config);
}
}

public ConnectionMetadata getConnectionMetadata(ConnectionConfig config) throws IOException {
return getConnector(config).getConnection(config).getConnectionMetadata(refreshTimeoutMs);
/** Internal use only: Returns ConnectionMetadata for a connection. */
public ConnectionMetadata getConnectionMetadata(ConnectionConfig config) {
if (config.getNamedConnector() != null) {
return getNamedConnector(config.getNamedConnector())
.getConnection(config)
.getConnectionMetadata(refreshTimeoutMs);

} else {
return getConnector(config).getConnection(config).getConnectionMetadata(refreshTimeoutMs);
}
}

private static KeyPair generateRsaKeyPair() {
Expand Down Expand Up @@ -228,7 +252,7 @@ public static void setApplicationName(String applicationName) {
}

private Connector getConnector(ConnectionConfig config) {
return connectors.computeIfAbsent(
return unnamedConnectors.computeIfAbsent(
config.getConnectorConfig(), k -> createConnector(config.getConnectorConfig()));
}

Expand Down Expand Up @@ -263,4 +287,39 @@ private Connector createConnector(ConnectorConfig config) {
refreshTimeoutMs,
serverProxyPort);
}

/** Register the configuration for a named connector. */
public void register(String name, ConnectorConfig config) {
if (this.namedConnectors.containsKey(name)) {
throw new IllegalArgumentException("Named connection " + name + " exists.");
}
this.namedConnectors.put(name, createConnector(config));
}

/** Close a named connector, stopping the refresh process and removing it from the registry. */
public void close(String name) {
Connector connector = namedConnectors.remove(name);
if (connector == null) {
throw new IllegalArgumentException("Named connection " + name + " does not exist.");
}
connector.close();
}

/** Shutdown all connectors and remove the singleton instance. */
public void shutdown() {
this.unnamedConnectors.forEach(
(key, c) -> {
c.close();
});
this.unnamedConnectors.clear();
this.executor.shutdown();
}

private Connector getNamedConnector(String name) {
Connector connector = namedConnectors.get(name);
if (connector == null) {
throw new IllegalArgumentException("Named connection " + name + " does not exist.");
}
return connector;
}
}
41 changes: 35 additions & 6 deletions core/src/main/java/com/google/cloud/sql/core/Refresher.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class Refresher {
ConnectionInfo getConnectionInfo(long timeoutMs) {
ListenableFuture<ConnectionInfo> f;
synchronized (connectionInfoGuard) {
if (closed) {
throw new IllegalStateException("Named connection closed");
}
f = current;
}

Expand Down Expand Up @@ -128,6 +131,9 @@ ConnectionInfo getConnectionInfo(long timeoutMs) {
*/
void forceRefresh() {
synchronized (connectionInfoGuard) {
if (closed) {
throw new IllegalStateException("Named connection closed");
}
// Don't force a refresh until the current refresh operation
// has produced a successful refresh.
if (refreshRunning) {
Expand Down Expand Up @@ -210,10 +216,11 @@ private ListenableFuture<ConnectionInfo> handleRefreshResult(

// Now update nextInstanceData to perform a refresh after the
// scheduled delay
next =
Futures.scheduleAsync(
this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor);

if (!closed) {
next =
Futures.scheduleAsync(
this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor);
}
// Resolves to an T immediately
return current;
}
Expand All @@ -226,14 +233,36 @@ private ListenableFuture<ConnectionInfo> handleRefreshResult(
e);
synchronized (connectionInfoGuard) {
currentRefreshFailure = e;
next = this.startRefreshAttempt();

if (!closed) {
next = this.startRefreshAttempt();
}
// Resolves after the next successful refresh attempt.
return next;
}
}
}

void close() {
synchronized (connectionInfoGuard) {
if (closed) {
return;
}

// Cancel any in-progress requests
if (!this.current.isDone()) {
this.current.cancel(true);
}
if (!this.next.isDone()) {
this.next.cancel(true);
}

this.current =
Futures.immediateFailedFuture(new RuntimeException("Named connection is closed."));

this.closed = true;
}
}

ListenableFuture<ConnectionInfo> getNext() {
synchronized (connectionInfoGuard) {
return this.next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,68 @@ public void testGetPreferredIpTypesThrowsException() {
() -> connectionInfoCache.getConnectionMetadata(TEST_TIMEOUT_MS));
}

@Test
public void testClosedCloudSqlInstanceDataThrowsException() throws Exception {
TestDataSupplier instanceDataSupplier = new TestDataSupplier(false);
// initialize instance after mocks are set up
DefaultConnectionInfoCache instance =
new DefaultConnectionInfoCache(
new ConnectionConfig.Builder().withCloudSqlInstance("project:region:instance").build(),
instanceDataSupplier,
stubCredentialFactory,
executorService,
keyPairFuture,
MIN_REFERSH_DELAY_MS);
instance.close();

assertThrows(
IllegalStateException.class, () -> instance.getConnectionMetadata(TEST_TIMEOUT_MS));
assertThrows(IllegalStateException.class, () -> instance.forceRefresh());
}

@Test
public void testClosedCloudSqlInstanceDataStopsRefreshTasks() throws Exception {
ConnectionInfo initialData =
new ConnectionInfo(
null, new SslData(null, null, null), Instant.now().plus(1, ChronoUnit.HOURS));

AtomicInteger refreshCount = new AtomicInteger();
final PauseCondition refresh0 = new PauseCondition();

DefaultConnectionInfoCache instance =
new DefaultConnectionInfoCache(
new ConnectionConfig.Builder().withCloudSqlInstance("project:region:instance").build(),
(instanceName, accessTokenSupplier, authType, executor, keyPair) -> {
int c = refreshCount.get();
if (c == 0) {
refresh0.pause();
}
refreshCount.incrementAndGet();
return Futures.immediateFuture(initialData);
},
stubCredentialFactory,
executorService,
keyPairFuture,
MIN_REFERSH_DELAY_MS);

// Wait for the first refresh attempt to complete.
refresh0.proceed();
refresh0.waitForPauseToEnd(TEST_TIMEOUT_MS);

// Assert that refresh gets instance data before it is closed
refresh0.waitForCondition(() -> refreshCount.get() == 1, TEST_TIMEOUT_MS);

// Assert that the next refresh task is scheduled in the future
assertThat(instance.getNext().isDone()).isFalse();

// Close the instance
instance.close();

// Assert that the next refresh task is canceled
assertThat(instance.getNext().isDone()).isTrue();
assertThat(instance.getNext().isCancelled()).isTrue();
}

private ListeningScheduledExecutorService newTestExecutor() {
ScheduledThreadPoolExecutor executor =
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);
Expand Down
Loading

0 comments on commit 420e7e1

Please sign in to comment.