From aad33121d8ac0162b926a66e30eec517566d5d8d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 26 Apr 2019 15:36:09 +0200 Subject: [PATCH] Async Snapshot Repository Deletes (#40144) (#41571) Motivated by slow snapshot deletes reported in e.g. #39656 and the fact that these likely are a contributing factor to repositories accumulating stale files over time when deletes fail to finish in time and are interrupted before they can complete. * Makes snapshot deletion async and parallelizes some steps of the delete process that can be safely run concurrently via the snapshot thread poll * I did not take the biggest potential speedup step here and parallelize the shard file deletion because that's probably better handled by moving to bulk deletes where possible (and can still be parallelized via the snapshot pool where it isn't). Also, I wanted to keep the size of the PR manageable. * See https://github.com/elastic/elasticsearch/pull/39656#issuecomment-470492106 * Also, as a side effect this gives the `SnapshotResiliencyTests` a little more coverage for master failover scenarios (since parallel access to a blob store repository during deletes is now possible since a delete isn't a single task anymore). * By adding a `ThreadPool` reference to the repository this also lays the groundwork to parallelizing shard snapshot uploads to improve the situation reported in #39657 --- .../repository/url/URLRepositoryPlugin.java | 7 +- .../repositories/url/URLRepository.java | 5 +- .../repositories/url/URLRepositoryTests.java | 4 +- .../repositories/azure/AzureRepository.java | 5 +- .../azure/AzureRepositoryPlugin.java | 7 +- .../azure/AzureRepositorySettingsTests.java | 4 +- .../gcs/GoogleCloudStoragePlugin.java | 7 +- .../gcs/GoogleCloudStorageRepository.java | 8 +- .../repositories/hdfs/HdfsPlugin.java | 6 +- .../repositories/hdfs/HdfsRepository.java | 5 +- .../repositories/s3/S3Repository.java | 5 +- .../repositories/s3/S3RepositoryPlugin.java | 11 +- .../s3/RepositoryCredentialsTests.java | 21 ++- .../s3/S3BlobStoreRepositoryTests.java | 8 +- .../repositories/s3/S3RepositoryTests.java | 4 +- .../create/TransportCreateSnapshotAction.java | 4 +- .../TransportRestoreSnapshotAction.java | 4 +- .../plugins/RepositoryPlugin.java | 7 +- .../repositories/FilterRepository.java | 5 +- .../repositories/RepositoriesModule.java | 6 +- .../repositories/Repository.java | 4 +- .../blobstore/BlobStoreFormat.java | 4 +- .../blobstore/BlobStoreRepository.java | 178 ++++++++---------- .../repositories/fs/FsRepository.java | 7 +- .../snapshots/SnapshotsService.java | 17 +- .../repositories/RepositoriesModuleTests.java | 31 +-- .../RepositoriesServiceTests.java | 5 +- .../BlobStoreRepositoryRestoreTests.java | 2 +- .../blobstore/BlobStoreRepositoryTests.java | 10 +- ...etadataLoadingDuringSnapshotRestoreIT.java | 11 +- .../snapshots/SnapshotResiliencyTests.java | 2 +- .../snapshots/mockstore/MockRepository.java | 10 +- .../index/shard/RestoreOnlyRepository.java | 4 +- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 7 +- .../xpack/ccr/repository/CcrRepository.java | 2 +- .../elasticsearch/xpack/core/XPackPlugin.java | 3 +- .../snapshots/SourceOnlySnapshotIT.java | 4 +- .../SourceOnlySnapshotShardTests.java | 2 +- .../core/LocalStateCompositeXPackPlugin.java | 15 +- 39 files changed, 248 insertions(+), 203 deletions(-) diff --git a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java index a28413d213a97..6e88f0e0deb54 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.url.URLRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -44,7 +45,9 @@ public List> getSettings() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap(URLRepository.TYPE, metadata -> new URLRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap(URLRepository.TYPE, + metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool)); } } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java index d314ce912ef66..29582f1f871aa 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.net.MalformedURLException; import java.net.URISyntaxException; @@ -82,8 +83,8 @@ public class URLRepository extends BlobStoreRepository { * Constructs a read-only URL-based repository */ public URLRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), false, namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment.settings(), false, namedXContentRegistry, threadPool); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) { throw new RepositoryException(metadata.name(), "missing url"); diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 2de4c132673db..96a82ee0b9d24 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -34,12 +35,13 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; +import static org.mockito.Mockito.mock; public class URLRepositoryTests extends ESTestCase { private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList())) { + new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 078e0e698aa51..5345fb13f6d99 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -38,6 +38,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; import java.net.URISyntaxException; import java.util.List; @@ -86,8 +87,8 @@ public static final class Repository { private final boolean readonly; public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - AzureStorageService storageService) { - super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry); + AzureStorageService storageService, ThreadPool threadPool) { + super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.environment = environment; this.storageService = storageService; diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index c6e8335bd5a6d..ab48cf1314ec5 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -28,6 +28,8 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -47,9 +49,10 @@ public AzureRepositoryPlugin(Settings settings) { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(AzureRepository.TYPE, - (metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService)); + (metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService, threadPool)); } @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java index 43891a8e9d57c..71f16b1413a01 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -42,7 +43,8 @@ private AzureRepository azureRepository(Settings settings) { .put(settings) .build(); final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings), - TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class)); + TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), + mock(ThreadPool.class)); assertThat(azureRepository.getBlobStore(), is(nullValue())); return azureRepository; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 3186d2547a327..8e46b305a3350 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -27,6 +27,8 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -49,9 +51,10 @@ protected GoogleCloudStorageService createStorageService() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - (metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService)); + metadata -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService, threadPool)); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 3192691d84389..8e39cb4b5f124 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -25,13 +25,13 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -59,7 +59,6 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic); static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); - private final Settings settings; private final GoogleCloudStorageService storageService; private final BlobPath basePath; private final ByteSizeValue chunkSize; @@ -68,9 +67,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - GoogleCloudStorageService storageService) { - super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry); - this.settings = environment.settings(); + GoogleCloudStorageService storageService, ThreadPool threadPool) { + super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry, threadPool); this.storageService = storageService; String basePath = BASE_PATH.get(metadata.settings()); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index c0b3d805bcc8f..a6dc5fe7db140 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -36,6 +36,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; public final class HdfsPlugin extends Plugin implements RepositoryPlugin { @@ -110,7 +111,8 @@ private static Void eagerInit() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool)); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index bba1b0031c85a..d51a48cac0eff 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; @@ -67,8 +68,8 @@ public final class HdfsRepository extends BlobStoreRepository { private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); public HdfsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, threadPool); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 189d57f83324d..e8d8c6d27ad10 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -36,6 +36,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -171,8 +172,8 @@ class S3Repository extends BlobStoreRepository { S3Repository(final RepositoryMetaData metadata, final Settings settings, final NamedXContentRegistry namedXContentRegistry, - final S3Service service) { - super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry); + final S3Service service, final ThreadPool threadPool) { + super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool); this.service = service; this.repositoryMetaData = metadata; diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index a2f9da5f846ef..bb044771e6085 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -30,6 +30,7 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; @@ -77,13 +78,15 @@ public S3RepositoryPlugin(final Settings settings) { // proxy method for testing protected S3Repository createRepository(final RepositoryMetaData metadata, final Settings settings, - final NamedXContentRegistry registry) { - return new S3Repository(metadata, settings, registry, service); + final NamedXContentRegistry registry, final ThreadPool threadPool) { + return new S3Repository(metadata, settings, registry, service, threadPool); } @Override - public Map getRepositories(final Environment env, final NamedXContentRegistry registry) { - return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry)); + public Map getRepositories(final Environment env, final NamedXContentRegistry registry, + final ThreadPool threadPool) { + return Collections.singletonMap(S3Repository.TYPE, + metadata -> createRepository(metadata, env.settings(), registry, threadPool)); } @Override diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index ca5893b57b2a4..89cc35ccf0cc3 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -30,12 +30,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; @SuppressForbidden(reason = "test fixture requires System.setProperty") public class RepositoryCredentialsTests extends ESTestCase { @@ -61,9 +63,9 @@ static final class ClientAndCredentials extends AmazonS3Wrapper { } static final class ProxyS3Service extends S3Service { - + private static final Logger logger = LogManager.getLogger(ProxyS3Service.class); - + @Override AmazonS3 buildClient(final S3ClientSettings clientSettings) { final AmazonS3 client = super.buildClient(clientSettings); @@ -77,8 +79,9 @@ AmazonS3 buildClient(final S3ClientSettings clientSettings) { } @Override - protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) { - return new S3Repository(metadata, settings, registry, service){ + protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry, + ThreadPool threadPool) { + return new S3Repository(metadata, settings, registry, service, threadPool){ @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads @@ -106,7 +109,7 @@ public void testRepositoryCredentialsOverrideSecureCredentials() throws IOExcept .put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key") .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class)); AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); @@ -129,7 +132,7 @@ public void testRepositoryCredentialsOnly() throws IOException { .put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret") .build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin); + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class)); AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials(); assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key")); @@ -144,8 +147,8 @@ public void testRepositoryCredentialsOnly() throws IOException { + " See the breaking changes documentation for the next major version."); } - private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) { - final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY); + private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin, ThreadPool threadPool) { + final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, threadPool); repository.start(); return repository; } @@ -168,7 +171,7 @@ public void testReinitSecureCredentials() throws IOException { } final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build()); try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings); - S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) { + S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class))) { try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) { final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials .getCredentials(); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 739452dc178c4..61c0328e516b7 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -114,14 +115,15 @@ public TestS3RepositoryPlugin(final Settings settings) { } @Override - public Map getRepositories(final Environment env, final NamedXContentRegistry registry) { + public Map getRepositories(final Environment env, final NamedXContentRegistry registry, + final ThreadPool threadPool) { return Collections.singletonMap(S3Repository.TYPE, - (metadata) -> new S3Repository(metadata, env.settings(), registry, new S3Service() { + metadata -> new S3Repository(metadata, env.settings(), registry, new S3Service() { @Override AmazonS3 buildClient(S3ClientSettings clientSettings) { return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass); } - })); + }, threadPool)); } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 36fa8b684bbb9..af04c420408ad 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Map; @@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class S3RepositoryTests extends ESTestCase { @@ -118,7 +120,7 @@ public void testDefaultBufferSize() { } private S3Repository createS3Repo(RepositoryMetaData metadata) { - return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) { + return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service(), mock(ThreadPool.class)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index c552019e07fa9..73f9a0742a719 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -49,7 +49,9 @@ public TransportCreateSnapshotAction(TransportService transportService, ClusterS @Override protected String executor() { - return ThreadPool.Names.SNAPSHOT; + // Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks + // which would block the request from getting an error response because of the ongoing task + return ThreadPool.Names.GENERIC; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index 62b59f272c1b7..935902432d90f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -49,7 +49,9 @@ public TransportRestoreSnapshotAction(TransportService transportService, Cluster @Override protected String executor() { - return ThreadPool.Names.SNAPSHOT; + // Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks + // which would block the request from getting an error response because of the ongoing task + return ThreadPool.Names.GENERIC; } @Override diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index 5c15040609863..ede5c5e3611f9 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.threadpool.ThreadPool; /** * An extension point for {@link Plugin} implementations to add custom snapshot repositories. @@ -39,7 +40,8 @@ public interface RepositoryPlugin { * The key of the returned {@link Map} is the type name of the repository and * the value is a factory to construct the {@link Repository} interface. */ - default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.emptyMap(); } @@ -52,7 +54,8 @@ default Map getRepositories(Environment env, NamedXC * The key of the returned {@link Map} is the type name of the repository and * the value is a factory to construct the {@link Repository} interface. */ - default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 4e8e9b6c7f569..afc38bda86c5b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -84,8 +85,8 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { - in.deleteSnapshot(snapshotId, repositoryStateId); + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + in.deleteSnapshot(snapshotId, repositoryStateId, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 90e3c94dfb3c5..5ea853b0b5501 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -47,10 +47,10 @@ public class RepositoriesModule extends AbstractModule { public RepositoriesModule(Environment env, List repoPlugins, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) { Map factories = new HashMap<>(); - factories.put(FsRepository.TYPE, (metadata) -> new FsRepository(metadata, env, namedXContentRegistry)); + factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, threadPool)); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry); + Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, threadPool); for (Map.Entry entry : newRepoTypes.entrySet()) { if (factories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered"); @@ -60,7 +60,7 @@ public RepositoriesModule(Environment env, List repoPlugins, T Map internalFactories = new HashMap<>(); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, threadPool); for (Map.Entry entry : newRepoTypes.entrySet()) { if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1ca6f5e148510..20f7c42cb21dd 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -140,8 +141,9 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long * * @param snapshotId snapshot id * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began + * @param listener completion listener */ - void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId); + void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener); /** * Returns snapshot throttle time in nanoseconds diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java index eb9dc41236d8c..dc9f8092e3fc0 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java @@ -100,11 +100,11 @@ public void delete(BlobContainer blobContainer, String name) throws IOException /** * Checks obj in the blob container */ - public boolean exists(BlobContainer blobContainer, String name) throws IOException { + public boolean exists(BlobContainer blobContainer, String name) { return blobContainer.blobExists(blobName(name)); } - protected String blobName(String name) { + public String blobName(String name) { return String.format(Locale.ROOT, blobNameFormat, name); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f11f4cec9291d..fdaeb17c20178 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -31,8 +31,10 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -103,9 +105,12 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -163,6 +168,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp protected final NamedXContentRegistry namedXContentRegistry; + private final ThreadPool threadPool; + private static final int BUFFER_SIZE = 4096; private static final String SNAPSHOT_PREFIX = "snap-"; @@ -225,17 +232,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp /** * Constructs new BlobStoreRepository - * - * @param metadata The metadata for this repository including name and settings - * @param settings Settings for the node this repository object is created on - * @param compress true if metadata and snapshot files should be compressed + * @param metadata The metadata for this repository including name and settings + * @param settings Settings for the node this repository object is created on + * @param threadPool Threadpool to run long running repository manipulations on asynchronously */ protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, boolean compress, - NamedXContentRegistry namedXContentRegistry) { + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { this.settings = settings; this.compress = compress; this.metadata = metadata; this.namedXContentRegistry = namedXContentRegistry; + this.threadPool = threadPool; snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); readOnly = metadata.settings().getAsBoolean("readonly", false); @@ -405,108 +412,98 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { if (isReadOnly()) { - throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); + listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); + } else { + SnapshotInfo snapshot = null; + try { + snapshot = getSnapshotInfo(snapshotId); + } catch (SnapshotMissingException ex) { + listener.onFailure(ex); + return; + } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { + logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); + } + // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots + final RepositoryData repositoryData; + final RepositoryData updatedRepositoryData; + try { + repositoryData = getRepositoryData(); + updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); + writeIndexGen(updatedRepositoryData, repositoryStateId); + } catch (Exception ex) { + listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); + return; + } + final SnapshotInfo finalSnapshotInfo = snapshot; + final Collection unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values()); + unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values()); + try { + blobContainer().deleteBlobsIgnoringIfNotExists( + Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()))); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e); + } + deleteIndices( + Optional.ofNullable(finalSnapshotInfo) + .map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList())) + .orElse(Collections.emptyList()), + snapshotId, + ActionListener.map(listener, v -> { + try { + blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists( + unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); + } catch (IOException e) { + logger.warn(() -> + new ParameterizedMessage( + "[{}] indices {} are no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e); + } + return null; + }) + ); } + } - final RepositoryData repositoryData = getRepositoryData(); - SnapshotInfo snapshot = null; - try { - snapshot = getSnapshotInfo(snapshotId); - } catch (SnapshotMissingException ex) { - throw ex; - } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { - logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); + private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { + if (indices.isEmpty()) { + listener.onResponse(null); + return; } + final ActionListener groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size()); + for (IndexId indexId: indices) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(groupedListener) { - try { - // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots - final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); - writeIndexGen(updatedRepositoryData, repositoryStateId); - - // delete the snapshot file - deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - // delete the global metadata file - deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID()); - - // Now delete all indices - if (snapshot != null) { - final List indices = snapshot.indices(); - for (String index : indices) { - final IndexId indexId = repositoryData.resolveIndexId(index); - + @Override + protected void doRun() { IndexMetaData indexMetaData = null; try { indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); - } catch (ElasticsearchParseException | IOException ex) { + } catch (Exception ex) { logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); } - - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); - + deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId); if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { - delete(snapshotId, indexId, new ShardId(indexMetaData.getIndex(), shardId)); + final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); + new Context(snapshotId, indexId, sid, sid).delete(); } catch (SnapshotException ex) { final int finalShardId = shardId; logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, index, finalShardId), ex); + snapshotId, indexId.getName(), finalShardId), ex); } } } + groupedListener.onResponse(null); } - } - - // cleanup indices that are no longer part of the repository - final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); - indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); - final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); - try { - indicesBlobContainer.deleteBlobsIgnoringIfNotExists( - indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList())); - } catch (IOException ioe) { - // a different IOException occurred while trying to delete - will just log the issue for now - logger.warn(() -> - new ParameterizedMessage( - "[{}] indices {} are no longer part of any snapshots in the repository, " + - "but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe); - } - } catch (IOException | ResourceNotFoundException ex) { - throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex); - } - } - - private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { - try { - snapshotFormat.delete(blobContainer(), blobId); - } catch (IOException e) { - if (snapshotInfo != null) { - logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]", - snapshotInfo.snapshotId(), blobId), e); - } else { - logger.warn(() -> new ParameterizedMessage("Unable to delete snapshot file [{}]", blobId), e); - } + }); } } - private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { - try { - globalMetaDataFormat.delete(blobContainer(), blobId); - } catch (IOException e) { - if (snapshotInfo != null) { - logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]", - snapshotInfo.snapshotId(), blobId), e); - } else { - logger.warn(() -> new ParameterizedMessage("Unable to delete global metadata file [{}]", blobId), e); - } - } - } - - private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final IndexId indexId) { - final SnapshotId snapshotId = snapshotInfo.snapshotId(); + private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())); try { indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID()); @@ -904,17 +901,6 @@ public void verify(String seed, DiscoveryNode localNode) { } } - /** - * Delete shard snapshot - * - * @param snapshotId snapshot id - * @param shardId shard id - */ - private void delete(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { - Context context = new Context(snapshotId, indexId, shardId, shardId); - context.delete(); - } - @Override public String toString() { return "BlobStoreRepository[" + diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index a47ced0496d9b..5d30de8d6d897 100644 --- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -32,6 +32,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.function.Function; @@ -73,9 +74,9 @@ public class FsRepository extends BlobStoreRepository { /** * Constructs a shared file system repository. */ - public FsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) { - super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry); + public FsRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry, threadPool); this.environment = environment; String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); if (location.isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7ba53cb5d1e1c..d21f27c4f195c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -27,6 +27,7 @@ import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -1308,15 +1309,15 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable(listener) { + @Override + protected void doRun() { Repository repository = repositoriesService.repository(snapshot.getRepository()); - repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); - logger.info("snapshot [{}] deleted", snapshot); - - removeSnapshotDeletionFromClusterState(snapshot, null, listener); - } catch (Exception ex) { - removeSnapshotDeletionFromClusterState(snapshot, ex, listener); + repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> { + logger.info("snapshot [{}] deleted", snapshot); + removeSnapshotDeletionFromClusterState(snapshot, null, listener); + }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, listener) + )); } }); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index 96a9670d16202..cd31ce121b245 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -43,12 +43,14 @@ public class RepositoriesModuleTests extends ESTestCase { private RepositoryPlugin plugin1; private RepositoryPlugin plugin2; private Repository.Factory factory; + private ThreadPool threadPool; @Override public void setUp() throws Exception { super.setUp(); environment = mock(Environment.class); contentRegistry = mock(NamedXContentRegistry.class); + threadPool = mock(ThreadPool.class); plugin1 = mock(RepositoryPlugin.class); plugin2 = mock(RepositoryPlugin.class); factory = mock(Repository.Factory.class); @@ -58,43 +60,46 @@ public void setUp() throws Exception { } public void testCanRegisterTwoRepositoriesWithDifferentTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory)); // Would throw - new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry); + new RepositoriesModule( + environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry); } public void testCannotRegisterTwoRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { - when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - mock(ThreadPool.class), contentRegistry)); + threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index c02ab0d185610..981004f48efe8 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -149,8 +150,8 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { - + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 1b59f558db584..a904879321d58 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -180,7 +180,7 @@ public void testSnapshotWithConflictingName() throws IOException { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()) { + final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index a09560c54ce43..2abd623c496cb 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -67,12 +68,13 @@ protected Collection> getPlugins() { } // the reason for this plug-in is to drop any assertSnapshotOrGenericThread as mostly all access in this test goes from test threads - public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin { + public static class FsLikeRepoPlugin extends Plugin implements RepositoryPlugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap(REPO_TYPE, - (metadata) -> new FsRepository(metadata, env, namedXContentRegistry) { + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we access blobStore on test/main threads @@ -260,7 +262,7 @@ public void testFsRepositoryCompressDeprecated() { Environment useCompressEnvironment = new Environment(useCompressSettings, node().getEnvironment().configFile()); - new FsRepository(metaData, useCompressEnvironment, null); + new FsRepository(metaData, useCompressEnvironment, null, null); assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" + " See the breaking changes documentation for the next major version."); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index 13b74df4e3d2b..040f12c956696 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -187,8 +188,8 @@ public static class CountingMockRepository extends MockRepository { public CountingMockRepository(final RepositoryMetaData metadata, final Environment environment, - final NamedXContentRegistry namedXContentRegistry) throws IOException { - super(metadata, environment, namedXContentRegistry); + final NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(metadata, environment, namedXContentRegistry, threadPool); } @Override @@ -207,8 +208,10 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind /** A plugin that uses CountingMockRepository as implementation of the Repository **/ public static class CountingMockRepositoryPlugin extends MockRepository.Plugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("coutingmock", (metadata) -> new CountingMockRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("coutingmock", + metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, threadPool)); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index a30ac9bda5365..29bf9e0493e03 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -889,7 +889,7 @@ public void onFailure(final Exception e) { repositoriesService = new RepositoriesService( settings, clusterService, transportService, Collections.singletonMap(FsRepository.TYPE, metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) { + final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo in the test thread diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 8a49324757f27..9ce111e1d3011 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -42,6 +42,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -69,8 +70,9 @@ public static class Plugin extends org.elasticsearch.plugins.Plugin implements R @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry)); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry, threadPool)); } @Override @@ -113,8 +115,8 @@ public long getFailureCount() { private volatile boolean blocked = false; public MockRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry) throws IOException { - super(overrideSettings(metadata, environment), environment, namedXContentRegistry); + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry, threadPool); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 11bdfb7bcc741..bc60b4c194622 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -103,7 +104,8 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 7fa7f37f4b71d..3eda554a84bd4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -132,7 +132,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final CcrLicenseChecker ccrLicenseChecker; private final SetOnce restoreSourceService = new SetOnce<>(); private final SetOnce ccrSettings = new SetOnce<>(); - private final SetOnce threadPool = new SetOnce<>(); private Client client; private final boolean transportClientMode; @@ -177,7 +176,6 @@ public Collection createComponents( CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); this.ccrSettings.set(ccrSettings); - this.threadPool.set(threadPool); CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings); this.restoreSourceService.set(restoreSourceService); return Arrays.asList( @@ -326,9 +324,10 @@ public List> getExecutorBuilders(Settings settings) { } @Override - public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { Repository.Factory repositoryFactory = - (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get()); + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0b445a3eb01ef..5a0472339c192 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -260,7 +260,7 @@ public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indice } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index b57b648b76592..2038b35b4e6e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -357,7 +357,8 @@ default Optional getRequiredFeature() { } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java index 00b199eef4419..81be978d33103 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.io.IOException; @@ -72,7 +73,8 @@ protected Collection> getMockPlugins() { public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin { @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index ec1f002d05ba8..6a37e8265c096 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -328,7 +328,7 @@ private Environment createEnvironment() { private Repository createRepository() throws IOException { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool); } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 9b5414a6f8359..3f8b279e5016c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -394,17 +394,20 @@ public List> getPersistentTasksExecutor(ClusterServic } @Override - public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry)); - filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry))); + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry, threadPool)); + filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, threadPool))); return repositories; } @Override - public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - HashMap internalRepositories = new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry)); + public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ThreadPool threadPool) { + HashMap internalRepositories = + new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, threadPool)); filterPlugins(RepositoryPlugin.class).forEach(r -> - internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry))); + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, threadPool))); return internalRepositories; }