Skip to content

Commit

Permalink
Avoid forking in AbstractSearchAsyncAction (#71580)
Browse files Browse the repository at this point in the history
We don't need to fork when handling unassigned shard failures
in AbstractSearchAsyncAction as we never call it recursively.

Relates #70792
  • Loading branch information
dnhatn committed Apr 15, 2021
1 parent 56a0641 commit d44785d
Showing 1 changed file with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,32 @@ private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> sha
return true;
}

private boolean assertExecuteOnStartThread() {
// Ensure that the current code has the following stacktrace:
// AbstractSearchAsyncAction#start -> AbstractSearchAsyncAction#executePhase -> AbstractSearchAsyncAction#performPhaseOnShard
final StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
assert stackTraceElements.length >= 6 : stackTraceElements;
int index = 0;
assert stackTraceElements[index++].getMethodName().equals("getStackTrace");
assert stackTraceElements[index++].getMethodName().equals("assertExecuteOnStartThread");
assert stackTraceElements[index++].getMethodName().equals("performPhaseOnShard");
if (stackTraceElements[index].getMethodName().equals("performPhaseOnShard")) {
assert stackTraceElements[index].getClassName().endsWith("CanMatchPreFilterSearchPhase");
index++;
}
assert stackTraceElements[index].getClassName().endsWith("AbstractSearchAsyncAction");
assert stackTraceElements[index++].getMethodName().equals("run");

assert stackTraceElements[index].getClassName().endsWith("AbstractSearchAsyncAction");
assert stackTraceElements[index++].getMethodName().equals("executePhase");

assert stackTraceElements[index].getClassName().endsWith("AbstractSearchAsyncAction");
assert stackTraceElements[index++].getMethodName().equals("start");

assert stackTraceElements[index].getClassName().endsWith("AbstractSearchAsyncAction") == false;
return true;
}

protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
Expand All @@ -260,9 +286,10 @@ protected void performPhaseOnShard(final int shardIndex, final SearchShardIterat
* we can continue (cf. InitialSearchPhase#maybeFork).
*/
if (shard == null) {
assert assertExecuteOnStartThread();
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(),
shardIt.getClusterAlias(), shardIt.getOriginalIndices());
fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
Expand Down

0 comments on commit d44785d

Please sign in to comment.