Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request-level circuit breaker support on coordinating nodes #62223

Merged
merged 16 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -77,7 +79,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
final SearchPhaseResults<Result> results;
protected final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
Expand All @@ -98,6 +100,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;

private final List<Releasable> releasables = new ArrayList<>();

AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Expand Down Expand Up @@ -133,7 +137,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.executor = executor;
this.request = request;
this.task = task;
this.listener = listener;
this.listener = ActionListener.runAfter(listener, this::releaseContext);
this.nodeIdToConnection = nodeIdToConnection;
this.clusterState = clusterState;
this.concreteIndexBoosts = concreteIndexBoosts;
Expand All @@ -143,6 +147,15 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.clusters = clusters;
}

@Override
public void addReleasable(Releasable releasable) {
releasables.add(releasable);
}

public void releaseContext() {
Releasables.close(releasables);
}

/**
* Builds how long it took to execute the search.
*/
Expand Down Expand Up @@ -529,7 +542,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.pointInTimeBuilder() == null && allowPartialResults == false && failures.length > 0) {
if (allowPartialResults == false && failures.length > 0) {
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
} else {
final Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
Expand Down Expand Up @@ -567,6 +580,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
}
});
}
Releasables.close(releasables);
listener.onFailure(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -76,6 +77,11 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
this.shardsIts = shardsIts;
}

@Override
public void addReleasable(Releasable releasable) {
throw new RuntimeException("cannot add releasable in " + getName() + " phase");
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<CanMatchResponse> listener) {
Expand All @@ -84,8 +90,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarge
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results,
SearchPhaseContext context) {
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results, SearchPhaseContext context) {

return phaseFactory.apply(getIterator((CanMatchSearchPhaseResults) results, shardsIts));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand All @@ -50,18 +49,21 @@ final class DfsQueryPhase extends SearchPhase {

DfsQueryPhase(List<DfsSearchResult> searchResults,
AggregatedDfs dfs,
SearchPhaseController searchPhaseController,
QueryPhaseResultConsumer queryResult,
Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
SearchPhaseContext context, Consumer<Exception> onPartialMergeFailure) {
SearchPhaseContext context) {
super("dfs_query");
this.progressListener = context.getTask().getProgressListener();
this.queryResult = searchPhaseController.newSearchPhaseResults(context, progressListener,
context.getRequest(), context.getNumShards(), onPartialMergeFailure);
this.queryResult = queryResult;
this.searchResults = searchResults;
this.dfs = dfs;
this.nextPhaseFactory = nextPhaseFactory;
this.context = context;
this.searchTransportService = context.getSearchTransport();

// register the release of the query consumer to free up the circuit breaker memory
// at the end of the search
context.addReleasable(queryResult);
}

@Override
Expand Down
Loading