Skip to content

Commit

Permalink
[7.17] Better logging when EsThreadPoolExecutor throws unexpected exc…
Browse files Browse the repository at this point in the history
…eptions (#85217) (#85379)

* Better logging when EsThreadPoolExecutor throws unexpected exceptions (#85217)

We recently had issues with EsThreadPoolExecutor throwing unexpected
exceptions (in our case an AccessControlException) before the Runnable
or AbstractRunnable has been submitted for execution. In the case of
AbstractRunnable the task is not rejected and it caused some resources
to leak.

This commit catches unexpected exceptions thrown when calling the
EsThreadPoolExecutor#execute() method with AbstractRunnable and
logs a message. It also assert that this situation should never happen.

* fix
  • Loading branch information
tlrx committed Mar 28, 2022
1 parent de7261d commit 216fefc
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.core.SuppressForbidden;

import java.util.concurrent.BlockingQueue;
Expand All @@ -22,6 +25,8 @@
*/
public class EsThreadPoolExecutor extends ThreadPoolExecutor {

private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class);

private final ThreadContext contextHolder;
private volatile ShutdownListener listener;

Expand Down Expand Up @@ -85,25 +90,35 @@ public interface ShutdownListener {

@Override
public void execute(Runnable command) {
command = wrapRunnable(command);
final Runnable wrappedRunnable = wrapRunnable(command);
try {
super.execute(command);
} catch (EsRejectedExecutionException ex) {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
// directly and don't need to rethrow it.
super.execute(wrappedRunnable);
} catch (Exception e) {
if (wrappedRunnable instanceof AbstractRunnable) {
final AbstractRunnable abstractRunnable = (AbstractRunnable) wrappedRunnable;
try {
((AbstractRunnable) command).onRejection(ex);
// If we are an abstract runnable we can handle the exception
// directly and don't need to rethrow it, but we log and assert
// any unexpected exception first.
if (e instanceof EsRejectedExecutionException == false) {
logException(abstractRunnable, e);
}
abstractRunnable.onRejection(e);
} finally {
((AbstractRunnable) command).onAfter();

abstractRunnable.onAfter();
}
} else {
throw ex;
throw e;
}
}
}

// package-visible for testing
void logException(AbstractRunnable r, Exception e) {
logger.error(() -> new ParameterizedMessage("[{}] unexpected exception when submitting task [{}] for execution", name, r), e);
assert false : "executor throws an exception (not a rejected execution exception) before the task has been submitted " + e;
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.security.AccessControlException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -73,4 +80,74 @@ protected void doRun() throws Exception {
assertTrue(rejected.get());
}

public void testExecuteThrowsException() {
final RuntimeException exception = randomFrom(
new RuntimeException("unexpected"),
new AccessControlException("unexpected"),
new EsRejectedExecutionException("unexpected")
);

final ThrowingEsThreadPoolExecutor executor = new ThrowingEsThreadPoolExecutor(getTestName(), 0, 1, exception);
try {
final AtomicBoolean doRun = new AtomicBoolean();
final AtomicBoolean onAfter = new AtomicBoolean();
final AtomicReference<Exception> onFailure = new AtomicReference<>();
final AtomicReference<Exception> onRejection = new AtomicReference<>();

executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
onFailure.set(e);
}

@Override
public void onRejection(Exception e) {
onRejection.set(e);
}

@Override
protected void doRun() {
doRun.set(true);
}

@Override
public void onAfter() {
onAfter.set(true);
}
});

assertThat(doRun.get(), equalTo(false));
assertThat(onAfter.get(), equalTo(true));
assertThat(onFailure.get(), nullValue());
assertThat(onRejection.get(), sameInstance(exception));
assertThat(
executor.lastLoggedException.get(),
exception instanceof EsRejectedExecutionException ? nullValue() : sameInstance(exception)
);
} finally {
terminate(executor);
}
}

/**
* EsThreadPoolExecutor that throws a given exception, preventing {@link Runnable} to be added to the thread pool work queue.
*/
private class ThrowingEsThreadPoolExecutor extends EsThreadPoolExecutor {

final AtomicReference<Exception> lastLoggedException = new AtomicReference<>();

ThrowingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, RuntimeException exception) {
super(name, corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable r) {
throw exception;
}
}, EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY));
}

@Override
void logException(AbstractRunnable task, Exception e) {
lastLoggedException.set(e);
}
}
}

0 comments on commit 216fefc

Please sign in to comment.