Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip sibling pipeline aggregators reduction during non-final reduce #40101

Merged
merged 3 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand All @@ -65,8 +64,6 @@
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class SearchPhaseController {

Expand Down Expand Up @@ -488,8 +485,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
reducedCompletionSuggestions = reducedSuggest.filter(CompletionSuggestion.class);
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
Expand All @@ -499,32 +496,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

/**
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
*/
private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
ReduceContext reduceContext = reduceContextFunction.apply(false);
return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
null, reduceContext);
}

private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
newAggs.add(newAgg);
}
return new InternalAggregations(newAggs);
}
return aggregations;
}

public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final TotalHits totalHits;
Expand Down Expand Up @@ -644,7 +615,8 @@ public void consumeResult(SearchPhaseResult result) {
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (hasAggs) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ public void onFailure(Exception e) {
if (localIndices != null) {
ActionListener<SearchResponse> ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
//here we provide the empty string a cluster alias, which means no prefix in index name,
//but the coord node will perform non final reduce as it's not null.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an outdated comment, unrelated to this change, I just stumbled upon it while working on it.

SearchRequest ccsLocalSearchRequest = SearchRequest.crossClusterSearch(searchRequest, localIndices.indices(),
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public AggParseContext(String name) {
}
}

public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0],
new ArrayList<PipelineAggregationBuilder>());
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0], new ArrayList<>());

private AggregatorFactory<?>[] factories;
private List<PipelineAggregationBuilder> pipelineAggregatorFactories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum
/**
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicates if operations like
* pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account.
* Operations that are potentially loosing information can only be applied during the final reduce phase.
* Operations that are potentially losing information can only be applied during the final reduce phase.
*/
public boolean isFinalReduce() {
return isFinalReduce;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,19 +53,26 @@ private InternalAggregations() {
}

/**
* Constructs a new addAggregation.
* Constructs a new aggregation.
*/
public InternalAggregations(List<InternalAggregation> aggregations) {
super(aggregations);
}

/**
* Reduces the given lists of addAggregation.
*
* @param aggregationsList A list of aggregation to reduce
* @return The reduced addAggregation
* Reduces the given list of aggregations
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
return reduce(aggregationsList, null, context);
}

/**
* Reduces the given list of aggregations as well as the provided sibling pipeline aggregators.
* Note that sibling pipeline aggregators are ignored when non final reduction is performed.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> siblingPipelineAggregators,
ReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}
Expand All @@ -89,6 +97,15 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
}

if (siblingPipelineAggregators != null) {
if (context.isFinalReduce()) {
for (SiblingPipelineAggregator pipelineAggregator : siblingPipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
reducedAggregations.add(newAgg);
}
}
}
Copy link
Member Author

@javanna javanna Mar 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is moved from SearchPhaseController (and fixed to honour the finalReduce flag), as I want to be able to later call this from SearchResponseMerger in 7.0 and later branches)

return new InternalAggregations(reducedAggregations);
}

Expand Down