Skip to content

Commit

Permalink
Remove bulk fallback for write thread pool (#29609)
Browse files Browse the repository at this point in the history
The name of the bulk thread pool was renamed to "write" with "bulk" as a
fallback name. This change was made in 6.x for BWC reasons yet in 7.0.0
we are removing this fallback. This commit removes this fallback for the
write thread pool.
  • Loading branch information
jasontedor committed Apr 19, 2018
1 parent 113d1d3 commit 5d767e4
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 95 deletions.
3 changes: 3 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

=== Breaking Changes

<<write-thread-pool-fallback, Removed `thread_pool.bulk.*` settings and
`es.thread_pool.write.use_bulk_as_display_name` system property>> ({pull}29609[#29609])

=== Breaking Java Changes

=== Deprecations
Expand Down
13 changes: 12 additions & 1 deletion docs/reference/migration/migrate_7_0/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,15 @@
requests with a single-document payload. This means that these requests are
executed on the bulk thread pool. As such, the indexing thread pool is no
longer needed and has been removed. As such, the settings
`thread_pool.index.size` and `thread_pool.index.queue_size` have been removed.
`thread_pool.index.size` and `thread_pool.index.queue_size` have been removed.

[[write-thread-pool-fallback]]
==== Write thread pool fallback

* The bulk thread pool was replaced by the write thread pool in 6.3.0. However,
for backwards compatibility reasons the name `bulk` was still usable as fallback
settings `thread_pool.bulk.size` and `thread_pool.bulk.queue_size` for
`thread_pool.write.size` and `thread_pool.write.queue_size`, respectively, and
the system property `es.thread_pool.write.use_bulk_as_display_name` was
available to keep the display output in APIs as `bulk` instead of `write`.
These fallback settings and this system property have been removed.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
Expand All @@ -39,9 +38,7 @@
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> {

private final Setting<Integer> sizeSetting;
private final Setting<Integer> fallbackSizeSetting;
private final Setting<Integer> queueSizeSetting;
private final Setting<Integer> fallbackQueueSizeSetting;

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
Expand All @@ -55,19 +52,6 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
this(settings, name, size, queueSize, "thread_pool." + name);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param fallbackName the fallback name of the executor (used for transitioning the name of a setting)
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
*/
FixedExecutorBuilder(final Settings settings, final String name, final String fallbackName, final int size, final int queueSize) {
this(settings, name, fallbackName, size, queueSize, "thread_pool." + name, "thread_pool." + fallbackName);
}

/**
* Construct a fixed executor builder.
*
Expand All @@ -78,81 +62,21 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param prefix the prefix for the settings keys
*/
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
this(settings, name, null, size, queueSize, prefix, null);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
*/
private FixedExecutorBuilder(
final Settings settings,
final String name,
final String fallbackName,
final int size,
final int queueSize,
final String prefix,
final String fallbackPrefix) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
if (fallbackName == null) {
assert fallbackPrefix == null;
final Setting.Property[] properties = {Setting.Property.NodeScope};
this.sizeSetting = sizeSetting(settings, name, size, prefix, properties);
this.fallbackSizeSetting = null;
this.queueSizeSetting = queueSizeSetting(prefix, queueSize, properties);
this.fallbackQueueSizeSetting = null;
} else {
assert fallbackPrefix != null;
final Setting.Property[] properties = { Setting.Property.NodeScope };
final Setting.Property[] fallbackProperties = { Setting.Property.NodeScope, Setting.Property.Deprecated };
final Setting<Integer> fallbackSizeSetting = sizeSetting(settings, fallbackName, size, fallbackPrefix, fallbackProperties);
this.sizeSetting =
new Setting<>(
new Setting.SimpleKey(sizeKey),
fallbackSizeSetting,
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
this.fallbackSizeSetting = fallbackSizeSetting;
final Setting<Integer> fallbackQueueSizeSetting = queueSizeSetting(fallbackPrefix, queueSize, fallbackProperties);
this.queueSizeSetting =
new Setting<>(
new Setting.SimpleKey(queueSizeKey),
fallbackQueueSizeSetting,
s -> Setting.parseInt(s, Integer.MIN_VALUE, queueSizeKey),
properties);
this.fallbackQueueSizeSetting = fallbackQueueSizeSetting;
}
}

private Setting<Integer> sizeSetting(
final Settings settings, final String name, final int size, final String prefix, final Setting.Property[] properties) {
final String sizeKey = settingsKey(prefix, "size");
return new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
}

private Setting<Integer> queueSizeSetting(final String prefix, final int queueSize, final Setting.Property[] properties) {
return Setting.intSetting(settingsKey(prefix, "queue_size"), queueSize, properties);
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
}

@Override
public List<Setting<?>> getRegisteredSettings() {
if (fallbackSizeSetting == null && fallbackQueueSizeSetting == null) {
return Arrays.asList(sizeSetting, queueSizeSetting);
} else {
assert fallbackSizeSetting != null && fallbackQueueSizeSetting != null;
return Arrays.asList(sizeSetting, fallbackSizeSetting, queueSizeSetting, fallbackQueueSizeSetting);
}
return Arrays.asList(sizeSetting, queueSizeSetting);
}

@Override
Expand All @@ -170,14 +94,8 @@ ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final Thre
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor =
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
final String name;
if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) {
name = "bulk";
} else {
name = name();
}
final ThreadPool.Info info =
new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.bulk.size", 1)
.put("thread_pool.bulk.queue_size", 1)
.put("thread_pool.write.size", 1)
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
.build();
Expand Down

0 comments on commit 5d767e4

Please sign in to comment.