Skip to content

Commit

Permalink
Aggs: Refactor aggregations to use lucene5-style collectors.
Browse files Browse the repository at this point in the history
Aggregators now return a new collector instance per segment, like Lucene 5 does
with its oal.search.Collector API. This is important for us because things like
knowing whether the field is single or multi-valued is only known at a segment
level.

In order to do that I had to change aggregators to notify their sub aggregators
of new incoming segments (pretty much in the spirit of #6477) while everything
used to be centralized in the AggregationContext class. While this might slow
down a bit deeply nested aggregation trees, this also makes the children
aggregation and the `breadth_first` collection mode much better options since
they can now only replay what they need while they used to have to replay the
whole aggregation tree.

I also took advantage of this big refactoring to remove some abstractions that
were not really required like ValuesSource.MetaData or BucketAnalysisCollector.
I also splitted Aggregator into Aggregator and AggregatorBase in order to
separate the Aggregator API from implementation helpers.

Close #9544
  • Loading branch information
jpountz committed Feb 12, 2015
1 parent d1deb6b commit de41981
Show file tree
Hide file tree
Showing 78 changed files with 2,248 additions and 2,655 deletions.
11 changes: 7 additions & 4 deletions src/main/java/org/elasticsearch/common/geo/GeoDistance.java
Expand Up @@ -23,9 +23,13 @@
import org.apache.lucene.util.SloppyMath;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.GeoPointValues;
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.fielddata.SortingNumericDoubleValues;

import java.util.List;
import java.util.Locale;

/**
Expand Down Expand Up @@ -398,8 +402,7 @@ public double get(int docID) {
@Override
public void setDocument(int doc) {
geoPointValues.setDocument(doc);
count = geoPointValues.count() * distances.length;
grow();
resize(geoPointValues.count() * distances.length);
int valueCounter = 0;
for (FixedSourceDistance distance : distances) {
for (int i = 0; i < geoPointValues.count(); ++i) {
Expand Down
61 changes: 59 additions & 2 deletions src/main/java/org/elasticsearch/common/lucene/Lucene.java
Expand Up @@ -24,8 +24,31 @@
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.ComplexExplanation;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
Expand All @@ -34,6 +57,7 @@
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -579,4 +603,37 @@ public static Version parse(String toParse, Version defaultValue) {
return defaultValue;
}
}

/**
* Return a Scorer that throws an ElasticsearchIllegalStateException
* on all operations with the given message.
*/
public static Scorer illegalScorer(final String message) {
return new Scorer(null) {
@Override
public float score() throws IOException {
throw new ElasticsearchIllegalStateException(message);
}
@Override
public int freq() throws IOException {
throw new ElasticsearchIllegalStateException(message);
}
@Override
public int advance(int arg0) throws IOException {
throw new ElasticsearchIllegalStateException(message);
}
@Override
public long cost() {
throw new ElasticsearchIllegalStateException(message);
}
@Override
public int docID() {
throw new ElasticsearchIllegalStateException(message);
}
@Override
public int nextDoc() throws IOException {
throw new ElasticsearchIllegalStateException(message);
}
};
}
}

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/common/util/LongHash.java
Expand Up @@ -52,7 +52,7 @@ public long get(long id) {
}

