Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegation of nextReader calls #6477

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ abstract class QueryCollector extends Collector {
List<Aggregator> aggregatorCollectors = new ArrayList<>();
Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators(aggregationContext);
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].setNextReader(context.searcher().getIndexReader().getContext());
if (!(aggregators[i] instanceof GlobalAggregator)) {
Aggregator aggregator = aggregators[i];
if (aggregator.shouldCollect()) {
Expand All @@ -107,7 +108,6 @@ abstract class QueryCollector extends Collector {
if (!aggregatorCollectors.isEmpty()) {
facetAggCollectorBuilder.add(new AggregationPhase.AggregationsCollector(aggregatorCollectors, aggregationContext));
}
aggregationContext.setNextReader(context.searcher().getIndexReader().getContext());
}
facetAndAggregatorCollector = facetAggCollectorBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void preProcess(SearchContext context) {
List<Aggregator> collectors = new ArrayList<>();
Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators(aggregationContext);
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].setNextReader(context.searcher().getIndexReader().getContext());
if (!(aggregators[i] instanceof GlobalAggregator)) {
Aggregator aggregator = aggregators[i];
if (aggregator.shouldCollect()) {
Expand All @@ -89,7 +90,6 @@ public void preProcess(SearchContext context) {
if (!collectors.isEmpty()) {
context.searcher().addMainQueryCollector(new AggregationsCollector(collectors, aggregationContext));
}
aggregationContext.setNextReader(context.searcher().getIndexReader().getContext());
}
}

Expand Down Expand Up @@ -148,7 +148,9 @@ public AggregationsCollector(Collection<Aggregator> collectors, AggregationConte

@Override
public void setScorer(Scorer scorer) throws IOException {
aggregationContext.setScorer(scorer);
for (Aggregator collector : collectors) {
collector.setScorer(scorer);
}
}

@Override
Expand All @@ -160,7 +162,9 @@ public void collect(int doc) throws IOException {

@Override
public void setNextReader(AtomicReaderContext context) throws IOException {
aggregationContext.setNextReader(context);
for (Aggregator collector : collectors) {
collector.setNextReader(context);
}
}

@Override
Expand Down
158 changes: 115 additions & 43 deletions src/main/java/org/elasticsearch/search/aggregations/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.lucene.ScorerAware;
import org.elasticsearch.common.lucene.TopReaderContextAware;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
Expand Down Expand Up @@ -115,43 +119,6 @@ public static SubAggCollectionMode parse(String value, EnumSet<ParseField.Flag>
throw new ElasticsearchParseException("No " + COLLECT_MODE.getPreferredName() + " found for value [" + value + "]");
}
}

// A scorer used for the deferred collection mode to handle any child aggs asking for scores that are not
// recorded.
static final Scorer unavailableScorer=new Scorer(null){
private final String MSG = "A limitation of the " + SubAggCollectionMode.BREADTH_FIRST.parseField.getPreferredName()
+ " collection mode is that scores cannot be buffered along with document IDs";

@Override
public float score() throws IOException {
throw new ElasticsearchParseException(MSG);
}

@Override
public int freq() throws IOException {
throw new ElasticsearchParseException(MSG);
}

@Override
public int advance(int arg0) throws IOException {
throw new ElasticsearchParseException(MSG);
}

@Override
public long cost() {
throw new ElasticsearchParseException(MSG);
}

@Override
public int docID() {
throw new ElasticsearchParseException(MSG);
}

@Override
public int nextDoc() throws IOException {
throw new ElasticsearchParseException(MSG);
}};


protected final String name;
protected final Aggregator parent;
Expand All @@ -167,6 +134,13 @@ public int nextDoc() throws IOException {

private Map<String, Aggregator> subAggregatorbyName;
private DeferringBucketCollector recordingWrapper;
private List<ReaderContextAware> readerAwares = new ArrayList<>();
private List<TopReaderContextAware> topReaderAwares = new ArrayList<TopReaderContextAware>();
private List<ScorerAware> scorerAwares = new ArrayList<>();

private IndexReaderContext topReader;
private AtomicReaderContext reader;
private Scorer scorer;

/**
* Constructs a new Aggregator.
Expand Down Expand Up @@ -200,6 +174,10 @@ void badState(){
public void setNextReader(AtomicReaderContext reader) {
badState();
}
@Override
public void setNextReader(IndexReaderContext reader) {
badState();
}

@Override
public void postCollection() throws IOException {
Expand All @@ -215,6 +193,11 @@ public void collect(int docId, long bucketOrdinal) throws IOException {
public void gatherAnalysis(BucketAnalysisCollector results, long bucketOrdinal) {
badState();
}

@Override
public void setScorer(Scorer scorer) {
badState();
}
};
}
protected void preCollection() {
Expand All @@ -231,14 +214,104 @@ protected void preCollection() {
if (nextPassCollectors.size() > 0) {
BucketCollector deferreds = BucketCollector.wrap(nextPassCollectors);
recordingWrapper = new DeferringBucketCollector(deferreds, context);
// TODO. Without line below we are dependent on subclass aggs
// delegating setNextReader calls on to child aggs
// which they don't seem to do as a matter of course. Need to move
// to a delegation model rather than broadcast
context.registerReaderContextAware(recordingWrapper);
thisPassCollectors.add(recordingWrapper);
}
collectableSubAggregators = BucketCollector.wrap(thisPassCollectors);
registerReaderAware(collectableSubAggregators);
registerTopReaderAware(collectableSubAggregators);
registerScorerAware(collectableSubAggregators);
}

public final void registerReaderAware(ReaderContextAware readerAware) {
if (readerAware != null) {
this.readerAwares.add(readerAware);
}
if (reader != null) {
readerAware.setNextReader(reader);
}
}

@Override
public final void setNextReader(AtomicReaderContext reader) {
this.reader = reader;
for (ReaderContextAware readerAware : readerAwares) {
readerAware.setNextReader(reader);
}
doSetNextReader(reader);
}

public final AtomicReaderContext currentReader() {
return reader;
}

/**
* Provides the opportunity to update state when a new reader is opened.
*
* @param reader the new reader context
*/
protected abstract void doSetNextReader(AtomicReaderContext reader);

public final void registerTopReaderAware(TopReaderContextAware topReaderAware) {
if (topReaderAware != null) {
this.topReaderAwares.add(topReaderAware);
}
if (topReader != null) {
topReaderAware.setNextReader(topReader);
}
}

@Override
public final void setNextReader(IndexReaderContext topReader) {
this.topReader = topReader;
for (TopReaderContextAware readerAware : topReaderAwares) {
readerAware.setNextReader(topReader);
}
doSetTopReader(topReader);
}

public final IndexReaderContext topReader() {
return topReader;
}

/**
* Provides the opportunity to update state when the top reader is set.
*
* @param reader the new reader context
*/
protected void doSetTopReader(IndexReaderContext reader) {

}

public final void registerScorerAware(ScorerAware scorerAware) {
if (scorerAware != null) {
this.scorerAwares.add(scorerAware);
}
if (reader != null) {
scorerAware.setScorer(scorer);
}
}

@Override
public final void setScorer(Scorer scorer) {
this.scorer = scorer;
for (ScorerAware scorerAware : scorerAwares) {
scorerAware.setScorer(scorer);
}
doSetScorer(scorer);
}

public final Scorer currentScorer() {
return scorer;
}

/**
* Provides the opportunity to update state when a new scorer is opened. Default
* implementation is no-op.
*
* @param scorer the new scorer
*/
protected void doSetScorer(Scorer scorer) {

}

/**
Expand All @@ -259,7 +332,6 @@ protected boolean shouldDefer(Aggregator aggregator) {
protected void runDeferredCollections(long... bucketOrds){
// Being lenient here - ignore calls where there are no deferred collections to playback
if (recordingWrapper != null) {
context.setScorer(unavailableScorer);
recordingWrapper.prepareSelectedBuckets(bucketOrds);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReaderContext;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.lease.Releasables;
Expand Down Expand Up @@ -49,11 +50,8 @@ private AggregatorFactories(AggregatorFactory[] factories) {
this.factories = factories;
}

private static Aggregator createAndRegisterContextAware(AggregationContext context, AggregatorFactory factory, Aggregator parent, long estimatedBucketsCount) {
private static Aggregator create(AggregationContext context, AggregatorFactory factory, Aggregator parent, long estimatedBucketsCount) {
final Aggregator aggregator = factory.create(context, parent, estimatedBucketsCount);
if (aggregator.shouldCollect()) {
context.registerReaderContextAware(aggregator);
}
// Once the aggregator is fully constructed perform any initialisation -
// can't do everything in constructors if Aggregator base class needs
// to delegate to subclasses as part of construction.
Expand All @@ -68,7 +66,7 @@ public Aggregator[] createSubAggregators(Aggregator parent, final long estimated
Aggregator[] aggregators = new Aggregator[count()];
for (int i = 0; i < factories.length; ++i) {
final AggregatorFactory factory = factories[i];
final Aggregator first = createAndRegisterContextAware(parent.context(), factory, parent, estimatedBucketsCount);
final Aggregator first = create(parent.context(), factory, parent, estimatedBucketsCount);
if (first.bucketAggregationMode() == BucketAggregationMode.MULTI_BUCKETS) {
// This aggregator already supports multiple bucket ordinals, can be used directly
aggregators[i] = first;
Expand Down Expand Up @@ -108,14 +106,31 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException {
aggregators = bigArrays.grow(aggregators, owningBucketOrdinal + 1);
Aggregator aggregator = aggregators.get(owningBucketOrdinal);
if (aggregator == null) {
aggregator = createAndRegisterContextAware(parent.context(), factory, parent, estimatedBucketsCount);
aggregator = create(parent.context(), factory, parent, estimatedBucketsCount);
aggregator.setNextReader(currentReader());
aggregators.set(owningBucketOrdinal, aggregator);
}
aggregator.collect(doc, 0);
}

@Override
public void setNextReader(AtomicReaderContext reader) {
public void doSetNextReader(AtomicReaderContext reader) {
for (int i = 0 ; i < aggregators.size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use a foreach loop here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregators is an ObjectArray which does not implement Iterable, although maybe it should?

@jpountz any reason why it couldn't?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a common theme across the codebase, we don't create iterators (explicitly/implicitly) if we don't need to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I thought that is an array from line 66 hmmm

Aggregator aggregator = aggregators.get(i);
if (aggregator != null) {
aggregator.setNextReader(reader);
}
}
}

@Override
public void doSetTopReader(IndexReaderContext reader) {
for (int i = 0 ; i < aggregators.size(); i++) {
Aggregator aggregator = aggregators.get(i);
if (aggregator != null) {
aggregator.setNextReader(reader);
}
}
}

@Override
Expand Down Expand Up @@ -154,7 +169,7 @@ public Aggregator[] createTopLevelAggregators(AggregationContext ctx) {
// These aggregators are going to be used with a single bucket ordinal, no need to wrap the PER_BUCKET ones
Aggregator[] aggregators = new Aggregator[factories.length];
for (int i = 0; i < factories.length; i++) {
aggregators[i] = createAndRegisterContextAware(ctx, factories[i], null, 0);
aggregators[i] = create(ctx, factories[i], null, 0);
}
return aggregators;
}
Expand Down
Loading