diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 01466467a2bcd..8661c76fbf026 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -235,7 +235,10 @@ public TestGoogleCloudStoragePlugin(Settings settings) { } @Override - protected GoogleCloudStorageService createStorageService(boolean isServerless) { + protected GoogleCloudStorageService createStorageService( + boolean isServerless, + GcsPerProjectClientManager gcsPerProjectClientManager + ) { return new GoogleCloudStorageService() { @Override StorageOptions createStorageOptions( @@ -279,7 +282,7 @@ public Map getRepositories( metadata -> new GoogleCloudStorageRepository( metadata, registry, - this.storageService, + this.storageService.get(), clusterService, bigArrays, recoverySettings, @@ -291,7 +294,7 @@ protected GoogleCloudStorageBlobStore createBlobStore() { metadata.settings().get("bucket"), "test", metadata.name(), - storageService, + storageService.get(), bigArrays, randomIntBetween(1, 8) * 1024, BackoffPolicy.noBackoff(), diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java new file mode 100644 index 0000000000000..32dda1088d94c --- /dev/null +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GcsPerProjectClientManager.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.gcs; + +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.common.settings.ProjectSecrets; +import org.elasticsearch.common.settings.Settings; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; + +public class GcsPerProjectClientManager implements ClusterStateListener { + + private final Settings settings; + private final BiFunction clientBuilder; + private final Map perProjectClientsCache; + + public GcsPerProjectClientManager( + Settings settings, + BiFunction clientBuilder + ) { + this.settings = settings; + this.clientBuilder = clientBuilder; + this.perProjectClientsCache = new ConcurrentHashMap<>(); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + final Map currentProjects = event.state().metadata().projects(); + + final var updatedPerProjectClients = new HashMap(); + for (var project : currentProjects.values()) { + final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE); + if (projectSecrets == null) { + // This can only happen when a node restarts, it will be processed again when file settings are loaded + continue; + } + final Settings currentSettings = Settings.builder() + // merge with static settings such as max retries etc, exclude secure settings + // TODO: We may need to update this if per-project settings decide to support hierarchical overrides + .put(settings, false) + .setSecureSettings(projectSecrets.getSettings()) + .build(); + final Map clientSettings = GoogleCloudStorageClientSettings.load(currentSettings); + + // TODO: clientSettings should not be empty, i.e. there should be at least one client configured + // Maybe log a warning if it is empty and continue. The project will not have usable client but that is probably ok. + + // TODO: Building and comparing the whole GoogleCloudStorageClientSettings may be insufficient, we could just compare the + // relevant secrets + if (newOrUpdated(project.id(), clientSettings)) { + updatedPerProjectClients.put(project.id(), new ClientsHolder(clientSettings)); + } + } + + // Updated projects + perProjectClientsCache.putAll(updatedPerProjectClients); + + // removed projects + for (var projectId : perProjectClientsCache.keySet()) { + if (currentProjects.containsKey(projectId) == false) { + perProjectClientsCache.remove(projectId); + } + } + } + + public MeteredStorage client( + ProjectId projectId, + String clientName, + String repositoryName, + GcsRepositoryStatsCollector statsCollector + ) { + final var clientsHolder = perProjectClientsCache.get(projectId); + if (clientsHolder == null) { + throw new IllegalArgumentException("No project found for [" + projectId + "]"); + } + return clientsHolder.client(clientName, repositoryName, statsCollector); + } + + public void closeRepositoryClients(ProjectId projectId, String repositoryName) { + final var clientsHolder = perProjectClientsCache.get(projectId); + if (clientsHolder != null) { + clientsHolder.closeRepositoryClients(repositoryName); + } + } + + private boolean newOrUpdated(ProjectId projectId, Map currentClientSettings) { + final ClientsHolder old = perProjectClientsCache.get(projectId); + if (old == null) { + return true; + } + return currentClientSettings.equals(old.clientSettings()) == false; + } + + private final class ClientsHolder { + // clientName -> client settings + private final Map clientSettings; + // repositoryName -> client + private final Map clientCache = new ConcurrentHashMap<>(); + + ClientsHolder(Map clientSettings) { + this.clientSettings = clientSettings; + } + + Map clientSettings() { + return clientSettings; + } + + MeteredStorage client(String clientName, String repositoryName, GcsRepositoryStatsCollector statsCollector) { + return clientCache.computeIfAbsent(repositoryName, ignored -> { + final var settings = clientSettings.get(clientName); + if (settings == null) { + throw new IllegalArgumentException("No client settings found for [" + clientName + "]"); + } + return clientBuilder.apply(settings, statsCollector); + }); + } + + void closeRepositoryClients(String repositoryName) { + clientCache.remove(repositoryName); + } + } +} diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 3f7d3ae4825ff..1c67c9dcabca9 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -9,11 +9,13 @@ package org.elasticsearch.repositories.gcs; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.Nullable; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; @@ -23,7 +25,10 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.xcontent.NamedXContentRegistry; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -31,19 +36,37 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin { // package-private for tests - final GoogleCloudStorageService storageService; + final SetOnce storageService = new SetOnce<>(); - @SuppressWarnings("this-escape") - public GoogleCloudStoragePlugin(final Settings settings) { + public GoogleCloudStoragePlugin(final Settings settings) {} + + @Override + public Collection createComponents(PluginServices services) { + final Settings settings = services.clusterService().getSettings(); + GcsPerProjectClientManager gcsPerProjectClientManager = null; + if (services.projectResolver().supportsMultipleProjects()) { + gcsPerProjectClientManager = new GcsPerProjectClientManager(settings, (gcsClientSettings, statsCollector) -> { + try { + return storageService.get().createClient(gcsClientSettings, statsCollector); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + services.clusterService().addListener(gcsPerProjectClientManager); + } var isServerless = DiscoveryNode.isStateless(settings); - this.storageService = createStorageService(isServerless); + this.storageService.set(createStorageService(isServerless, gcsPerProjectClientManager)); // eagerly load client settings so that secure settings are readable (not closed) reload(settings); + return List.of(); } // overridable for tests - protected GoogleCloudStorageService createStorageService(boolean isServerless) { - return new GoogleCloudStorageService(isServerless); + protected GoogleCloudStorageService createStorageService( + boolean isServerless, + @Nullable GcsPerProjectClientManager gcsPerProjectClientManager + ) { + return new GoogleCloudStorageService(isServerless, gcsPerProjectClientManager); } @Override @@ -60,7 +83,7 @@ public Map getRepositories( metadata -> new GoogleCloudStorageRepository( metadata, namedXContentRegistry, - this.storageService, + this.storageService.get(), clusterService, bigArrays, recoverySettings, @@ -93,6 +116,6 @@ public void reload(Settings settings) { // `GoogleCloudStorageClientSettings` instance) instead of the `Settings` // instance. final Map clientsSettings = GoogleCloudStorageClientSettings.load(settings); - this.storageService.refreshAndClearCache(clientsSettings); + this.storageService.get().refreshAndClearCache(clientsSettings); } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java index bb83b767abb4c..19c7a733269e3 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageService.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; @@ -56,13 +57,16 @@ public class GoogleCloudStorageService { private volatile Map clientSettings = emptyMap(); private final boolean isServerless; + @Nullable + private GcsPerProjectClientManager gcsPerProjectClientManager; public GoogleCloudStorageService() { - this.isServerless = false; + this(false, null); } - public GoogleCloudStorageService(boolean isServerless) { + public GoogleCloudStorageService(boolean isServerless, @Nullable GcsPerProjectClientManager gcsPerProjectClientManager) { this.isServerless = isServerless; + this.gcsPerProjectClientManager = gcsPerProjectClientManager; } public boolean isServerless() { @@ -132,6 +136,25 @@ public MeteredStorage client(final String clientName, final String repositoryNam } } + public MeteredStorage client( + @Nullable final ProjectId projectId, + final String clientName, + final String repositoryName, + final GcsRepositoryStatsCollector statsCollector + ) throws IOException { + if (gcsPerProjectClientManager == null) { + // single default project mode + assert ProjectId.DEFAULT.equals(projectId) : projectId; + return client(clientName, repositoryName, statsCollector); + } else if (projectId == null) { + // MP mode for cluster level client + return client(clientName, repositoryName, statsCollector); + } else { + // MP mode for per-project client + return gcsPerProjectClientManager.client(projectId, clientName, repositoryName, statsCollector); + } + } + synchronized void closeRepositoryClients(String repositoryName) { clientCache = clientCache.entrySet() .stream() @@ -139,6 +162,20 @@ synchronized void closeRepositoryClients(String repositoryName) { .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); } + void closeRepositoryClients(@Nullable ProjectId projectId, String repositoryName) { + if (gcsPerProjectClientManager == null) { + // single default project mode + assert ProjectId.DEFAULT.equals(projectId) : projectId; + closeRepositoryClients(repositoryName); + } else if (projectId == null) { + // MP mode for cluster level client + closeRepositoryClients(repositoryName); + } else { + // MP mode for per-project client + gcsPerProjectClientManager.closeRepositoryClients(projectId, repositoryName); + } + } + /** * Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe. * @@ -146,7 +183,7 @@ synchronized void closeRepositoryClients(String repositoryName) { * @return a new client storage instance that can be used to manage objects * (blobs) */ - private MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) + MeteredStorage createClient(GoogleCloudStorageClientSettings gcsClientSettings, GcsRepositoryStatsCollector statsCollector) throws IOException { final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> { final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java index b888a9e97f76f..ba9d669ab5043 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageServiceTests.java @@ -113,7 +113,9 @@ public void testReinitClientSettings() throws Exception { secureSettings2.setFile("gcs.client.gcs3.credentials_file", serviceAccountFileContent("project_gcs23")); final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) { - final GoogleCloudStorageService storageService = plugin.storageService; + plugin.storageService.set(plugin.createStorageService(randomBoolean(), null)); + plugin.reload(settings1); + final GoogleCloudStorageService storageService = plugin.storageService.get(); var statsCollector = new GcsRepositoryStatsCollector(); final var client11 = storageService.client("gcs1", "repo1", statsCollector); assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11")); @@ -151,7 +153,9 @@ public void testClientsAreNotSharedAcrossRepositories() throws Exception { secureSettings1.setFile("gcs.client.gcs1.credentials_file", serviceAccountFileContent("test_project")); final Settings settings = Settings.builder().setSecureSettings(secureSettings1).build(); try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings)) { - final GoogleCloudStorageService storageService = plugin.storageService; + plugin.storageService.set(plugin.createStorageService(randomBoolean(), null)); + plugin.reload(settings); + final GoogleCloudStorageService storageService = plugin.storageService.get(); final MeteredStorage repo1Client = storageService.client("gcs1", "repo1", new GcsRepositoryStatsCollector()); final MeteredStorage repo2Client = storageService.client("gcs1", "repo2", new GcsRepositoryStatsCollector()); diff --git a/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index eb4cd955c81b1..b85d9abb6422a 100644 --- a/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/qa/insecure-credentials/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -252,7 +252,12 @@ public ProxyS3RepositoryPlugin(Settings settings) { } @Override - S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) { + S3Service s3Service( + Environment environment, + Settings nodeSettings, + ResourceWatcherService resourceWatcherService, + S3PerProjectClientManager s3PerProjectClientManager + ) { return new ProxyS3Service(environment, nodeSettings, resourceWatcherService); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 129de029daf7a..63445b2f4765e 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -115,6 +115,7 @@ class S3BlobStore implements BlobStore { S3RepositoriesMetrics s3RepositoriesMetrics, BackoffPolicy retryThrottledDeleteBackoffPolicy ) { + // TODO: add a projectId field, maybe null for cluster level blobstore this.service = service; this.bigArrays = bigArrays; this.bucket = bucket; @@ -310,6 +311,7 @@ public String toString() { } public AmazonS3Reference clientReference() { + // TODO: change to service.client(projectId, repositoryMetadata) return service.client(repositoryMetadata); } @@ -490,6 +492,7 @@ private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobS @Override public void close() throws IOException { + // TODO: change to use service.onBlobStoreClose(projectId) service.onBlobStoreClose(); } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java new file mode 100644 index 0000000000000..df461cffc3c88 --- /dev/null +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3PerProjectClientManager.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.s3; + +import com.amazonaws.http.IdleConnectionReaper; +import com.amazonaws.services.s3.AmazonS3; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.settings.ProjectSecrets; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.IOUtils; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +public class S3PerProjectClientManager implements ClusterStateListener { + + private final Settings settings; + private final Function clientBuilder; + private final Executor executor; + // A map of projectId to clients holder. Adding to and removing from the map happen only in the cluster state listener thread. + private final Map perProjectClientsCache; + // Listener for tracking ongoing async closing of obsolete clients. Updated only in the cluster state listener thread. + private volatile SubscribableListener clientsCloseListener = null; + + public S3PerProjectClientManager(Settings settings, Function clientBuilder, Executor executor) { + this.settings = settings; + this.clientBuilder = clientBuilder; + this.executor = executor; + this.perProjectClientsCache = new ConcurrentHashMap<>(); + } + + public void clusterChanged(ClusterChangedEvent event) { + final Map currentProjects = event.state().metadata().projects(); + + final var updatedPerProjectClients = new HashMap(); + for (var project : currentProjects.values()) { + final ProjectSecrets projectSecrets = project.custom(ProjectSecrets.TYPE); + if (projectSecrets == null) { + // This can only happen when a node restarts, it will be processed again when file settings are loaded + continue; + } + final Settings currentSettings = Settings.builder() + // merge with static settings such as max retries etc, exclude secure settings + // TODO: We may need to update this if per-project settings decide to support hierarchical overrides + .put(settings, false) + .setSecureSettings(projectSecrets.getSettings()) + .build(); + final Map clientSettings = S3ClientSettings.load(currentSettings); + + // TODO: clientSettings should not be empty, i.e. there should be at least one client configured + // Maybe log a warning if it is empty and continue. The project will not have usable client but that is probably ok. + + // TODO: Building and comparing the whole S3ClientSettings may be insufficient, we could just compare the relevant secrets + if (newOrUpdated(project.id(), clientSettings)) { + updatedPerProjectClients.put(project.id(), new ClientsHolder(clientSettings)); + } + } + + final List clientsHoldersToClose = new ArrayList<>(); + // Updated projects + for (var projectId : updatedPerProjectClients.keySet()) { + final var old = perProjectClientsCache.put(projectId, updatedPerProjectClients.get(projectId)); + if (old != null) { + clientsHoldersToClose.add(old); + } + } + // removed projects + for (var projectId : perProjectClientsCache.keySet()) { + if (currentProjects.containsKey(projectId) == false) { + final var removed = perProjectClientsCache.remove(projectId); + assert removed != null; + clientsHoldersToClose.add(removed); + } + } + if (clientsHoldersToClose.isEmpty() == false) { + final var currentClientsCloseListener = new SubscribableListener(); + final var previousClientsCloseListener = clientsCloseListener; + clientsCloseListener = currentClientsCloseListener; + if (previousClientsCloseListener != null && previousClientsCloseListener.isDone() == false) { + previousClientsCloseListener.addListener( + ActionListener.running(() -> closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener)) + ); + } else { + closeClientsAsync(clientsHoldersToClose, currentClientsCloseListener); + } + } + } + + private void closeClientsAsync(List clientsHoldersToClose, ActionListener listener) { + executor.execute(() -> { + IOUtils.closeWhileHandlingException(clientsHoldersToClose); + listener.onResponse(null); + }); + } + + public AmazonS3Reference client(ProjectId projectId, RepositoryMetadata repositoryMetadata) { + final var clientsHolder = perProjectClientsCache.get(projectId); + if (clientsHolder == null) { + throw new IllegalArgumentException("project [" + projectId + "] does not exist"); + } + final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetadata.settings()); + return clientsHolder.client(clientName); + } + + /** + * Similar to S3Service#releaseCachedClients but only clears the cache for the given project. + * All clients for the project are closed and will be recreated on next access, also similar to S3Service#releaseCachedClients + */ + public void clearCacheForProject(ProjectId projectId) { + final var old = perProjectClientsCache.get(projectId); + if (old != null) { + old.clearCache(); + // TODO: do we need this? + // shutdown IdleConnectionReaper background thread + // it will be restarted on new client usage + IdleConnectionReaper.shutdown(); + } + } + + /** + * Shutdown the manager by closing all clients holders. This is called when the node is shutting down. + */ + public void close() { + IOUtils.closeWhileHandlingException(perProjectClientsCache.values()); + final var currentClientsCloseListener = clientsCloseListener; + if (currentClientsCloseListener != null && currentClientsCloseListener.isDone() == false) { + // Wait for async clients closing to be completed + final CountDownLatch latch = new CountDownLatch(1); + currentClientsCloseListener.addListener(ActionListener.running(latch::countDown)); + try { + if (latch.await(1, TimeUnit.MINUTES) == false) { + // TODO: log warning + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private boolean newOrUpdated(ProjectId projectId, Map currentClientSettings) { + final var old = perProjectClientsCache.get(projectId); + if (old == null) { + return true; + } + return currentClientSettings.equals(old.clientSettings()) == false; + } + + private final class ClientsHolder implements Closeable { + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Map clientSettings; + private volatile Map clientsCache = Collections.emptyMap(); + + ClientsHolder(Map clientSettings) { + this.clientSettings = clientSettings; + } + + Map clientSettings() { + return clientSettings; + } + + AmazonS3Reference client(String clientName) { + final var clientReference = clientsCache.get(clientName); + // It is ok to retrieve an existing client when the cache is being cleared or the holder is closing. + // As long as there are paired incRef/decRef calls, the client will be closed when the last reference is released + // by either the caller of this method or the clearCache() method. + if (clientReference != null && clientReference.tryIncRef()) { + return clientReference; + } + final var settings = clientSettings.get(clientName); + if (settings == null) { + throw new IllegalArgumentException("client [" + clientName + "] does not exist"); + } + synchronized (this) { + final var existing = clientsCache.get(clientName); + if (existing != null && existing.tryIncRef()) { + return existing; + } + if (closed.get()) { + // Not adding new client once the manager is closed since there won't be anything to close it + throw new IllegalStateException("client manager is closed"); + } + // The close() method maybe called after we checked it, it is ok since we are already inside the synchronized block. + // The clearCache() will clear the newly added client. + final var newClientReference = new AmazonS3Reference(clientBuilder.apply(settings)); + newClientReference.mustIncRef(); + clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientName, newClientReference); + return newClientReference; + } + } + + /** + * Clear the cache by closing and clear out all clients. Subsequent {@link #client(String)} calls will recreate + * the clients and populate the cache again. + */ + synchronized void clearCache() { + IOUtils.closeWhileHandlingException(clientsCache.values()); + clientsCache = Collections.emptyMap(); + } + + /** + * Similar to {@link #clearCache()} but also flag the holder to be closed so that no new client can be created. + */ + public void close() { + if (closed.compareAndSet(false, true)) { + clearCache(); + } + } + } +} diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 0904f37e39743..62a8cde8bdf7d 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -292,6 +292,7 @@ class S3Repository extends MeteredBlobStoreRepository { buildBasePath(metadata), buildLocation(metadata) ); + // TODO: add a projectId field this.service = service; this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 37b960b33eb79..7746838701063 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.Nullable; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; @@ -26,6 +27,7 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -86,15 +88,41 @@ protected S3Repository createRepository( @Override public Collection createComponents(PluginServices services) { - service.set(s3Service(services.environment(), services.clusterService().getSettings(), services.resourceWatcherService())); + S3PerProjectClientManager s3PerProjectClientManager = null; + if (services.projectResolver().supportsMultipleProjects()) { + s3PerProjectClientManager = new S3PerProjectClientManager( + settings, + s3ClientSettings -> this.service.get().buildClient(s3ClientSettings), + services.threadPool().executor(ThreadPool.Names.GENERIC) + ); + services.clusterService().addListener(s3PerProjectClientManager); + } + service.set( + s3Service( + services.environment(), + services.clusterService().getSettings(), + services.resourceWatcherService(), + s3PerProjectClientManager + ) + ); this.service.get().refreshAndClearCache(S3ClientSettings.load(settings)); return List.of(service); } + @Deprecated(forRemoval = true) S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) { return new S3Service(environment, nodeSettings, resourceWatcherService); } + S3Service s3Service( + Environment environment, + Settings nodeSettings, + ResourceWatcherService resourceWatcherService, + @Nullable S3PerProjectClientManager s3PerProjectClientManager + ) { + return new S3Service(environment, nodeSettings, resourceWatcherService, s3PerProjectClientManager); + } + @Override public Map getRepositories( final Environment env, diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index 0e94275e0d919..52c2a9b3b8229 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; @@ -41,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.rest.RestStatus; @@ -99,8 +101,19 @@ class S3Service implements Closeable { final TimeValue compareAndExchangeTimeToLive; final TimeValue compareAndExchangeAntiContentionDelay; final boolean isStateless; + private final S3PerProjectClientManager s3PerProjectClientManager; + @Deprecated(forRemoval = true) S3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) { + this(environment, nodeSettings, resourceWatcherService, null); + } + + S3Service( + Environment environment, + Settings nodeSettings, + ResourceWatcherService resourceWatcherService, + @Nullable S3PerProjectClientManager s3PerProjectClientManager + ) { webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider( environment, System::getenv, @@ -111,6 +124,7 @@ class S3Service implements Closeable { compareAndExchangeTimeToLive = REPOSITORY_S3_CAS_TTL_SETTING.get(nodeSettings); compareAndExchangeAntiContentionDelay = REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.get(nodeSettings); isStateless = DiscoveryNode.isStateless(nodeSettings); + this.s3PerProjectClientManager = s3PerProjectClientManager; } /** @@ -153,6 +167,26 @@ public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) { } } + /** + * Delegates to {@link #client(RepositoryMetadata)} when + * 1. per-project client is disabled + * 2. or when the blobstore is cluster level (projectId = null) + * Otherwise, attempts to retrieve a per-project client by the project-id and repository metadata from the + * per-project client manager. Throws if project-id or the client does not exist. The client maybe initialized lazily. + */ + public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) { + if (s3PerProjectClientManager == null) { + // Multi-Project is disabled and we have a single default project + assert ProjectId.DEFAULT.equals(projectId) : projectId; + return client(repositoryMetadata); + } else if (projectId == null) { + // Multi-Project is enabled and we are retrieving a client for the cluster level blobstore + return client(repositoryMetadata); + } else { + return s3PerProjectClientManager.client(projectId, repositoryMetadata); + } + } + /** * Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetadata} from cached settings or creates them * by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata. @@ -307,9 +341,25 @@ public void onBlobStoreClose() { releaseCachedClients(); } + public void onBlobStoreClose(@Nullable ProjectId projectId) { + if (s3PerProjectClientManager == null) { + // Multi-Project is disabled and we have a single default project + assert ProjectId.DEFAULT.equals(projectId) : projectId; + onBlobStoreClose(); + } else if (projectId == null) { + // Multi-Project is enabled and this is for the cluster level blobstore + onBlobStoreClose(); + } else { + s3PerProjectClientManager.clearCacheForProject(projectId); + } + } + @Override public void close() throws IOException { releaseCachedClients(); + if (s3PerProjectClientManager != null) { + s3PerProjectClientManager.close(); + } webIdentityTokenCredentialsProvider.shutdown(); }