Skip to content

Commit

Permalink
Switch to shared thread pool for all snapshot repositories
Browse files Browse the repository at this point in the history
 Closes #6181
  • Loading branch information
imotov committed May 16, 2014
1 parent c960301 commit a4d9479
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 36 deletions.
3 changes: 0 additions & 3 deletions docs/reference/modules/snapshots.asciidoc
Expand Up @@ -67,7 +67,6 @@ on all data and master nodes. The following settings are supported:
[horizontal]
`location`:: Location of the snapshots. Mandatory.
`compress`:: Turns on compression of the snapshot files. Defaults to `true`.
`concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`
`chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by
using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size).
`max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `20mb` per second.
Expand All @@ -83,8 +82,6 @@ point to the root of the shared filesystem repository. The following settings ar

[horizontal]
`url`:: Location of the snapshots. Mandatory.
`concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`


[float]
===== Repository plugins
Expand Down
14 changes: 12 additions & 2 deletions docs/reference/modules/threadpool.asciidoc
Expand Up @@ -36,13 +36,23 @@ pools, but the important ones include:
size `# of available processors`.
queue_size `1000`.

`warmer`::
`snapshot`::
For snapshot/restore operations, defaults to `scaling`
keep-alive `5m`,
size `(# of available processors)/2`.

`snapshot_data`::
For snapshot/restore operations on data files, defaults to `scaling`
with a `5m` keep-alive,
size `5`.

`warmer`::
For segment warm-up operations, defaults to `scaling`
with a `5m` keep-alive.

`refresh`::
For refresh operations, defaults to `scaling`
with a `5m` keep-alive.
with a `5m` keep-alive.

