Skip to content

Commit

Permalink
Aggregations: removed aggregations from ReduceContext
Browse files Browse the repository at this point in the history
ReduceContext contains the list of aggregations to reduce but these aggregations are set as null half of the time. This change makes the reduce(ReduceContext) method changed to reduce(List<InternalAggregation>, ReduceContext) and ReduceContext now only holds the BigArrays and Script services.
  • Loading branch information
colings86 committed Apr 9, 2015
1 parent 3b41299 commit fcc09f6
Show file tree
Hide file tree
Showing 25 changed files with 88 additions and 70 deletions.
27 changes: 22 additions & 5 deletions src/main/java/org/elasticsearch/percolator/PercolatorService.java
Expand Up @@ -19,11 +19,19 @@
package org.elasticsearch.percolator;

import com.carrotsearch.hppc.ByteObjectOpenHashMap;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.memory.ExtendedMemoryIndex;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.*;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -58,14 +66,21 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.percolator.QueryCollector.*;
import org.elasticsearch.percolator.QueryCollector.Count;
import org.elasticsearch.percolator.QueryCollector.Match;
import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget;
Expand All @@ -83,7 +98,9 @@
import java.util.Map;

import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.percolator.QueryCollector.*;
import static org.elasticsearch.percolator.QueryCollector.count;
import static org.elasticsearch.percolator.QueryCollector.match;
import static org.elasticsearch.percolator.QueryCollector.matchAndScore;