/**
* Get the id associated with <code>key</code>
* Get the id associated with <code>key</code> or -1 if the key is not contained in the hash.
*/
public long find(long key) {
final long slot = slot(hash(key), mask);
Expand Down
Expand Up @@ -28,7 +28,7 @@
*/
public abstract class SortingNumericDoubleValues extends SortedNumericDoubleValues {

protected int count;
private int count;
protected double[] values;
private final Sorter sorter;

Expand All @@ -51,9 +51,11 @@ protected int compare(int i, int j) {
}

/**
* Make sure the {@link #values} array can store at least {@link #count} entries.
* Set the {@link #count()} and ensure that the {@link #values} array can
* store at least that many entries.
*/
protected final void grow() {
protected final void resize(int newSize) {
count = newSize;
values = ArrayUtil.grow(values, count);
}

Expand All @@ -65,10 +67,12 @@ protected final void sort() {
sorter.sort(0, count);
}

@Override
public final int count() {
return count;
}

@Override
public final double valueAt(int index) {
return values[index];
}
Expand Down
Expand Up @@ -772,11 +772,7 @@ private void queryBasedPercolating(Engine.Searcher percolatorSearcher, Percolate
percolatorTypeFilter = context.indexService().cache().filter().cache(percolatorTypeFilter, null, context.indexService().queryParserService().autoFilterCachePolicy());
FilteredQuery query = new FilteredQuery(context.percolateQuery(), percolatorTypeFilter);
percolatorSearcher.searcher().search(query, percolateCollector);
for (Collector queryCollector : percolateCollector.aggregatorCollector) {
if (queryCollector instanceof XCollector) {
((XCollector) queryCollector).postCollection();
}
}
percolateCollector.aggregatorCollector.postCollection();
if (context.aggregations() != null) {
aggregationPhase.execute(context);
}
Expand Down
52 changes: 19 additions & 33 deletions src/main/java/org/elasticsearch/percolator/QueryCollector.java
Expand Up @@ -19,20 +19,27 @@
package org.elasticsearch.percolator;

import com.carrotsearch.hppc.FloatArrayList;
import com.google.common.collect.ImmutableList;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.XCollector;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.search.aggregations.AggregationPhase;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.highlight.HighlightField;
Expand All @@ -59,9 +66,8 @@ abstract class QueryCollector extends SimpleCollector {

SortedBinaryDocValues values;

final List<Collector> aggregatorCollector;

List<LeafCollector> aggregatorLeafCollectors;
final XCollector aggregatorCollector;
LeafCollector aggregatorLeafCollector;

QueryCollector(ESLogger logger, PercolateContext context, boolean isNestedDoc) throws IOException {
this.logger = logger;
Expand All @@ -71,63 +77,43 @@ abstract class QueryCollector extends SimpleCollector {
this.idFieldData = context.fieldData().getForField(idMapper);
this.isNestedDoc = isNestedDoc;

ImmutableList.Builder<Collector> aggCollectorBuilder = ImmutableList.builder();
List<Aggregator> aggregatorCollectors = new ArrayList<>();

if (context.aggregations() != null) {
AggregationContext aggregationContext = new AggregationContext(context);
context.aggregations().aggregationContext(aggregationContext);

List<Aggregator> aggregatorCollectors = new ArrayList<>();
Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators(aggregationContext);
for (int i = 0; i < aggregators.length; i++) {
if (!(aggregators[i] instanceof GlobalAggregator)) {
Aggregator aggregator = aggregators[i];
if (aggregator.shouldCollect()) {
aggregatorCollectors.add(aggregator);
}
aggregatorCollectors.add(aggregator);
}
}
context.aggregations().aggregators(aggregators);
if (!aggregatorCollectors.isEmpty()) {
aggCollectorBuilder.add(new AggregationPhase.AggregationsCollector(aggregatorCollectors, aggregationContext));
}
aggregationContext.setNextReader(context.searcher().getIndexReader().getContext());
}
aggregatorCollector = aggCollectorBuilder.build();
aggregatorLeafCollectors = new ArrayList<>(aggregatorCollector.size());
aggregatorCollector = BucketCollector.wrap(aggregatorCollectors);
}

public void postMatch(int doc) throws IOException {
for (LeafCollector collector : aggregatorLeafCollectors) {
collector.collect(doc);
}
aggregatorLeafCollector.collect(doc);
}

@Override
public void setScorer(Scorer scorer) throws IOException {
for (LeafCollector collector : aggregatorLeafCollectors) {
collector.setScorer(scorer);
}
aggregatorLeafCollector.setScorer(scorer);
}

@Override
public boolean needsScores() {
for (Collector collector : aggregatorCollector) {
if (collector.needsScores()) {
return true;
}
}
return false;
return aggregatorCollector.needsScores();
}

@Override
public void doSetNextReader(LeafReaderContext context) throws IOException {
// we use the UID because id might not be indexed
values = idFieldData.load(context).getBytesValues();
aggregatorLeafCollectors.clear();
for (Collector collector : aggregatorCollector) {
aggregatorLeafCollectors.add(collector.getLeafCollector(context));
}
aggregatorLeafCollector = aggregatorCollector.getLeafCollector(context);
}

static Match match(ESLogger logger, PercolateContext context, HighlightPhase highlightPhase, boolean isNestedDoc) throws IOException {
Expand Down

0 comments on commit de41981

Please sign in to comment.