Skip to content

Commit

Permalink
Shared Gateway: Allow to set the number of concurrent streams doing s…
Browse files Browse the repository at this point in the history
…napshot operations, closes elastic#621.
  • Loading branch information
kimchy committed Jan 11, 2011
1 parent 93dec72 commit a0a714e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
Expand Up @@ -19,26 +19,33 @@

package org.elasticsearch.gateway.fs;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.index.gateway.fs.FsIndexGatewayModule;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

/**
* @author kimchy (shay.banon)
*/
public class FsGateway extends BlobStoreGateway {

private final ExecutorService concurrentStreamPool;

@Inject public FsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException {
super(settings, clusterService, createIndexService);
Expand All @@ -51,7 +58,11 @@ public class FsGateway extends BlobStoreGateway {
} else {
gatewayFile = new File(location);
}
initialize(new FsBlobStore(componentSettings, threadPool.cached(), gatewayFile), clusterName, null);

int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));

initialize(new FsBlobStore(componentSettings, concurrentStreamPool, gatewayFile), clusterName, null);
}

@Override public String type() {
Expand All @@ -61,4 +72,9 @@ public class FsGateway extends BlobStoreGateway {
@Override public Class<? extends Module> suggestIndexGateway() {
return FsIndexGatewayModule.class;
}

@Override protected void doClose() throws ElasticSearchException {
super.doClose();
concurrentStreamPool.shutdown();
}
}
Expand Up @@ -31,17 +31,23 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.index.gateway.s3.S3IndexGatewayModule;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

/**
* @author kimchy (shay.banon)
*/
public class S3Gateway extends BlobStoreGateway {

private final ExecutorService concurrentStreamPool;

@Inject public S3Gateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException {
super(settings, clusterService, createIndexService);
Expand Down Expand Up @@ -76,13 +82,17 @@ public class S3Gateway extends BlobStoreGateway {
}
ByteSizeValue chunkSize = componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB));

logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize);
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));

logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams);

initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, threadPool.cached()), clusterName, chunkSize);
initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, concurrentStreamPool), clusterName, chunkSize);
}

@Override public void close() throws ElasticSearchException {
super.close();
@Override protected void doClose() throws ElasticSearchException {
super.doClose();
concurrentStreamPool.shutdown();
}

@Override public String type() {
Expand Down
Expand Up @@ -31,12 +31,16 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* @author kimchy (shay.banon)
Expand All @@ -47,6 +51,8 @@ public class HdfsGateway extends BlobStoreGateway {

private final FileSystem fileSystem;

private final ExecutorService concurrentStreamPool;

@Inject public HdfsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
ClusterName clusterName, ThreadPool threadPool) throws IOException {
super(settings, clusterService, createIndexService);
Expand All @@ -62,7 +68,10 @@ public class HdfsGateway extends BlobStoreGateway {
}
Path hPath = new Path(new Path(path), clusterName.value());

logger.debug("Using uri [{}], path [{}]", uri, hPath);
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));

logger.debug("Using uri [{}], path [{}], concurrent_streams [{}]", uri, hPath, concurrentStreams);

Configuration conf = new Configuration();
Settings hdfsSettings = settings.getByPrefix("hdfs.conf.");
Expand All @@ -72,7 +81,7 @@ public class HdfsGateway extends BlobStoreGateway {

fileSystem = FileSystem.get(URI.create(uri), conf);

initialize(new HdfsBlobStore(settings, fileSystem, threadPool.cached(), hPath), clusterName, null);
initialize(new HdfsBlobStore(settings, fileSystem, concurrentStreamPool, hPath), clusterName, null);
}

@Override public String type() {
Expand All @@ -92,5 +101,6 @@ public class HdfsGateway extends BlobStoreGateway {
// ignore
}
}
concurrentStreamPool.shutdown();
}
}

0 comments on commit a0a714e

Please sign in to comment.