Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the way sub-aggregations are collected. #5975

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -83,7 +83,7 @@ public boolean acceptsDocsOutOfOrder() {
}

@Override
public void postCollection() {
public void postCollection() throws IOException {
if (collector instanceof XCollector) {
((XCollector) collector).postCollection();
}
Expand Down
Expand Up @@ -44,7 +44,7 @@ public FilteredCollector(Collector collector, Filter filter) {
}

@Override
public void postCollection() {
public void postCollection() throws IOException {
if (collector instanceof XCollector) {
((XCollector) collector).postCollection();
}
Expand Down
Expand Up @@ -20,13 +20,15 @@

import org.apache.lucene.search.Collector;

import java.io.IOException;

/**
* An extension to {@link Collector} that allows for a callback when
* collection is done.
*/
public abstract class XCollector extends Collector {

public void postCollection() {
public void postCollection() throws IOException {

}
}
Expand Up @@ -25,8 +25,6 @@
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.search.XCollector;
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
Expand Down Expand Up @@ -124,10 +122,10 @@ public void execute(SearchContext context) throws ElasticsearchException {
}
try {
context.searcher().search(query, collector);
collector.postCollection();
} catch (Exception e) {
throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e);
}
collector.postCollection();
}

List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
Expand Down Expand Up @@ -171,7 +169,7 @@ public boolean acceptsDocsOutOfOrder() {
}

@Override
public void postCollection() {
public void postCollection() throws IOException {
for (Aggregator collector : collectors) {
collector.postCollection();
}
Expand Down
40 changes: 16 additions & 24 deletions src/main/java/org/elasticsearch/search/aggregations/Aggregator.java
Expand Up @@ -18,21 +18,26 @@
*/
package org.elasticsearch.search.aggregations;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContext.Lifetime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

public abstract class Aggregator implements Releasable, ReaderContextAware {
public abstract class Aggregator extends BucketCollector implements Releasable {

private static final Predicate<Aggregator> COLLECTABLE_AGGREGATOR = new Predicate<Aggregator>() {
@Override
public boolean apply(Aggregator aggregator) {
return aggregator.shouldCollect();
}
};

/**
* Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create.
Expand Down Expand Up @@ -60,6 +65,7 @@ public static enum BucketAggregationMode {
protected final BucketAggregationMode bucketAggregationMode;
protected final AggregatorFactories factories;
protected final Aggregator[] subAggregators;
protected final BucketCollector collectableSugAggregators;

private Map<String, Aggregator> subAggregatorbyName;

Expand All @@ -84,6 +90,7 @@ protected Aggregator(String name, BucketAggregationMode bucketAggregationMode, A
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
this.factories = factories;
this.subAggregators = factories.createSubAggregators(this, estimatedBucketsCount);
collectableSugAggregators = BucketCollector.wrap(Iterables.filter(Arrays.asList(subAggregators), COLLECTABLE_AGGREGATOR));
context.searchContext().addReleasable(this, Lifetime.PHASE);
}

Expand Down Expand Up @@ -150,26 +157,11 @@ public BucketAggregationMode bucketAggregationMode() {
*/
public abstract boolean shouldCollect();

/**
* Called during the query phase, to collect & aggregate the given document.
*
* @param doc The document to be collected/aggregated
* @param owningBucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator.
* Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS}
* will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just
* an extra information for the aggregation context. For top level aggregators, the ordinal will always be
* equal to 0.
* @throws IOException
*/
public abstract void collect(int doc, long owningBucketOrdinal) throws IOException;

/**
* Called after collection of all document is done.
*/
public final void postCollection() {
for (int i = 0; i < subAggregators.length; i++) {
subAggregators[i].postCollection();
}
public final void postCollection() throws IOException {
collectableSugAggregators.postCollection();
doPostCollection();
}

Expand All @@ -185,7 +177,7 @@ protected void doClose() {}
/**
* Can be overriden by aggregator implementation to be called back when the collection phase ends.
*/
protected void doPostCollection() {
protected void doPostCollection() throws IOException {
}

/**
Expand Down
Expand Up @@ -86,7 +86,7 @@ public boolean shouldCollect() {
}

@Override
protected void doPostCollection() {
protected void doPostCollection() throws IOException {
for (long i = 0; i < aggregators.size(); ++i) {
final Aggregator aggregator = aggregators.get(i);
if (aggregator != null) {
Expand Down
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import com.google.common.collect.Iterables;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode;

import java.io.IOException;

/**
* A Collector that can collect data in separate buckets.
*/
public abstract class BucketCollector implements ReaderContextAware {

public static BucketCollector NO_OP_COLLECTOR = new BucketCollector() {

@Override
public void collect(int docId, long bucketOrdinal) throws IOException {
// no-op
}

@Override
public void setNextReader(AtomicReaderContext reader) {
// no-op
}

@Override
public void postCollection() throws IOException {
// no-op
}
};

/**
* Wrap the given collectors into a single instance.
*/
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectorList) {
final BucketCollector[] collectors = Iterables.toArray(collectorList, BucketCollector.class);
switch (collectors.length) {
case 0:
return NO_OP_COLLECTOR;
case 1:
return collectors[0];
default:
return new BucketCollector() {

@Override
public void collect(int docId, long bucketOrdinal) throws IOException {
for (BucketCollector collector : collectors) {
collector.collect(docId, bucketOrdinal);
}
}

@Override
public void setNextReader(AtomicReaderContext reader) {
for (BucketCollector collector : collectors) {
collector.setNextReader(reader);
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

};
}
}

/**
* Called during the query phase, to collect & aggregate the given document.
*
* @param doc The document to be collected/aggregated
* @param bucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator.
* Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS}
* will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just
* an extra information for the aggregation context. For top level aggregators, the ordinal will always be
* equal to 0.
* @throws IOException
*/
public abstract void collect(int docId, long bucketOrdinal) throws IOException;

/**
* Post collection callback.
*/
public abstract void postCollection() throws IOException;

}
Expand Up @@ -27,9 +27,7 @@
import org.elasticsearch.search.aggregations.support.AggregationContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
*
Expand All @@ -38,19 +36,10 @@ public abstract class BucketsAggregator extends Aggregator {

private LongArray docCounts;

private final Aggregator[] collectableSugAggregators;

public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMode, AggregatorFactories factories,
long estimatedBucketsCount, AggregationContext context, Aggregator parent) {
super(name, bucketAggregationMode, factories, estimatedBucketsCount, context, parent);
docCounts = bigArrays.newLongArray(estimatedBucketsCount, true);
List<Aggregator> collectables = new ArrayList<>(subAggregators.length);
for (int i = 0; i < subAggregators.length; i++) {
if (subAggregators[i].shouldCollect()) {
collectables.add((subAggregators[i]));
}
}
collectableSugAggregators = collectables.toArray(new Aggregator[collectables.size()]);
}

/**
Expand All @@ -73,9 +62,7 @@ protected final void collectBucket(int doc, long bucketOrd) throws IOException {
*/
protected final void collectExistingBucket(int doc, long bucketOrd) throws IOException {
docCounts.increment(bucketOrd, 1);
for (int i = 0; i < collectableSugAggregators.length; i++) {
collectableSugAggregators[i].collect(doc, bucketOrd);
}
collectBucketNoCounts(doc, bucketOrd);
}

public LongArray getDocCounts() {
Expand All @@ -86,9 +73,7 @@ public LongArray getDocCounts() {
* Utility method to collect the given doc in the given bucket but not to update the doc counts of the bucket
*/
protected final void collectBucketNoCounts(int doc, long bucketOrd) throws IOException {
for (int i = 0; i < collectableSugAggregators.length; i++) {
collectableSugAggregators[i].collect(doc, bucketOrd);
}
collectableSugAggregators.collect(doc, bucketOrd);
}

/**
Expand Down
Expand Up @@ -121,7 +121,7 @@ public boolean acceptsDocsOutOfOrder() {
}

@Override
public abstract void postCollection();
public abstract void postCollection() throws IOException;
}

/**
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/elasticsearch/search/facet/FacetPhase.java
Expand Up @@ -184,14 +184,14 @@ public void execute(SearchContext context) throws ElasticsearchException {
}
try {
context.searcher().search(query, MultiCollector.wrap(entry.getValue().toArray(new Collector[entry.getValue().size()])));
for (Collector collector : entry.getValue()) {
if (collector instanceof XCollector) {
((XCollector) collector).postCollection();
}
}
} catch (Exception e) {
throw new QueryPhaseExecutionException(context, "Failed to execute global facets", e);
}
for (Collector collector : entry.getValue()) {
if (collector instanceof XCollector) {
((XCollector) collector).postCollection();
}
}
}
}

Expand Down
Expand Up @@ -171,7 +171,7 @@ public Collector(org.apache.lucene.search.Collector collector, Filter parentFilt
}

@Override
public void postCollection() {
public void postCollection() throws IOException {
if (collector instanceof XCollector) {
((XCollector) collector).postCollection();
}
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void setNextReader(AtomicReaderContext context) throws IOException {
}

@Override
public void postCollection() {
public void postCollection() throws IOException {
if (collector instanceof XCollector) {
((XCollector) collector).postCollection();
}
Expand Down