Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ The following settings are supported:
* `server_side_encryption`: When set to `true` files are encrypted on server side using AES256 algorithm. Defaults to `false`.
* `buffer_size`: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, the S3 repository will use the [AWS Multipart Upload API](http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that positionning a buffer size lower than `5mb` is not allowed since it will prevents the use of the Multipart API and may result in upload errors. Defaults to `5mb`.
* `max_retries`: Number of retries in case of S3 errors. Defaults to `3`.
* `use_throttle_retries`: Set to `true` if you want to throttle retries. Defaults to AWS SDK default value (`false`).

Note that you can define S3 repository settings for all S3 repositories in `elasticsearch.yml` configuration file.
They are all prefixed with `repositories.s3.`. For example, you can define compression for all S3 repositories
by setting `repositories.s3.compress: true` in `elasticsearch.yml`.

The S3 repositories are using the same credentials as the rest of the AWS services provided by this plugin (`discovery`).
See [Generic Configuration](#generic-configuration) for details.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
</parent>

<properties>
<amazonaws.version>1.10.12</amazonaws.version>
<amazonaws.version>1.10.69</amazonaws.version>
</properties>

<dependencies>
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
*
*/
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
AmazonS3 client();

AmazonS3 client(String endpoint, String protocol, String region, String account, String key);

AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries);
AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries,
boolean useThrottleRetries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,8 @@ public InternalAwsS3Service(Settings settings, SettingsFilter settingsFilter) {
}

@Override
public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));

return getClient(endpoint, null, account, key, null);
}

@Override
public AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return client(endpoint, protocol, region, account, key, null);
}

@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries,
boolean useThrottleRetries) {
if (region != null && endpoint == null) {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
Expand All @@ -81,11 +68,12 @@ public synchronized AmazonS3 client(String endpoint, String protocol, String reg
key = componentSettings.get("secret_key", settings.get("cloud.key"));
}

return getClient(endpoint, protocol, account, key, maxRetries);
return getClient(endpoint, protocol, account, key, maxRetries, useThrottleRetries);
}


private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries) {
private synchronized AmazonS3 getClient(String endpoint, String protocol, String account, String key, Integer maxRetries,
boolean useThrottleRetries) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
Expand Down Expand Up @@ -127,6 +115,7 @@ private synchronized AmazonS3 getClient(String endpoint, String protocol, String
// If not explicitly set, default to 3 with exponential backoff policy
clientConfiguration.setMaxErrorRetry(maxRetries);
}
clientConfiguration.setUseThrottleRetries(useThrottleRetries);

// #155: we might have 3rd party users using older S3 API version
String awsSigner = settings.get("cloud.aws.s3.signer", settings.get("cloud.aws.signer"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.repositories.s3;

import com.amazonaws.ClientConfiguration;
import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -116,13 +117,16 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings,
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false));
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null));
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", componentSettings.getAsInt("max_retries", 3));
boolean useThrottleRetries = repositorySettings.settings().getAsBoolean("use_throttle_retries", settings.getAsBoolean("repositories.s3.use_throttle_retries", ClientConfiguration.DEFAULT_THROTTLE_RETRIES));
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));

logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries);
logger.debug("using bucket [{}], region [{}], endpoint [{}], protocol [{}], chunk_size [{}], server_side_encryption [{}], " +
"buffer_size [{}], max_retries [{}], use_throttle_retries [{}]",
bucket, region, endpoint, protocol, chunkSize, serverSideEncryption, bufferSize, maxRetries, useThrottleRetries);

blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, protocol, region, repositorySettings.settings().get("access_key"),
repositorySettings.settings().get("secret_key"), maxRetries, useThrottleRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
String basePath = repositorySettings.settings().get("base_path", null);
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/org/elasticsearch/cloud/aws/AmazonS3Wrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public boolean doesBucketExist(String bucketName) throws AmazonClientException,
return delegate.doesBucketExist(bucketName);
}

@Override
public HeadBucketResult headBucket(HeadBucketRequest headBucketRequest) throws AmazonClientException, AmazonServiceException {
return delegate.headBucket(headBucketRequest);
}

@Override
public List<Bucket> listBuckets() throws AmazonClientException, AmazonServiceException {
return delegate.listBuckets();
Expand Down Expand Up @@ -172,6 +177,11 @@ public AccessControlList getObjectAcl(String bucketName, String key, String vers
return delegate.getObjectAcl(bucketName, key, versionId);
}

@Override
public AccessControlList getObjectAcl(GetObjectAclRequest getObjectAclRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectAcl(getObjectAclRequest);
}

@Override
public void setObjectAcl(String bucketName, String key, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, acl);
Expand Down Expand Up @@ -277,6 +287,17 @@ public void deleteBucketReplicationConfiguration(String bucketName) throws Amazo
delegate.deleteBucketReplicationConfiguration(bucketName);
}

@Override
public void deleteBucketReplicationConfiguration(DeleteBucketReplicationConfigurationRequest request) throws AmazonServiceException,
AmazonClientException {
delegate.deleteBucketReplicationConfiguration(request);
}

@Override
public boolean doesObjectExist(String bucketName, String objectName) throws AmazonServiceException, AmazonClientException {
return delegate.doesObjectExist(bucketName, objectName);
}

@Override
public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException, AmazonServiceException {
return delegate.putObject(putObjectRequest);
Expand Down
16 changes: 3 additions & 13 deletions src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,10 @@ public TestAwsS3Service(Settings settings, SettingsFilter settingsFilter) {
super(settings, settingsFilter);
}


@Override
public synchronized AmazonS3 client() {
return cachedWrapper(super.client());
}

@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key));
}

@Override
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries));
public synchronized AmazonS3 client(String endpoint, String protocol, String region, String account, String key, Integer maxRetries,
boolean useThrottleRetries) {
return cachedWrapper(super.client(endpoint, protocol, region, account, key, maxRetries, useThrottleRetries));
}

private AmazonS3 cachedWrapper(AmazonS3 client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,12 @@ public void testEncryption() {
Settings settings = internalCluster().getInstance(Settings.class);
Settings bucket = settings.getByPrefix("repositories.s3.");
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(
null,
null,
bucket.get("region", settings.get("repositories.s3.region")),
bucket.get("access_key", settings.get("cloud.aws.access_key")),
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
null,
null,
bucket.get("region", settings.get("repositories.s3.region")),
bucket.get("access_key", settings.get("cloud.aws.access_key")),
bucket.get("secret_key", settings.get("cloud.aws.secret_key")),
null, randomBoolean());

String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
Expand Down Expand Up @@ -460,7 +461,7 @@ public void cleanRepositoryFiles(String basePath) {
// We check that settings has been set in elasticsearch.yml integration test file
// as described in README
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey);
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(endpoint, protocol, region, accessKey, secretKey, null, randomBoolean());
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
Expand Down