Skip to content

Commit

Permalink
Fix aggregators early termination with breadth-first mode (#44963)
Browse files Browse the repository at this point in the history
This commit fixes a bug when a deferred aggregator tries to early terminate the collection. In such case the CollectionTerminatedException is not caught and
the search fails on the shard. This change makes sure that we catch the exception in order to continue the deferred collection on the next leaf.

Fixes #44909
  • Loading branch information
jimczi committed Jul 30, 2019
1 parent 11b3365 commit e82818a
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -168,32 +169,37 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {

for (Entry entry : entries) {
assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0";
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
scoreIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (scoreIt.docID() < doc) {
scoreIt.advance(doc);
try {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
scoreIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (scoreIt.docID() < doc) {
scoreIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert scoreIt.docID() == doc;
}
// aggregations should only be replayed on matching documents
assert scoreIt.docID() == doc;
leafCollector.collect(doc, rebasedBucket);
}
leafCollector.collect(doc, rebasedBucket);
}
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
collector.postCollection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.sampler;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc;
Expand Down Expand Up @@ -254,25 +255,30 @@ public void setScorer(Scorable scorer) throws IOException {
}

public void replayRelatedMatches(List<ScoreDoc> sd) throws IOException {
final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext);
leafCollector.setScorer(this);
try {
final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext);
leafCollector.setScorer(this);

currentScore = 0;
currentDocId = -1;
if (maxDocId < 0) {
return;
}
for (ScoreDoc scoreDoc : sd) {
// Doc ids from TopDocCollector are root-level Reader so
// need rebasing
int rebased = scoreDoc.doc - readerContext.docBase;
if ((rebased >= 0) && (rebased <= maxDocId)) {
currentScore = scoreDoc.score;
currentDocId = rebased;
// We stored the bucket ID in Lucene's shardIndex property
// for convenience.
leafCollector.collect(rebased, scoreDoc.shardIndex);
currentScore = 0;
currentDocId = -1;
if (maxDocId < 0) {
return;
}
for (ScoreDoc scoreDoc : sd) {
// Doc ids from TopDocCollector are root-level Reader so
// need rebasing
int rebased = scoreDoc.doc - readerContext.docBase;
if ((rebased >= 0) && (rebased <= maxDocId)) {
currentScore = scoreDoc.score;
currentDocId = rebased;
// We stored the bucket ID in Lucene's shardIndex property
// for convenience.
leafCollector.collect(rebased, scoreDoc.shardIndex);
}
}
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationTestScriptsPlugin;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.global.Global;
Expand Down Expand Up @@ -411,4 +412,34 @@ public void testEarlyTermination() throws Exception {
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
}

public void testNestedEarlyTermination() throws Exception {
for (Aggregator.SubAggCollectionMode collectionMode : Aggregator.SubAggCollectionMode.values()) {
SearchResponse searchResponse = client().prepareSearch("idx")
.setTrackTotalHits(false)
.setQuery(matchAllQuery())
.addAggregation(max("max").field("values"))
.addAggregation(count("count").field("values"))
.addAggregation(terms("terms").field("value")
.collectMode(collectionMode)
.subAggregation(max("sub_max").field("invalid")))
.get();

Max max = searchResponse.getAggregations().get("max");
assertThat(max, notNullValue());
assertThat(max.getName(), equalTo("max"));
assertThat(max.getValue(), equalTo(12.0));

ValueCount count = searchResponse.getAggregations().get("count");
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));

Terms terms = searchResponse.getAggregations().get("terms");
assertThat(terms.getBuckets().size(), equalTo(10));
for (Terms.Bucket b : terms.getBuckets()) {
InternalMax subMax = b.getAggregations().get("sub_max");
assertThat(subMax.getValue(), equalTo(Double.NEGATIVE_INFINITY));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationTestScriptsPlugin;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.global.Global;
Expand Down Expand Up @@ -423,4 +424,32 @@ public void testEarlyTermination() throws Exception {
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
}

public void testNestedEarlyTermination() throws Exception {
SearchResponse searchResponse = client().prepareSearch("idx")
.setTrackTotalHits(false)
.setQuery(matchAllQuery())
.addAggregation(min("min").field("values"))
.addAggregation(count("count").field("values"))
.addAggregation(terms("terms").field("value")
.collectMode(Aggregator.SubAggCollectionMode.BREADTH_FIRST)
.subAggregation(min("sub_min").field("invalid")))
.get();

Min min = searchResponse.getAggregations().get("min");
assertThat(min, notNullValue());
assertThat(min.getName(), equalTo("min"));
assertThat(min.getValue(), equalTo(2.0));

ValueCount count = searchResponse.getAggregations().get("count");
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));

Terms terms = searchResponse.getAggregations().get("terms");
assertThat(terms.getBuckets().size(), equalTo(10));
for (Terms.Bucket b : terms.getBuckets()) {
InternalMin subMin = b.getAggregations().get("sub_min");
assertThat(subMin.getValue(), equalTo(Double.POSITIVE_INFINITY));
}
}
}

0 comments on commit e82818a

Please sign in to comment.