Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In reverse nested aggregation, fix handling of the same child doc id being processed multiple times. #9345

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,12 +20,11 @@

import com.carrotsearch.hppc.LongIntOpenHashMap;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Filter;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.index.cache.fixedbitset.FixedBitSetFilter;
import org.elasticsearch.index.mapper.MapperService;
Expand All @@ -35,7 +34,6 @@
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;

Expand All @@ -45,7 +43,8 @@
public class ReverseNestedAggregator extends SingleBucketAggregator implements ReaderContextAware {

private final FixedBitSetFilter parentFilter;
private DocIdSetIterator parentDocs;
// It is ok to use bitset from bitset cache, because in this agg the path always to a nested parent path.
private FixedBitSet parentDocs;

// TODO: Add LongIntPagedHashMap?
private final Recycler.V<LongIntOpenHashMap> bucketOrdToLastCollectedParentDocRecycler;
Expand All @@ -54,9 +53,9 @@ public class ReverseNestedAggregator extends SingleBucketAggregator implements R
public ReverseNestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parent) {
super(name, factories, aggregationContext, parent);
if (objectMapper == null) {
parentFilter = SearchContext.current().fixedBitSetFilterCache().getFixedBitSetFilter(NonNestedDocsFilter.INSTANCE);
parentFilter = context.searchContext().fixedBitSetFilterCache().getFixedBitSetFilter(NonNestedDocsFilter.INSTANCE);
} else {
parentFilter = SearchContext.current().fixedBitSetFilterCache().getFixedBitSetFilter(objectMapper.nestedTypeFilter());
parentFilter = context.searchContext().fixedBitSetFilterCache().getFixedBitSetFilter(objectMapper.nestedTypeFilter());
}
bucketOrdToLastCollectedParentDocRecycler = aggregationContext.searchContext().cacheRecycler().longIntMap(32);
bucketOrdToLastCollectedParentDoc = bucketOrdToLastCollectedParentDocRecycler.v();
Expand All @@ -69,12 +68,7 @@ public void setNextReader(AtomicReaderContext reader) {
try {
// In ES if parent is deleted, then also the children are deleted, so the child docs this agg receives
// must belong to parent docs that is alive. For this reason acceptedDocs can be null here.
DocIdSet docIdSet = parentFilter.getDocIdSet(reader, null);
if (DocIdSets.isEmpty(docIdSet)) {
parentDocs = null;
} else {
parentDocs = docIdSet.iterator();
}
parentDocs = parentFilter.getDocIdSet(reader, null);
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
Expand All @@ -87,12 +81,7 @@ public void collect(int childDoc, long bucketOrd) throws IOException {
}

// fast forward to retrieve the parentDoc this childDoc belongs to
final int parentDoc;
if (parentDocs.docID() < childDoc) {
parentDoc = parentDocs.advance(childDoc);
} else {
parentDoc = parentDocs.docID();
}
final int parentDoc = parentDocs.nextSetBit(childDoc);
assert childDoc <= parentDoc && parentDoc != DocIdSetIterator.NO_MORE_DOCS;
if (bucketOrdToLastCollectedParentDoc.containsKey(bucketOrd)) {
int lastCollectedParentDoc = bucketOrdToLastCollectedParentDoc.lget();
Expand Down Expand Up @@ -157,7 +146,7 @@ public Aggregator create(AggregationContext context, Aggregator parent, long exp

final ObjectMapper objectMapper;
if (path != null) {
MapperService.SmartNameObjectMapper mapper = SearchContext.current().smartNameObjectMapper(path);
MapperService.SmartNameObjectMapper mapper = context.searchContext().smartNameObjectMapper(path);
if (mapper == null) {
return new Unmapped(name, context, parent);
}
Expand Down
Expand Up @@ -20,11 +20,14 @@

import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.nested.Nested;
import org.elasticsearch.search.aggregations.bucket.nested.ReverseNested;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;
Expand All @@ -33,11 +36,15 @@
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.IsNull.notNullValue;

Expand Down Expand Up @@ -464,4 +471,163 @@ public void nonExistingNestedField() throws Exception {
ReverseNested reverseNested = nested.getAggregations().get("incorrect");
assertThat(reverseNested.getDocCount(), is(0l));
}

@Test
public void testSameParentDocHavingMultipleBuckets() throws Exception {
XContentBuilder mapping = jsonBuilder().startObject().startObject("product").field("dynamic", "strict").startObject("properties")
.startObject("id").field("type", "long").endObject()
.startObject("category")
.field("type", "nested")
.startObject("properties")
.startObject("name").field("type", "string").endObject()
.endObject()
.endObject()
.startObject("sku")
.field("type", "nested")
.startObject("properties")
.startObject("sku_type").field("type", "string").endObject()
.startObject("colors")
.field("type", "nested")
.startObject("properties")
.startObject("name").field("type", "string").endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.endObject().endObject().endObject();
assertAcked(
prepareCreate("idx3")
.setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.addMapping("product", mapping)
);

client().prepareIndex("idx3", "product", "1").setRefresh(true).setSource(
jsonBuilder().startObject()
.startArray("sku")
.startObject()
.field("sku_type", "bar1")
.startArray("colors")
.startObject().field("name", "red").endObject()
.startObject().field("name", "green").endObject()
.startObject().field("name", "yellow").endObject()
.endArray()
.endObject()
.startObject()
.field("sku_type", "bar1")
.startArray("colors")
.startObject().field("name", "red").endObject()
.startObject().field("name", "blue").endObject()
.startObject().field("name", "white").endObject()
.endArray()
.endObject()
.startObject()
.field("sku_type", "bar1")
.startArray("colors")
.startObject().field("name", "black").endObject()
.startObject().field("name", "blue").endObject()
.endArray()
.endObject()
.startObject()
.field("sku_type", "bar2")
.startArray("colors")
.startObject().field("name", "orange").endObject()
.endArray()
.endObject()
.startObject()
.field("sku_type", "bar2")
.startArray("colors")
.startObject().field("name", "pink").endObject()
.endArray()
.endObject()
.endArray()
.startArray("category")
.startObject().field("name", "abc").endObject()
.startObject().field("name", "klm").endObject()
.startObject().field("name", "xyz").endObject()
.endArray()
.endObject()
).get();

SearchResponse response = client().prepareSearch("idx3")
.addAggregation(
nested("nested_0").path("category").subAggregation(
terms("group_by_category").field("category.name").subAggregation(
reverseNested("to_root").subAggregation(
nested("nested_1").path("sku").subAggregation(
filter("filter_by_sku").filter(termFilter("sku.sku_type", "bar1")).subAggregation(
count("sku_count").field("sku_type")
)
)
)
)
)
).get();
assertNoFailures(response);
assertHitCount(response, 1);

Nested nested0 = response.getAggregations().get("nested_0");
assertThat(nested0.getDocCount(), equalTo(3l));
Terms terms = nested0.getAggregations().get("group_by_category");
assertThat(terms.getBuckets().size(), equalTo(3));
for (String bucketName : new String[]{"abc", "klm", "xyz"}) {
logger.info("Checking results for bucket {}", bucketName);
Terms.Bucket bucket = terms.getBucketByKey(bucketName);
assertThat(bucket.getDocCount(), equalTo(1l));
ReverseNested toRoot = bucket.getAggregations().get("to_root");
assertThat(toRoot.getDocCount(), equalTo(1l));
Nested nested1 = toRoot.getAggregations().get("nested_1");
assertThat(nested1.getDocCount(), equalTo(5l));
Filter filterByBar = nested1.getAggregations().get("filter_by_sku");
assertThat(filterByBar.getDocCount(), equalTo(3l));
ValueCount barCount = filterByBar.getAggregations().get("sku_count");
assertThat(barCount.getValue(), equalTo(3l));
}

response = client().prepareSearch("idx3")
.addAggregation(
nested("nested_0").path("category").subAggregation(
terms("group_by_category").field("category.name").subAggregation(
reverseNested("to_root").subAggregation(
nested("nested_1").path("sku").subAggregation(
filter("filter_by_sku").filter(termFilter("sku.sku_type", "bar1")).subAggregation(
nested("nested_2").path("sku.colors").subAggregation(
filter("filter_sku_color").filter(termFilter("sku.colors.name", "red")).subAggregation(
reverseNested("reverse_to_sku").path("sku").subAggregation(
count("sku_count").field("sku_type")
)
)
)
)
)
)
)
)
).get();
assertNoFailures(response);
assertHitCount(response, 1);

nested0 = response.getAggregations().get("nested_0");
assertThat(nested0.getDocCount(), equalTo(3l));
terms = nested0.getAggregations().get("group_by_category");
assertThat(terms.getBuckets().size(), equalTo(3));
for (String bucketName : new String[]{"abc", "klm", "xyz"}) {
logger.info("Checking results for bucket {}", bucketName);
Terms.Bucket bucket = terms.getBucketByKey(bucketName);
assertThat(bucket.getDocCount(), equalTo(1l));
ReverseNested toRoot = bucket.getAggregations().get("to_root");
assertThat(toRoot.getDocCount(), equalTo(1l));
Nested nested1 = toRoot.getAggregations().get("nested_1");
assertThat(nested1.getDocCount(), equalTo(5l));
Filter filterByBar = nested1.getAggregations().get("filter_by_sku");
assertThat(filterByBar.getDocCount(), equalTo(3l));
Nested nested2 = filterByBar.getAggregations().get("nested_2");
assertThat(nested2.getDocCount(), equalTo(8l));
Filter filterBarColor = nested2.getAggregations().get("filter_sku_color");
assertThat(filterBarColor.getDocCount(), equalTo(2l));
ReverseNested reverseToBar = filterBarColor.getAggregations().get("reverse_to_sku");
assertThat(reverseToBar.getDocCount(), equalTo(2l));
ValueCount barCount = reverseToBar.getAggregations().get("sku_count");
assertThat(barCount.getValue(), equalTo(2l));
}
}
}