Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of multiple buckets being emitted for the same parent doc id in nested aggregation #9346

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -46,9 +48,12 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo

private DocIdSetIterator childDocs;
private FixedBitSet parentDocs;

private AtomicReaderContext reader;

private FixedBitSet rootDocs;
private int currentRootDoc = -1;
private final IntObjectOpenHashMap<IntArrayList> childDocIdBuffers = new IntObjectOpenHashMap<>();

public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator) {
super(name, factories, aggregationContext, parentAggregator);
this.parentAggregator = parentAggregator;
Expand Down Expand Up @@ -79,6 +84,7 @@ public void setNextReader(AtomicReaderContext reader) {
} else {
childDocs = childDocIdSet.iterator();
}
rootDocs = context.searchContext().fixedBitSetFilterCache().getFixedBitSetFilter(NonNestedDocsFilter.INSTANCE).getDocIdSet(reader, null);
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
Expand Down Expand Up @@ -109,22 +115,22 @@ public void collect(int parentDoc, long bucketOrd) throws IOException {
parentDocs = parentFilter.getDocIdSet(reader, null);
}

int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}

int numChildren = 0;
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
IntArrayList iterator = getChildren(parentDoc);
final int[] buffer = iterator.buffer;
final int size = iterator.size();
for (int i = 0; i < size; i++) {
numChildren++;
collectBucketNoCounts(childDocId, bucketOrd);
collectBucketNoCounts(buffer[i], bucketOrd);
}
incrementBucketDocCount(bucketOrd, numChildren);
}

@Override
protected void doClose() {
childDocIdBuffers.clear();
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal));
Expand Down Expand Up @@ -183,4 +189,42 @@ public InternalAggregation buildEmptyAggregation() {
}
}
}

// The aggs framework can collect buckets for the same parent doc id more than once and because the children docs
// can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope
// of the current root doc.

// Examples:
// 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted
// 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg.
// For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids
private IntArrayList getChildren(final int parentDocId) throws IOException {
int rootDocId = rootDocs.nextSetBit(parentDocId);
if (currentRootDoc == rootDocId) {
final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId);
if (childDocIdBuffer != null) {
return childDocIdBuffer;
} else {
// here we translate the parent doc to a list of its nested docs,
// and then collect buckets for every one of them so they'll be collected
final IntArrayList newChildDocIdBuffer = new IntArrayList();
childDocIdBuffers.put(parentDocId, newChildDocIdBuffer);
int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}
for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) {
newChildDocIdBuffer.add(childDocId);
}
return newChildDocIdBuffer;
}
} else {
this.currentRootDoc = rootDocId;
childDocIdBuffers.clear();
return getChildren(parentDocId);
}
}
}
Expand Up @@ -449,4 +449,90 @@ public void testParentFilterResolvedCorrectly() throws Exception {
tags = nestedTags.getAggregations().get("tag");
assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty
}

@Test
public void nestedSameDocIdProcessedMultipleTime() throws Exception {
assertAcked(
prepareCreate("idx4")
.setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.addMapping("product", "categories", "type=string", "name", "type=string", "property", "type=nested")
);

client().prepareIndex("idx4", "product", "1").setSource(jsonBuilder().startObject()
.field("name", "product1")
.field("categories", "1", "2", "3", "4")
.startArray("property")
.startObject().field("id", 1).endObject()
.startObject().field("id", 2).endObject()
.startObject().field("id", 3).endObject()
.endArray()
.endObject()).get();
client().prepareIndex("idx4", "product", "2").setSource(jsonBuilder().startObject()
.field("name", "product2")
.field("categories", "1", "2")
.startArray("property")
.startObject().field("id", 1).endObject()
.startObject().field("id", 5).endObject()
.startObject().field("id", 4).endObject()
.endArray()
.endObject()).get();
refresh();

SearchResponse response = client().prepareSearch("idx4").setTypes("product")
.addAggregation(terms("category").field("categories").subAggregation(
nested("property").path("property").subAggregation(
terms("property_id").field("property.id")
)
))
.get();
assertNoFailures(response);
assertHitCount(response, 2);

Terms category = response.getAggregations().get("category");
assertThat(category.getBuckets().size(), equalTo(4));

Terms.Bucket bucket = category.getBucketByKey("1");
assertThat(bucket.getDocCount(), equalTo(2l));
Nested property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(6l));
Terms propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(5));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("2");
assertThat(bucket.getDocCount(), equalTo(2l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(6l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(5));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("3");
assertThat(bucket.getDocCount(), equalTo(1l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(3l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(3));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("4");
assertThat(bucket.getDocCount(), equalTo(1l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(3l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(3));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
}
}