Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -148,7 +146,7 @@ private void doExecuteForked(
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(request);
}
final Executor singleThreadedExecutor = buildSingleThreadedExecutor();
final Executor singleThreadedExecutor = ThrottledTaskRunner.buildSingleThreadedExecutor("field_caps", searchCoordinationExecutor);
assert task instanceof CancellableTask;
final CancellableTask fieldCapTask = (CancellableTask) task;
// retrieve the initial timestamp in case the action is a cross cluster search
Expand Down Expand Up @@ -314,29 +312,6 @@ private void doExecuteForked(
}
}

private Executor buildSingleThreadedExecutor() {
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor);
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
try (releasable) {
r.run();
}
}

@Override
public void onFailure(Exception e) {
if (r instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onFailure(e);
} else {
// should be impossible, we should always submit an AbstractRunnable
logger.error("unexpected failure running " + r, e);
assert false : new AssertionError("unexpected failure running " + r, e);
}
}
});
}

public interface RemoteRequestExecutor {
void executeRemoteRequest(
RemoteClusterClient remoteClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,7 @@ MergeResult consumePartialMergeResultDataNode() {
}

void addBatchedPartialResult(TopDocsStats topDocsStats, MergeResult mergeResult) {
synchronized (batchedResults) {
batchedResults.add(new Tuple<>(topDocsStats, mergeResult));
}
batchedResults.add(new Tuple<>(topDocsStats, mergeResult));
}

@Override
Expand All @@ -215,10 +213,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
buffer.sort(RESULT_COMPARATOR);
final TopDocsStats topDocsStats = this.topDocsStats;
var mergeResult = this.mergeResult;
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults;
synchronized (this.batchedResults) {
batchedResults = this.batchedResults;
}
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults = this.batchedResults;
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.SimpleRefCounted;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -408,7 +409,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
}
AbstractSearchAsyncAction.doCheckNoMissingShards(getName(), request, shardsIts);
final Map<CanMatchPreFilterSearchPhase.SendingTarget, NodeQueryRequest> perNodeQueries = new HashMap<>();
final String localNodeId = searchTransportService.transportService().getLocalNode().getId();
final var transportService = searchTransportService.transportService();
final String localNodeId = transportService.getLocalNode().getId();
final int numberOfShardsTotal = shardsIts.size();
for (int i = 0; i < numberOfShardsTotal; i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
Expand Down Expand Up @@ -445,6 +447,10 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
}
}
}
final Executor singleThreadedExecutor = ThrottledTaskRunner.buildSingleThreadedExecutor(
"node_query_response",
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)
);
perNodeQueries.forEach((routing, request) -> {
if (request.shards.size() == 1) {
executeAsSingleRequest(routing, request.shards.getFirst());
Expand All @@ -463,16 +469,20 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
executeWithoutBatching(routing, request);
return;
}
searchTransportService.transportService()
.sendChildRequest(connection, NODE_SEARCH_ACTION_NAME, request, task, new TransportResponseHandler<NodeQueryResponse>() {
transportService.sendChildRequest(
connection,
NODE_SEARCH_ACTION_NAME,
request,
task,
new TransportResponseHandler<NodeQueryResponse>() {
@Override
public NodeQueryResponse read(StreamInput in) throws IOException {
return new NodeQueryResponse(in);
}

@Override
public Executor executor() {
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
return singleThreadedExecutor;
}

@Override
Expand Down Expand Up @@ -517,7 +527,8 @@ public void handleException(TransportException e) {
onPhaseFailure(getName(), "", cause);
}
}
});
}
);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,46 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;

import java.util.concurrent.Executor;

public class ThrottledTaskRunner extends AbstractThrottledTaskRunner<ActionListener<Releasable>> {

private static final Logger logger = LogManager.getLogger(ThrottledTaskRunner.class);

// a simple AbstractThrottledTaskRunner which fixes the task type and uses a regular FIFO blocking queue.
public ThrottledTaskRunner(String name, int maxRunningTasks, Executor executor) {
super(name, maxRunningTasks, executor, ConcurrentCollections.newBlockingQueue());
}

/**
* Builds an executor that executes one task at a time from a throttled task runner.
* @param name name for the executor
* @param executor executor to use for executing the tasks
* @return executor that forwards and runs a single task at a time on the provided {@code executor}
*/
public static Executor buildSingleThreadedExecutor(String name, Executor executor) {
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner(name, 1, executor);
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
try (releasable) {
r.run();
}
}

@Override
public void onFailure(Exception e) {
if (r instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onFailure(e);
} else {
// should be impossible, we should always submit an AbstractRunnable
logger.error("unexpected failure running [" + r + "] on [" + name + "]", e);
assert false : new AssertionError("unexpected failure running " + r, e);
}
}
});
}
}