Skip to content

Commit

Permalink
Adds aggregation profiling (not including reduce phase)
Browse files Browse the repository at this point in the history
Add Aggregation profiling initially only be for the shard phases (i.e. the reduce phase will not be profiled in this change)

This change refactors the query profiling class to extract abstract classes where it is useful for other profiler types to share code.
  • Loading branch information
colings86 committed Jun 10, 2016
1 parent 439b2a9 commit 1d76177
Show file tree
Hide file tree
Showing 27 changed files with 1,430 additions and 472 deletions.
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.search.suggest.Suggest;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
Expand Down Expand Up @@ -169,7 +168,7 @@ public void scrollId(String scrollId) {
*
* @return The profile results or an empty map
*/
public @Nullable Map<String, List<ProfileShardResult>> getProfileResults() {
public @Nullable Map<String, ProfileShardResult> getProfileResults() {
return internalResponse.profile();
}

Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.profile.aggregation.ProfilingAggregator;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -81,7 +83,12 @@ public Aggregator[] createSubAggregators(Aggregator parent) throws IOException {
// propagate the fact that only bucket 0 will be collected with single-bucket
// aggs
final boolean collectsFromSingleBucket = false;
aggregators[i] = factories[i].create(parent, collectsFromSingleBucket);
Aggregator factory = factories[i].create(parent, collectsFromSingleBucket);
Profilers profilers = factory.context().searchContext().getProfilers();
if (profilers != null) {
factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
}
aggregators[i] = factory;
}
return aggregators;
}
Expand All @@ -92,7 +99,12 @@ public Aggregator[] createTopLevelAggregators() throws IOException {
for (int i = 0; i < factories.length; i++) {
// top-level aggs only get called with bucket 0
final boolean collectsFromSingleBucket = true;
aggregators[i] = factories[i].create(null, collectsFromSingleBucket);
Aggregator factory = factories[i].create(null, collectsFromSingleBucket);
Profilers profilers = factory.context().searchContext().getProfilers();
if (profilers != null) {
factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
}
aggregators[i] = factory;
}
return aggregators;
}
Expand Down
Expand Up @@ -28,13 +28,139 @@
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext.Lifetime;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public abstract class AggregatorFactory<AF extends AggregatorFactory<AF>> {

public static final class MultiBucketAggregatorWrapper extends Aggregator {
private final BigArrays bigArrays;
private final Aggregator parent;
private final AggregatorFactory<?> factory;
private final Aggregator first;
ObjectArray<Aggregator> aggregators;
ObjectArray<LeafBucketCollector> collectors;

MultiBucketAggregatorWrapper(BigArrays bigArrays, AggregationContext context, Aggregator parent, AggregatorFactory<?> factory,
Aggregator first) {
this.bigArrays = bigArrays;
this.parent = parent;
this.factory = factory;
this.first = first;
context.searchContext().addReleasable(this, Lifetime.PHASE);
aggregators = bigArrays.newObjectArray(1);
aggregators.set(0, first);
collectors = bigArrays.newObjectArray(1);
}

public Class<?> getWrappedClass() {
return first.getClass();
}

@Override
public String name() {
return first.name();
}

@Override
public AggregationContext context() {
return first.context();
}

@Override
public Aggregator parent() {
return first.parent();
}

@Override
public boolean needsScores() {
return first.needsScores();
}

@Override
public Aggregator subAggregator(String name) {
throw new UnsupportedOperationException();
}

@Override
public void preCollection() throws IOException {
for (long i = 0; i < aggregators.size(); ++i) {
final Aggregator aggregator = aggregators.get(i);
if (aggregator != null) {
aggregator.preCollection();
}
}
}

@Override
public void postCollection() throws IOException {
for (long i = 0; i < aggregators.size(); ++i) {
final Aggregator aggregator = aggregators.get(i);
if (aggregator != null) {
aggregator.postCollection();
}
}
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx) {
for (long i = 0; i < collectors.size(); ++i) {
collectors.set(i, null);
}
return new LeafBucketCollector() {
Scorer scorer;

@Override
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
}

@Override
public void collect(int doc, long bucket) throws IOException {
collectors = bigArrays.grow(collectors, bucket + 1);

LeafBucketCollector collector = collectors.get(bucket);
if (collector == null) {
aggregators = bigArrays.grow(aggregators, bucket + 1);
Aggregator aggregator = aggregators.get(bucket);
if (aggregator == null) {
aggregator = factory.create(parent, true);
aggregator.preCollection();
aggregators.set(bucket, aggregator);
}
collector = aggregator.getLeafCollector(ctx);
collector.setScorer(scorer);
collectors.set(bucket, collector);
}
collector.collect(doc, 0);
}

};
}

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (bucket < aggregators.size()) {
Aggregator aggregator = aggregators.get(bucket);
if (aggregator != null) {
return aggregator.buildAggregation(0);
}
}
return buildEmptyAggregation();
}

@Override
public InternalAggregation buildEmptyAggregation() {
return first.buildEmptyAggregation();
}

@Override
public void close() {
Releasables.close(aggregators, collectors);
}
}