public class PercolatorService extends AbstractComponent {

Expand Down Expand Up @@ -834,7 +851,7 @@ private InternalAggregations reduceAggregations(List<PercolateShardResponse> sha
for (PercolateShardResponse shardResult : shardResults) {
aggregationsList.add(shardResult.aggregations());
}
return InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
return InternalAggregations.reduce(aggregationsList, new ReduceContext(bigArrays, scriptService));
}

}
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -89,20 +88,14 @@ public String toString() {

public static class ReduceContext {

private final List<InternalAggregation> aggregations;
private final BigArrays bigArrays;
private ScriptService scriptService;

public ReduceContext(List<InternalAggregation> aggregations, BigArrays bigArrays, ScriptService scriptService) {
this.aggregations = aggregations;
public ReduceContext(BigArrays bigArrays, ScriptService scriptService) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
}

public List<InternalAggregation> aggregations() {
return aggregations;
}

public BigArrays bigArrays() {
return bigArrays;
}
Expand Down Expand Up @@ -146,7 +139,7 @@ public String getName() {
* try reusing an existing get instance (typically the first in the given list) to save on redundant object
* construction.
*/
public abstract InternalAggregation reduce(ReduceContext reduceContext);
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);

@Override
public Object getProperty(String path) {
Expand Down
Expand Up @@ -169,7 +169,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue();
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context.scriptService())));
reducedAggregations.add(first.reduce(aggregations, context));
}
return new InternalAggregations(reducedAggregations);
}
Expand Down
Expand Up @@ -69,8 +69,7 @@ public InternalAggregations getAggregations() {
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long docCount = 0L;
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
for (InternalAggregation aggregation : aggregations) {
Expand Down
Expand Up @@ -191,8 +191,7 @@ public Bucket getBucketByKey(String key) {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<List<Bucket>> bucketsList = null;
for (InternalAggregation aggregation : aggregations) {
InternalFilters filters = (InternalFilters) aggregation;
Expand Down
Expand Up @@ -188,8 +188,7 @@ public List<GeoHashGrid.Bucket> getBuckets() {
}

@Override
public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalGeoHashGrid reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

LongObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
Expand Down
Expand Up @@ -297,8 +297,7 @@ private static class IteratorAndCurrent<B> {

}

private List<B> reduceBuckets(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
private List<B> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<IteratorAndCurrent<B>>(aggregations.size()) {
@Override
Expand Down Expand Up @@ -412,8 +411,8 @@ private void addEmptyBuckets(List<B> list) {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<B> reducedBuckets = reduceBuckets(reduceContext);
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<B> reducedBuckets = reduceBuckets(aggregations, reduceContext);

// adding empty buckets if needed
if (minDocCount == 0) {
Expand Down
Expand Up @@ -260,8 +260,7 @@ public List<B> getBuckets() {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
@SuppressWarnings("unchecked")
List<Bucket>[] rangeList = new List[ranges.size()];
for (int i = 0; i < rangeList.length; ++i) {
Expand Down
Expand Up @@ -156,8 +156,7 @@ public SignificantTerms.Bucket getBucketByKey(String term) {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

long globalSubsetSize = 0;
long globalSupersetSize = 0;
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.JLHScore;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -68,10 +67,10 @@ public Type type() {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
for (InternalAggregation aggregation : reduceContext.aggregations()) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation aggregation : aggregations) {
if (!(aggregation instanceof UnmappedSignificantTerms)) {
return aggregation.reduce(reduceContext);
return aggregation.reduce(aggregations, reduceContext);
}
}
return this;
Expand Down
Expand Up @@ -162,8 +162,7 @@ public long getSumOfOtherDocCounts() {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
long sumDocCountError = 0;
Expand Down
Expand Up @@ -81,10 +81,10 @@ protected void doWriteTo(StreamOutput out) throws IOException {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
for (InternalAggregation agg : reduceContext.aggregations()) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation agg : aggregations) {
if (!(agg instanceof UnmappedTerms)) {
return agg.reduce(reduceContext);
return agg.reduce(aggregations, reduceContext);
}
}
return this;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -79,10 +80,10 @@ public Type type() {
}

@Override
public InternalAvg reduce(ReduceContext reduceContext) {
public InternalAvg reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
count += ((InternalAvg) aggregation).count;
sum += ((InternalAvg) aggregation).sum;
}
Expand Down
Expand Up @@ -99,8 +99,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
InternalCardinality reduced = null;
for (InternalAggregation aggregation : aggregations) {
final InternalCardinality cardinality = (InternalCardinality) aggregation;
Expand Down
Expand Up @@ -73,15 +73,15 @@ public Type type() {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double top = Double.NEGATIVE_INFINITY;
double bottom = Double.POSITIVE_INFINITY;
double posLeft = Double.POSITIVE_INFINITY;
double posRight = Double.NEGATIVE_INFINITY;
double negLeft = Double.POSITIVE_INFINITY;
double negRight = Double.NEGATIVE_INFINITY;

for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;

if (bounds.top > top) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -77,9 +78,9 @@ public Type type() {
}

@Override
public InternalMax reduce(ReduceContext reduceContext) {
public InternalMax reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double max = Double.NEGATIVE_INFINITY;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
max = Math.max(max, ((InternalMax) aggregation).max);
}
return new InternalMax(name, max, valueFormatter, getMetaData());
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -78,9 +79,9 @@ public Type type() {
}

@Override
public InternalMin reduce(ReduceContext reduceContext) {
public InternalMin reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double min = Double.POSITIVE_INFINITY;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
min = Math.min(min, ((InternalMin) aggregation).min);
}
return new InternalMin(getName(), min, this.valueFormatter, getMetaData());
Expand Down
Expand Up @@ -60,8 +60,7 @@ public double value(String name) {
public abstract double value(double key);

@Override
public AbstractInternalPercentiles reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
public AbstractInternalPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
TDigestState merged = null;
for (InternalAggregation aggregation : aggregations) {
final AbstractInternalPercentiles percentiles = (AbstractInternalPercentiles) aggregation;
Expand Down
Expand Up @@ -82,13 +82,13 @@ public Object aggregation() {
}

@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Object> aggregationObjects = new ArrayList<>();
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation;
aggregationObjects.add(mapReduceAggregation.aggregation());
}
InternalScriptedMetric firstAggregation = ((InternalScriptedMetric) reduceContext.aggregations().get(0));
InternalScriptedMetric firstAggregation = ((InternalScriptedMetric) aggregations.get(0));
Object aggregation;
if (firstAggregation.reduceScript != null) {
Map<String, Object> params;
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -148,12 +149,12 @@ public double value(String name) {
}

@Override
public InternalStats reduce(ReduceContext reduceContext) {
public InternalStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalStats stats = (InternalStats) aggregation;
count += stats.getCount();
min = Math.min(min, stats.getMin());
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -143,13 +144,13 @@ public String getStdDeviationBoundAsString(Bounds bound) {
}

@Override
public InternalExtendedStats reduce(ReduceContext reduceContext) {
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
double sumOfSqrs = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
for (InternalAggregation aggregation : aggregations) {
InternalExtendedStats stats = (InternalExtendedStats) aggregation;
sumOfSqrs += stats.getSumOfSquares();
}
final InternalStats stats = super.reduce(reduceContext);
final InternalStats stats = super.reduce(aggregations, reduceContext);
return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs, sigma, valueFormatter, getMetaData());
}

Expand Down

0 comments on commit fcc09f6

Please sign in to comment.