Skip to content

Commit

Permalink
Add a listener thread pool
Browse files Browse the repository at this point in the history
Today, when executing an action (mainly when using the Java API), a listener threaded flag can be set to true in order to execute the listener on a different thread pool. Today, this thread pool is the generic thread pool, which is cached. This can create problems for Java clients (mainly) around potential thread explosion.
Introduce a new thread pool called listener, that is fixed sized and defaults to the half the cores maxed at 10, and use it where listeners are executed.
relates to #5152
closes #7837
  • Loading branch information
kimchy committed Sep 23, 2014
1 parent ca86e1c commit b761612
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 38 deletions.
4 changes: 4 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Expand Up @@ -49,6 +49,10 @@ pools, but the important ones include:
For refresh operations, defaults to `scaling`
with a `5m` keep-alive.

`listener`::
Mainly for java client executing of action when listener threaded is set to true
size `(# of available processors)/2` max at 10.

Changing a specific thread pool can be done by setting its type and
specific type parameters, for example, changing the `index` thread pool
to have more threads:
Expand Down
Expand Up @@ -30,9 +30,4 @@ public interface ListenableActionFuture<T> extends ActionFuture<T> {
* Add an action listener to be invoked when a response has received.
*/
void addListener(final ActionListener<T> listener);

/**
* Add an action listener (runnable) to be invoked when a response has received.
*/
void addListener(final Runnable listener);
}
Expand Up @@ -63,7 +63,7 @@ public Response newInstance() {
@Override
public String executor() {
if (request.listenerThreaded()) {
return ThreadPool.Names.GENERIC;
return ThreadPool.Names.LISTENER;
}
return ThreadPool.Names.SAME;
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
Expand All @@ -33,11 +34,8 @@
public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> {

final boolean listenerThreaded;

final ThreadPool threadPool;

volatile Object listeners;

boolean executedListeners = false;

protected AbstractListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) {
Expand All @@ -57,11 +55,7 @@ public void addListener(final ActionListener<T> listener) {
internalAddListener(listener);
}

public void addListener(final Runnable listener) {
internalAddListener(listener);
}

public void internalAddListener(Object listener) {
public void internalAddListener(ActionListener<T> listener) {
boolean executeImmediate = false;
synchronized (this) {
if (executedListeners) {
Expand Down Expand Up @@ -97,42 +91,36 @@ protected void done() {
if (listeners instanceof List) {
List list = (List) listeners;
for (Object listener : list) {
executeListener(listener);
executeListener((ActionListener<T>) listener);
}
} else {
executeListener(listeners);
executeListener((ActionListener<T>) listeners);
}
}
}

private void executeListener(final Object listener) {
private void executeListener(final ActionListener<T> listener) {
if (listenerThreaded) {
if (listener instanceof Runnable) {
threadPool.generic().execute((Runnable) listener);
} else {
threadPool.generic().execute(new Runnable() {
try {
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
@Override
public void run() {
ActionListener<T> lst = (ActionListener<T>) listener;
try {
lst.onResponse(actionGet());
listener.onResponse(actionGet());
} catch (ElasticsearchException e) {
lst.onFailure(e);
listener.onFailure(e);
}
}
});
} catch (EsRejectedExecutionException e) {
listener.onFailure(e);
}
} else {
if (listener instanceof Runnable) {
((Runnable) listener).run();
} else {
ActionListener<T> lst = (ActionListener<T>) listener;
try {
lst.onResponse(actionGet());
} catch (ElasticsearchException e) {
lst.onFailure(e);
}
try {
listener.onResponse(actionGet());
} catch (Throwable e) {
listener.onFailure(e);
}
}
}
}
}
Expand Up @@ -106,7 +106,7 @@ static final class ThreadedActionListener<Response> implements ActionListener<Re
@Override
public void onResponse(final Response response) {
try {
threadPool.generic().execute(new Runnable() {
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
@Override
public void run() {
try {
Expand All @@ -131,15 +131,15 @@ public void run() {
@Override
public void onFailure(final Throwable e) {
try {
threadPool.generic().execute(new Runnable() {
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
@Override
public void run() {
listener.onFailure(e);
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run threaded action, exectuion rejected for listener [{}] running on current thread", listener);
/* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait
logger.debug("Can not run threaded action, execution rejected for listener [{}] running on current thread", listener);
/* we don't care if that takes long since we are shutting down (or queue capacity). But if we not respond somebody could wait
* for the response on the listener side which could be a remote machine so make sure we push it out there.*/
listener.onFailure(e);
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Expand Up @@ -64,6 +64,7 @@ public class ThreadPool extends AbstractComponent {
public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
public static final String LISTENER = "listener";
public static final String GET = "get";
public static final String INDEX = "index";
public static final String BULK = "bulk";
Expand Down Expand Up @@ -117,6 +118,9 @@ public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsS
.put(Names.SUGGEST, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build())
.put(Names.PERCOLATE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build())
.put(Names.MANAGEMENT, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt5).put("queue_size", 100).build())
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
.put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build())
.put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.MERGE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build())
Expand Down

0 comments on commit b761612

Please sign in to comment.