protected final String name;
protected final Type type;
protected final AggregatorFactory<?> parent;
Expand Down Expand Up @@ -112,120 +238,7 @@ protected static Aggregator asMultiBucketAggregator(final AggregatorFactory<?> f
final Aggregator parent) throws IOException {
final Aggregator first = factory.create(parent, true);
final BigArrays bigArrays = context.bigArrays();
return new Aggregator() {

ObjectArray<Aggregator> aggregators;
ObjectArray<LeafBucketCollector> collectors;

{
context.searchContext().addReleasable(this, Lifetime.PHASE);
aggregators = bigArrays.newObjectArray(1);
aggregators.set(0, first);
collectors = bigArrays.newObjectArray(1);
}

@Override
public String name() {
return first.name();
}

@Override
public AggregationContext context() {
return first.context();
}

@Override
public Aggregator parent() {
return first.parent();
}

@Override
public boolean needsScores() {
return first.needsScores();
}

@Override
public Aggregator subAggregator(String name) {
throw new UnsupportedOperationException();
}

@Override
public void preCollection() throws IOException {
for (long i = 0; i < aggregators.size(); ++i) {
final Aggregator aggregator = aggregators.get(i);
if (aggregator != null) {
aggregator.preCollection();
}
}
}

@Override
public void postCollection() throws IOException {
for (long i = 0; i < aggregators.size(); ++i) {
final Aggregator aggregator = aggregators.get(i);
if (aggregator != null) {
aggregator.postCollection();
}
}
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx) {
for (long i = 0; i < collectors.size(); ++i) {
collectors.set(i, null);
}
return new LeafBucketCollector() {
Scorer scorer;

@Override
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
}

@Override
public void collect(int doc, long bucket) throws IOException {
aggregators = bigArrays.grow(aggregators, bucket + 1);
collectors = bigArrays.grow(collectors, bucket + 1);

LeafBucketCollector collector = collectors.get(bucket);
if (collector == null) {
Aggregator aggregator = aggregators.get(bucket);
if (aggregator == null) {
aggregator = factory.create(parent, true);
aggregator.preCollection();
aggregators.set(bucket, aggregator);
}
collector = aggregator.getLeafCollector(ctx);
collector.setScorer(scorer);
collectors.set(bucket, collector);
}
collector.collect(doc, 0);
}

};
}

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (bucket < aggregators.size()) {
Aggregator aggregator = aggregators.get(bucket);
if (aggregator != null) {
return aggregator.buildAggregation(0);
}
}
return buildEmptyAggregation();
}

@Override
public InternalAggregation buildEmptyAggregation() {
return first.buildEmptyAggregation();
}

@Override
public void close() {
Releasables.close(aggregators, collectors);
}
};
return new MultiBucketAggregatorWrapper(bigArrays, context, parent, factory, first);
}

}
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand Down
Expand Up @@ -51,8 +51,9 @@
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.profile.query.QueryProfileShardResult;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
Expand Down Expand Up @@ -407,7 +408,7 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends
//Collect profile results
SearchProfileShardResults shardResults = null;
if (!queryResults.isEmpty() && firstResult.profileResults() != null) {
Map<String, List<ProfileShardResult>> profileResults = new HashMap<>(queryResults.size());
Map<String, ProfileShardResult> profileResults = new HashMap<>(queryResults.size());
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
String key = entry.value.queryResult().shardTarget().toString();
profileResults.put(key, entry.value.queryResult().profileResults());
Expand Down
Expand Up @@ -122,7 +122,7 @@ public Weight createWeight(Query query, boolean needsScores) throws IOException
weight = super.createWeight(query, needsScores);
} finally {
profile.stopAndRecordTime();
profiler.pollLastQuery();
profiler.pollLastElement();
}
return new ProfileWeight(query, weight, profile);
} else {
Expand Down
Expand Up @@ -28,13 +28,12 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.suggest.Suggest;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.search.internal.InternalSearchHits.readSearchHits;
Expand Down Expand Up @@ -99,7 +98,7 @@ public Suggest suggest() {
*
* @return Profile results
*/
public Map<String, List<ProfileShardResult>> profile() {
public Map<String, ProfileShardResult> profile() {
if (profileResults == null) {
return Collections.emptyMap();
}
Expand Down

0 comments on commit 1d76177

Please sign in to comment.