Skip to content

Commit

Permalink
Deprecate the index thread pool (#29540)
Browse files Browse the repository at this point in the history
The index thread pool is no longer needed as its primary use-case for
single-document indexing requests has been relieved now that
single-document indexing requests are converted to bulk indexing
requests (with a single document payload).
  • Loading branch information
jasontedor committed Apr 17, 2018
1 parent 392a5f2 commit 451454d
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,28 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
private final Setting<Integer> queueSizeSetting;

/**
* Construct a fixed executor builder; the settings will have the
* key prefix "thread_pool." followed by the executor name.
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
*/
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
this(settings, name, size, queueSize, "thread_pool." + name);
this(settings, name, size, queueSize, false);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param deprecated whether or not the thread pool is deprecated
*/
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean deprecated) {
this(settings, name, size, queueSize, "thread_pool." + name, deprecated);
}

/**
Expand All @@ -63,17 +75,41 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
* @param prefix the prefix for the settings keys
*/
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
this(settings, name, size, queueSize, prefix, false);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
*/
private FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final String prefix,
final boolean deprecated) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
final Setting.Property[] properties;
if (deprecated) {
properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
} else {
properties = new Setting.Property[]{Setting.Property.NodeScope};
}
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
properties);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting =
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void testRejectedExecutionCounter() throws InterruptedException {

assertThat(counter, equalTo(rejections));
assertThat(stats(threadPool, threadPoolName).getRejected(), equalTo(rejections));

if (threadPoolName.equals(ThreadPool.Names.INDEX)) {
assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.index.queue_size", "thread_pool.index.size"});
}
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception {
}

public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
Settings settings = Settings.builder().put("node.name", "index").put("thread_pool.index.queue_size", "-1").build();
Settings settings = Settings.builder().put("node.name", "bulk").put("thread_pool.bulk.queue_size", "-1").build();
ThreadPool threadPool = new ThreadPool(settings);
assertThat(threadPool.info("index").getQueueSize(), is(nullValue()));
assertThat(threadPool.info("bulk").getQueueSize(), is(nullValue()));
terminate(threadPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void testIndexingThreadPoolsMaxSize() throws InterruptedException {
initial,
hasToString(containsString(
"Failed to parse value [" + tooBig + "] for setting [thread_pool." + name + ".size] must be ")));

if (name.equals(Names.INDEX)) {
assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.size" });
}
}

private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
Expand Down Expand Up @@ -116,6 +120,10 @@ public void testFixedExecutorType() throws InterruptedException {
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
// keep alive does not apply to fixed thread pools
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));

if (threadPoolName.equals(Names.INDEX)) {
assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.size" });
}
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
Expand Down Expand Up @@ -171,6 +179,10 @@ public void testShutdownNowInterrupts() throws Exception {
latch.await(3, TimeUnit.SECONDS); // if this throws then ThreadPool#shutdownNow did not interrupt
assertThat(oldExecutor.isShutdown(), equalTo(true));
assertThat(oldExecutor.isTerminating() || oldExecutor.isTerminated(), equalTo(true));

if (threadPoolName.equals(Names.INDEX)) {
assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.queue_size" });
}
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,14 @@ private void ensureNoWarnings() throws IOException {
* @param warnings other expected general deprecation warnings
*/
protected final void assertSettingDeprecationsAndWarnings(final Setting<?>[] settings, final String... warnings) {
assertSettingDeprecationsAndWarnings(Arrays.stream(settings).map(Setting::getKey).toArray(String[]::new), warnings);
}

protected final void assertSettingDeprecationsAndWarnings(final String[] settings, final String... warnings) {
assertWarnings(
Stream.concat(
Arrays
.stream(settings)
.map(Setting::getKey)
.map(k -> "[" + k + "] setting was deprecated in Elasticsearch and will be removed in a future release! " +
"See the breaking changes documentation for the next major version."),
Arrays.stream(warnings))
Expand Down

0 comments on commit 451454d

Please sign in to comment.