Skip to content

Commit

Permalink
feat: Add public API to configure connectors. Fixes #1226
Browse files Browse the repository at this point in the history
  • Loading branch information
hessjcg committed Nov 7, 2023
1 parent a94f358 commit d52c0f4
Show file tree
Hide file tree
Showing 14 changed files with 594 additions and 75 deletions.
61 changes: 61 additions & 0 deletions core/src/main/java/com/google/cloud/sql/ConnectorRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.sql;

import com.google.cloud.sql.core.InternalConnectorRegistry;

/** Configure the CloudSQL JDBC Connector. */
public final class ConnectorRegistry {

/**
* Register a named connection so that it can later be referenced by name in a JDBC or R2DBC URL.
*
* @param name the named connection name.
* @param config the full configuration of the connection.
*/
public static void register(String name, ConnectorConfig config) {
InternalConnectorRegistry.getInstance().register(name, config);
}

/**
* Close a named connector. This will stop all background credential refresh processes. All future
* attempts to connect via this named connection will fail.
*
* @param name the name of the connector to close.
*/
public static void close(String name) {
InternalConnectorRegistry.getInstance().close(name);
}

/**
* Shutdown the entire CloudSQL JDBC Connector. This will stop all background threads. All future
* attempts to connect to a CloudSQL database will fail.
*/
public static void shutdown() {
InternalConnectorRegistry.shutdownInstance();
}

/**
* Adds an external application name to the user agent string for tracking. This is known to be
* used by the spring-cloud-gcp project.
*
* @throws IllegalStateException if the SQLAdmin client has already been initialized
*/
public static void addArtifactId(String artifactId) {
InternalConnectorRegistry.getInstance().addArtifactId(artifactId);
}
}
12 changes: 12 additions & 0 deletions core/src/main/java/com/google/cloud/sql/core/ConnectionConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ private ConnectionConfig(
this.authType = authType;
}

/** Creates a new instance of the ConnectionConfig with an updated connectorConfig. */
public ConnectionConfig withConnectorConfig(ConnectorConfig config) {
return new ConnectionConfig(
cloudSqlInstance,
namedConnector,
unixSocketPath,
ipTypes,
authType,
unixSocketPathSuffix,
config);
}

