From d52c0f49062a3e6641517aeaf5f969077d39d46f Mon Sep 17 00:00:00 2001 From: Jonathan Hess Date: Thu, 5 Oct 2023 11:19:22 -0600 Subject: [PATCH] feat: Add public API to configure connectors. Fixes #1226 --- .../google/cloud/sql/ConnectorRegistry.java | 61 +++++++++ .../cloud/sql/core/ConnectionConfig.java | 12 ++ .../com/google/cloud/sql/core/Connector.java | 13 ++ .../sql/core/DefaultConnectionInfoCache.java | 4 + .../sql/core/InternalConnectorRegistry.java | 86 +++++++++++-- .../com/google/cloud/sql/core/Refresher.java | 44 ++++++- .../cloud/sql/core/ConnectionConfigTest.java | 25 ++++ .../google/cloud/sql/core/ConnectorTest.java | 1 + .../core/DefaultConnectionInfoCacheTest.java | 62 +++++++++ .../core/InternalConnectorRegistryTest.java | 93 ++++++++++++++ .../google/cloud/sql/core/RefresherTest.java | 51 ++++++++ ...ostgresNamedConnectorIntegrationTests.java | 121 ++++++++++++++++++ .../sql/core/CloudSqlConnectionFactory.java | 33 ++--- .../core/GcpConnectionFactoryProvider.java | 63 ++++----- 14 files changed, 594 insertions(+), 75 deletions(-) create mode 100644 core/src/main/java/com/google/cloud/sql/ConnectorRegistry.java create mode 100644 jdbc/postgres/src/test/java/com/google/cloud/sql/postgres/JdbcPostgresNamedConnectorIntegrationTests.java diff --git a/core/src/main/java/com/google/cloud/sql/ConnectorRegistry.java b/core/src/main/java/com/google/cloud/sql/ConnectorRegistry.java new file mode 100644 index 000000000..a26140f1e --- /dev/null +++ b/core/src/main/java/com/google/cloud/sql/ConnectorRegistry.java @@ -0,0 +1,61 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.sql; + +import com.google.cloud.sql.core.InternalConnectorRegistry; + +/** Configure the CloudSQL JDBC Connector. */ +public final class ConnectorRegistry { + + /** + * Register a named connection so that it can later be referenced by name in a JDBC or R2DBC URL. + * + * @param name the named connection name. + * @param config the full configuration of the connection. + */ + public static void register(String name, ConnectorConfig config) { + InternalConnectorRegistry.getInstance().register(name, config); + } + + /** + * Close a named connector. This will stop all background credential refresh processes. All future + * attempts to connect via this named connection will fail. + * + * @param name the name of the connector to close. + */ + public static void close(String name) { + InternalConnectorRegistry.getInstance().close(name); + } + + /** + * Shutdown the entire CloudSQL JDBC Connector. This will stop all background threads. All future + * attempts to connect to a CloudSQL database will fail. + */ + public static void shutdown() { + InternalConnectorRegistry.shutdownInstance(); + } + + /** + * Adds an external application name to the user agent string for tracking. This is known to be + * used by the spring-cloud-gcp project. + * + * @throws IllegalStateException if the SQLAdmin client has already been initialized + */ + public static void addArtifactId(String artifactId) { + InternalConnectorRegistry.getInstance().addArtifactId(artifactId); + } +} diff --git a/core/src/main/java/com/google/cloud/sql/core/ConnectionConfig.java b/core/src/main/java/com/google/cloud/sql/core/ConnectionConfig.java index e841dff84..680163179 100644 --- a/core/src/main/java/com/google/cloud/sql/core/ConnectionConfig.java +++ b/core/src/main/java/com/google/cloud/sql/core/ConnectionConfig.java @@ -167,6 +167,18 @@ private ConnectionConfig( this.authType = authType; } + /** Creates a new instance of the ConnectionConfig with an updated connectorConfig. */ + public ConnectionConfig withConnectorConfig(ConnectorConfig config) { + return new ConnectionConfig( + cloudSqlInstance, + namedConnector, + unixSocketPath, + ipTypes, + authType, + unixSocketPathSuffix, + config); + } + public String getNamedConnector() { return namedConnector; } diff --git a/core/src/main/java/com/google/cloud/sql/core/Connector.java b/core/src/main/java/com/google/cloud/sql/core/Connector.java index 1ff9fc2e2..ca934b7fb 100644 --- a/core/src/main/java/com/google/cloud/sql/core/Connector.java +++ b/core/src/main/java/com/google/cloud/sql/core/Connector.java @@ -16,6 +16,7 @@ package com.google.cloud.sql.core; +import com.google.cloud.sql.ConnectorConfig; import com.google.cloud.sql.CredentialFactory; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -43,8 +44,10 @@ class Connector { new ConcurrentHashMap<>(); private final long refreshTimeoutMs; private final int serverProxyPort; + private final ConnectorConfig config; Connector( + ConnectorConfig config, DefaultConnectionInfoRepository adminApi, CredentialFactory instanceCredentialFactory, ListeningScheduledExecutorService executor, @@ -52,6 +55,7 @@ class Connector { long minRefreshDelayMs, long refreshTimeoutMs, int serverProxyPort) { + this.config = config; this.adminApi = adminApi; this.instanceCredentialFactory = instanceCredentialFactory; this.executor = executor; @@ -61,6 +65,10 @@ class Connector { this.serverProxyPort = serverProxyPort; } + public ConnectorConfig getConfig() { + return config; + } + /** Extracts the Unix socket argument from specified properties object. If unset, returns null. */ private String getUnixSocketArg(ConnectionConfig config) { String unixSocketPath = config.getUnixSocketPath(); @@ -126,4 +134,9 @@ private DefaultConnectionInfoCache createConnectionInfo(ConnectionConfig config) return new DefaultConnectionInfoCache( config, adminApi, instanceCredentialFactory, executor, localKeyPair, minRefreshDelayMs); } + + public void close() { + this.instances.forEach((key, c) -> c.close()); + this.instances.clear(); + } } diff --git a/core/src/main/java/com/google/cloud/sql/core/DefaultConnectionInfoCache.java b/core/src/main/java/com/google/cloud/sql/core/DefaultConnectionInfoCache.java index 3fa30166e..388d3e976 100644 --- a/core/src/main/java/com/google/cloud/sql/core/DefaultConnectionInfoCache.java +++ b/core/src/main/java/com/google/cloud/sql/core/DefaultConnectionInfoCache.java @@ -147,4 +147,8 @@ ListenableFuture getCurrent() { public CloudSqlInstanceName getInstanceName() { return instanceName; } + + void close() { + refresher.close(); + } } diff --git a/core/src/main/java/com/google/cloud/sql/core/InternalConnectorRegistry.java b/core/src/main/java/com/google/cloud/sql/core/InternalConnectorRegistry.java index 4ee54731a..a97b6b48b 100644 --- a/core/src/main/java/com/google/cloud/sql/core/InternalConnectorRegistry.java +++ b/core/src/main/java/com/google/cloud/sql/core/InternalConnectorRegistry.java @@ -56,8 +56,9 @@ public final class InternalConnectorRegistry { private static final long MIN_REFRESH_DELAY_MS = 30000; // Minimum 30 seconds between refresh. private static InternalConnectorRegistry internalConnectorRegistry; private final ListenableFuture localKeyPair; - private final ConcurrentHashMap connectors = + private final ConcurrentHashMap unnamedConnectors = new ConcurrentHashMap<>(); + private final ConcurrentHashMap namedConnectors = new ConcurrentHashMap<>(); private final ListeningScheduledExecutorService executor; private final CredentialFactory credentialFactory; private final int serverProxyPort; @@ -109,6 +110,18 @@ public static synchronized InternalConnectorRegistry getInstance() { return internalConnectorRegistry; } + /** + * Calls shutdown on the singleton and removes the singleton. After calling shutdownInstance(), + * the next call to getInstance() will start a new singleton instance. + */ + public static synchronized void shutdownInstance() { + if (internalConnectorRegistry != null) { + InternalConnectorRegistry old = internalConnectorRegistry; + internalConnectorRegistry = null; + old.shutdown(); + } + } + // TODO(kvg): Figure out better executor to use for testing @VisibleForTesting // Returns a listenable, scheduled executor that exits upon shutdown. @@ -128,26 +141,39 @@ static ListeningScheduledExecutorService getDefaultExecutor() { } /** - * Creates a socket representing a connection to a Cloud SQL instance. + * Internal use only: Creates a socket representing a connection to a Cloud SQL instance. * *

