Skip to content

Commit

Permalink
Switch to shared thread pool for snapshot operations
Browse files Browse the repository at this point in the history
Closes elastic#87
  • Loading branch information
imotov committed May 28, 2014
1 parent 91b2522 commit f97cd68
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.concurrent.Executor;
Expand All @@ -47,16 +48,16 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {

private final String region;

private final Executor executor;
private final ThreadPool threadPool;

private final int bufferSizeInBytes;

public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, Executor executor) {
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, ThreadPool threadPool) {
super(settings);
this.client = client;
this.bucket = bucket;
this.region = region;
this.executor = executor;
this.threadPool = threadPool;

this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();

Expand All @@ -83,7 +84,7 @@ public String bucket() {
}

public Executor executor() {
return executor;
return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
}

public int bufferSizeInBytes() {
Expand Down
Expand Up @@ -27,12 +27,12 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Locale;
Expand Down Expand Up @@ -74,7 +74,7 @@ public class S3Repository extends BlobStoreRepository {
* @throws IOException
*/
@Inject
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service, ThreadPool threadPool) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);

String bucket = repositorySettings.settings().get("bucket", componentSettings.get("bucket"));
Expand Down Expand Up @@ -120,11 +120,9 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings,
}
}
}
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));

logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, concurrentStreamPool);
logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, threadPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
String basePath = repositorySettings.settings().get("base_path", null);
Expand Down

0 comments on commit f97cd68

Please sign in to comment.