Skip to content

Commit

Permalink
Aggs: Make the nested aggregation call sub aggregators with doc IDs i…
Browse files Browse the repository at this point in the history
…n order.

Close #9547
  • Loading branch information
jpountz committed Feb 3, 2015
1 parent ebb7ecb commit 13b64cc
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 63 deletions.
Expand Up @@ -18,8 +18,6 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -53,10 +51,6 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
private BitSet parentDocs;
private LeafReaderContext reader;

private BitSet rootDocs;
private int currentRootDoc = -1;
private final IntObjectOpenHashMap<IntArrayList> childDocIdBuffers = new IntObjectOpenHashMap<>();

public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator, Map<String, Object> metaData, FilterCachingPolicy filterCachingPolicy) throws IOException {
super(name, factories, aggregationContext, parentAggregator, metaData);
this.parentAggregator = parentAggregator;
Expand All @@ -76,19 +70,14 @@ public void setNextReader(LeafReaderContext reader) {
} else {
childDocs = childDocIdSet.iterator();
}
BitDocIdSetFilter rootDocsFilter = context.searchContext().bitsetFilterCache().getBitDocIdSetFilter(NonNestedDocsFilter.INSTANCE);
BitDocIdSet rootDocIdSet = rootDocsFilter.getDocIdSet(reader);
rootDocs = rootDocIdSet.bits();
// We need to reset the current root doc, otherwise we may emit incorrect child docs if the next segment happen to start with the same root doc id value
currentRootDoc = -1;
childDocIdBuffers.clear();
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
}

@Override
public void collect(int parentDoc, long bucketOrd) throws IOException {
assert bucketOrd == 0;
// here we translate the parent doc to a list of its nested docs, and then call super.collect for evey one of them so they'll be collected

// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent doc), so we can skip:
Expand Down Expand Up @@ -119,21 +108,19 @@ public void collect(int parentDoc, long bucketOrd) throws IOException {
}
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}

int numChildren = 0;
IntArrayList iterator = getChildren(parentDoc);
final int[] buffer = iterator.buffer;
final int size = iterator.size();
for (int i = 0; i < size; i++) {
numChildren++;
collectBucketNoCounts(buffer[i], bucketOrd);
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
collectBucketNoCounts(childDocId, bucketOrd);
numChildren += 1;
}
incrementBucketDocCount(bucketOrd, numChildren);
}

@Override
protected void doClose() {
childDocIdBuffers.clear();
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
Expand Down Expand Up @@ -169,6 +156,9 @@ public Factory(String name, String path, FilterCachingPolicy filterCachingPolicy

@Override
public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metaData) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
MapperService.SmartNameObjectMapper mapper = context.searchContext().smartNameObjectMapper(path);
if (mapper == null) {
return new Unmapped(name, context, parent, metaData);
Expand Down Expand Up @@ -196,43 +186,4 @@ public InternalAggregation buildEmptyAggregation() {
}
}

// The aggs framework can collect buckets for the same parent doc id more than once and because the children docs
// can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope
// of the current root doc.

// Examples:
// 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted
// 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg.
// For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids
private IntArrayList getChildren(final int parentDocId) throws IOException {
int rootDocId = rootDocs.nextSetBit(parentDocId);
if (currentRootDoc == rootDocId) {
final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId);
if (childDocIdBuffer != null) {
return childDocIdBuffer;
} else {
// here we translate the parent doc to a list of its nested docs,
// and then collect buckets for every one of them so they'll be collected
final IntArrayList newChildDocIdBuffer = new IntArrayList();
childDocIdBuffers.put(parentDocId, newChildDocIdBuffer);
int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}
for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) {
newChildDocIdBuffer.add(childDocId);
}
return newChildDocIdBuffer;
}
} else {
this.currentRootDoc = rootDocId;
childDocIdBuffers.clear();
return getChildren(parentDocId);
}
}


}
Expand Up @@ -483,7 +483,6 @@ public void nonExistingNestedField() throws Exception {
}

@Test
@AwaitsFix(bugUrl="http://github.com/elasticsearch/elasticsearch/issues/9547")
public void testSameParentDocHavingMultipleBuckets() throws Exception {
XContentBuilder mapping = jsonBuilder().startObject().startObject("product").field("dynamic", "strict").startObject("properties")
.startObject("id").field("type", "long").endObject()
Expand Down

0 comments on commit 13b64cc

Please sign in to comment.