From 806711f2419ea4f36e31e095f84579bf93190d5d Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 18 Nov 2024 17:34:15 +0000 Subject: [PATCH 1/2] Split searchable snapshot into multiple repo operations Each operation on a snapshot repository uses the same `Repository`, `BlobStore`, etc. instances throughout, in order to avoid the complexity arising from handling metadata updates that occur while an operation is running. Today we model the entire lifetime of a searchable snapshot shard as a single repository operation since there should be no metadata updates that matter in this context (other than those that are handled dynamically via other mechanisms) and some metadata updates might be positively harmful to a searchable snapshot shard. It turns out that there are some undocumented legacy settings which _do_ matter to searchable snapshots, and which are still in use, so with this commit we move to a finer-grained model of repository operations within a searchable snapshot. Backport of #116918 to 8.16 --- docs/changelog/116918.yaml | 5 + .../cluster/metadata/RepositoryMetadata.java | 6 +- .../repositories/RepositoriesService.java | 14 +- .../blobstore/BlobStoreRepository.java | 6 +- ...earchableSnapshotsCredentialsReloadIT.java | 281 ++++++++++++++++++ .../SearchableSnapshots.java | 4 +- .../store/BlobContainerSupplier.java | 95 ++++++ .../store/RepositorySupplier.java | 83 ++++++ .../store/SearchableSnapshotDirectory.java | 90 ++---- 9 files changed, 505 insertions(+), 79 deletions(-) create mode 100644 docs/changelog/116918.yaml create mode 100644 x-pack/plugin/searchable-snapshots/qa/s3/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/s3/S3SearchableSnapshotsCredentialsReloadIT.java create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/BlobContainerSupplier.java create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java diff --git a/docs/changelog/116918.yaml b/docs/changelog/116918.yaml new file mode 100644 index 0000000000000..3b04b4ae4a69a --- /dev/null +++ b/docs/changelog/116918.yaml @@ -0,0 +1,5 @@ +pr: 116918 +summary: Split searchable snapshot into multiple repo operations +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java index 9b3abf38c519b..0b9c359006b23 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java @@ -46,7 +46,11 @@ public class RepositoryMetadata implements Writeable { * @param settings repository settings */ public RepositoryMetadata(String name, String type, Settings settings) { - this(name, RepositoryData.MISSING_UUID, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN); + this(name, RepositoryData.MISSING_UUID, type, settings); + } + + public RepositoryMetadata(String name, String uuid, String type, Settings settings) { + this(name, uuid, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN); } public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index fc39d2d2d80a4..623401220ec68 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -283,12 +283,22 @@ public RegisterRepositoryTask(final RepositoriesService repositoriesService, fin @Override public ClusterState execute(ClusterState currentState) { - RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); RepositoriesMetadata repositories = RepositoriesMetadata.get(currentState); List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { - if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) { + if (repositoryMetadata.name().equals(request.name())) { + final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata( + request.name(), + // Copy the UUID from the existing instance rather than resetting it back to MISSING_UUID which would force us to + // re-read the RepositoryData to get it again. In principle the new RepositoryMetadata might point to a different + // underlying repository at this point, but if so that'll cause things to fail in clear ways and eventually (before + // writing anything) we'll read the RepositoryData again and update the UUID in the RepositoryMetadata to match. See + // also #109936. + repositoryMetadata.uuid(), + request.type(), + request.settings() + ); Repository existing = repositoriesService.repositories.get(request.name()); if (existing == null) { existing = repositoriesService.internalRepositories.get(request.name()); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 5df7a8ea20f54..240d41ea589a7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -200,6 +200,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final String STATELESS_SHARD_WRITE_THREAD_NAME = "stateless_shard_write"; public static final String STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME = "stateless_cluster_state"; public static final String STATELESS_SHARD_PREWARMING_THREAD_NAME = "stateless_prewarm"; + public static final String SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME = "searchable_snapshots_cache_fetch_async"; + public static final String SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME = "searchable_snapshots_cache_prewarming"; /** * Prefix for the name of the root {@link RepositoryData} blob. @@ -2183,7 +2185,9 @@ private void assertSnapshotOrStatelessPermittedThreadPool() { STATELESS_TRANSLOG_THREAD_NAME, STATELESS_SHARD_WRITE_THREAD_NAME, STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME, - STATELESS_SHARD_PREWARMING_THREAD_NAME + STATELESS_SHARD_PREWARMING_THREAD_NAME, + SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME, + SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME ); } diff --git a/x-pack/plugin/searchable-snapshots/qa/s3/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/s3/S3SearchableSnapshotsCredentialsReloadIT.java b/x-pack/plugin/searchable-snapshots/qa/s3/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/s3/S3SearchableSnapshotsCredentialsReloadIT.java new file mode 100644 index 0000000000000..3049fe830e728 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/qa/s3/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/s3/S3SearchableSnapshotsCredentialsReloadIT.java @@ -0,0 +1,281 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.s3; + +import fixture.s3.S3HttpFixture; +import io.netty.handler.codec.http.HttpMethod; + +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.WarningsHandler; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.MutableSettingsProvider; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.function.UnaryOperator; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.allOf; + +public class S3SearchableSnapshotsCredentialsReloadIT extends ESRestTestCase { + + private static final String BUCKET = "S3SearchableSnapshotsCredentialsReloadIT-bucket"; + private static final String BASE_PATH = "S3SearchableSnapshotsCredentialsReloadIT-base-path"; + + public static final S3HttpFixture s3Fixture = new S3HttpFixture(true, BUCKET, BASE_PATH, "ignored"); + + private static final MutableSettingsProvider keystoreSettings = new MutableSettingsProvider(); + + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .setting("xpack.license.self_generated.type", "trial") + .keystore(keystoreSettings) + .setting("xpack.searchable.snapshot.shared_cache.size", "4kB") + .setting("xpack.searchable.snapshot.shared_cache.region_size", "4kB") + .setting("xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive", "0ms") + .setting("xpack.security.enabled", "false") + .systemProperty("es.allow_insecure_settings", "true") + .build(); + + @ClassRule + public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Before + public void skipFips() { + assumeFalse("getting these tests to run in a FIPS JVM is kinda fiddly and we don't really need the extra coverage", inFipsJvm()); + } + + public void testReloadCredentialsFromKeystore() throws IOException { + final TestHarness testHarness = new TestHarness(); + testHarness.putRepository(); + + // Set up initial credentials + final String accessKey1 = randomIdentifier(); + s3Fixture.setAccessKey(accessKey1); + keystoreSettings.put("s3.client.default.access_key", accessKey1); + keystoreSettings.put("s3.client.default.secret_key", randomIdentifier()); + cluster.updateStoredSecureSettings(); + assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); + + testHarness.createFrozenSearchableSnapshotIndex(); + + // Verify searchable snapshot functionality + testHarness.ensureSearchSuccess(); + + // Rotate credentials in blob store + logger.info("--> rotate credentials"); + final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier); + s3Fixture.setAccessKey(accessKey2); + + // Ensure searchable snapshot now does not work due to invalid credentials + logger.info("--> expect failure"); + testHarness.ensureSearchFailure(); + + // Set up refreshed credentials + logger.info("--> update keystore contents"); + keystoreSettings.put("s3.client.default.access_key", accessKey2); + cluster.updateStoredSecureSettings(); + assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); + + // Check access using refreshed credentials + logger.info("--> expect success"); + testHarness.ensureSearchSuccess(); + } + + public void testReloadCredentialsFromAlternativeClient() throws IOException { + final TestHarness testHarness = new TestHarness(); + testHarness.putRepository(); + + // Set up credentials + final String accessKey1 = randomIdentifier(); + final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier); + final String alternativeClient = randomValueOtherThan("default", ESTestCase::randomIdentifier); + + s3Fixture.setAccessKey(accessKey1); + keystoreSettings.put("s3.client.default.access_key", accessKey1); + keystoreSettings.put("s3.client.default.secret_key", randomIdentifier()); + keystoreSettings.put("s3.client." + alternativeClient + ".access_key", accessKey2); + keystoreSettings.put("s3.client." + alternativeClient + ".secret_key", randomIdentifier()); + cluster.updateStoredSecureSettings(); + assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); + + testHarness.createFrozenSearchableSnapshotIndex(); + + // Verify searchable snapshot functionality + testHarness.ensureSearchSuccess(); + + // Rotate credentials in blob store + logger.info("--> rotate credentials"); + s3Fixture.setAccessKey(accessKey2); + + // Ensure searchable snapshot now does not work due to invalid credentials + logger.info("--> expect failure"); + testHarness.ensureSearchFailure(); + + // Adjust repository to use new client + logger.info("--> update repository metadata"); + testHarness.putRepository(b -> b.put("client", alternativeClient)); + + // Check access using refreshed credentials + logger.info("--> expect success"); + testHarness.ensureSearchSuccess(); + } + + public void testReloadCredentialsFromMetadata() throws IOException { + final TestHarness testHarness = new TestHarness(); + testHarness.warningsHandler = WarningsHandler.PERMISSIVE; + + // Set up credentials + final String accessKey1 = randomIdentifier(); + final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier); + + testHarness.putRepository(b -> b.put("access_key", accessKey1).put("secret_key", randomIdentifier())); + s3Fixture.setAccessKey(accessKey1); + + testHarness.createFrozenSearchableSnapshotIndex(); + + // Verify searchable snapshot functionality + testHarness.ensureSearchSuccess(); + + // Rotate credentials in blob store + logger.info("--> rotate credentials"); + s3Fixture.setAccessKey(accessKey2); + + // Ensure searchable snapshot now does not work due to invalid credentials + logger.info("--> expect failure"); + testHarness.ensureSearchFailure(); + + // Adjust repository to use new client + logger.info("--> update repository metadata"); + testHarness.putRepository(b -> b.put("access_key", accessKey2).put("secret_key", randomIdentifier())); + + // Check access using refreshed credentials + logger.info("--> expect success"); + testHarness.ensureSearchSuccess(); + } + + private class TestHarness { + private final String mountedIndexName = randomIdentifier(); + private final String repositoryName = randomIdentifier(); + + @Nullable // to use the default + WarningsHandler warningsHandler; + + void putRepository() throws IOException { + putRepository(UnaryOperator.identity()); + } + + void putRepository(UnaryOperator settingsOperator) throws IOException { + // Register repository + final Request request = newXContentRequest( + HttpMethod.PUT, + "/_snapshot/" + repositoryName, + (b, p) -> b.field("type", "s3") + .startObject("settings") + .value( + settingsOperator.apply( + Settings.builder().put("bucket", BUCKET).put("base_path", BASE_PATH).put("endpoint", s3Fixture.getAddress()) + ).build() + ) + .endObject() + ); + request.addParameter("verify", "false"); // because we don't have access to the blob store yet + request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler)); + assertOK(client().performRequest(request)); + } + + void createFrozenSearchableSnapshotIndex() throws IOException { + // Create an index, large enough that its data is not all captured in the file headers + final String indexName = randomValueOtherThan(mountedIndexName, ESTestCase::randomIdentifier); + createIndex(indexName, indexSettings(1, 0).build()); + try (var bodyStream = new ByteArrayOutputStream()) { + for (int i = 0; i < 1024; i++) { + try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) { + bodyLineBuilder.startObject().startObject("index").endObject().endObject(); + } + bodyStream.write(0x0a); + try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) { + bodyLineBuilder.startObject().field("foo", "bar").endObject(); + } + bodyStream.write(0x0a); + } + bodyStream.flush(); + final Request request = new Request("PUT", indexName + "/_bulk"); + request.setEntity(new ByteArrayEntity(bodyStream.toByteArray(), ContentType.APPLICATION_JSON)); + client().performRequest(request); + } + + // Take a snapshot and delete the original index + final String snapshotName = randomIdentifier(); + final Request createSnapshotRequest = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repositoryName + '/' + snapshotName); + createSnapshotRequest.addParameter("wait_for_completion", "true"); + createSnapshotRequest.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler)); + assertOK(client().performRequest(createSnapshotRequest)); + + deleteIndex(indexName); + + // Mount the snapshotted index as a searchable snapshot + final Request mountRequest = newXContentRequest( + HttpMethod.POST, + "/_snapshot/" + repositoryName + "/" + snapshotName + "/_mount", + (b, p) -> b.field("index", indexName).field("renamed_index", mountedIndexName) + ); + mountRequest.addParameter("wait_for_completion", "true"); + mountRequest.addParameter("storage", "shared_cache"); + assertOK(client().performRequest(mountRequest)); + ensureGreen(mountedIndexName); + } + + void ensureSearchSuccess() throws IOException { + final Request searchRequest = new Request("GET", mountedIndexName + "/_search"); + searchRequest.addParameter("size", "10000"); + assertEquals( + "bar", + ObjectPath.createFromResponse(assertOK(client().performRequest(searchRequest))).evaluate("hits.hits.0._source.foo") + ); + } + + void ensureSearchFailure() throws IOException { + assertOK(client().performRequest(new Request("POST", "/_searchable_snapshots/cache/clear"))); + final Request searchRequest = new Request("GET", mountedIndexName + "/_search"); + searchRequest.addParameter("size", "10000"); + assertThat( + expectThrows(ResponseException.class, () -> client().performRequest(searchRequest)).getMessage(), + allOf( + containsString("Bad access key"), + containsString("Status Code: 403"), + containsString("Error Code: AccessDenied"), + containsString("failed to read data from cache") + ) + ); + } + } + +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 4eea006b4c2f2..67768c9aadded 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -547,9 +547,9 @@ public Map getRecoveryStateFactories() { return Map.of(SNAPSHOT_RECOVERY_STATE_FACTORY_KEY, SearchableSnapshotRecoveryState::new); } - public static final String CACHE_FETCH_ASYNC_THREAD_POOL_NAME = "searchable_snapshots_cache_fetch_async"; + public static final String CACHE_FETCH_ASYNC_THREAD_POOL_NAME = BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME; public static final String CACHE_FETCH_ASYNC_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_fetch_async_thread_pool"; - public static final String CACHE_PREWARMING_THREAD_POOL_NAME = "searchable_snapshots_cache_prewarming"; + public static final String CACHE_PREWARMING_THREAD_POOL_NAME = BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME; public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool"; public static ScalingExecutorBuilder[] executorBuilders(Settings settings) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/BlobContainerSupplier.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/BlobContainerSupplier.java new file mode 100644 index 0000000000000..335c8e311ace6 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/BlobContainerSupplier.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.support.FilterBlobContainer; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +public class BlobContainerSupplier implements Supplier { + + private static final Logger logger = LogManager.getLogger(BlobContainerSupplier.class); + + private final Supplier repositorySupplier; + private final IndexId indexId; + private final int shardId; + + private volatile LastKnownState lastKnownState = new LastKnownState(null, null); + + public BlobContainerSupplier(Supplier repositorySupplier, IndexId indexId, int shardId) { + this.repositorySupplier = repositorySupplier; + this.indexId = indexId; + this.shardId = shardId; + } + + @Override + public BlobContainer get() { + final LastKnownState lastKnownState = this.lastKnownState; + final BlobStoreRepository currentRepository = repositorySupplier.get(); + + if (lastKnownState.blobStoreRepository() == currentRepository) { + return lastKnownState.blobContainer(); + } else { + return refreshAndGet(); + } + } + + private synchronized BlobContainer refreshAndGet() { + final BlobStoreRepository currentRepository = repositorySupplier.get(); + if (lastKnownState.blobStoreRepository() == currentRepository) { + return lastKnownState.blobContainer(); + } else { + logger.debug("creating new blob container [{}][{}][{}]", currentRepository.getMetadata().name(), indexId, shardId); + final BlobContainer newContainer = new RateLimitingBlobContainer( + currentRepository, + currentRepository.shardContainer(indexId, shardId) + ); + lastKnownState = new LastKnownState(currentRepository, newContainer); + return newContainer; + } + } + + private record LastKnownState(BlobStoreRepository blobStoreRepository, BlobContainer blobContainer) {} + + /** + * A {@link FilterBlobContainer} that uses {@link BlobStoreRepository#maybeRateLimitRestores(InputStream)} to limit the rate at which + * blobs are read from the repository. + */ + private static class RateLimitingBlobContainer extends FilterBlobContainer { + + private final BlobStoreRepository blobStoreRepository; + + RateLimitingBlobContainer(BlobStoreRepository blobStoreRepository, BlobContainer blobContainer) { + super(blobContainer); + this.blobStoreRepository = blobStoreRepository; + } + + @Override + protected BlobContainer wrapChild(BlobContainer child) { + return new RateLimitingBlobContainer(blobStoreRepository, child); + } + + @Override + public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { + return blobStoreRepository.maybeRateLimitRestores(super.readBlob(purpose, blobName)); + } + + @Override + public InputStream readBlob(OperationPurpose purpose, String blobName, long position, long length) throws IOException { + return blobStoreRepository.maybeRateLimitRestores(super.readBlob(purpose, blobName, position, length)); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java new file mode 100644 index 0000000000000..63522ce2309a1 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; + +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; + +public class RepositorySupplier implements Supplier { + + private static final Logger logger = LogManager.getLogger(BlobContainerSupplier.class); + + private final RepositoriesService repositoriesService; + + private final String repositoryName; + + @Nullable // if repository specified only by name + private final String repositoryUuid; + + private volatile String repositoryNameHint; + + public RepositorySupplier(RepositoriesService repositoriesService, String repositoryName, String repositoryUuid) { + this.repositoriesService = Objects.requireNonNull(repositoriesService); + this.repositoryName = Objects.requireNonNull(repositoryName); + this.repositoryUuid = repositoryUuid; + this.repositoryNameHint = repositoryName; + } + + @Override + public BlobStoreRepository get() { + return SearchableSnapshots.getSearchableRepository(getRepository()); + } + + private Repository getRepository() { + if (repositoryUuid == null) { + // repository containing pre-7.12 snapshots has no UUID so we assume it matches by name + final Repository repository = repositoriesService.repository(repositoryName); + assert repository.getMetadata().name().equals(repositoryName) : repository.getMetadata().name() + " vs " + repositoryName; + return repository; + } + + final Map repositoriesByName = repositoriesService.getRepositories(); + + final String currentRepositoryNameHint = repositoryNameHint; + final Repository repositoryByLastKnownName = repositoriesByName.get(currentRepositoryNameHint); + if (repositoryByLastKnownName != null) { + final var foundRepositoryUuid = repositoryByLastKnownName.getMetadata().uuid(); + if (Objects.equals(repositoryUuid, foundRepositoryUuid)) { + return repositoryByLastKnownName; + } + } + + for (final Repository repository : repositoriesByName.values()) { + if (repository.getMetadata().uuid().equals(repositoryUuid)) { + final var newRepositoryName = repository.getMetadata().name(); + logger.debug( + "getRepository: repository [{}] with uuid [{}] replacing repository [{}]", + newRepositoryName, + repositoryUuid, + currentRepositoryNameHint + ); + repositoryNameHint = repository.getMetadata().name(); + return repository; + } + } + + throw new RepositoryMissingException("uuid [" + repositoryUuid + "], original name [" + repositoryName + "]"); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java index b56cd28e9dc6c..bbdf371e1ed7b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java @@ -24,8 +24,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.OperationPurpose; -import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.settings.Settings; @@ -43,8 +41,6 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; -import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; @@ -62,7 +58,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; @@ -134,7 +129,6 @@ public class SearchableSnapshotDirectory extends BaseDirectory { // volatile fields are updated once under `this` lock, all together, iff loaded is not true. private volatile BlobStoreIndexShardSnapshot snapshot; - private volatile BlobContainer blobContainer; private volatile boolean loaded; private volatile SearchableSnapshotRecoveryState recoveryState; @@ -182,7 +176,6 @@ public SearchableSnapshotDirectory( private synchronized boolean invariant() { assert loaded != (snapshot == null); - assert loaded != (blobContainer == null); assert loaded != (recoveryState == null); return true; } @@ -212,7 +205,6 @@ public boolean loadSnapshot( synchronized (this) { alreadyLoaded = this.loaded; if (alreadyLoaded == false) { - this.blobContainer = blobContainerSupplier.get(); this.snapshot = snapshotSupplier.get(); this.loaded = true; cleanExistingRegularShardFiles(); @@ -226,14 +218,12 @@ public boolean loadSnapshot( return alreadyLoaded == false; } - @Nullable public BlobContainer blobContainer() { - final BlobContainer blobContainer = this.blobContainer; + final BlobContainer blobContainer = blobContainerSupplier.get(); assert blobContainer != null; return blobContainer; } - @Nullable public BlobStoreIndexShardSnapshot snapshot() { final BlobStoreIndexShardSnapshot snapshot = this.snapshot; assert snapshot != null; @@ -590,23 +580,15 @@ public static Directory create( ); } - Repository repository; - final String repositoryName; - if (SNAPSHOT_REPOSITORY_UUID_SETTING.exists(indexSettings.getSettings())) { - repository = repositoryByUuid( - repositories.getRepositories(), - SNAPSHOT_REPOSITORY_UUID_SETTING.get(indexSettings.getSettings()), - SNAPSHOT_REPOSITORY_NAME_SETTING.get(indexSettings.getSettings()) - ); - repositoryName = repository.getMetadata().name(); - } else { - // repository containing pre-7.12 snapshots has no UUID so we assume it matches by name - repositoryName = SNAPSHOT_REPOSITORY_NAME_SETTING.get(indexSettings.getSettings()); - repository = repositories.repository(repositoryName); - assert repository.getMetadata().name().equals(repositoryName) : repository.getMetadata().name() + " vs " + repositoryName; - } + final Supplier repositorySupplier = new RepositorySupplier( + repositories, + SNAPSHOT_REPOSITORY_NAME_SETTING.get(indexSettings.getSettings()), + SNAPSHOT_REPOSITORY_UUID_SETTING.exists(indexSettings.getSettings()) + ? SNAPSHOT_REPOSITORY_UUID_SETTING.get(indexSettings.getSettings()) + : null + ); - final BlobStoreRepository blobStoreRepository = SearchableSnapshots.getSearchableRepository(repository); + final BlobStoreRepository initialRepository = repositorySupplier.get(); final IndexId indexId = new IndexId( SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings.getSettings()), @@ -617,14 +599,14 @@ public static Directory create( SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()) ); - final LazyInitializable lazyBlobContainer = new LazyInitializable<>( - () -> new RateLimitingBlobContainer( - blobStoreRepository, - blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id()) - ) + final Supplier blobContainerSupplier = new BlobContainerSupplier( + repositorySupplier, + indexId, + shardPath.getShardId().id() ); + final LazyInitializable lazySnapshot = new LazyInitializable<>( - () -> blobStoreRepository.loadShardSnapshot(lazyBlobContainer.getOrCompute(), snapshotId) + () -> repositorySupplier.get().loadShardSnapshot(blobContainerSupplier.get(), snapshotId) ); final Path cacheDir = CacheService.getShardCachePath(shardPath).resolve(snapshotId.getUUID()); @@ -632,10 +614,10 @@ public static Directory create( return new InMemoryNoOpCommitDirectory( new SearchableSnapshotDirectory( - lazyBlobContainer::getOrCompute, + blobContainerSupplier, lazySnapshot::getOrCompute, blobStoreCacheService, - repositoryName, + initialRepository.getMetadata().name(), snapshotId, indexId, shardPath.getShardId(), @@ -690,42 +672,4 @@ public void putCachedBlob(String name, ByteRange range, BytesReference content, public SharedBlobCacheService.CacheFile getFrozenCacheFile(String fileName, long length) { return sharedBlobCacheService.getCacheFile(createCacheKey(fileName), length); } - - private static Repository repositoryByUuid(Map repositories, String repositoryUuid, String originalName) { - for (Repository repository : repositories.values()) { - if (repository.getMetadata().uuid().equals(repositoryUuid)) { - return repository; - } - } - throw new RepositoryMissingException("uuid [" + repositoryUuid + "], original name [" + originalName + "]"); - } - - /** - * A {@link FilterBlobContainer} that uses {@link BlobStoreRepository#maybeRateLimitRestores(InputStream)} to limit the rate at which - * blobs are read from the repository. - */ - private static class RateLimitingBlobContainer extends FilterBlobContainer { - - private final BlobStoreRepository blobStoreRepository; - - RateLimitingBlobContainer(BlobStoreRepository blobStoreRepository, BlobContainer blobContainer) { - super(blobContainer); - this.blobStoreRepository = blobStoreRepository; - } - - @Override - protected BlobContainer wrapChild(BlobContainer child) { - return new RateLimitingBlobContainer(blobStoreRepository, child); - } - - @Override - public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException { - return blobStoreRepository.maybeRateLimitRestores(super.readBlob(purpose, blobName)); - } - - @Override - public InputStream readBlob(OperationPurpose purpose, String blobName, long position, long length) throws IOException { - return blobStoreRepository.maybeRateLimitRestores(super.readBlob(purpose, blobName, position, length)); - } - } } From c7ca1310c24d558974aca58490d01f21c2a70645 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 14 Nov 2024 10:01:36 +0000 Subject: [PATCH 2/2] Add end-to-end test for reloading S3 credentials We don't seem to have a test that completely verifies that a S3 repository can reload credentials from an updated keystore. This commit adds such a test. Backport of #116762 to 8.16. --- modules/repository-s3/build.gradle | 5 + .../repositories/s3/RepositoryS3RestIT.java | 97 +++++++++++++++++++ .../main/java/fixture/s3/S3HttpFixture.java | 8 +- 3 files changed, 108 insertions(+), 2 deletions(-) create mode 100644 modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/RepositoryS3RestIT.java diff --git a/modules/repository-s3/build.gradle b/modules/repository-s3/build.gradle index 346a458a65f85..59dfa6b9aace2 100644 --- a/modules/repository-s3/build.gradle +++ b/modules/repository-s3/build.gradle @@ -12,6 +12,7 @@ import org.elasticsearch.gradle.internal.test.InternalClusterTestPlugin */ apply plugin: 'elasticsearch.internal-yaml-rest-test' apply plugin: 'elasticsearch.internal-cluster-test' +apply plugin: 'elasticsearch.internal-java-rest-test' esplugin { description 'The S3 repository plugin adds S3 repositories' @@ -48,6 +49,10 @@ dependencies { yamlRestTestImplementation project(':test:fixtures:minio-fixture') internalClusterTestImplementation project(':test:fixtures:minio-fixture') + javaRestTestImplementation project(":test:framework") + javaRestTestImplementation project(':test:fixtures:s3-fixture') + javaRestTestImplementation project(':modules:repository-s3') + yamlRestTestRuntimeOnly "org.slf4j:slf4j-simple:${versions.slf4j}" internalClusterTestRuntimeOnly "org.slf4j:slf4j-simple:${versions.slf4j}" } diff --git a/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/RepositoryS3RestIT.java b/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/RepositoryS3RestIT.java new file mode 100644 index 0000000000000..2de85f657664a --- /dev/null +++ b/modules/repository-s3/src/javaRestTest/java/org/elasticsearch/repositories/s3/RepositoryS3RestIT.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.s3; + +import fixture.s3.S3HttpFixture; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.MutableSettingsProvider; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; + +public class RepositoryS3RestIT extends ESRestTestCase { + + private static final String BUCKET = "RepositoryS3JavaRestTest-bucket"; + private static final String BASE_PATH = "RepositoryS3JavaRestTest-base-path"; + + public static final S3HttpFixture s3Fixture = new S3HttpFixture(true, BUCKET, BASE_PATH, "ignored"); + + private static final MutableSettingsProvider keystoreSettings = new MutableSettingsProvider(); + + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("repository-s3") + .keystore(keystoreSettings) + .setting("s3.client.default.endpoint", s3Fixture::getAddress) + .build(); + + @ClassRule + public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + public void testReloadCredentialsFromKeystore() throws IOException { + // Register repository (?verify=false because we don't have access to the blob store yet) + final var repositoryName = randomIdentifier(); + registerRepository( + repositoryName, + S3Repository.TYPE, + false, + Settings.builder().put("bucket", BUCKET).put("base_path", BASE_PATH).build() + ); + final var verifyRequest = new Request("POST", "/_snapshot/" + repositoryName + "/_verify"); + + // Set up initial credentials + final var accessKey1 = randomIdentifier(); + s3Fixture.setAccessKey(accessKey1); + keystoreSettings.put("s3.client.default.access_key", accessKey1); + keystoreSettings.put("s3.client.default.secret_key", randomIdentifier()); + cluster.updateStoredSecureSettings(); + assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); + + // Check access using initial credentials + assertOK(client().performRequest(verifyRequest)); + + // Rotate credentials in blob store + final var accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier); + s3Fixture.setAccessKey(accessKey2); + + // Ensure that initial credentials now invalid + final var accessDeniedException2 = expectThrows(ResponseException.class, () -> client().performRequest(verifyRequest)); + assertThat(accessDeniedException2.getResponse().getStatusLine().getStatusCode(), equalTo(500)); + assertThat( + accessDeniedException2.getMessage(), + allOf(containsString("Bad access key"), containsString("Status Code: 403"), containsString("Error Code: AccessDenied")) + ); + + // Set up refreshed credentials + keystoreSettings.put("s3.client.default.access_key", accessKey2); + cluster.updateStoredSecureSettings(); + assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); + + // Check access using refreshed credentials + assertOK(client().performRequest(verifyRequest)); + } + +} diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java index 29123d6f2e7eb..421478a53e6bc 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java @@ -26,10 +26,10 @@ public class S3HttpFixture extends ExternalResource { private HttpServer server; - private boolean enabled; + private final boolean enabled; private final String bucket; private final String basePath; - protected final String accessKey; + protected volatile String accessKey; public S3HttpFixture() { this(true); @@ -98,4 +98,8 @@ private static InetSocketAddress resolveAddress(String address, int port) { throw new RuntimeException(e); } } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } }