Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadPool: make sure no leaking threads are left behind in case of initialization failure #11061

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.TransportSearchModule;
Expand Down Expand Up @@ -125,24 +126,34 @@ public TransportClient build() {

CompressorFactory.configure(this.settings);

ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new PluginsModule(this.settings, pluginsService));
modules.add(new EnvironmentModule(environment));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule());
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(this.settings));
modules.add(new TransportSearchModule());
modules.add(new TransportModule(this.settings));
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));

Injector injector = modules.createInjector();
injector.getInstance(TransportService.class).start();

return new TransportClient(injector);
final ThreadPool threadPool = new ThreadPool(settings);

boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new PluginsModule(this.settings, pluginsService));
modules.add(new EnvironmentModule(environment));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule());
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new TransportSearchModule());
modules.add(new TransportModule(this.settings));
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));

Injector injector = modules.createInjector();
injector.getInstance(TransportService.class).start();
TransportClient transportClient = new TransportClient(injector);
success = true;
return transportClient;
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.internal.NodeModule;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.percolator.PercolatorModule;
import org.elasticsearch.percolator.PercolatorService;
import org.elasticsearch.plugins.PluginsModule;
Expand Down Expand Up @@ -159,6 +160,8 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
throw new IllegalStateException("Failed to created node environment", ex);
}

final ThreadPool threadPool = new ThreadPool(settings);

boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
Expand All @@ -174,7 +177,7 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
modules.add(new EnvironmentModule(environment));
modules.add(new NodeEnvironmentModule(nodeEnvironment));
modules.add(new ClusterNameModule(settings));
modules.add(new ThreadPoolModule(settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new DiscoveryModule(settings));
modules.add(new ClusterModule(settings));
modules.add(new RestModule(settings));
Expand All @@ -198,10 +201,12 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
injector = modules.createInjector();

client = injector.getInstance(Client.class);
threadPool.setNodeSettingsService(injector.getInstance(NodeSettingsService.class));
success = true;
} finally {
if (!success) {
nodeEnvironment.close();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down
18 changes: 12 additions & 6 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Expand Up @@ -91,13 +91,14 @@ public static class Names {

private final EstimatedTimeThread estimatedTimeThread;

private boolean settingsListenerIsSet = false;


public ThreadPool(String name) {
this(ImmutableSettings.builder().put("name", name).build(), null);
this(ImmutableSettings.builder().put("name", name).build());
}

@Inject
public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsService) {
public ThreadPool(Settings settings) {
super(settings);

assert settings.get("name") != null : "ThreadPool's settings should contain a name";
Expand Down Expand Up @@ -148,15 +149,20 @@ public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsS
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduler.setRemoveOnCancelPolicy(true);
if (nodeSettingsService != null) {
nodeSettingsService.addListener(new ApplySettings());
}

TimeValue estimatedTimeInterval = settings.getAsTime("threadpool.estimated_time_interval", TimeValue.timeValueMillis(200));
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.estimatedTimeThread.start();
}

public void setNodeSettingsService(NodeSettingsService nodeSettingsService) {
if(settingsListenerIsSet) {
throw new IllegalStateException("the node settings listener was set more then once");
}
nodeSettingsService.addListener(new ApplySettings());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some safety that this method is called only once?

settingsListenerIsSet = true;
}

public long estimatedTimeInMillis() {
return estimatedTimeThread.estimatedTimeInMillis();
}
Expand Down
Expand Up @@ -27,14 +27,14 @@
*/
public class ThreadPoolModule extends AbstractModule {

private final Settings settings;
private final ThreadPool threadPool;

public ThreadPoolModule(Settings settings) {
this.settings = settings;
public ThreadPoolModule(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
protected void configure() {
bind(ThreadPool.class).asEagerSingleton();
bind(ThreadPool.class).toInstance(threadPool);
}
}
Expand Up @@ -77,7 +77,7 @@ public void setup() throws IOException {
injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new SettingsModule(settings),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new IndicesQueriesModule(),
new ScriptModule(settings),
new IndexSettingsModule(index, settings),
Expand Down
Expand Up @@ -70,7 +70,7 @@ public void testCustomInjection() throws InterruptedException {
Injector injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new SettingsModule(settings),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new IndicesQueriesModule(),
new ScriptModule(settings),
new IndexSettingsModule(index, settings),
Expand Down
Expand Up @@ -75,7 +75,7 @@ public void processXContentQueryParsers(XContentQueryParsersBindings bindings) {
Injector injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new SettingsModule(settings),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new IndicesQueriesModule(),
new ScriptModule(settings),
new IndexSettingsModule(index, settings),
Expand Down
Expand Up @@ -55,7 +55,7 @@ public void testNativeScript() throws InterruptedException {
.build();
Injector injector = new ModulesBuilder().add(
new EnvironmentModule(new Environment(settings)),
new ThreadPoolModule(settings),
new ThreadPoolModule(new ThreadPool(settings)),
new SettingsModule(settings),
new ScriptModule(settings)).createInjector();

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
Expand Down Expand Up @@ -191,6 +192,24 @@ public void run() {
}
}

@Test
public void testThreadPoolLeakingThreadsWithTribeNode() {
Settings settings = ImmutableSettings.builder()
.put("node.name", "thread_pool_leaking_threads_tribe_node")
.put("path.home", createTempDir())
.put("tribe.t1.cluster.name", "non_existing_cluster")
//trigger initialization failure of one of the tribes (doesn't require starting the node)
.put("tribe.t1.plugin.mandatory", "non_existing").build();

try {
NodeBuilder.nodeBuilder().settings(settings).build();
fail("The node startup is supposed to fail");
} catch(Throwable t) {
//all good
assertThat(t.getMessage(), containsString("mandatory plugins [non_existing]"));
}
}

private Map<String, Object> getPoolSettingsThroughJson(ThreadPoolInfo info, String poolName) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
Expand Down
Expand Up @@ -95,7 +95,7 @@ public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception {
@Test
public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build();
ThreadPool threadPool = new ThreadPool(settings, null);
ThreadPool threadPool = new ThreadPool(settings);
assertThat(threadPool.info("index").getQueueSize(), is(nullValue()));
terminate(threadPool);
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ public void testCachedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(
ImmutableSettings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testCachedExecutorType() throws InterruptedException {
public void testFixedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "fixed")
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));

Expand Down Expand Up @@ -170,7 +170,7 @@ public void testScalingExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.size", 10)
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testScalingExecutorType() throws InterruptedException {
public void testShutdownDownNowDoesntBlock() throws Exception {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("name","testCachedExecutorType").build(), null);
.put("name","testCachedExecutorType").build());

final CountDownLatch latch = new CountDownLatch(1);
Executor oldExecutor = threadPool.executor(Names.SEARCH);
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testCustomThreadPool() throws Exception {
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").build(), null);
.put("name", "testCustomThreadPool").build());

ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
Expand Down
Expand Up @@ -58,8 +58,8 @@ public class NettySizeHeaderFrameDecoderTests extends ElasticsearchTestCase {

@Before
public void startThreadPool() {
threadPool = new ThreadPool(settings, new NodeSettingsService(settings));

threadPool = new ThreadPool(settings);
threadPool.setNodeSettingsService(new NodeSettingsService(settings));
NetworkService networkService = new NetworkService(settings);
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT);
Expand Down