Skip to content

Commit

Permalink
Nested aggregator: Fix handling of multiple buckets being emitted for…
Browse files Browse the repository at this point in the history
… the same parent doc id.

This bug was introduced by elastic#8454 which allowed the childFilter to only be consumed once. By adding the child docid buffering multiple buckets can now be emitted by the same doc id. This child docid buffering only happens in the scope of the current root document, so the amount of child doc ids buffered is small.

Closes elastic#9317
Closes elastic#9346
  • Loading branch information
martijnvg committed Jan 26, 2015
1 parent 534560e commit fc64e4f
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 11 deletions.
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -46,9 +48,12 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo

private DocIdSetIterator childDocs;
private FixedBitSet parentDocs;

private AtomicReaderContext reader;

private FixedBitSet rootDocs;
private int currentRootDoc = -1;
private final IntObjectOpenHashMap<IntArrayList> childDocIdBuffers = new IntObjectOpenHashMap<>();

public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator) {
super(name, factories, aggregationContext, parentAggregator);
this.parentAggregator = parentAggregator;
Expand Down Expand Up @@ -79,6 +84,7 @@ public void setNextReader(AtomicReaderContext reader) {
} else {
childDocs = childDocIdSet.iterator();
}
rootDocs = context.searchContext().fixedBitSetFilterCache().getFixedBitSetFilter(NonNestedDocsFilter.INSTANCE).getDocIdSet(reader, null);
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
Expand Down Expand Up @@ -109,22 +115,22 @@ public void collect(int parentDoc, long bucketOrd) throws IOException {
parentDocs = parentFilter.getDocIdSet(reader, null);
}

int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}

int numChildren = 0;
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
IntArrayList iterator = getChildren(parentDoc);
final int[] buffer = iterator.buffer;
final int size = iterator.size();
for (int i = 0; i < size; i++) {
numChildren++;
collectBucketNoCounts(childDocId, bucketOrd);
collectBucketNoCounts(buffer[i], bucketOrd);
}
incrementBucketDocCount(bucketOrd, numChildren);
}

@Override
protected void doClose() {
childDocIdBuffers.clear();
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal));
Expand Down Expand Up @@ -183,4 +189,42 @@ public InternalAggregation buildEmptyAggregation() {
}
}
}

// The aggs framework can collect buckets for the same parent doc id more than once and because the children docs
// can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope
// of the current root doc.

// Examples:
// 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted
// 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg.
// For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids
private IntArrayList getChildren(final int parentDocId) throws IOException {
int rootDocId = rootDocs.nextSetBit(parentDocId);
if (currentRootDoc == rootDocId) {
final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId);
if (childDocIdBuffer != null) {
return childDocIdBuffer;
} else {
// here we translate the parent doc to a list of its nested docs,
// and then collect buckets for every one of them so they'll be collected
final IntArrayList newChildDocIdBuffer = new IntArrayList();
childDocIdBuffers.put(parentDocId, newChildDocIdBuffer);
int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}
for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) {
newChildDocIdBuffer.add(childDocId);
}
return newChildDocIdBuffer;
}
} else {
this.currentRootDoc = rootDocId;
childDocIdBuffers.clear();
return getChildren(parentDocId);
}
}
}
Expand Up @@ -449,4 +449,90 @@ public void testParentFilterResolvedCorrectly() throws Exception {
tags = nestedTags.getAggregations().get("tag");
assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty
}

@Test
public void nestedSameDocIdProcessedMultipleTime() throws Exception {
assertAcked(
prepareCreate("idx4")
.setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.addMapping("product", "categories", "type=string", "name", "type=string", "property", "type=nested")
);

client().prepareIndex("idx4", "product", "1").setSource(jsonBuilder().startObject()
.field("name", "product1")
.field("categories", "1", "2", "3", "4")
.startArray("property")
.startObject().field("id", 1).endObject()
.startObject().field("id", 2).endObject()
.startObject().field("id", 3).endObject()
.endArray()
.endObject()).get();
client().prepareIndex("idx4", "product", "2").setSource(jsonBuilder().startObject()
.field("name", "product2")
.field("categories", "1", "2")
.startArray("property")
.startObject().field("id", 1).endObject()
.startObject().field("id", 5).endObject()
.startObject().field("id", 4).endObject()
.endArray()
.endObject()).get();
refresh();

SearchResponse response = client().prepareSearch("idx4").setTypes("product")
.addAggregation(terms("category").field("categories").subAggregation(
nested("property").path("property").subAggregation(
terms("property_id").field("property.id")
)
))
.get();
assertNoFailures(response);
assertHitCount(response, 2);

Terms category = response.getAggregations().get("category");
assertThat(category.getBuckets().size(), equalTo(4));

Terms.Bucket bucket = category.getBucketByKey("1");
assertThat(bucket.getDocCount(), equalTo(2l));
Nested property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(6l));
Terms propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(5));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("2");
assertThat(bucket.getDocCount(), equalTo(2l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(6l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(5));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("3");
assertThat(bucket.getDocCount(), equalTo(1l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(3l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(3));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("4");
assertThat(bucket.getDocCount(), equalTo(1l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(3l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(3));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
}
}

0 comments on commit fc64e4f

Please sign in to comment.