Skip to content

Commit

Permalink
Fix potential listener leak in TransportBulkAction (#82002)
Browse files Browse the repository at this point in the history
Currently we manually call doInternalExecute when returning from the
ingest service if we are still on the write threadpool. Unforunately, if
this throws an exception, it will be lost and the listener not
completed. This commit resolves this by using an action runnable same as
if we were dispatching back to the WRITE thread.
  • Loading branch information
Tim-Brooks committed Dec 21, 2021
1 parent 95b35ce commit 9b6dfbc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -834,27 +834,27 @@ private void processBulkIndexIngestRequest(
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, actionListener);
}

@Override
public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
};
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
assert Thread.currentThread().getName().contains(executorName);
doInternalExecute(task, bulkRequest, executorName, actionListener);
threadPool.executor(Names.SAME).execute(runnable);
} else {
threadPool.executor(executorName).execute(new ActionRunnable<>(actionListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, actionListener);
}

@Override
public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
});
threadPool.executor(executorName).execute(runnable);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,43 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
);
}

public void testIngestCallbackExceptionHandled() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("index");
indexRequest1.source(Collections.emptyMap());
indexRequest1.setPipeline("testpipeline");
bulkRequest.add(indexRequest1);

AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
ActionTestUtils.execute(
action,
null,
bulkRequest,
ActionListener.wrap(response -> { responseCalled.set(true); }, e -> { failureCalled.set(true); })
);

// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE)
);
indexRequest1.process(Version.CURRENT, null, "index");
completionHandler.getValue().accept(Thread.currentThread(), null);

// check failure passed through to the listener
assertFalse(action.isExecuted);
assertFalse(responseCalled.get());
assertTrue(failureCalled.get());
}

private void validateDefaultPipeline(IndexRequest indexRequest) {
Exception exception = new Exception("fake exception");
indexRequest.source(Collections.emptyMap());
Expand Down

0 comments on commit 9b6dfbc

Please sign in to comment.