public String getNamedConnector() {
return namedConnector;
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/com/google/cloud/sql/core/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.sql.core;

import com.google.cloud.sql.ConnectorConfig;
import com.google.cloud.sql.CredentialFactory;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
Expand Down Expand Up @@ -43,15 +44,18 @@ class Connector {
new ConcurrentHashMap<>();
private final long refreshTimeoutMs;
private final int serverProxyPort;
private final ConnectorConfig config;

Connector(
ConnectorConfig config,
DefaultConnectionInfoRepository adminApi,
CredentialFactory instanceCredentialFactory,
ListeningScheduledExecutorService executor,
ListenableFuture<KeyPair> localKeyPair,
long minRefreshDelayMs,
long refreshTimeoutMs,
int serverProxyPort) {
this.config = config;
this.adminApi = adminApi;
this.instanceCredentialFactory = instanceCredentialFactory;
this.executor = executor;
Expand All @@ -61,6 +65,10 @@ class Connector {
this.serverProxyPort = serverProxyPort;
}

public ConnectorConfig getConfig() {
return config;
}

/** Extracts the Unix socket argument from specified properties object. If unset, returns null. */
private String getUnixSocketArg(ConnectionConfig config) {
String unixSocketPath = config.getUnixSocketPath();
Expand Down Expand Up @@ -126,4 +134,9 @@ private DefaultConnectionInfoCache createConnectionInfo(ConnectionConfig config)
return new DefaultConnectionInfoCache(
config, adminApi, instanceCredentialFactory, executor, localKeyPair, minRefreshDelayMs);
}

public void close() {
this.instances.forEach((key, c) -> c.close());
this.instances.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,8 @@ ListenableFuture<ConnectionInfo> getCurrent() {
public CloudSqlInstanceName getInstanceName() {
return instanceName;
}

void close() {
refresher.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public final class InternalConnectorRegistry {
private static final long MIN_REFRESH_DELAY_MS = 30000; // Minimum 30 seconds between refresh.
private static InternalConnectorRegistry internalConnectorRegistry;
private final ListenableFuture<KeyPair> localKeyPair;
private final ConcurrentHashMap<ConnectorConfig, Connector> connectors =
private final ConcurrentHashMap<ConnectorConfig, Connector> unnamedConnectors =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Connector> namedConnectors = new ConcurrentHashMap<>();
private final ListeningScheduledExecutorService executor;
private final CredentialFactory credentialFactory;
private final int serverProxyPort;
Expand Down Expand Up @@ -109,6 +110,18 @@ public static synchronized InternalConnectorRegistry getInstance() {
return internalConnectorRegistry;
}

/**
* Calls shutdown on the singleton and removes the singleton. After calling shutdownInstance(),
* the next call to getInstance() will start a new singleton instance.
*/
public static synchronized void shutdownInstance() {
if (internalConnectorRegistry != null) {
InternalConnectorRegistry old = internalConnectorRegistry;
internalConnectorRegistry = null;
old.shutdown();
}
}

// TODO(kvg): Figure out better executor to use for testing
@VisibleForTesting
// Returns a listenable, scheduled executor that exits upon shutdown.
Expand All @@ -128,26 +141,39 @@ static ListeningScheduledExecutorService getDefaultExecutor() {
}

/**
* Creates a socket representing a connection to a Cloud SQL instance.
* Internal use only: Creates a socket representing a connection to a Cloud SQL instance.
*
* <p>Depending on the given properties, it may return either a SSL Socket or a Unix Socket.
*
* @param config Configuration used to configure the connection.
* @param config used to configure the connection.
* @return the newly created Socket.
* @throws IOException if error occurs during socket creation.
*/
public Socket connect(ConnectionConfig config) throws IOException, InterruptedException {
// Validate parameters
Preconditions.checkArgument(
config.getCloudSqlInstance() != null,
"cloudSqlInstance property not set. Please specify this property in the JDBC URL or the "
+ "connection Properties with value in form \"project:region:instance\"");
if (config.getNamedConnector() != null) {
Connector connector = getNamedConnector(config.getNamedConnector());
return connector.connect(config.withConnectorConfig(connector.getConfig()));
} else {
// Validate parameters
Preconditions.checkArgument(
config.getCloudSqlInstance() != null,
"cloudSqlInstance property not set. Please specify this property in the JDBC URL or the "
+ "connection Properties with value in form \"project:region:instance\"");

return getConnector(config).connect(config);
return getConnector(config).connect(config);
}
}

public ConnectionMetadata getConnectionMetadata(ConnectionConfig config) throws IOException {
return getConnector(config).getConnection(config).getConnectionMetadata(refreshTimeoutMs);
/** Internal use only: Returns ConnectionMetadata for a connection. */
public ConnectionMetadata getConnectionMetadata(ConnectionConfig config) {
if (config.getNamedConnector() != null) {
Connector connector = getNamedConnector(config.getNamedConnector());
return connector
.getConnection(config.withConnectorConfig(connector.getConfig()))
.getConnectionMetadata(refreshTimeoutMs);
} else {
return getConnector(config).getConnection(config).getConnectionMetadata(refreshTimeoutMs);
}
}

private static KeyPair generateRsaKeyPair() {
Expand Down Expand Up @@ -228,7 +254,7 @@ public static void setApplicationName(String applicationName) {
}

private Connector getConnector(ConnectionConfig config) {
return connectors.computeIfAbsent(
return unnamedConnectors.computeIfAbsent(
config.getConnectorConfig(), k -> createConnector(config.getConnectorConfig()));
}

Expand All @@ -255,6 +281,7 @@ private Connector createConnector(ConnectorConfig config) {
connectionInfoRepositoryFactory.create(credential, config);

return new Connector(
config,
adminApi,
instanceCredentialFactory,
executor,
Expand All @@ -263,4 +290,39 @@ private Connector createConnector(ConnectorConfig config) {
refreshTimeoutMs,
serverProxyPort);
}

/** Register the configuration for a named connector. */
public void register(String name, ConnectorConfig config) {
if (this.namedConnectors.containsKey(name)) {
throw new IllegalArgumentException("Named connection " + name + " exists.");
}
this.namedConnectors.put(name, createConnector(config));
}

/** Close a named connector, stopping the refresh process and removing it from the registry. */
public void close(String name) {
Connector connector = namedConnectors.remove(name);
if (connector == null) {
throw new IllegalArgumentException("Named connection " + name + " does not exist.");
}
connector.close();
}

/** Shutdown all connectors and remove the singleton instance. */
public void shutdown() {
this.unnamedConnectors.forEach(
(key, c) -> {
c.close();
});
this.unnamedConnectors.clear();
this.executor.shutdown();
}

private Connector getNamedConnector(String name) {
Connector connector = namedConnectors.get(name);
if (connector == null) {
throw new IllegalArgumentException("Named connection " + name + " does not exist.");
}
return connector;
}
}
44 changes: 38 additions & 6 deletions core/src/main/java/com/google/cloud/sql/core/Refresher.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class Refresher {
@GuardedBy("connectionInfoGuard")
private Throwable currentRefreshFailure;

@GuardedBy("connectionInfoGuard")
private boolean closed;

/**
* Create a new refresher.
*
Expand Down Expand Up @@ -92,6 +95,9 @@ class Refresher {
ConnectionInfo getConnectionInfo(long timeoutMs) {
ListenableFuture<ConnectionInfo> f;
synchronized (connectionInfoGuard) {
if (closed) {
throw new IllegalStateException("Named connection closed");
}
f = current;
}

Expand Down Expand Up @@ -128,6 +134,9 @@ ConnectionInfo getConnectionInfo(long timeoutMs) {
*/
void forceRefresh() {
synchronized (connectionInfoGuard) {
if (closed) {
throw new IllegalStateException("Named connection closed");
}
// Don't force a refresh until the current refresh operation
// has produced a successful refresh.
if (refreshRunning) {
Expand Down Expand Up @@ -210,10 +219,11 @@ private ListenableFuture<ConnectionInfo> handleRefreshResult(

// Now update nextInstanceData to perform a refresh after the
// scheduled delay
next =
Futures.scheduleAsync(
this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor);

if (!closed) {
next =
Futures.scheduleAsync(
this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor);
}
// Resolves to an T immediately
return current;
}
Expand All @@ -226,14 +236,36 @@ private ListenableFuture<ConnectionInfo> handleRefreshResult(
e);
synchronized (connectionInfoGuard) {
currentRefreshFailure = e;
next = this.startRefreshAttempt();

if (!closed) {
next = this.startRefreshAttempt();
}
// Resolves after the next successful refresh attempt.
return next;
}
}
}

void close() {
synchronized (connectionInfoGuard) {
if (closed) {
return;
}

// Cancel any in-progress requests
if (!this.current.isDone()) {
this.current.cancel(true);
}
if (!this.next.isDone()) {
this.next.cancel(true);
}

this.current =
Futures.immediateFailedFuture(new RuntimeException("Named connection is closed."));

this.closed = true;
}
}

ListenableFuture<ConnectionInfo> getNext() {
synchronized (connectionInfoGuard) {
return this.next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,29 @@ public void testConfigFromBuilder() {
assertThat(c.getConnectorConfig().getAdminServicePath()).isEqualTo(wantAdminServicePath);
assertThat(c.getUnixSocketPathSuffix()).isEqualTo(wantUnixSuffix);
}

@Test
public void testWithConnectorConfig() {
final String wantCsqlInstance = "proj:region:inst";
final String wantNamedConnector = "my-connection";

ConnectorConfig cc = new ConnectorConfig.Builder().build();

ConnectionConfig c =
new ConnectionConfig.Builder()
.withCloudSqlInstance(wantCsqlInstance)
.withNamedConnector(wantNamedConnector)
.build();

assertThat(c.getCloudSqlInstance()).isEqualTo(wantCsqlInstance);
assertThat(c.getNamedConnector()).isEqualTo(wantNamedConnector);
assertThat(c.getConnectorConfig()).isNotSameInstanceAs(cc);

ConnectionConfig c1 = c.withConnectorConfig(cc);

assertThat(c1).isNotSameInstanceAs(c);
assertThat(c1.getCloudSqlInstance()).isEqualTo(wantCsqlInstance);
assertThat(c1.getNamedConnector()).isEqualTo(wantNamedConnector);
assertThat(c1.getConnectorConfig()).isSameInstanceAs(cc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private Connector newConnector(ConnectorConfig config, int port) {
new StubConnectionInfoRepositoryFactory(fakeSuccessHttpTransport(Duration.ofSeconds(0)));
Connector connector =
new Connector(
config,
factory.create(credentialFactory.create(), config),
credentialFactory,
defaultExecutor,
Expand Down
Loading

0 comments on commit d52c0f4

Please sign in to comment.