Skip to content

Commit

Permalink
Switch to shared thread pool for snapshot operations
Browse files Browse the repository at this point in the history
Closes #87

(cherry picked from commit 36d4da6)
  • Loading branch information
imotov authored and dadoonet committed May 28, 2014
1 parent d597565 commit d322b10
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 19 deletions.
1 change: 0 additions & 1 deletion README.md
Expand Up @@ -96,7 +96,6 @@ The following settings are supported:
* `base_path`: Specifies the path within bucket to repository data. Defaults to root directory.
* `access_key`: The access key to use for authentication. Defaults to value of `cloud.aws.access_key`.
* `secret_key`: The secret key to use for authentication. Defaults to value of `cloud.aws.secret_key`.
* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`.

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-cloud-aws</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Elasticsearch AWS cloud plugin</name>
<description>The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.</description>
Expand Down Expand Up @@ -33,7 +33,7 @@

<properties>
<elasticsearch.version>2.0.0-SNAPSHOT</elasticsearch.version>
<lucene.version>4.8.0</lucene.version>
<lucene.version>4.8.1</lucene.version>
<tests.output>onerror</tests.output>
<tests.jvms>1</tests.jvms>
<tests.shuffle>true</tests.shuffle>
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.concurrent.Executor;
Expand All @@ -47,16 +48,16 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {

private final String region;

private final Executor executor;
private final ThreadPool threadPool;

private final int bufferSizeInBytes;

public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, Executor executor) {
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, ThreadPool threadPool) {
super(settings);
this.client = client;
this.bucket = bucket;
this.region = region;
this.executor = executor;
this.threadPool = threadPool;

this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();

Expand All @@ -83,7 +84,7 @@ public String bucket() {
}

public Executor executor() {
return executor;
return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
}

public int bufferSizeInBytes() {
Expand Down
Expand Up @@ -27,12 +27,12 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Locale;
Expand Down Expand Up @@ -74,7 +74,7 @@ public class S3Repository extends BlobStoreRepository {
* @throws IOException
*/
@Inject
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service, ThreadPool threadPool) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);

String bucket = repositorySettings.settings().get("bucket", componentSettings.get("bucket"));
Expand Down Expand Up @@ -120,11 +120,9 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings,
}
}
}
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));

logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, concurrentStreamPool);
logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, threadPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
String basePath = repositorySettings.settings().get("base_path", null);
Expand Down
Expand Up @@ -180,6 +180,7 @@ public void testRepositoryWithCustomCredentials() {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("region", bucketSettings.get("region"))
.put("access_key", bucketSettings.get("access_key"))
.put("secret_key", bucketSettings.get("secret_key"))
.put("bucket", bucketSettings.get("bucket"))
Expand Down Expand Up @@ -292,10 +293,11 @@ public void cleanRepositoryFiles(String basePath) {
settings.getByPrefix("repositories.s3.remote-bucket.")
};
for (Settings bucket : buckets) {
AmazonS3 client = cluster().getInstance(AwsS3Service.class).client(
bucket.get("region", settings.get("repositories.s3.region")),
bucket.get("access_key", settings.get("cloud.aws.access_key")),
bucket.get("secret_key", settings.get("cloud.aws.secret_key")));
String region = bucket.get("region", settings.get("repositories.s3.region"));
String accessKey = bucket.get("access_key", settings.get("cloud.aws.access_key"));
String secretKey = bucket.get("secret_key", settings.get("cloud.aws.secret_key"));
String bucketName = bucket.get("bucket");
AmazonS3 client = cluster().getInstance(AwsS3Service.class).client(region, accessKey, secretKey);
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
Expand All @@ -308,7 +310,7 @@ public void cleanRepositoryFiles(String basePath) {
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
list = client.listObjects(bucket.get("bucket"), basePath);
list = client.listObjects(bucketName, basePath);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
Expand All @@ -332,7 +334,7 @@ public void cleanRepositoryFiles(String basePath) {
client.deleteObjects(multiObjectDeleteRequest);
}
} catch (Throwable ex) {
logger.warn("Failed to delete S3 repository", ex);
logger.warn("Failed to delete S3 repository [{}] in [{}]", ex, bucketName, region);
}
}
}
Expand Down

0 comments on commit d322b10

Please sign in to comment.