Skip to content

Commit

Permalink
Propogate rejected execution during bulk actions (#64886)
Browse files Browse the repository at this point in the history
Currently a rejected execution exception can be swallowed when async
actions return during transport bulk actions. This includes scenarios
where we went async to perform ingest pipelines or index creation. This
commit resolves the issue by propagating a rejected exception.
  • Loading branch information
Tim-Brooks committed Nov 10, 2020
1 parent aa0e3f8 commit f96dccd
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -271,8 +270,13 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
threadPool.executor(executorName).execute(
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(listener) {

@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
});
}
}

Expand All @@ -288,11 +292,22 @@ public void onFailure(Exception e) {
}
}
if (counter.decrementAndGet() == 0) {
threadPool.executor(executorName).execute(() -> executeBulk(task, bulkRequest, startTime,
ActionListener.wrap(listener::onResponse, inner -> {
final ActionListener<BulkResponse> wrappedListener = ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated));
});
threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(wrappedListener) {
@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated);
}

@Override
public void onRejection(Exception rejectedException) {
rejectedException.addSuppressed(e);
super.onRejection(rejectedException);
}
});
}
}
});
Expand Down Expand Up @@ -731,14 +746,9 @@ private void processBulkIndexIngestRequest(Task task, BulkRequest original, Stri
assert Thread.currentThread().getName().contains(executorName);
doInternalExecute(task, bulkRequest, executorName, actionListener);
} else {
threadPool.executor(executorName).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(listener) {
@Override
protected void doRun() throws Exception {
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, actionListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
Expand All @@ -42,6 +43,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -77,12 +79,13 @@ public class TransportBulkActionTests extends ESTestCase {
/** Services needed by bulk action */
private TransportService transportService;
private ClusterService clusterService;
private ThreadPool threadPool;
private TestThreadPool threadPool;

private TestTransportBulkAction bulkAction;

class TestTransportBulkAction extends TransportBulkAction {

volatile boolean failIndexCreation = false;
boolean indexCreated = false; // set when the "real" index is created

TestTransportBulkAction() {
Expand All @@ -100,7 +103,11 @@ protected boolean needToCheck() {
@Override
void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
listener.onResponse(null);
if (failIndexCreation) {
listener.onFailure(new ResourceAlreadyExistsException("index already exists"));
} else {
listener.onResponse(null);
}
}
}

Expand Down Expand Up @@ -293,6 +300,20 @@ public void testIncludesSystem() {
assertTrue(bulkAction.includesSystem(buildBulkRequest(mixed), indicesLookup, systemIndices));
}

public void testRejectionAfterCreateIndexIsPropagated() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
bulkAction.failIndexCreation = randomBoolean();

try {
threadPool.startForcingRejections();
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
expectThrows(EsRejectedExecutionException.class, future::actionGet);
} finally {
threadPool.stopForcingRejections();
}
}

private BulkRequest buildBulkRequest(List<String> indices) {
BulkRequest request = new BulkRequest();
for (String index : indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,20 @@
package org.elasticsearch.threadpool;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.node.Node;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

public class TestThreadPool extends ThreadPool {

private final CountDownLatch blockingLatch = new CountDownLatch(1);
private volatile boolean returnRejectingExecutor = false;
private volatile ThreadPoolExecutor rejectingExecutor;

public TestThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
this(name, Settings.EMPTY, customBuilders);
}
Expand All @@ -32,4 +42,64 @@ public TestThreadPool(String name, Settings settings, ExecutorBuilder<?>... cust
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders);
}

@Override
public ExecutorService executor(String name) {
if (returnRejectingExecutor) {
return rejectingExecutor;
} else {
return super.executor(name);
}
}

public void startForcingRejections() {
if (rejectingExecutor == null) {
createRejectingExecutor();
}
returnRejectingExecutor = true;
}

public void stopForcingRejections() {
returnRejectingExecutor = false;
}

@Override
public void shutdown() {
blockingLatch.countDown();
if (rejectingExecutor != null) {
rejectingExecutor.shutdown();
}
super.shutdown();
}

@Override
public void shutdownNow() {
blockingLatch.countDown();
if (rejectingExecutor != null) {
rejectingExecutor.shutdownNow();
}
super.shutdownNow();
}

private synchronized void createRejectingExecutor() {
if (rejectingExecutor != null) {
return;
}
ThreadFactory factory = EsExecutors.daemonThreadFactory("reject_thread");
rejectingExecutor = EsExecutors.newFixed("rejecting", 1, 0, factory, getThreadContext());

CountDownLatch startedLatch = new CountDownLatch(1);
rejectingExecutor.execute(() -> {
try {
startedLatch.countDown();
blockingLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
try {
startedLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit f96dccd

Please sign in to comment.