Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove bulk fallback for write thread pool #29609

Merged
merged 4 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/reference/migration/migrate_7_0/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,14 @@
requests with a single-document payload. This means that these requests are
executed on the bulk thread pool. As such, the indexing thread pool is no
longer needed and has been removed. As such, the settings
`thread_pool.index.size` and `thread_pool.index.queue_size` have been removed.
`thread_pool.index.size` and `thread_pool.index.queue_size` have been removed.

==== Bulk thread pool fallback

* The bulk thread pool was replaced by the write thread pool in 6.3.0. However,
for backwards compatibility reasons the name `bulk` was still usable as fallback
settings `thread_pool.bulk.size` and `thread_pool.bulk.queue_size` for
`thread_pool.write.size` and `thread_pool.write.queue_size`, respectively, and
the system property `es.thread_pool.write.use_bulk_as_display_name` was
available to keep the display output in APIs as `bulk` instead of `write`.
These fallback settings and this system property have been removed.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
Expand All @@ -39,9 +38,7 @@
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {

private final Setting<Integer> sizeSetting;
private final Setting<Integer> fallbackSizeSetting;
private final Setting<Integer> queueSizeSetting;
private final Setting<Integer> fallbackQueueSizeSetting;

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
Expand All @@ -55,19 +52,6 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
this(settings, name, size, queueSize, "thread_pool." + name);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param fallbackName the fallback name of the executor (used for transitioning the name of a setting)
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
*/
FixedExecutorBuilder(final Settings settings, final String name, final String fallbackName, final int size, final int queueSize) {
this(settings, name, fallbackName, size, queueSize, "thread_pool." + name, "thread_pool." + fallbackName);
}

/**
* Construct a fixed executor builder.
*
Expand All @@ -78,81 +62,21 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param prefix the prefix for the settings keys
*/
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
this(settings, name, null, size, queueSize, prefix, null);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
*/
private FixedExecutorBuilder(
final Settings settings,
final String name,
final String fallbackName,
final int size,
final int queueSize,
final String prefix,
final String fallbackPrefix) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
if (fallbackName == null) {
assert fallbackPrefix == null;
final Setting.Property[] properties = {Setting.Property.NodeScope};
this.sizeSetting = sizeSetting(settings, name, size, prefix, properties);
this.fallbackSizeSetting = null;
this.queueSizeSetting = queueSizeSetting(prefix, queueSize, properties);
this.fallbackQueueSizeSetting = null;
} else {
assert fallbackPrefix != null;
final Setting.Property[] properties = { Setting.Property.NodeScope };
final Setting.Property[] fallbackProperties = { Setting.Property.NodeScope, Setting.Property.Deprecated };
final Setting<Integer> fallbackSizeSetting = sizeSetting(settings, fallbackName, size, fallbackPrefix, fallbackProperties);
this.sizeSetting =
new Setting<>(
new Setting.SimpleKey(sizeKey),
fallbackSizeSetting,
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
this.fallbackSizeSetting = fallbackSizeSetting;
final Setting<Integer> fallbackQueueSizeSetting = queueSizeSetting(fallbackPrefix, queueSize, fallbackProperties);
this.queueSizeSetting =
new Setting<>(
new Setting.SimpleKey(queueSizeKey),
fallbackQueueSizeSetting,
s -> Setting.parseInt(s, Integer.MIN_VALUE, queueSizeKey),
properties);
this.fallbackQueueSizeSetting = fallbackQueueSizeSetting;
}
}

private Setting<Integer> sizeSetting(
final Settings settings, final String name, final int size, final String prefix, final Setting.Property[] properties) {
final String sizeKey = settingsKey(prefix, "size");
return new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
}

private Setting<Integer> queueSizeSetting(final String prefix, final int queueSize, final Setting.Property[] properties) {
return Setting.intSetting(settingsKey(prefix, "queue_size"), queueSize, properties);
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
}

@Override
public List<Setting<?>> getRegisteredSettings() {
if (fallbackSizeSetting == null && fallbackQueueSizeSetting == null) {
return Arrays.asList(sizeSetting, queueSizeSetting);
} else {
assert fallbackSizeSetting != null && fallbackQueueSizeSetting != null;
return Arrays.asList(sizeSetting, fallbackSizeSetting, queueSizeSetting, fallbackQueueSizeSetting);
}
return Arrays.asList(sizeSetting, queueSizeSetting);
}

@Override
Expand All @@ -170,14 +94,8 @@ ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final Thre
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor =
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
final String name;
if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) {
name = "bulk";
} else {
name = name();
}
final ThreadPool.Info info =
new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.bulk.size", 1)
.put("thread_pool.bulk.queue_size", 1)
.put("thread_pool.write.size", 1)
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
.build();
Expand Down