Skip to content

Commit

Permalink
Fix concurrent issue in SearchPhaseController (elastic#49829)
Browse files Browse the repository at this point in the history
The list used by the search progress listener can be nullified
by another thread that reports a query result. This change replaces
the usage of this list with a new array that is synchronously modified.

Closes elastic#49778
  • Loading branch information
jimczi authored and SivagurunathanV committed Jan 21, 2020
1 parent 8a65b6d commit 8f8e0e6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand Down Expand Up @@ -564,6 +565,7 @@ public InternalSearchResponse buildResponse(SearchHits hits) {
* iff the buffer is exhausted.
*/
static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhaseResult> {
private final SearchShardTarget[] processedShards;
private final InternalAggregations[] aggsBuffer;
private final TopDocs[] topDocsBuffer;
private final boolean hasAggs;
Expand Down Expand Up @@ -600,6 +602,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search
}
this.controller = controller;
this.progressListener = progressListener;
this.processedShards = new SearchShardTarget[expectedResultSize];
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
Expand Down Expand Up @@ -636,7 +639,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()),
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
Expand All @@ -650,6 +653,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}

private synchronized List<InternalAggregations> getRemainingAggs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregations;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -169,6 +171,13 @@ final List<SearchShard> searchShards(List<? extends SearchPhaseResult> results)
.collect(Collectors.toUnmodifiableList());
}

final List<SearchShard> searchShards(SearchShardTarget[] results) {
return Arrays.stream(results)
.filter(Objects::nonNull)
.map(e -> new SearchShard(e.getClusterAlias(), e.getShardId()))
.collect(Collectors.toUnmodifiableList());
}

final List<SearchShard> searchShards(GroupShardsIterator<SearchShardIterator> its) {
return StreamSupport.stream(its.spliterator(), false)
.map(e -> new SearchShard(e.getClusterAlias(), e.shardId()))
Expand Down

0 comments on commit 8f8e0e6

Please sign in to comment.