Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ protected S3Repository createRepository(
) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) {
@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// eliminate thread name check as we create repo manually on test/main threads
Comment on lines +270 to 271
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure these overrides are needed any more? The check in S3Repository permits things to happen on the main test thread these days.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would appear that deleting the override from BlobStoreRepositoryRestoreTests works locally.

I'm not inclined to make code changes in this patch, but I'll take a look 👍

}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) {
S3RepositoriesMetrics.NOOP
) {
@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private URLRepository createRepository(Settings baseSettings, RepositoryMetadata
mock(URLHttpClient.Factory.class)
) {
@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A collection of static methods to help create different ES Executor types.
*/
public class EsExecutors {

// although the available processors may technically change, for node sizing we use the number available at launch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ protected BlobStore getBlobStore() {
* maintains single lazy instance of {@link BlobContainer}
*/
protected BlobContainer blobContainer() {
assertSnapshotOrGenericThread();
assertSnapshotOrStatelessPermittedThreadPool();

if (lifecycle.started() == false) {
throw notStartedException();
Expand All @@ -705,7 +705,7 @@ protected BlobContainer blobContainer() {
* Public for testing.
*/
public BlobStore blobStore() {
assertSnapshotOrGenericThread();
assertSnapshotOrStatelessPermittedThreadPool();

BlobStore store = blobStore.get();
if (store == null) {
Expand Down Expand Up @@ -1994,7 +1994,7 @@ public long getRestoreThrottleTimeInNanos() {
return restoreRateLimitingTimeInNanos.count();
}

protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// The Stateless plugin adds custom thread pools for object store operations
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SNAPSHOT,
Expand Down Expand Up @@ -3539,7 +3539,7 @@ public IndexShardSnapshotStatus.Copy getShardSnapshotStatus(SnapshotId snapshotI

@Override
public void verify(String seed, DiscoveryNode localNode) {
assertSnapshotOrGenericThread();
assertSnapshotOrStatelessPermittedThreadPool();
if (isReadOnly()) {
try {
latestIndexBlobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

/**
* A builder for fixed executors.
*
* Builds an Executor with a static number of threads, as opposed to {@link ScalingExecutorBuilder} that dynamically scales the number of
* threads in the pool up and down based on request load.
*/
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

/**
* A builder for scaling executors.
*
* The {@link #build} method will instantiate a java {@link ExecutorService} thread pool that starts with the specified minimum number of
* threads and then scales up to the specified max number of threads as needed for excess work, scaling back when the burst of activity
* stops. As opposed to the {@link FixedExecutorBuilder} that keeps a fixed number of threads alive.
*/
public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecutorBuilder.ScalingExecutorSettings> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,28 @@
import static java.util.Map.entry;
import static org.elasticsearch.core.Strings.format;

/**
* Manages all the Java thread pools we create. {@link Names} contains a list of the thread pools, but plugins can dynamically add more
* thread pools to instantiate.
*/
public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {

private static final Logger logger = LogManager.getLogger(ThreadPool.class);

/**
* List of names that identify Java thread pools that are created in {@link ThreadPool#ThreadPool}.
*/
public static class Names {
/**
* All the tasks that do not relate to the purpose of one of the other thread pools should use this thread pool. Try to pick one of
* the other more specific thread pools where possible.
*/
public static final String GENERIC = "generic";
/**
* Important management tasks that keep the cluster from falling apart.
* This thread pool ensures cluster coordination tasks do not get blocked by less critical tasks and can continue to make progress.
* This thread pool also defaults to a single thread, reducing contention on the Coordinator mutex.
*/
public static final String CLUSTER_COORDINATION = "cluster_coordination";
public static final String GET = "get";
public static final String ANALYZE = "analyze";
Expand All @@ -75,6 +91,10 @@ public static class Names {
public static final String SEARCH_COORDINATION = "search_coordination";
public static final String AUTO_COMPLETE = "auto_complete";
public static final String SEARCH_THROTTLED = "search_throttled";
/**
* Cluster management tasks. Tasks that manage data, and tasks that report on cluster health via statistics etc.
* Not a latency sensitive thread pool: some tasks may time be long-running; and the thread pool size is limited / relatively small.
*/
public static final String MANAGEMENT = "management";
public static final String FLUSH = "flush";
public static final String REFRESH = "refresh";
Expand Down Expand Up @@ -199,6 +219,13 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

/**
* Defines and builds the many thread pools delineated in {@link Names}.
*
* @param settings
* @param meterRegistry
* @param customBuilders a list of additional thread pool builders that were defined elsewhere (like a Plugin).
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public ThreadPool(final Settings settings, MeterRegistry meterRegistry, final ExecutorBuilder<?>... customBuilders) {
assert Node.NODE_NAME_SETTING.exists(settings);
Expand Down Expand Up @@ -327,6 +354,7 @@ public ThreadPool(final Settings settings, MeterRegistry meterRegistry, final Ex

threadContext = new ThreadContext(settings);

// Now that all the thread pools have been defined, actually build them.
final Map<String, ExecutorHolder> executors = new HashMap<>();
for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
Expand Down Expand Up @@ -902,6 +930,11 @@ void check(long newAbsoluteMillis, long newRelativeNanos) {
}
}

/**
* Holds a thread pool and additional ES information ({@link Info}) about that Java thread pool ({@link ExecutorService}) instance.
*
* See {@link Names} for a list of thread pools, though there can be more dynamically added via plugins.
*/
static class ExecutorHolder {
private final ExecutorService executor;
public final Info info;
Expand All @@ -917,6 +950,9 @@ ExecutorService executor() {
}
}

/**
* The settings used to create a Java ExecutorService thread pool.
*/
public static class Info implements Writeable, ToXContentFragment {

private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private Repository createRepository() {
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
) {
@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// eliminate thread name check as we create repo manually
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2130,7 +2130,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
recoverySettings
) {
@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// eliminate thread name check as we create repo in the test thread
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ public Map<String, Repository.Factory> getRepositories(
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) {

@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
if (enabled.get()) {
super.assertSnapshotOrGenericThread();
super.assertSnapshotOrStatelessPermittedThreadPool();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public Map<String, Repository.Factory> getRepositories(
"test-fs",
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) {
@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// ignore
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ private void testDirectories(
) {

@Override
protected void assertSnapshotOrGenericThread() {
protected void assertSnapshotOrStatelessPermittedThreadPool() {
// eliminate thread name check as we create repo manually on test/main threads
}
};
Expand Down