diff --git a/README.md b/README.md index 90eaad11..b8448551 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,11 @@ The following settings are supported: * `server_side_encryption`: When set to `true` files are encrypted on server side using AES256 algorithm. Defaults to `false`. * `buffer_size`: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, the S3 repository will use the [AWS Multipart Upload API](http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that positionning a buffer size lower than `5mb` is not allowed since it will prevents the use of the Multipart API and may result in upload errors. Defaults to `5mb`. * `max_retries`: Number of retries in case of S3 errors. Defaults to `3`. +* `use_throttle_retries`: Set to `true` if you want to throttle retries. Defaults to AWS SDK default value (`false`). + +Note that you can define S3 repository settings for all S3 repositories in `elasticsearch.yml` configuration file. +They are all prefixed with `repositories.s3.`. For example, you can define compression for all S3 repositories +by setting `repositories.s3.compress: true` in `elasticsearch.yml`. The S3 repositories are using the same credentials as the rest of the AWS services provided by this plugin (`discovery`). See [Generic Configuration](#generic-configuration) for details. diff --git a/pom.xml b/pom.xml index f0422c8e..3775685c 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ - 1.10.12 + 1.10.69 diff --git a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java index e5db2ed7..15fcef48 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java @@ -26,9 +26,6 @@ * */ public interface AwsS3Service extends LifecycleComponent { - AmazonS3 client(); - - AmazonS3 client(String endpoint, String protocol, String region, String account, String key); - - AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries); + AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries, + boolean useThrottleRetries); } diff --git a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java index e2f10b23..f43bcf13 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java @@ -55,21 +55,8 @@ public InternalAwsS3Service(Settings settings, SettingsFilter settingsFilter) { } @Override - public synchronized AmazonS3 client() { - String endpoint = getDefaultEndpoint(); - String account = componentSettings.get("access_key", settings.get("cloud.account")); - String key = componentSettings.get("secret_key", settings.get("cloud.key")); - - return getClient(endpoint, null, account, key, null); - } - - @Override - public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) { - return client(endpoint, protocol, region, account, key, null); - } - - @Override - public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) { + public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries, + boolean useThrottleRetries) { if (region != null && endpoint == null) { endpoint = getEndpoint(region); logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint); @@ -81,11 +68,12 @@ public synchronized AmazonS3 client(String endpoint, String protocol, String reg key = componentSettings.get("secret_key", settings.get("cloud.key")); } - return getClient(endpoint, protocol, account, key, maxRetries); + return getClient(endpoint, protocol, account, key, maxRetries, useThrottleRetries); } - private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries) { + private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries, + boolean useThrottleRetries) { Tuple clientDescriptor = new Tuple(endpoint, account); AmazonS3Client client = clients.get(clientDescriptor); if (client != null) { @@ -127,6 +115,7 @@ private synchronized AmazonS3 getClient(String endpoint, String protocol, String // If not explicitly set, default to 3 with exponential backoff policy clientConfiguration.setMaxErrorRetry(maxRetries); } + clientConfiguration.setUseThrottleRetries(useThrottleRetries); // #155: we might have 3rd party users using older S3 API version String awsSigner = settings.get("cloud.aws.s3.signer", settings.get("cloud.aws.signer")); diff --git a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index ac097683..92b34dcb 100644 --- a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.s3; +import com.amazonaws.ClientConfiguration; import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cloud.aws.blobstore.S3BlobStore; import org.elasticsearch.common.Strings; @@ -116,13 +117,16 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings, boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false)); ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null)); Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", componentSettings.getAsInt("max_retries", 3)); + boolean useThrottleRetries = repositorySettings.settings().getAsBoolean("use_throttle_retries", settings.getAsBoolean("repositories.s3.use_throttle_retries", ClientConfiguration.DEFAULT_THROTTLE_RETRIES)); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB))); this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); - logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]", - bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries); + logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], " + + "buffer_size [{}], max_retries [{}], use_throttle_retries [{}]", + bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, useThrottleRetries); - blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries); + blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), + repositorySettings.settings().get("secret_key"), maxRetries, useThrottleRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries); String basePath = repositorySettings.settings().get("base_path", null); if (Strings.hasLength(basePath)) { BlobPath path = new BlobPath(); diff --git a/src/test/java/org/elasticsearch/cloud/aws/AmazonS3Wrapper.java b/src/test/java/org/elasticsearch/cloud/aws/AmazonS3Wrapper.java index 1c3a5ea7..3909f9cb 100644 --- a/src/test/java/org/elasticsearch/cloud/aws/AmazonS3Wrapper.java +++ b/src/test/java/org/elasticsearch/cloud/aws/AmazonS3Wrapper.java @@ -122,6 +122,11 @@ public boolean doesBucketExist(String bucketName) throws AmazonClientException, return delegate.doesBucketExist(bucketName); } + @Override + public HeadBucketResult headBucket(HeadBucketRequest headBucketRequest) throws AmazonClientException, AmazonServiceException { + return delegate.headBucket(headBucketRequest); + } + @Override public List listBuckets() throws AmazonClientException, AmazonServiceException { return delegate.listBuckets(); @@ -172,6 +177,11 @@ public AccessControlList getObjectAcl(String bucketName, String key, String vers return delegate.getObjectAcl(bucketName, key, versionId); } + @Override + public AccessControlList getObjectAcl(GetObjectAclRequest getObjectAclRequest) throws AmazonClientException, AmazonServiceException { + return delegate.getObjectAcl(getObjectAclRequest); + } + @Override public void setObjectAcl(String bucketName, String key, AccessControlList acl) throws AmazonClientException, AmazonServiceException { delegate.setObjectAcl(bucketName, key, acl); @@ -277,6 +287,17 @@ public void deleteBucketReplicationConfiguration(String bucketName) throws Amazo delegate.deleteBucketReplicationConfiguration(bucketName); } + @Override + public void deleteBucketReplicationConfiguration(DeleteBucketReplicationConfigurationRequest request) throws AmazonServiceException, + AmazonClientException { + delegate.deleteBucketReplicationConfiguration(request); + } + + @Override + public boolean doesObjectExist(String bucketName, String objectName) throws AmazonServiceException, AmazonClientException { + return delegate.doesObjectExist(bucketName, objectName); + } + @Override public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException, AmazonServiceException { return delegate.putObject(putObjectRequest); diff --git a/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java b/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java index b8c9b2ee..256c20a9 100644 --- a/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java +++ b/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java @@ -38,20 +38,10 @@ public TestAwsS3Service(Settings settings, SettingsFilter settingsFilter) { super(settings, settingsFilter); } - - @Override - public synchronized AmazonS3 client() { - return cachedWrapper(super.client()); - } - - @Override - public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key) { - return cachedWrapper(super.client(endpoint, protocol, region, account, key)); - } - @Override - public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) { - return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries)); + public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries, + boolean useThrottleRetries) { + return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries, useThrottleRetries)); } private AmazonS3 cachedWrapper(AmazonS3 client) { diff --git a/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java b/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java index 4cab5316..1c6ae538 100644 --- a/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java +++ b/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java @@ -192,11 +192,12 @@ public void testEncryption() { Settings settings = internalCluster().getInstance(Settings.class); Settings bucket = settings.getByPrefix("repositories.s3."); AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client( - null, - null, - bucket.get("region", settings.get("repositories.s3.region")), - bucket.get("access_key", settings.get("cloud.aws.access_key")), - bucket.get("secret_key", settings.get("cloud.aws.secret_key"))); + null, + null, + bucket.get("region", settings.get("repositories.s3.region")), + bucket.get("access_key", settings.get("cloud.aws.access_key")), + bucket.get("secret_key", settings.get("cloud.aws.secret_key")), + null, randomBoolean()); String bucketName = bucket.get("bucket"); logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath); @@ -460,7 +461,7 @@ public void cleanRepositoryFiles(String basePath) { // We check that settings has been set in elasticsearch.yml integration test file // as described in README assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue()); - AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey); + AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey, null, randomBoolean()); try { ObjectListing prevListing = null; //From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html