Skip to content

Commit

Permalink
Add per repository credentials
Browse files Browse the repository at this point in the history
Changed AwsS3Service to use one client per region and credentials combination.
Made S3Repository specify credentials if such exists in the repository settings.

Updated readme with repository specific credentials settings.

Closes #54.
Closes #55.
Closes #56.
(cherry picked from commit d4ea2dd)
  • Loading branch information
beiske authored and dadoonet committed Mar 26, 2014
1 parent 254fb81 commit 7f271fd
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 87 deletions.
15 changes: 12 additions & 3 deletions README.md
Expand Up @@ -119,6 +119,8 @@ The following settings are supported:
* `bucket`: The name of the bucket to be used for snapshots. (Mandatory)
* `region`: The region where bucket is located. Defaults to US Standard
* `base_path`: Specifies the path within bucket to repository data. Defaults to root directory.
* `access_key`: The access key to use for authentication. Defaults to value of `cloud.aws.access_key`.
* `secret_key`: The secret key to use for authentication. Defaults to value of `cloud.aws.secret_key`.
* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`.
Expand All @@ -131,11 +133,11 @@ The S3 repositories are using the same credentials as the rest of the S3 service
secret_key: vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br


Multiple S3 repositories can be created as long as they share the same credential.
Multiple S3 repositories can be created. If the buckets require different credentials, then define them as part of the repository settings.

## Testing

Integrations tests in this plugin require working AWS configuration and therefore disabled by default. To enable tests prepare a config file elasticsearch.yml with the following content:
Integrations tests in this plugin require working AWS configuration and therefore disabled by default. Three buckets and two iam users have to be created. The first iam user needs access to two buckets in different regions and the final bucket is exclusive for the other iam user. To enable tests prepare a config file elasticsearch.yml with the following content:

```
cloud:
Expand All @@ -147,10 +149,17 @@ repositories:
s3:
bucket: "bucket_name"
region: "us-west-2"
private-bucket:
bucket: <bucket not accessible by default key>
access_key: <access key>
secret_key: <access key>
remote-bucket:
bucket: <bucket in other region>
region: <region>
```

Replaces `access_key`, `secret_key`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified bucket.
Replace all occurrences of `access_key`, `secret_key`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified buckets.

To run test:

Expand Down
127 changes: 83 additions & 44 deletions src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.cloud.aws;

import java.util.HashMap;
import java.util.Map;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
Expand All @@ -27,6 +30,7 @@
import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -37,7 +41,10 @@
*/
public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {

private AmazonS3Client client;
/**
* (acceskey, endpoint) -> client
*/
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>();

@Inject
public AwsS3Service(Settings settings, SettingsFilter settingsFilter) {
Expand All @@ -47,6 +54,33 @@ public AwsS3Service(Settings settings, SettingsFilter settingsFilter) {
}

public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));

return getClient(endpoint, account, key);
}

public synchronized AmazonS3 client(String region, String account, String key) {
String endpoint;
if (region == null) {
endpoint = getDefaultEndpoint();
} else {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
if (account == null || key == null) {
account = componentSettings.get("access_key", settings.get("cloud.account"));
key = componentSettings.get("secret_key", settings.get("cloud.key"));
}

return getClient(endpoint, account, key);
}


private synchronized AmazonS3 getClient(String endpoint, String account, String key) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
return client;
}
Expand All @@ -60,8 +94,6 @@ public synchronized AmazonS3 client() {
} else {
throw new ElasticsearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));

String proxyHost = componentSettings.get("proxy_host");
if (proxyHost != null) {
Expand All @@ -88,53 +120,60 @@ public synchronized AmazonS3 client() {
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
);
}
this.client = new AmazonS3Client(credentials, clientConfiguration);
client = new AmazonS3Client(credentials, clientConfiguration);

if (endpoint != null) {
client.setEndpoint(endpoint);
}
clients.put(clientDescriptor, client);
return client;
}

private String getDefaultEndpoint() {
String endpoint = null;
if (componentSettings.get("s3.endpoint") != null) {
String endpoint = componentSettings.get("s3.endpoint");
endpoint = componentSettings.get("s3.endpoint");
logger.debug("using explicit s3 endpoint [{}]", endpoint);
client.setEndpoint(endpoint);
} else if (componentSettings.get("region") != null) {
String endpoint;
String region = componentSettings.get("region").toLowerCase();
if ("us-east".equals(region)) {
endpoint = "s3.amazonaws.com";
} else if ("us-east-1".equals(region)) {
endpoint = "s3.amazonaws.com";
} else if ("us-west".equals(region)) {
endpoint = "s3-us-west-1.amazonaws.com";
} else if ("us-west-1".equals(region)) {
endpoint = "s3-us-west-1.amazonaws.com";
} else if ("us-west-2".equals(region)) {
endpoint = "s3-us-west-2.amazonaws.com";
} else if ("ap-southeast".equals(region)) {
endpoint = "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-1".equals(region)) {
endpoint = "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-2".equals(region)) {
endpoint = "s3-ap-southeast-2.amazonaws.com";
} else if ("ap-northeast".equals(region)) {
endpoint = "s3-ap-northeast-1.amazonaws.com";
} else if ("ap-northeast-1".equals(region)) {
endpoint = "s3-ap-northeast-1.amazonaws.com";
} else if ("eu-west".equals(region)) {
endpoint = "s3-eu-west-1.amazonaws.com";
} else if ("eu-west-1".equals(region)) {
endpoint = "s3-eu-west-1.amazonaws.com";
} else if ("sa-east".equals(region)) {
endpoint = "s3-sa-east-1.amazonaws.com";
} else if ("sa-east-1".equals(region)) {
endpoint = "s3-sa-east-1.amazonaws.com";
} else {
throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
if (endpoint != null) {
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
client.setEndpoint(endpoint);
}
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
return endpoint;
}

return this.client;
private static String getEndpoint(String region) {
if ("us-east".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-east-1".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-west".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-1".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-2".equals(region)) {
return "s3-us-west-2.amazonaws.com";
} else if ("ap-southeast".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-1".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-2".equals(region)) {
return "s3-ap-southeast-2.amazonaws.com";
} else if ("ap-northeast".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("ap-northeast-1".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("eu-west".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("eu-west-1".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("sa-east".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else if ("sa-east-1".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else {
throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
}

@Override
Expand All @@ -147,7 +186,7 @@ protected void doStop() throws ElasticsearchException {

@Override
protected void doClose() throws ElasticsearchException {
if (client != null) {
for (AmazonS3Client client : clients.values()) {
client.shutdown();
}
}
Expand Down
Expand Up @@ -124,7 +124,7 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings,
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));

logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams);
blobStore = new S3BlobStore(settings, s3Service.client(), bucket, region, concurrentStreamPool);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, concurrentStreamPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
String basePath = repositorySettings.settings().get("base_path", null);
Expand Down

0 comments on commit 7f271fd

Please sign in to comment.