Changing a specific thread pool can be done by setting its type and
specific type parameters, for example, changing the `index` thread pool
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.File;
import java.util.concurrent.Executor;
Expand All @@ -37,15 +38,16 @@
*/
public class FsBlobStore extends AbstractComponent implements BlobStore {

private final Executor executor;
private final ThreadPool threadPool;

private final File path;

private final int bufferSizeInBytes;

public FsBlobStore(Settings settings, Executor executor, File path) {
public FsBlobStore(Settings settings, ThreadPool threadPool, File path) {
super(settings);
this.path = path;
this.threadPool = threadPool;
if (!path.exists()) {
boolean b = FileSystemUtils.mkdirs(path);
if (!b) {
Expand All @@ -56,7 +58,6 @@ public FsBlobStore(Settings settings, Executor executor, File path) {
throw new BlobStoreException("Path is not a directory at [" + path + "]");
}
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
this.executor = executor;
}

@Override
Expand All @@ -73,7 +74,7 @@ public int bufferSizeInBytes() {
}

public Executor executor() {
return executor;
return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
}

@Override
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -37,7 +38,7 @@
*/
public class URLBlobStore extends AbstractComponent implements BlobStore {

private final Executor executor;
private final ThreadPool threadPool;

private final URL path;

Expand All @@ -53,14 +54,14 @@ public class URLBlobStore extends AbstractComponent implements BlobStore {
* </dl>
*
* @param settings settings
* @param executor executor for read operations
* @param threadPool thread pool for read operations
* @param path base URL
*/
public URLBlobStore(Settings settings, Executor executor, URL path) {
public URLBlobStore(Settings settings, ThreadPool threadPool, URL path) {
super(settings);
this.path = path;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
this.executor = executor;
this.threadPool = threadPool;
}

/**
Expand Down Expand Up @@ -95,7 +96,7 @@ public int bufferSizeInBytes() {
* @return executor
*/
public Executor executor() {
return executor;
return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
}

/**
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState;
Expand All @@ -41,6 +40,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -274,7 +274,7 @@ public void run() {
return;
}
if (indexShard.state() == IndexShardState.STARTED && indexShard.translog().syncNeeded()) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
@Override
public void run() {
try {
Expand Down
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportModule;
Expand Down Expand Up @@ -223,6 +224,7 @@ public Node start() {
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(RiversManager.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
Expand Down Expand Up @@ -263,6 +265,7 @@ public Node stop() {

injector.getInstance(RiversManager.class).stop();

injector.getInstance(SnapshotsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
// we close indices first, so operations won't be allowed on it
Expand Down Expand Up @@ -317,6 +320,8 @@ public void close() {
stopWatch.stop().start("rivers");
injector.getInstance(RiversManager.class).close();

stopWatch.stop().start("snapshot_service");
injector.getInstance(SnapshotsService.class).close();
stopWatch.stop().start("client");
injector.getInstance(Client.class).close();
stopWatch.stop().start("indices_cluster");
Expand Down
Expand Up @@ -24,17 +24,15 @@
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Shared file system implementation of the BlobStoreRepository
Expand Down Expand Up @@ -68,7 +66,7 @@ public class FsRepository extends BlobStoreRepository {
* @throws IOException
*/
@Inject
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
public FsRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
File locationFile;
String location = repositorySettings.settings().get("location", componentSettings.get("location"));
Expand All @@ -78,9 +76,7 @@ public FsRepository(RepositoryName name, RepositorySettings repositorySettings,
} else {
locationFile = new File(location);
}
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
blobStore = new FsBlobStore(componentSettings, concurrentStreamPool, locationFile);
blobStore = new FsBlobStore(componentSettings, threadPool, locationFile);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
this.basePath = BlobPath.cleanPath();
Expand Down
Expand Up @@ -25,17 +25,15 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.url.URLBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Read-only URL-based implementation of the BlobStoreRepository
Expand Down Expand Up @@ -65,7 +63,7 @@ public class URLRepository extends BlobStoreRepository {
* @throws IOException
*/
@Inject
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
public URLRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
URL url;
String path = repositorySettings.settings().get("url", componentSettings.get("url"));
Expand All @@ -74,10 +72,8 @@ public URLRepository(RepositoryName name, RepositorySettings repositorySettings,
} else {
url = new URL(path);
}
int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
listDirectories = repositorySettings.settings().getAsBoolean("list_directories", componentSettings.getAsBoolean("list_directories", true));
blobStore = new URLBlobStore(componentSettings, concurrentStreamPool, url);
blobStore = new URLBlobStore(componentSettings, threadPool, url);
basePath = BlobPath.cleanPath();
}

Expand Down
Expand Up @@ -65,6 +65,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
ThreadPool.Names.REFRESH,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SNAPSHOT_DATA,
ThreadPool.Names.SUGGEST,
ThreadPool.Names.WARMER
};
Expand All @@ -82,6 +83,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
"r",
"s",
"sn",
"sd",
"su",
"w"
};
Expand Down
47 changes: 44 additions & 3 deletions src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Expand Up @@ -35,7 +35,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -56,6 +56,10 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Maps.newHashMap;
Expand All @@ -79,7 +83,7 @@
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId, SnapshotInfo, Throwable)} to remove snapshot from cluster state</li>
* </ul>
*/
public class SnapshotsService extends AbstractComponent implements ClusterStateListener {
public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsService> implements ClusterStateListener {

private final ClusterService clusterService;

Expand All @@ -93,6 +97,10 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL

private volatile ImmutableMap<SnapshotId, SnapshotShards> shardSnapshots = ImmutableMap.of();

private final Lock shutdownLock = new ReentrantLock();

private final Condition shutdownCondition = shutdownLock.newCondition();

private final CopyOnWriteArrayList<SnapshotCompletionListener> snapshotCompletionListeners = new CopyOnWriteArrayList<>();


Expand Down Expand Up @@ -678,7 +686,16 @@ private void processIndexShardSnapshots(SnapshotMetaData snapshotMetaData) {

// Update the list of snapshots that we saw and tried to started
// If startup of these shards fails later, we don't want to try starting these shards again
shardSnapshots = ImmutableMap.copyOf(survivors);
shutdownLock.lock();
try {
shardSnapshots = ImmutableMap.copyOf(survivors);
if (shardSnapshots.isEmpty()) {
// Notify all waiting threads that no more snapshots
shutdownCondition.signalAll();
}
} finally {
shutdownLock.unlock();
}

// We have new snapshots to process -
if (newSnapshots != null) {
Expand Down Expand Up @@ -1101,6 +1118,30 @@ public void removeListener(SnapshotCompletionListener listener) {
this.snapshotCompletionListeners.remove(listener);
}

@Override
protected void doStart() throws ElasticsearchException {

}

@Override
protected void doStop() throws ElasticsearchException {
shutdownLock.lock();
try {
while(!shardSnapshots.isEmpty() && shutdownCondition.await(5, TimeUnit.SECONDS)) {
// Wait for at most 5 second for locally running snapshots to finish
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
shutdownLock.unlock();
}
}

@Override
protected void doClose() throws ElasticsearchException {

}

/**
* Listener for create snapshot operation
*/
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Expand Up @@ -74,6 +74,7 @@ public static class Names {
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
public static final String SNAPSHOT_DATA = "snapshot_data";
public static final String OPTIMIZE = "optimize";
public static final String BENCH = "bench";
}
Expand Down Expand Up @@ -117,6 +118,7 @@ public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsS
.put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build())
.put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.SNAPSHOT_DATA, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build())
.put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build())
.put(Names.BENCH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.build();
Expand Down

0 comments on commit a4d9479

Please sign in to comment.