diff --git a/README.md b/README.md index c1feb28d6c614..64653034f9297 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,8 @@ The following settings are supported: * `bucket`: The name of the bucket to be used for snapshots. (Mandatory) * `region`: The region where bucket is located. Defaults to US Standard * `base_path`: Specifies the path within bucket to repository data. Defaults to root directory. +* `access_key`: The access key to use for authentication. Defaults to value of `cloud.aws.access_key`. +* `secret_key`: The secret key to use for authentication. Defaults to value of `cloud.aws.secret_key`. * `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`. * `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`. * `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`. @@ -131,11 +133,11 @@ The S3 repositories are using the same credentials as the rest of the S3 service secret_key: vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br -Multiple S3 repositories can be created as long as they share the same credential. +Multiple S3 repositories can be created. If the buckets require different credentials, then define them as part of the repository settings. ## Testing -Integrations tests in this plugin require working AWS configuration and therefore disabled by default. To enable tests prepare a config file elasticsearch.yml with the following content: +Integrations tests in this plugin require working AWS configuration and therefore disabled by default. Three buckets and two iam users have to be created. The first iam user needs access to two buckets in different regions and the final bucket is exclusive for the other iam user. To enable tests prepare a config file elasticsearch.yml with the following content: ``` cloud: @@ -147,10 +149,17 @@ repositories: s3: bucket: "bucket_name" region: "us-west-2" + private-bucket: + bucket: + access_key: + secret_key: + remote-bucket: + bucket: + region: ``` -Replaces `access_key`, `secret_key`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified bucket. +Replace all occurrences of `access_key`, `secret_key`, `bucket` and `region` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified buckets. To run test: diff --git a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java index 77c5c588b7125..80127db4a9c59 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java @@ -19,6 +19,9 @@ package org.elasticsearch.cloud.aws; +import java.util.HashMap; +import java.util.Map; + import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.*; @@ -27,6 +30,7 @@ import com.amazonaws.services.s3.AmazonS3Client; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -37,7 +41,10 @@ */ public class AwsS3Service extends AbstractLifecycleComponent { - private AmazonS3Client client; + /** + * (acceskey, endpoint) -> client + */ + private Map, AmazonS3Client> clients = new HashMap, AmazonS3Client>(); @Inject public AwsS3Service(Settings settings, SettingsFilter settingsFilter) { @@ -47,6 +54,33 @@ public AwsS3Service(Settings settings, SettingsFilter settingsFilter) { } public synchronized AmazonS3 client() { + String endpoint = getDefaultEndpoint(); + String account = componentSettings.get("access_key", settings.get("cloud.account")); + String key = componentSettings.get("secret_key", settings.get("cloud.key")); + + return getClient(endpoint, account, key); + } + + public synchronized AmazonS3 client(String region, String account, String key) { + String endpoint; + if (region == null) { + endpoint = getDefaultEndpoint(); + } else { + endpoint = getEndpoint(region); + logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint); + } + if (account == null || key == null) { + account = componentSettings.get("access_key", settings.get("cloud.account")); + key = componentSettings.get("secret_key", settings.get("cloud.key")); + } + + return getClient(endpoint, account, key); + } + + + private synchronized AmazonS3 getClient(String endpoint, String account, String key) { + Tuple clientDescriptor = new Tuple(endpoint, account); + AmazonS3Client client = clients.get(clientDescriptor); if (client != null) { return client; } @@ -60,8 +94,6 @@ public synchronized AmazonS3 client() { } else { throw new ElasticsearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]"); } - String account = componentSettings.get("access_key", settings.get("cloud.account")); - String key = componentSettings.get("secret_key", settings.get("cloud.key")); String proxyHost = componentSettings.get("proxy_host"); if (proxyHost != null) { @@ -88,53 +120,60 @@ public synchronized AmazonS3 client() { new StaticCredentialsProvider(new BasicAWSCredentials(account, key)) ); } - this.client = new AmazonS3Client(credentials, clientConfiguration); + client = new AmazonS3Client(credentials, clientConfiguration); + if (endpoint != null) { + client.setEndpoint(endpoint); + } + clients.put(clientDescriptor, client); + return client; + } + + private String getDefaultEndpoint() { + String endpoint = null; if (componentSettings.get("s3.endpoint") != null) { - String endpoint = componentSettings.get("s3.endpoint"); + endpoint = componentSettings.get("s3.endpoint"); logger.debug("using explicit s3 endpoint [{}]", endpoint); - client.setEndpoint(endpoint); } else if (componentSettings.get("region") != null) { - String endpoint; String region = componentSettings.get("region").toLowerCase(); - if ("us-east".equals(region)) { - endpoint = "s3.amazonaws.com"; - } else if ("us-east-1".equals(region)) { - endpoint = "s3.amazonaws.com"; - } else if ("us-west".equals(region)) { - endpoint = "s3-us-west-1.amazonaws.com"; - } else if ("us-west-1".equals(region)) { - endpoint = "s3-us-west-1.amazonaws.com"; - } else if ("us-west-2".equals(region)) { - endpoint = "s3-us-west-2.amazonaws.com"; - } else if ("ap-southeast".equals(region)) { - endpoint = "s3-ap-southeast-1.amazonaws.com"; - } else if ("ap-southeast-1".equals(region)) { - endpoint = "s3-ap-southeast-1.amazonaws.com"; - } else if ("ap-southeast-2".equals(region)) { - endpoint = "s3-ap-southeast-2.amazonaws.com"; - } else if ("ap-northeast".equals(region)) { - endpoint = "s3-ap-northeast-1.amazonaws.com"; - } else if ("ap-northeast-1".equals(region)) { - endpoint = "s3-ap-northeast-1.amazonaws.com"; - } else if ("eu-west".equals(region)) { - endpoint = "s3-eu-west-1.amazonaws.com"; - } else if ("eu-west-1".equals(region)) { - endpoint = "s3-eu-west-1.amazonaws.com"; - } else if ("sa-east".equals(region)) { - endpoint = "s3-sa-east-1.amazonaws.com"; - } else if ("sa-east-1".equals(region)) { - endpoint = "s3-sa-east-1.amazonaws.com"; - } else { - throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]"); - } - if (endpoint != null) { - logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint); - client.setEndpoint(endpoint); - } + endpoint = getEndpoint(region); + logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint); } + return endpoint; + } - return this.client; + private static String getEndpoint(String region) { + if ("us-east".equals(region)) { + return "s3.amazonaws.com"; + } else if ("us-east-1".equals(region)) { + return "s3.amazonaws.com"; + } else if ("us-west".equals(region)) { + return "s3-us-west-1.amazonaws.com"; + } else if ("us-west-1".equals(region)) { + return "s3-us-west-1.amazonaws.com"; + } else if ("us-west-2".equals(region)) { + return "s3-us-west-2.amazonaws.com"; + } else if ("ap-southeast".equals(region)) { + return "s3-ap-southeast-1.amazonaws.com"; + } else if ("ap-southeast-1".equals(region)) { + return "s3-ap-southeast-1.amazonaws.com"; + } else if ("ap-southeast-2".equals(region)) { + return "s3-ap-southeast-2.amazonaws.com"; + } else if ("ap-northeast".equals(region)) { + return "s3-ap-northeast-1.amazonaws.com"; + } else if ("ap-northeast-1".equals(region)) { + return "s3-ap-northeast-1.amazonaws.com"; + } else if ("eu-west".equals(region)) { + return "s3-eu-west-1.amazonaws.com"; + } else if ("eu-west-1".equals(region)) { + return "s3-eu-west-1.amazonaws.com"; + } else if ("sa-east".equals(region)) { + return "s3-sa-east-1.amazonaws.com"; + } else if ("sa-east-1".equals(region)) { + return "s3-sa-east-1.amazonaws.com"; + } else { + throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]"); + } } @Override @@ -147,7 +186,7 @@ protected void doStop() throws ElasticsearchException { @Override protected void doClose() throws ElasticsearchException { - if (client != null) { + for (AmazonS3Client client : clients.values()) { client.shutdown(); } } diff --git a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 3f47f589a216f..b9595a3abfb2a 100644 --- a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -124,7 +124,7 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings, ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[s3_stream]")); logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams); - blobStore = new S3BlobStore(settings, s3Service.client(), bucket, region, concurrentStreamPool); + blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, concurrentStreamPool); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB))); this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); String basePath = repositorySettings.settings().get("base_path", null); diff --git a/src/test/java/org/elasticsearch/repositories/s3/S3SnapshotRestoreTest.java b/src/test/java/org/elasticsearch/repositories/s3/S3SnapshotRestoreTest.java index c057149f66403..1484c88c1d51a 100644 --- a/src/test/java/org/elasticsearch/repositories/s3/S3SnapshotRestoreTest.java +++ b/src/test/java/org/elasticsearch/repositories/s3/S3SnapshotRestoreTest.java @@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; + import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -33,6 +34,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; @@ -50,7 +52,7 @@ /** */ @AwsTest -@ClusterScope(scope = Scope.TEST, numNodes = 2) +@ClusterScope(scope = Scope.SUITE, numNodes = 2) public class S3SnapshotRestoreTest extends AbstractAwsTest { @Override @@ -151,6 +153,117 @@ public void testSimpleWorkflow() { assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); } + /** + * This test verifies that the test configuration is set up in a manner that + * does not make the test {@link #testRepositoryWithCustomCredentials()} pointless. + */ + @Test(expected = UncategorizedExecutionException.class) + public void assertRepositoryWithCustomCredentialsIsNotAccessibleByDefaultCredentials() { + Client client = client(); + Settings bucketSettings = cluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket."); + logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("s3").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("bucket", bucketSettings.get("bucket")) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + assertRepositoryIsOperational(client, "test-repo"); + } + + @Test + public void testRepositoryWithCustomCredentials() { + Client client = client(); + Settings bucketSettings = cluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket."); + logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("s3").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("access_key", bucketSettings.get("access_key")) + .put("secret_key", bucketSettings.get("secret_key")) + .put("bucket", bucketSettings.get("bucket")) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + assertRepositoryIsOperational(client, "test-repo"); + } + + /** + * This test verifies that the test configuration is set up in a manner that + * does not make the test {@link #testRepositoryInRemoteRegion()} pointless. + */ + @Test(expected = UncategorizedExecutionException.class) + public void assertRepositoryInRemoteRegionIsRemote() { + Client client = client(); + Settings bucketSettings = cluster().getInstance(Settings.class).getByPrefix("repositories.s3.remote-bucket."); + logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("s3").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("bucket", bucketSettings.get("bucket")) +// Below setting intentionally omitted to assert bucket is not available in default region. +// .put("region", privateBucketSettings.get("region")) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + assertRepositoryIsOperational(client, "test-repo"); + } + + @Test + public void testRepositoryInRemoteRegion() { + Client client = client(); + Settings settings = cluster().getInstance(Settings.class); + Settings bucketSettings = settings.getByPrefix("repositories.s3.remote-bucket."); + logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("s3").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("bucket", bucketSettings.get("bucket")) + .put("region", bucketSettings.get("region")) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + assertRepositoryIsOperational(client, "test-repo"); + } + + private void assertRepositoryIsOperational(Client client, String repository) { + createIndex("test-idx-1"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repository, "test-snap").setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client.admin().cluster().prepareGetSnapshots(repository).setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> delete some data"); + for (int i = 0; i < 50; i++) { + client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get(); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L)); + + logger.info("--> close indices"); + client.admin().indices().prepareClose("test-idx-1").get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repository, "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + } + + /** * Deletes repositories, supports wildcard notation. */ @@ -172,45 +285,55 @@ public static void wipeRepositories(String... repositories) { * Deletes content of the repository files in the bucket */ public void cleanRepositoryFiles(String basePath) { - String bucket = cluster().getInstance(Settings.class).get("repositories.s3.bucket"); - AmazonS3 client = cluster().getInstance(AwsS3Service.class).client(); - try { - ObjectListing prevListing = null; - //From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html - //we can do at most 1K objects per delete - //We don't know the bucket name until first object listing - DeleteObjectsRequest multiObjectDeleteRequest = null; - ArrayList keys = new ArrayList(); - while (true) { - ObjectListing list; - if (prevListing != null) { - list = client.listNextBatchOfObjects(prevListing); - } else { - list = client.listObjects(bucket, basePath); - multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName()); - } - for (S3ObjectSummary summary : list.getObjectSummaries()) { - keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); - //Every 500 objects batch the delete request - if (keys.size() > 500) { - multiObjectDeleteRequest.setKeys(keys); - client.deleteObjects(multiObjectDeleteRequest); + Settings settings = cluster().getInstance(Settings.class); + Settings[] buckets = { + settings.getByPrefix("repositories.s3."), + settings.getByPrefix("repositories.s3.private-bucket."), + settings.getByPrefix("repositories.s3.remote-bucket.") + }; + for (Settings bucket : buckets) { + AmazonS3 client = cluster().getInstance(AwsS3Service.class).client( + bucket.get("region", settings.get("repositories.s3.region")), + bucket.get("access_key", settings.get("cloud.aws.access_key")), + bucket.get("secret_key", settings.get("cloud.aws.secret_key"))); + try { + ObjectListing prevListing = null; + //From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html + //we can do at most 1K objects per delete + //We don't know the bucket name until first object listing + DeleteObjectsRequest multiObjectDeleteRequest = null; + ArrayList keys = new ArrayList(); + while (true) { + ObjectListing list; + if (prevListing != null) { + list = client.listNextBatchOfObjects(prevListing); + } else { + list = client.listObjects(bucket.get("bucket"), basePath); multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName()); - keys.clear(); + } + for (S3ObjectSummary summary : list.getObjectSummaries()) { + keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + //Every 500 objects batch the delete request + if (keys.size() > 500) { + multiObjectDeleteRequest.setKeys(keys); + client.deleteObjects(multiObjectDeleteRequest); + multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName()); + keys.clear(); + } + } + if (list.isTruncated()) { + prevListing = list; + } else { + break; } } - if (list.isTruncated()) { - prevListing = list; - } else { - break; + if (!keys.isEmpty()) { + multiObjectDeleteRequest.setKeys(keys); + client.deleteObjects(multiObjectDeleteRequest); } + } catch (Throwable ex) { + logger.warn("Failed to delete S3 repository", ex); } - if (!keys.isEmpty()) { - multiObjectDeleteRequest.setKeys(keys); - client.deleteObjects(multiObjectDeleteRequest); - } - } catch (Throwable ex) { - logger.warn("Failed to delete S3 repository", ex); } } } diff --git a/src/test/resources/elasticsearch.yml b/src/test/resources/elasticsearch.yml index cf2dfeed33ef6..a7200fa3ae3c4 100644 --- a/src/test/resources/elasticsearch.yml +++ b/src/test/resources/elasticsearch.yml @@ -1,10 +1,21 @@ # Replace this access_key / secret_key and bucket name with your own if you want # to run tests. -#cloud: +# cloud: # aws: -# access_key: AKVAIQBF2RECL7FJWGJQ -# secret_key: vExyMThREXeRMm/b/LRzEB8jWwvzQeXgjqMX+6br +# access_key: +# secret_key: # #discovery: # type: ec2 - +# +#repositories: +# s3: +# bucket: +# region: +# private-bucket: +# bucket: +# access_key: +# secret_key: +# remote-bucket: +# bucket: +# region: \ No newline at end of file