Skip to content

Commit

Permalink
Add background filters of significant terms aggregations to can match…
Browse files Browse the repository at this point in the history
… query. (#106564) (#106797)

* Add background filters of significant terms aggregations to can match query.

* Fix NPE

* Unit tests

* Update docs/changelog/106564.yaml

* Update 106564.yaml

* Make aggregation queries in can match phase more generic.

* Copy source to preserve other relevant fields.

* Replace copy constructor by shallowCopy
  • Loading branch information
jan-elastic committed Mar 27, 2024
1 parent 79bfb88 commit ad39c3f
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 8 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/106564.yaml
@@ -0,0 +1,5 @@
pr: 106564
summary: Fix the background set of significant terms aggregations in case the data is in different shards than the foreground set
area: Search
type: bug
issues: []
Expand Up @@ -18,9 +18,12 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.builder.SubSearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
Expand All @@ -31,6 +34,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -129,7 +133,7 @@ public CanMatchNodeRequest(
long nowInMillis,
@Nullable String clusterAlias
) {
this.source = searchRequest.source();
this.source = getCanMatchSource(searchRequest);
this.indicesOptions = indicesOptions;
this.shards = new ArrayList<>(shards);
this.searchType = searchRequest.searchType();
Expand All @@ -146,6 +150,36 @@ public CanMatchNodeRequest(
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct().toArray(String[]::new);
}

private static void collectAggregationQueries(Collection<AggregationBuilder> aggregations, List<QueryBuilder> aggregationQueries) {
for (AggregationBuilder aggregation : aggregations) {
QueryBuilder aggregationQuery = aggregation.getQuery();
if (aggregationQuery != null) {
aggregationQueries.add(aggregationQuery);
}
collectAggregationQueries(aggregation.getSubAggregations(), aggregationQueries);
}
}

private SearchSourceBuilder getCanMatchSource(SearchRequest searchRequest) {
// Aggregations may use a different query than the top-level search query. An example is
// the significant terms aggregation, which also collects data over a background that
// typically much larger than the search query. To accommodate for this, we take the union
// of all queries to determine whether a request can match.
List<QueryBuilder> aggregationQueries = new ArrayList<>();
if (searchRequest.source() != null && searchRequest.source().aggregations() != null) {
collectAggregationQueries(searchRequest.source().aggregations().getAggregatorFactories(), aggregationQueries);
}
if (aggregationQueries.isEmpty()) {
return searchRequest.source();
} else {
List<SubSearchSourceBuilder> subSearches = new ArrayList<>(searchRequest.source().subSearches());
for (QueryBuilder aggregationQuery : aggregationQueries) {
subSearches.add(new SubSearchSourceBuilder(aggregationQuery));
}
return searchRequest.source().shallowCopy().subSearches(subSearches);
}
}

public CanMatchNodeRequest(StreamInput in) throws IOException {
super(in);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
Expand Down
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
Expand Down Expand Up @@ -97,6 +98,11 @@ public Collection<AggregationBuilder> getSubAggregations() {
return factoriesBuilder.getAggregatorFactories();
}

/** Return the aggregation's query if it's different from the search query, or null otherwise. */
public QueryBuilder getQuery() {
return null;
}

/** Return the configured set of pipeline aggregations **/
public Collection<PipelineAggregationBuilder> getPipelineAggregations() {
return factoriesBuilder.getPipelineAggregatorFactories();
Expand Down
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -257,6 +258,11 @@ public SignificantTermsAggregationBuilder backgroundFilter(QueryBuilder backgrou
return this;
}

@Override
public QueryBuilder getQuery() {
return backgroundFilter != null ? backgroundFilter : QueryBuilders.matchAllQuery();
}

/**
* Set terms to include and exclude from the aggregation results
*/
Expand Down
Expand Up @@ -30,21 +30,24 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -497,14 +500,14 @@ public void testCanMatchFilteringOnCoordinatorThatCanBeSkipped() throws Exceptio
regularIndices,
contextProviderBuilder.build(),
queryBuilder,
List.of(),
null,
(updatedSearchShardIterators, requests) -> {
List<SearchShardIterator> skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList();
;

List<SearchShardIterator> nonSkippedShards = updatedSearchShardIterators.stream()
.filter(searchShardIterator -> searchShardIterator.skip() == false)
.toList();
;

int regularIndexShardCount = (int) updatedSearchShardIterators.stream()
.filter(s -> regularIndices.contains(s.shardId().getIndex()))
Expand Down Expand Up @@ -568,6 +571,8 @@ public void testCanMatchFilteringOnCoordinatorParsingFails() throws Exception {
regularIndices,
contextProviderBuilder.build(),
queryBuilder,
List.of(),
null,
this::assertAllShardsAreQueried
);
}
Expand Down Expand Up @@ -624,6 +629,99 @@ public void testCanMatchFilteringOnCoordinatorThatCanNotBeSkipped() throws Excep
regularIndices,
contextProviderBuilder.build(),
queryBuilder,
List.of(),
null,
this::assertAllShardsAreQueried
);
}

public void testCanMatchFilteringOnCoordinator_withSignificantTermsAggregation_withDefaultBackgroundFilter() throws Exception {
Index index1 = new Index("index1", UUIDs.base64UUID());
Index index2 = new Index("index2", UUIDs.base64UUID());
Index index3 = new Index("index3", UUIDs.base64UUID());

StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder();
contextProviderBuilder.addIndexMinMaxTimestamps(index1, DataStream.TIMESTAMP_FIELD_NAME, 0, 999);
contextProviderBuilder.addIndexMinMaxTimestamps(index2, DataStream.TIMESTAMP_FIELD_NAME, 1000, 1999);
contextProviderBuilder.addIndexMinMaxTimestamps(index3, DataStream.TIMESTAMP_FIELD_NAME, 2000, 2999);

QueryBuilder query = new BoolQueryBuilder().filter(new RangeQueryBuilder(DataStream.TIMESTAMP_FIELD_NAME).from(2100).to(2200));
AggregationBuilder aggregation = new SignificantTermsAggregationBuilder("significant_terms");

assignShardsAndExecuteCanMatchPhase(
List.of(),
List.of(index1, index2, index3),
contextProviderBuilder.build(),
query,
List.of(aggregation),
null,
// The default background filter matches the whole index, so all shards must be queried.
this::assertAllShardsAreQueried
);
}

public void testCanMatchFilteringOnCoordinator_withSignificantTermsAggregation_withBackgroundFilter() throws Exception {
Index index1 = new Index("index1", UUIDs.base64UUID());
Index index2 = new Index("index2", UUIDs.base64UUID());
Index index3 = new Index("index3", UUIDs.base64UUID());
Index index4 = new Index("index4", UUIDs.base64UUID());

StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder();
contextProviderBuilder.addIndexMinMaxTimestamps(index1, DataStream.TIMESTAMP_FIELD_NAME, 0, 999);
contextProviderBuilder.addIndexMinMaxTimestamps(index2, DataStream.TIMESTAMP_FIELD_NAME, 1000, 1999);
contextProviderBuilder.addIndexMinMaxTimestamps(index3, DataStream.TIMESTAMP_FIELD_NAME, 2000, 2999);
contextProviderBuilder.addIndexMinMaxTimestamps(index4, DataStream.TIMESTAMP_FIELD_NAME, 3000, 3999);

QueryBuilder query = new BoolQueryBuilder().filter(new RangeQueryBuilder(DataStream.TIMESTAMP_FIELD_NAME).from(3100).to(3200));
AggregationBuilder aggregation = new SignificantTermsAggregationBuilder("significant_terms").backgroundFilter(
new RangeQueryBuilder(DataStream.TIMESTAMP_FIELD_NAME).from(0).to(1999)
);

assignShardsAndExecuteCanMatchPhase(
List.of(),
List.of(index1, index2, index3),
contextProviderBuilder.build(),
query,
List.of(aggregation),
null,
(updatedSearchShardIterators, requests) -> {
// The search query matches index4, the background query matches index1 and index2,
// so index3 is the only one that must be skipped.
for (SearchShardIterator shard : updatedSearchShardIterators) {
if (shard.shardId().getIndex().getName().equals("index3")) {
assertTrue(shard.skip());
} else {
assertFalse(shard.skip());
}
}
}
);
}

public void testCanMatchFilteringOnCoordinator_withSignificantTermsAggregation_withSuggest() throws Exception {
Index index1 = new Index("index1", UUIDs.base64UUID());
Index index2 = new Index("index2", UUIDs.base64UUID());
Index index3 = new Index("index3", UUIDs.base64UUID());

StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder();
contextProviderBuilder.addIndexMinMaxTimestamps(index1, DataStream.TIMESTAMP_FIELD_NAME, 0, 999);
contextProviderBuilder.addIndexMinMaxTimestamps(index2, DataStream.TIMESTAMP_FIELD_NAME, 1000, 1999);
contextProviderBuilder.addIndexMinMaxTimestamps(index3, DataStream.TIMESTAMP_FIELD_NAME, 2000, 2999);

QueryBuilder query = new BoolQueryBuilder().filter(new RangeQueryBuilder(DataStream.TIMESTAMP_FIELD_NAME).from(2100).to(2200));
AggregationBuilder aggregation = new SignificantTermsAggregationBuilder("significant_terms").backgroundFilter(
new RangeQueryBuilder(DataStream.TIMESTAMP_FIELD_NAME).from(2000).to(2300)
);
SuggestBuilder suggest = new SuggestBuilder().setGlobalText("test");

assignShardsAndExecuteCanMatchPhase(
List.of(),
List.of(index1, index2, index3),
contextProviderBuilder.build(),
query,
List.of(aggregation),
suggest,
// The query and aggregation and match only index3, but suggest should match everything.
this::assertAllShardsAreQueried
);
}
Expand Down Expand Up @@ -669,6 +767,8 @@ public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedTsdb() throws Exce
List.of(),
contextProviderBuilder.build(),
queryBuilder,
List.of(),
null,
(updatedSearchShardIterators, requests) -> {
var skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList();
var nonSkippedShards = updatedSearchShardIterators.stream()
Expand Down Expand Up @@ -713,11 +813,13 @@ private void assertAllShardsAreQueried(List<SearchShardIterator> updatedSearchSh
assertThat(requests.size(), equalTo(shardsWithPrimariesAssigned));
}

private <QB extends AbstractQueryBuilder<QB>> void assignShardsAndExecuteCanMatchPhase(
private void assignShardsAndExecuteCanMatchPhase(
List<DataStream> dataStreams,
List<Index> regularIndices,
CoordinatorRewriteContextProvider contextProvider,
AbstractQueryBuilder<QB> query,
QueryBuilder query,
List<AggregationBuilder> aggregations,
SuggestBuilder suggest,
BiConsumer<List<SearchShardIterator>, List<ShardSearchRequest>> canMatchResultsConsumer
) throws Exception {
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -764,14 +866,20 @@ private <QB extends AbstractQueryBuilder<QB>> void assignShardsAndExecuteCanMatc
searchRequest.allowPartialSearchResults(true);

final AliasFilter aliasFilter;
if (randomBoolean()) {
if (aggregations.isEmpty() == false || randomBoolean()) {
// Apply the query on the request body
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource();
searchSourceBuilder.query(query);
for (AggregationBuilder aggregation : aggregations) {
searchSourceBuilder.aggregation(aggregation);
}
if (suggest != null) {
searchSourceBuilder.suggest(suggest);
}
searchRequest.source(searchSourceBuilder);

// Sometimes apply the same query in the alias filter too
aliasFilter = AliasFilter.of(randomBoolean() ? query : null, Strings.EMPTY_ARRAY);
aliasFilter = AliasFilter.of(aggregations.isEmpty() && randomBoolean() ? query : null, Strings.EMPTY_ARRAY);
} else {
// Apply the query as an alias filter
aliasFilter = AliasFilter.of(query, Strings.EMPTY_ARRAY);
Expand Down

0 comments on commit ad39c3f

Please sign in to comment.