Skip to content

Commit

Permalink
Core: Allow to configure custom thread pools
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Oct 31, 2014
1 parent af55479 commit 1a33a8d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 2 deletions.
31 changes: 30 additions & 1 deletion src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Expand Up @@ -132,7 +132,16 @@ public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsS
for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) {
executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue()));
}
executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same")));

// Building custom thread pools
for (Map.Entry<String, Settings> entry : groupSettings.entrySet()) {
if (executors.containsKey(entry.getKey())) {
continue;
}
executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), ImmutableSettings.EMPTY));
}

executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.directExecutor(), new Info(Names.SAME, "same")));
if (!executors.get(Names.GENERIC).info.getType().equals("cached")) {
throw new ElasticsearchIllegalArgumentException("generic thread pool must be of type cached");
}
Expand Down Expand Up @@ -420,6 +429,26 @@ public void updateSettings(Settings settings) {
}
}
}

// Building custom thread pools
for (Map.Entry<String, Settings> entry : groupSettings.entrySet()) {
if (defaultExecutorTypeSettings.containsKey(entry.getKey())) {
continue;
}

ExecutorHolder oldExecutorHolder = executors.get(entry.getKey());
ExecutorHolder newExecutorHolder = rebuild(entry.getKey(), oldExecutorHolder, entry.getValue(), ImmutableSettings.EMPTY);
// Can't introduce new thread pools at runtime, because The oldExecutorHolder variable will be null in the
// case the settings contains a thread pool not defined in the initial settings in the constructor. The if
// statement will then fail and so this prevents the addition of new thread groups at runtime, which is desired.
if (!newExecutorHolder.equals(oldExecutorHolder)) {
executors = newMapBuilder(executors).put(entry.getKey(), newExecutorHolder).immutableMap();
if (!oldExecutorHolder.executor.equals(newExecutorHolder.executor) && oldExecutorHolder.executor instanceof EsThreadPoolExecutor) {
retiredExecutors.add(oldExecutorHolder);
((EsThreadPoolExecutor) oldExecutorHolder.executor).shutdown(new ExecutorShutdownListener(oldExecutorHolder));
}
}
}
}

/**
Expand Down
Expand Up @@ -19,14 +19,15 @@

package org.elasticsearch.threadpool;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.junit.Test;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -228,4 +229,73 @@ public void run() {
latch.await();
}

@Test
public void testCustomThreadPool() throws Exception {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder()
.put("threadpool.my_pool1.type", "cached")
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").build(), null);

ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
boolean foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getType(), equalTo("fixed"));
assertThat(info.getMin(), equalTo(1));
assertThat(info.getMax(), equalTo(1));
assertThat(info.getQueueSize().singles(), equalTo(1l));
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));

// Updating my_pool2
Settings settings = ImmutableSettings.builder()
.put("threadpool.my_pool2.size", "10")
.build();
threadPool.updateSettings(settings);

groups = threadPool.info();
foundPool1 = false;
foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getMax(), equalTo(10));
assertThat(info.getMin(), equalTo(10));
assertThat(info.getQueueSize().singles(), equalTo(1l));
assertThat(info.getType(), equalTo("fixed"));
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
terminate(threadPool);
}

}

0 comments on commit 1a33a8d

Please sign in to comment.