Skip to content

Commit

Permalink
aggs: The nested aggregator's parent filter is n't resolved properl…
Browse files Browse the repository at this point in the history
…y in the case the nested agg gets created on the fly for buckets that are constructed during query execution.

The fix is the move the parent filter resolving from the nextReader(...) method to the collect(...) method, because only then any parent nested filter's parent filter is then properly instantiated.

Closes #9280
Closes #9335
  • Loading branch information
martijnvg committed Jan 26, 2015
1 parent 1a933c1 commit 6ad9a8a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 21 deletions.
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;

Expand All @@ -49,6 +48,8 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
private Bits childDocs;
private FixedBitSet parentDocs;

private AtomicReaderContext reader;

public NestedAggregator(String name, AggregatorFactories factories, String nestedPath, AggregationContext aggregationContext, Aggregator parentAggregator) {
super(name, factories, aggregationContext, parentAggregator);
this.nestedPath = nestedPath;
Expand All @@ -70,41 +71,51 @@ public NestedAggregator(String name, AggregatorFactories factories, String neste

@Override
public void setNextReader(AtomicReaderContext reader) {
// Reset parentFilter, so we resolve the parentDocs for each new segment being searched
this.parentFilter = null;
this.reader = reader;
try {
// In ES if parent is deleted, then also the children are deleted. Therefore acceptedDocs can also null here.
childDocs = DocIdSets.toSafeBits(reader.reader(), childFilter.getDocIdSet(reader, null));
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
}

@Override
public void collect(int parentDoc, long bucketOrd) throws IOException {
// here we translate the parent doc to a list of its nested docs, and then call super.collect for evey one of them so they'll be collected

// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent doc), so we can skip:
if (parentDoc == 0 || childDocs == null) {
return;
}
if (parentFilter == null) {
// The aggs are instantiated in reverse, first the most inner nested aggs and lastly the top level aggs
// So at the time a nested 'nested' aggs is parsed its closest parent nested aggs hasn't been constructed.
// So the trick to set at the last moment just before needed and we can use its child filter as the
// So the trick is to set at the last moment just before needed and we can use its child filter as the
// parent filter.

// Additional NOTE: Before this logic was performed in the setNextReader(...) method, but the the assumption
// that aggs instances are constructed in reverse doesn't hold when buckets are constructed lazily during
// aggs execution
Filter parentFilterNotCached = findClosestNestedPath(parentAggregator);
if (parentFilterNotCached == null) {
parentFilterNotCached = NonNestedDocsFilter.INSTANCE;
}
parentFilter = SearchContext.current().filterCache().cache(parentFilterNotCached);
parentFilter = context.searchContext().filterCache().cache(parentFilterNotCached);
// if the filter cache is disabled, we still need to produce bit sets
parentFilter = new FixedBitSetCachingWrapperFilter(parentFilter);
}

try {
DocIdSet docIdSet = parentFilter.getDocIdSet(reader, null);
// In ES if parent is deleted, then also the children are deleted. Therefore acceptedDocs can also null here.
childDocs = DocIdSets.toSafeBits(reader.reader(), childFilter.getDocIdSet(reader, null));
if (DocIdSets.isEmpty(docIdSet)) {
parentDocs = null;
// There are no parentDocs in the segment, so return and set childDocs to null, so we exit early for future invocations.
childDocs = null;
return;
} else {
parentDocs = (FixedBitSet) docIdSet;
}
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
}

@Override
public void collect(int parentDoc, long bucketOrd) throws IOException {
// here we translate the parent doc to a list of its nested docs, and then call super.collect for evey one of them
// so they'll be collected
if (parentDoc == 0 || parentDocs == null) {
return;
}
int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int numChildren = 0;
for (int childDocId = prevParentDoc + 1; childDocId < parentDoc; childDocId++) {
Expand Down
Expand Up @@ -21,8 +21,10 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.nested.Nested;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
Expand All @@ -39,11 +41,13 @@
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsNull.notNullValue;
Expand Down Expand Up @@ -336,4 +340,102 @@ public void emptyAggregation() throws Exception {
assertThat(nested.getName(), equalTo("nested"));
assertThat(nested.getDocCount(), is(0l));
}

@Test
// Test based on: https://github.com/elasticsearch/elasticsearch/issues/9280
public void testParentFilterResolvedCorrectly() throws Exception {
XContentBuilder mapping = jsonBuilder().startObject().startObject("provider").startObject("properties")
.startObject("comments")
.field("type", "nested")
.startObject("properties")
.startObject("cid").field("type", "long").endObject()
.startObject("identifier").field("type", "string").field("index", "not_analyzed").endObject()
.startObject("tags")
.field("type", "nested")
.startObject("properties")
.startObject("tid").field("type", "long").endObject()
.startObject("name").field("type", "string").field("index", "not_analyzed").endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.startObject("dates")
.field("type", "object")
.startObject("properties")
.startObject("day").field("type", "date").field("format", "dateOptionalTime").endObject()
.startObject("month")
.field("type", "object")
.startObject("properties")
.startObject("end").field("type", "date").field("format", "dateOptionalTime").endObject()
.startObject("start").field("type", "date").field("format", "dateOptionalTime").endObject()
.startObject("label").field("type", "string").field("index", "not_analyzed").endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.endObject().endObject().endObject();
assertAcked(prepareCreate("idx2")
.setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.addMapping("provider", mapping));

List<IndexRequestBuilder> indexRequests = new ArrayList<>(2);
indexRequests.add(client().prepareIndex("idx2", "provider", "1").setSource("{\"dates\": {\"month\": {\"label\": \"2014-11\", \"end\": \"2014-11-30\", \"start\": \"2014-11-01\"}, \"day\": \"2014-11-30\"}, \"comments\": [{\"cid\": 3,\"identifier\": \"29111\"}, {\"cid\": 4,\"tags\": [{\"tid\" :44,\"name\": \"Roles\"}], \"identifier\": \"29101\"}]}"));
indexRequests.add(client().prepareIndex("idx2", "provider", "2").setSource("{\"dates\": {\"month\": {\"label\": \"2014-12\", \"end\": \"2014-12-31\", \"start\": \"2014-12-01\"}, \"day\": \"2014-12-03\"}, \"comments\": [{\"cid\": 1, \"identifier\": \"29111\"}, {\"cid\": 2,\"tags\": [{\"tid\" : 22, \"name\": \"DataChannels\"}], \"identifier\": \"29101\"}]}"));
indexRandom(true, indexRequests);

SearchResponse response = client().prepareSearch("idx2").setTypes("provider")
.addAggregation(
terms("startDate").field("dates.month.start").subAggregation(
terms("endDate").field("dates.month.end").subAggregation(
terms("period").field("dates.month.label").subAggregation(
nested("ctxt_idfier_nested").path("comments").subAggregation(
filter("comment_filter").filter(termFilter("comments.identifier", "29111")).subAggregation(
nested("nested_tags").path("comments.tags").subAggregation(
terms("tag").field("comments.tags.name")
)
)
)
)
)
)
).get();
assertNoFailures(response);
assertHitCount(response, 2);

Terms startDate = response.getAggregations().get("startDate");
assertThat(startDate.getBuckets().size(), equalTo(2));
Terms.Bucket bucket = startDate.getBucketByKey("1414800000000"); // 2014-11-01T00:00:00.000Z
assertThat(bucket.getDocCount(), equalTo(1l));
Terms endDate = bucket.getAggregations().get("endDate");
bucket = endDate.getBucketByKey("1417305600000"); // 2014-11-30T00:00:00.000Z
assertThat(bucket.getDocCount(), equalTo(1l));
Terms period = bucket.getAggregations().get("period");
bucket = period.getBucketByKey("2014-11");
assertThat(bucket.getDocCount(), equalTo(1l));
Nested comments = bucket.getAggregations().get("ctxt_idfier_nested");
assertThat(comments.getDocCount(), equalTo(2l));
Filter filter = comments.getAggregations().get("comment_filter");
assertThat(filter.getDocCount(), equalTo(1l));
Nested nestedTags = filter.getAggregations().get("nested_tags");
assertThat(nestedTags.getDocCount(), equalTo(0l)); // This must be 0
Terms tags = nestedTags.getAggregations().get("tag");
assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty

bucket = startDate.getBucketByKey("1417392000000"); // 2014-12-01T00:00:00.000Z
assertThat(bucket.getDocCount(), equalTo(1l));
endDate = bucket.getAggregations().get("endDate");
bucket = endDate.getBucketByKey("1419984000000"); // 2014-12-31T00:00:00.000Z
assertThat(bucket.getDocCount(), equalTo(1l));
period = bucket.getAggregations().get("period");
bucket = period.getBucketByKey("2014-12");
assertThat(bucket.getDocCount(), equalTo(1l));
comments = bucket.getAggregations().get("ctxt_idfier_nested");
assertThat(comments.getDocCount(), equalTo(2l));
filter = comments.getAggregations().get("comment_filter");
assertThat(filter.getDocCount(), equalTo(1l));
nestedTags = filter.getAggregations().get("nested_tags");
assertThat(nestedTags.getDocCount(), equalTo(0l)); // This must be 0
tags = nestedTags.getAggregations().get("tag");
assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty
}
}

0 comments on commit 6ad9a8a

Please sign in to comment.