Depending on the given properties, it may return either a SSL Socket or a Unix Socket. * - * @param config Configuration used to configure the connection. + * @param config used to configure the connection. * @return the newly created Socket. * @throws IOException if error occurs during socket creation. */ public Socket connect(ConnectionConfig config) throws IOException, InterruptedException { - // Validate parameters - Preconditions.checkArgument( - config.getCloudSqlInstance() != null, - "cloudSqlInstance property not set. Please specify this property in the JDBC URL or the " - + "connection Properties with value in form \"project:region:instance\""); + if (config.getNamedConnector() != null) { + Connector connector = getNamedConnector(config.getNamedConnector()); + return connector.connect(config.withConnectorConfig(connector.getConfig())); + } else { + // Validate parameters + Preconditions.checkArgument( + config.getCloudSqlInstance() != null, + "cloudSqlInstance property not set. Please specify this property in the JDBC URL or the " + + "connection Properties with value in form \"project:region:instance\""); - return getConnector(config).connect(config); + return getConnector(config).connect(config); + } } - public ConnectionMetadata getConnectionMetadata(ConnectionConfig config) throws IOException { - return getConnector(config).getConnection(config).getConnectionMetadata(refreshTimeoutMs); + /** Internal use only: Returns ConnectionMetadata for a connection. */ + public ConnectionMetadata getConnectionMetadata(ConnectionConfig config) { + if (config.getNamedConnector() != null) { + Connector connector = getNamedConnector(config.getNamedConnector()); + return connector + .getConnection(config.withConnectorConfig(connector.getConfig())) + .getConnectionMetadata(refreshTimeoutMs); + } else { + return getConnector(config).getConnection(config).getConnectionMetadata(refreshTimeoutMs); + } } private static KeyPair generateRsaKeyPair() { @@ -228,7 +254,7 @@ public static void setApplicationName(String applicationName) { } private Connector getConnector(ConnectionConfig config) { - return connectors.computeIfAbsent( + return unnamedConnectors.computeIfAbsent( config.getConnectorConfig(), k -> createConnector(config.getConnectorConfig())); } @@ -255,6 +281,7 @@ private Connector createConnector(ConnectorConfig config) { connectionInfoRepositoryFactory.create(credential, config); return new Connector( + config, adminApi, instanceCredentialFactory, executor, @@ -263,4 +290,39 @@ private Connector createConnector(ConnectorConfig config) { refreshTimeoutMs, serverProxyPort); } + + /** Register the configuration for a named connector. */ + public void register(String name, ConnectorConfig config) { + if (this.namedConnectors.containsKey(name)) { + throw new IllegalArgumentException("Named connection " + name + " exists."); + } + this.namedConnectors.put(name, createConnector(config)); + } + + /** Close a named connector, stopping the refresh process and removing it from the registry. */ + public void close(String name) { + Connector connector = namedConnectors.remove(name); + if (connector == null) { + throw new IllegalArgumentException("Named connection " + name + " does not exist."); + } + connector.close(); + } + + /** Shutdown all connectors and remove the singleton instance. */ + public void shutdown() { + this.unnamedConnectors.forEach( + (key, c) -> { + c.close(); + }); + this.unnamedConnectors.clear(); + this.executor.shutdown(); + } + + private Connector getNamedConnector(String name) { + Connector connector = namedConnectors.get(name); + if (connector == null) { + throw new IllegalArgumentException("Named connection " + name + " does not exist."); + } + return connector; + } } diff --git a/core/src/main/java/com/google/cloud/sql/core/Refresher.java b/core/src/main/java/com/google/cloud/sql/core/Refresher.java index 8062fe417..357b60dd1 100644 --- a/core/src/main/java/com/google/cloud/sql/core/Refresher.java +++ b/core/src/main/java/com/google/cloud/sql/core/Refresher.java @@ -55,6 +55,9 @@ class Refresher { @GuardedBy("connectionInfoGuard") private Throwable currentRefreshFailure; + @GuardedBy("connectionInfoGuard") + private boolean closed; + /** * Create a new refresher. * @@ -92,6 +95,9 @@ class Refresher { ConnectionInfo getConnectionInfo(long timeoutMs) { ListenableFuture f; synchronized (connectionInfoGuard) { + if (closed) { + throw new IllegalStateException("Named connection closed"); + } f = current; } @@ -128,6 +134,9 @@ ConnectionInfo getConnectionInfo(long timeoutMs) { */ void forceRefresh() { synchronized (connectionInfoGuard) { + if (closed) { + throw new IllegalStateException("Named connection closed"); + } // Don't force a refresh until the current refresh operation // has produced a successful refresh. if (refreshRunning) { @@ -210,10 +219,11 @@ private ListenableFuture handleRefreshResult( // Now update nextInstanceData to perform a refresh after the // scheduled delay - next = - Futures.scheduleAsync( - this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor); - + if (!closed) { + next = + Futures.scheduleAsync( + this::startRefreshAttempt, secondsToRefresh, TimeUnit.SECONDS, executor); + } // Resolves to an T immediately return current; } @@ -226,14 +236,36 @@ private ListenableFuture handleRefreshResult( e); synchronized (connectionInfoGuard) { currentRefreshFailure = e; - next = this.startRefreshAttempt(); - + if (!closed) { + next = this.startRefreshAttempt(); + } // Resolves after the next successful refresh attempt. return next; } } } + void close() { + synchronized (connectionInfoGuard) { + if (closed) { + return; + } + + // Cancel any in-progress requests + if (!this.current.isDone()) { + this.current.cancel(true); + } + if (!this.next.isDone()) { + this.next.cancel(true); + } + + this.current = + Futures.immediateFailedFuture(new RuntimeException("Named connection is closed.")); + + this.closed = true; + } + } + ListenableFuture getNext() { synchronized (connectionInfoGuard) { return this.next; diff --git a/core/src/test/java/com/google/cloud/sql/core/ConnectionConfigTest.java b/core/src/test/java/com/google/cloud/sql/core/ConnectionConfigTest.java index 7f7a916ab..d864a4100 100644 --- a/core/src/test/java/com/google/cloud/sql/core/ConnectionConfigTest.java +++ b/core/src/test/java/com/google/cloud/sql/core/ConnectionConfigTest.java @@ -114,4 +114,29 @@ public void testConfigFromBuilder() { assertThat(c.getConnectorConfig().getAdminServicePath()).isEqualTo(wantAdminServicePath); assertThat(c.getUnixSocketPathSuffix()).isEqualTo(wantUnixSuffix); } + + @Test + public void testWithConnectorConfig() { + final String wantCsqlInstance = "proj:region:inst"; + final String wantNamedConnector = "my-connection"; + + ConnectorConfig cc = new ConnectorConfig.Builder().build(); + + ConnectionConfig c = + new ConnectionConfig.Builder() + .withCloudSqlInstance(wantCsqlInstance) + .withNamedConnector(wantNamedConnector) + .build(); + + assertThat(c.getCloudSqlInstance()).isEqualTo(wantCsqlInstance); + assertThat(c.getNamedConnector()).isEqualTo(wantNamedConnector); + assertThat(c.getConnectorConfig()).isNotSameInstanceAs(cc); + + ConnectionConfig c1 = c.withConnectorConfig(cc); + + assertThat(c1).isNotSameInstanceAs(c); + assertThat(c1.getCloudSqlInstance()).isEqualTo(wantCsqlInstance); + assertThat(c1.getNamedConnector()).isEqualTo(wantNamedConnector); + assertThat(c1.getConnectorConfig()).isSameInstanceAs(cc); + } } diff --git a/core/src/test/java/com/google/cloud/sql/core/ConnectorTest.java b/core/src/test/java/com/google/cloud/sql/core/ConnectorTest.java index 53df081d2..1b62801eb 100644 --- a/core/src/test/java/com/google/cloud/sql/core/ConnectorTest.java +++ b/core/src/test/java/com/google/cloud/sql/core/ConnectorTest.java @@ -103,6 +103,7 @@ private Connector newConnector(ConnectorConfig config, int port) { new StubConnectionInfoRepositoryFactory(fakeSuccessHttpTransport(Duration.ofSeconds(0))); Connector connector = new Connector( + config, factory.create(credentialFactory.create(), config), credentialFactory, defaultExecutor, diff --git a/core/src/test/java/com/google/cloud/sql/core/DefaultConnectionInfoCacheTest.java b/core/src/test/java/com/google/cloud/sql/core/DefaultConnectionInfoCacheTest.java index 1234488ba..58b7a4700 100644 --- a/core/src/test/java/com/google/cloud/sql/core/DefaultConnectionInfoCacheTest.java +++ b/core/src/test/java/com/google/cloud/sql/core/DefaultConnectionInfoCacheTest.java @@ -585,6 +585,68 @@ public void testGetPreferredIpTypesThrowsException() { () -> connectionInfoCache.getConnectionMetadata(TEST_TIMEOUT_MS)); } + @Test + public void testClosedCloudSqlInstanceDataThrowsException() throws Exception { + TestDataSupplier instanceDataSupplier = new TestDataSupplier(false); + // initialize instance after mocks are set up + DefaultConnectionInfoCache instance = + new DefaultConnectionInfoCache( + new ConnectionConfig.Builder().withCloudSqlInstance("project:region:instance").build(), + instanceDataSupplier, + stubCredentialFactory, + executorService, + keyPairFuture, + MIN_REFERSH_DELAY_MS); + instance.close(); + + assertThrows( + IllegalStateException.class, () -> instance.getConnectionMetadata(TEST_TIMEOUT_MS)); + assertThrows(IllegalStateException.class, () -> instance.forceRefresh()); + } + + @Test + public void testClosedCloudSqlInstanceDataStopsRefreshTasks() throws Exception { + ConnectionInfo initialData = + new ConnectionInfo( + null, new SslData(null, null, null), Instant.now().plus(1, ChronoUnit.HOURS)); + + AtomicInteger refreshCount = new AtomicInteger(); + final PauseCondition refresh0 = new PauseCondition(); + + DefaultConnectionInfoCache instance = + new DefaultConnectionInfoCache( + new ConnectionConfig.Builder().withCloudSqlInstance("project:region:instance").build(), + (instanceName, accessTokenSupplier, authType, executor, keyPair) -> { + int c = refreshCount.get(); + if (c == 0) { + refresh0.pause(); + } + refreshCount.incrementAndGet(); + return Futures.immediateFuture(initialData); + }, + stubCredentialFactory, + executorService, + keyPairFuture, + MIN_REFERSH_DELAY_MS); + + // Wait for the first refresh attempt to complete. + refresh0.proceed(); + refresh0.waitForPauseToEnd(TEST_TIMEOUT_MS); + + // Assert that refresh gets instance data before it is closed + refresh0.waitForCondition(() -> refreshCount.get() == 1, TEST_TIMEOUT_MS); + + // Assert that the next refresh task is scheduled in the future + assertThat(instance.getNext().isDone()).isFalse(); + + // Close the instance + instance.close(); + + // Assert that the next refresh task is canceled + assertThat(instance.getNext().isDone()).isTrue(); + assertThat(instance.getNext().isCancelled()).isTrue(); + } + private ListeningScheduledExecutorService newTestExecutor() { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2); diff --git a/core/src/test/java/com/google/cloud/sql/core/InternalConnectorRegistryTest.java b/core/src/test/java/com/google/cloud/sql/core/InternalConnectorRegistryTest.java index a8b5516f0..ace4b3f9b 100644 --- a/core/src/test/java/com/google/cloud/sql/core/InternalConnectorRegistryTest.java +++ b/core/src/test/java/com/google/cloud/sql/core/InternalConnectorRegistryTest.java @@ -34,6 +34,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Collections; +import java.util.Properties; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -313,6 +314,98 @@ public void testGetApplicationNameFailsAfterInitialization() { () -> InternalConnectorRegistry.setApplicationName("sample-app")); } + @Test + public void registerConnection() throws IOException, InterruptedException { + InternalConnectorRegistry registry = createRegistry(PUBLIC_IP, credentialFactory); + + // Register a ConnectionConfig named "my-connection" + ConnectorConfig configWithDetails = new ConnectorConfig.Builder().build(); + registry.register("my-connection", configWithDetails); + + // Attempt to connect using the cloudSqlNamedConnection connection property + Properties connProps = new Properties(); + connProps.setProperty(ConnectionConfig.CLOUD_SQL_NAMED_CONNECTOR_PROPERTY, "my-connection"); + connProps.setProperty( + ConnectionConfig.CLOUD_SQL_INSTANCE_PROPERTY, "myProject:myRegion:myInstance"); + ConnectionConfig nameOnlyConfig = ConnectionConfig.fromConnectionProperties(connProps); + + // Assert that the socket opens correctly, indicating that ConnectorRegistry used the + // namedConnection property to find the connection configuration, and created + // a socket. + Socket socket = registry.connect(nameOnlyConfig); + assertThat(readLine(socket)).isEqualTo(SERVER_MESSAGE); + } + + @Test + public void registerConnectionFailsWithDuplicateName() throws InterruptedException { + InternalConnectorRegistry registry = createRegistry(PUBLIC_IP, credentialFactory); + // Register a ConnectionConfig named "my-connection" + ConnectorConfig configWithDetails = new ConnectorConfig.Builder().build(); + registry.register("my-connection", configWithDetails); + + // Assert that you can't register a connection with a duplicate name + assertThrows( + IllegalArgumentException.class, + () -> registry.register("my-connection", configWithDetails)); + } + + @Test + public void registerConnectionFailsWithDuplicateNameAndDifferentConfig() + throws InterruptedException { + InternalConnectorRegistry registry = createRegistry(PUBLIC_IP, credentialFactory); + + ConnectorConfig config = + new ConnectorConfig.Builder().withTargetPrincipal("joe@test.com").build(); + registry.register("my-connection", config); + + ConnectorConfig config2 = + new ConnectorConfig.Builder().withTargetPrincipal("jane@test.com").build(); + + // Assert that you can't register a connection with a duplicate name + assertThrows(IllegalArgumentException.class, () -> registry.register("my-connection", config2)); + } + + @Test + public void closeNamedConnectionFailsWhenNotFound() throws InterruptedException { + InternalConnectorRegistry registry = createRegistry(PUBLIC_IP, credentialFactory); + // Assert that you can't close a connection that doesn't exist + assertThrows(IllegalArgumentException.class, () -> registry.close("my-connection")); + } + + @Test + public void connectFailsOnClosedNamedConnection() throws InterruptedException { + InternalConnectorRegistry registry = createRegistry(PUBLIC_IP, credentialFactory); + // Register a ConnectionConfig named "my-connection" + ConnectorConfig configWithDetails = new ConnectorConfig.Builder().build(); + registry.register("my-connection", configWithDetails); + + // Close the named connection. + registry.close("my-connection"); + + // Attempt and fail to connect using the cloudSqlNamedConnection connection property + Properties connProps = new Properties(); + connProps.setProperty(ConnectionConfig.CLOUD_SQL_NAMED_CONNECTOR_PROPERTY, "my-connection"); + ConnectionConfig nameOnlyConfig = ConnectionConfig.fromConnectionProperties(connProps); + + // Assert that no connection is possible because the connector is closed. + IllegalArgumentException ex = + assertThrows(IllegalArgumentException.class, () -> registry.connect(nameOnlyConfig)); + assertThat(ex).hasMessageThat().contains("Named connection my-connection does not exist."); + } + + @Test + public void connectFailsOnUnknownNamedConnection() throws InterruptedException { + InternalConnectorRegistry registry = createRegistry(PUBLIC_IP, credentialFactory); + + // Attempt and fail to connect using the cloudSqlNamedConnection connection property + Properties connProps = new Properties(); + connProps.setProperty(ConnectionConfig.CLOUD_SQL_NAMED_CONNECTOR_PROPERTY, "my-connection"); + ConnectionConfig nameOnlyConfig = ConnectionConfig.fromConnectionProperties(connProps); + IllegalArgumentException ex = + assertThrows(IllegalArgumentException.class, () -> registry.connect(nameOnlyConfig)); + assertThat(ex).hasMessageThat().contains("Named connection my-connection does not exist."); + } + private InternalConnectorRegistry createRegistry( String ipType, CredentialFactory credentialFactory) throws InterruptedException { FakeSslServer sslServer = new FakeSslServer(); diff --git a/core/src/test/java/com/google/cloud/sql/core/RefresherTest.java b/core/src/test/java/com/google/cloud/sql/core/RefresherTest.java index 5e78a4dd6..f9316d5d9 100644 --- a/core/src/test/java/com/google/cloud/sql/core/RefresherTest.java +++ b/core/src/test/java/com/google/cloud/sql/core/RefresherTest.java @@ -391,6 +391,57 @@ public void testRefreshRetriesOnAfterFailedAttempts() throws Exception { goodRequest.waitForCondition(() -> r.getConnectionInfo(TEST_TIMEOUT_MS) == data, 2000); } + @Test + public void testClosedCloudSqlInstanceDataThrowsException() { + ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); + Refresher r = + new Refresher( + "testcase", executorService, () -> Futures.immediateFuture(data), rateLimiter); + r.close(); + + assertThrows(IllegalStateException.class, () -> r.getConnectionInfo(TEST_TIMEOUT_MS)); + assertThrows(IllegalStateException.class, () -> r.forceRefresh()); + } + + @Test + public void testClosedCloudSqlInstanceDataStopsRefreshTasks() throws Exception { + ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); + + AtomicInteger refreshCount = new AtomicInteger(); + final PauseCondition refresh0 = new PauseCondition(); + + Refresher r = + new Refresher( + "testcase", + executorService, + () -> { + int c = refreshCount.get(); + if (c == 0) { + refresh0.pause(); + } + refreshCount.incrementAndGet(); + return Futures.immediateFuture(data); + }, + rateLimiter); + + // Wait for the first refresh attempt to complete. + refresh0.proceed(); + refresh0.waitForPauseToEnd(TEST_TIMEOUT_MS); + + // Assert that refresh gets instance data before it is closed + refresh0.waitForCondition(() -> refreshCount.get() == 1, TEST_TIMEOUT_MS); + + // Assert that the next refresh task is scheduled in the future + assertThat(r.getNext().isDone()).isFalse(); + + // Close the instance + r.close(); + + // Assert that the next refresh task is canceled + assertThat(r.getNext().isDone()).isTrue(); + assertThat(r.getNext().isCancelled()).isTrue(); + } + private static class ExampleData extends ConnectionInfo { ExampleData(Instant expiration) { diff --git a/jdbc/postgres/src/test/java/com/google/cloud/sql/postgres/JdbcPostgresNamedConnectorIntegrationTests.java b/jdbc/postgres/src/test/java/com/google/cloud/sql/postgres/JdbcPostgresNamedConnectorIntegrationTests.java new file mode 100644 index 000000000..bb887c11f --- /dev/null +++ b/jdbc/postgres/src/test/java/com/google/cloud/sql/postgres/JdbcPostgresNamedConnectorIntegrationTests.java @@ -0,0 +1,121 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.sql.postgres; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; + +import com.google.cloud.sql.ConnectorConfig; +import com.google.cloud.sql.ConnectorRegistry; +import com.google.common.collect.ImmutableList; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class JdbcPostgresNamedConnectorIntegrationTests { + + private static final String CONNECTION_NAME = System.getenv("POSTGRES_IAM_CONNECTION_NAME"); + private static final String DB_NAME = System.getenv("POSTGRES_DB"); + private static final String DB_USER = System.getenv("POSTGRES_IAM_USER"); + private static final ImmutableList requiredEnvVars = + ImmutableList.of("POSTGRES_IAM_USER", "POSTGRES_DB", "POSTGRES_IAM_CONNECTION_NAME"); + @Rule public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); + + private HikariDataSource connectionPool; + + @BeforeClass + public static void checkEnvVars() { + // Check that required env vars are set + requiredEnvVars.forEach( + (varName) -> + assertWithMessage( + String.format( + "Environment variable '%s' must be set to perform these tests.", varName)) + .that(System.getenv(varName)) + .isNotEmpty()); + } + + @Before + public void setUpPool() throws SQLException { + // Register a named Cloud SQL Connector with specific configuration + ConnectorConfig namedConnectorConfig = + new ConnectorConfig.Builder() + // Set connector configuration, for example, to use service + // account impersonation, set the target principal: + // .withTargetPrincipal("example@project.iam.googleapis.com") + // .withDelegates(Arrays.asList("delegate@project.iam.googleapis.com")) + .build(); + + // Register the named connector + ConnectorRegistry.register("my-connector", namedConnectorConfig); + + // Set up URL parameters + String jdbcURL = String.format("jdbc:postgresql:///%s", DB_NAME); + Properties connProps = new Properties(); + + // Configure Postgres driver properties + connProps.setProperty("user", DB_USER); + connProps.setProperty("password", "password"); + connProps.setProperty("socketFactory", "com.google.cloud.sql.postgres.SocketFactory"); + + // Configure Cloud SQL connector properties + connProps.setProperty("cloudSqlInstance", CONNECTION_NAME); + connProps.setProperty("enableIamAuth", "true"); + + // Configure the named connector registered as "my-connector" + connProps.setProperty("cloudSqlNamedConnector", "my-connector"); + + // Initialize connection pool + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(jdbcURL); + config.setDataSourceProperties(connProps); + config.setConnectionTimeout(10000); // 10s + + this.connectionPool = new HikariDataSource(config); + } + + @Test + public void pooledConnectionTest() throws SQLException { + + List rows = new ArrayList<>(); + try (Connection conn = connectionPool.getConnection()) { + try (PreparedStatement selectStmt = conn.prepareStatement("SELECT NOW() as TS")) { + ResultSet rs = selectStmt.executeQuery(); + while (rs.next()) { + rows.add(rs.getTimestamp("TS")); + } + } + } + assertThat(rows.size()).isEqualTo(1); + } +} diff --git a/r2dbc/core/src/main/java/com/google/cloud/sql/core/CloudSqlConnectionFactory.java b/r2dbc/core/src/main/java/com/google/cloud/sql/core/CloudSqlConnectionFactory.java index 7a1268c2f..0c9ebe0ce 100644 --- a/r2dbc/core/src/main/java/com/google/cloud/sql/core/CloudSqlConnectionFactory.java +++ b/r2dbc/core/src/main/java/com/google/cloud/sql/core/CloudSqlConnectionFactory.java @@ -24,7 +24,6 @@ import io.r2dbc.spi.ConnectionFactoryMetadata; import io.r2dbc.spi.ConnectionFactoryOptions; import io.r2dbc.spi.ConnectionFactoryProvider; -import java.io.IOException; import java.util.function.Supplier; import org.reactivestreams.Publisher; import reactor.util.annotation.NonNull; @@ -50,30 +49,22 @@ public CloudSqlConnectionFactory( @Override @NonNull public Publisher create() { - try { - String hostIp = - InternalConnectorRegistry.getInstance() - .getConnectionMetadata(config) - .getPreferredIpAddress(); - builder.option(HOST, hostIp).option(PORT, SERVER_PROXY_PORT); - return supplier.get().create(builder.build()).create(); - } catch (IOException e) { - throw new RuntimeException(e); - } + String hostIp = + InternalConnectorRegistry.getInstance() + .getConnectionMetadata(config) + .getPreferredIpAddress(); + builder.option(HOST, hostIp).option(PORT, SERVER_PROXY_PORT); + return supplier.get().create(builder.build()).create(); } @Override @NonNull public ConnectionFactoryMetadata getMetadata() { - try { - String hostIp = - InternalConnectorRegistry.getInstance() - .getConnectionMetadata(config) - .getPreferredIpAddress(); - builder.option(HOST, hostIp).option(PORT, SERVER_PROXY_PORT); - return supplier.get().create(builder.build()).getMetadata(); - } catch (IOException e) { - throw new RuntimeException(e); - } + String hostIp = + InternalConnectorRegistry.getInstance() + .getConnectionMetadata(config) + .getPreferredIpAddress(); + builder.option(HOST, hostIp).option(PORT, SERVER_PROXY_PORT); + return supplier.get().create(builder.build()).getMetadata(); } } diff --git a/r2dbc/core/src/main/java/com/google/cloud/sql/core/GcpConnectionFactoryProvider.java b/r2dbc/core/src/main/java/com/google/cloud/sql/core/GcpConnectionFactoryProvider.java index 5b3da8042..eb6773cd4 100644 --- a/r2dbc/core/src/main/java/com/google/cloud/sql/core/GcpConnectionFactoryProvider.java +++ b/r2dbc/core/src/main/java/com/google/cloud/sql/core/GcpConnectionFactoryProvider.java @@ -28,7 +28,6 @@ import io.r2dbc.spi.ConnectionFactoryOptions; import io.r2dbc.spi.ConnectionFactoryProvider; import io.r2dbc.spi.Option; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -44,6 +43,7 @@ public abstract class GcpConnectionFactoryProvider implements ConnectionFactoryP public static final Option IP_TYPES = Option.valueOf("IP_TYPES"); public static final Option ENABLE_IAM_AUTH = Option.valueOf("ENABLE_IAM_AUTH"); public static final Option DELEGATES = Option.valueOf("DELEGATES"); + public static final Option NAMED_CONNECTOR = Option.valueOf("NAMED_CONNECTOR"); public static final Option TARGET_PRINCIPAL = Option.valueOf("TARGET_PRINCIPAL"); public static final Option ADMIN_ROOT_URL = Option.valueOf("ADMIN_ROOT_URL"); public static final Option ADMIN_SERVICE_PATH = Option.valueOf("ADMIN_SERVICE_PATH"); @@ -103,6 +103,7 @@ public ConnectionFactory create(ConnectionFactoryOptions connectionFactoryOption delegates = Collections.emptyList(); } final String targetPrincipal = (String) connectionFactoryOptions.getValue(TARGET_PRINCIPAL); + final String namedConnector = (String) connectionFactoryOptions.getValue(NAMED_CONNECTOR); final String adminRootUrl = (String) connectionFactoryOptions.getValue(ADMIN_ROOT_URL); final String adminServicePath = (String) connectionFactoryOptions.getValue(ADMIN_SERVICE_PATH); @@ -114,6 +115,7 @@ public ConnectionFactory create(ConnectionFactoryOptions connectionFactoryOption .withCloudSqlInstance(cloudSqlInstance) .withAuthType(enableIamAuth ? AuthType.IAM : AuthType.PASSWORD) .withIpTypes(ipTypes) + .withNamedConnector(namedConnector) .withConnectorConfig( new ConnectorConfig.Builder() .withTargetPrincipal(targetPrincipal) @@ -122,42 +124,31 @@ public ConnectionFactory create(ConnectionFactoryOptions connectionFactoryOption .withAdminServicePath(adminServicePath) .build()) .build(); - try { - // Precompute SSL Data to trigger the initial refresh to happen immediately, - // and ensure enableIAMAuth is set correctly. - InternalConnectorRegistry.getInstance().getConnectionMetadata(config); - - String socket = (String) connectionFactoryOptions.getValue(UNIX_SOCKET); - if (socket != null) { - return unixSocketConnectionFactory(optionBuilder, socket); - } - - Function sslFunction = - sslContextBuilder -> { - // Execute in a default scheduler to prevent it from blocking event loop - ConnectionMetadata connectionMetadata = - Mono.fromSupplier( - () -> { - try { - return InternalConnectorRegistry.getInstance() - .getConnectionMetadata(config); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .subscribeOn(Schedulers.boundedElastic()) - .share() - .block(); - sslContextBuilder.keyManager(connectionMetadata.getKeyManagerFactory()); - sslContextBuilder.trustManager(connectionMetadata.getTrustManagerFactory()); - sslContextBuilder.protocols("TLSv1.2"); - - return sslContextBuilder; - }; - return tcpSocketConnectionFactory(config, optionBuilder, sslFunction); - } catch (IOException e) { - throw new RuntimeException(e); + // Precompute SSL Data to trigger the initial refresh to happen immediately, + // and ensure enableIAMAuth is set correctly. + InternalConnectorRegistry.getInstance().getConnectionMetadata(config); + + String socket = (String) connectionFactoryOptions.getValue(UNIX_SOCKET); + if (socket != null) { + return unixSocketConnectionFactory(optionBuilder, socket); } + + Function sslFunction = + sslContextBuilder -> { + // Execute in a default scheduler to prevent it from blocking event loop + ConnectionMetadata connectionMetadata = + Mono.fromSupplier( + () -> InternalConnectorRegistry.getInstance().getConnectionMetadata(config)) + .subscribeOn(Schedulers.boundedElastic()) + .share() + .block(); + sslContextBuilder.keyManager(connectionMetadata.getKeyManagerFactory()); + sslContextBuilder.trustManager(connectionMetadata.getTrustManagerFactory()); + sslContextBuilder.protocols("TLSv1.2"); + + return sslContextBuilder; + }; + return tcpSocketConnectionFactory(config, optionBuilder, sslFunction); } @Override