Skip to content

Commit

Permalink
Remove abstraction in the percentiles aggregation.
Browse files Browse the repository at this point in the history
We initially added abstraction in the percentiles aggregation in order to be
able to plug in different percentiles estimators. However, only one of the 3
options that we looked into proved useful and I don't see us adding new
estimators in the future.

Moreover, because of this, we let the parser put unknown parameters into a hash
table in case these parameters would have meaning for a specific percentiles
estimator impl. But this makes parsing error-prone: for example a user reported
that his percentiles aggregation reported extremely high (in the order of
several millions while the maximum field value was `5`), and the reason was that
he had a typo and had written `fields` instead of `field`. As a consequence,
the percentiles aggregation used the parent value source which was a timestamp,
hence the large values. Parsing would now barf in case of an unknown parameter.

Close #5859
  • Loading branch information
jpountz committed Apr 24, 2014
1 parent b3e0e58 commit cb8139a
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 445 deletions.
Expand Up @@ -19,12 +19,15 @@
package org.elasticsearch.search.aggregations.metrics.percentiles;

import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigestState;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

import java.io.IOException;
Expand All @@ -51,20 +54,22 @@ public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}

private PercentilesEstimator.Result result;
private double[] percents;
private TDigestState state;
private boolean keyed;

InternalPercentiles() {} // for serialization

public InternalPercentiles(String name, PercentilesEstimator.Result result, boolean keyed) {
public InternalPercentiles(String name, double[] percents, TDigestState state, boolean keyed) {
super(name);
this.result = result;
this.percents = percents;
this.state = state;
this.keyed = keyed;
}

@Override
public double value(String name) {
return result.estimate(Double.valueOf(name));
return percentile(Double.parseDouble(name));
}

@Override
Expand All @@ -74,53 +79,69 @@ public Type type() {

@Override
public double percentile(double percent) {
return result.estimate(percent);
return state.quantile(percent / 100);
}

@Override
public Iterator<Percentiles.Percentile> iterator() {
return new Iter(result);
return new Iter(percents, state);
}

@Override
public InternalPercentiles reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
InternalPercentiles first = (InternalPercentiles) aggregations.get(0);
if (aggregations.size() == 1) {
return first;
}
PercentilesEstimator.Result.Merger merger = first.result.merger(aggregations.size());
InternalPercentiles merged = null;
for (InternalAggregation aggregation : aggregations) {
merger.add(((InternalPercentiles) aggregation).result);
final InternalPercentiles percentiles = (InternalPercentiles) aggregation;
if (merged == null) {
merged = percentiles;
} else {
merged.state.add(percentiles.state);
}
}
first.result = merger.merge();
return first;
return merged;
}

@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
valueFormatter = ValueFormatterStreams.readOptional(in);
result = PercentilesEstimator.Streams.read(in);
if (in.getVersion().before(Version.V_1_2_0)) {
final byte id = in.readByte();
if (id != 0) {
throw new ElasticsearchIllegalArgumentException("Unexpected percentiles aggregator id [" + id + "]");
}
}
percents = new double[in.readInt()];
for (int i = 0; i < percents.length; ++i) {
percents[i] = in.readDouble();
}
state = TDigestState.read(in);
keyed = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
ValueFormatterStreams.writeOptional(valueFormatter, out);
PercentilesEstimator.Streams.write(result, out);
if (out.getVersion().before(Version.V_1_2_0)) {
out.writeByte((byte) 0);
}
out.writeInt(percents.length);
for (int i = 0 ; i < percents.length; ++i) {
out.writeDouble(percents[i]);
}
TDigestState.write(state, out);
out.writeBoolean(keyed);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
double[] percents = result.percents;
if (keyed) {
builder.startObject(name);
for(int i = 0; i < percents.length; ++i) {
String key = String.valueOf(percents[i]);
double value = result.estimate(i);
double value = percentile(percents[i]);
builder.field(key, value);
if (valueFormatter != null) {
builder.field(key + "_as_string", valueFormatter.format(value));
Expand All @@ -130,7 +151,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
} else {
builder.startArray(name);
for (int i = 0; i < percents.length; i++) {
double value = result.estimate(i);
double value = percentile(percents[i]);
builder.startObject();
builder.field(CommonFields.KEY, percents[i]);
builder.field(CommonFields.VALUE, value);
Expand All @@ -146,22 +167,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

public static class Iter extends UnmodifiableIterator<Percentiles.Percentile> {

private final PercentilesEstimator.Result result;
private final double[] percents;
private final TDigestState state;
private int i;

public Iter(PercentilesEstimator.Result estimator) {
this.result = estimator;
public Iter(double[] percents, TDigestState state) {
this.percents = percents;
this.state = state;
i = 0;
}

@Override
public boolean hasNext() {
return i < result.percents.length;
return i < percents.length;
}

@Override
public Percentiles.Percentile next() {
final Percentiles.Percentile next = new InnerPercentile(result.percents[i], result.estimate(i));
final Percentiles.Percentile next = new InnerPercentile(percents[i], state.quantile(percents[i] / 100));
++i;
return next;
}
Expand Down
Expand Up @@ -18,57 +18,13 @@
*/
package org.elasticsearch.search.aggregations.metrics.percentiles;

import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregation;

import java.io.IOException;

/**
*
*/
public interface Percentiles extends Aggregation, Iterable<Percentiles.Percentile> {

public static abstract class Estimator {

public static TDigest tDigest() {
return new TDigest();
}

private final String type;

protected Estimator(String type) {
this.type = type;
}

public static class TDigest extends Estimator {

protected double compression = -1;

TDigest() {
super("tdigest");
}

public TDigest compression(double compression) {
this.compression = compression;
return this;
}

@Override
void paramsToXContent(XContentBuilder builder) throws IOException {
if (compression > 0) {
builder.field("compression", compression);
}
}
}

String type() {
return type;
}

abstract void paramsToXContent(XContentBuilder builder) throws IOException;

}

public static interface Percentile {

double getPercent();
Expand Down
Expand Up @@ -19,10 +19,14 @@
package org.elasticsearch.search.aggregations.metrics.percentiles;

import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigestState;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
Expand All @@ -35,19 +39,27 @@
*/
public class PercentilesAggregator extends MetricsAggregator.MultiValue {

private static int indexOfPercent(double[] percents, double percent) {
return ArrayUtils.binarySearch(percents, percent, 0.001);
}

private final double[] percents;
private final ValuesSource.Numeric valuesSource;
private DoubleValues values;

private final PercentilesEstimator estimator;
private ObjectArray<TDigestState> states;
private final double compression;
private final boolean keyed;


public PercentilesAggregator(String name, long estimatedBucketsCount, ValuesSource.Numeric valuesSource, AggregationContext context,
Aggregator parent, PercentilesEstimator estimator, boolean keyed) {
Aggregator parent, double[] percents, double compression, boolean keyed) {
super(name, estimatedBucketsCount, context, parent);
this.valuesSource = valuesSource;
this.keyed = keyed;
this.estimator = estimator;
this.states = bigArrays.newObjectArray(estimatedBucketsCount);
this.percents = percents;
this.compression = compression;
}

@Override
Expand All @@ -61,65 +73,86 @@ public void setNextReader(AtomicReaderContext reader) {
}

@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
public void collect(int doc, long bucketOrd) throws IOException {
states = bigArrays.grow(states, bucketOrd + 1);

TDigestState state = states.get(bucketOrd);
if (state == null) {
state = new TDigestState(compression);
states.set(bucketOrd, state);
}

final int valueCount = values.setDocument(doc);
for (int i = 0; i < valueCount; i++) {
estimator.offer(values.nextValue(), owningBucketOrdinal);
state.add(values.nextValue());
}
}

@Override
public boolean hasMetric(String name) {
return PercentilesEstimator.indexOfPercent(estimator.percents, Double.parseDouble(name)) >= 0;
return indexOfPercent(percents, Double.parseDouble(name)) >= 0;
}

private TDigestState getState(long bucketOrd) {
if (bucketOrd >= states.size()) {
return null;
}
final TDigestState state = states.get(bucketOrd);
return state;
}

@Override
public double metric(String name, long owningBucketOrd) {
return estimator.result(owningBucketOrd).estimate(Double.parseDouble(name));
public double metric(String name, long bucketOrd) {
TDigestState state = getState(bucketOrd);
if (state == null) {
return Double.NaN;
} else {
return state.quantile(Double.parseDouble(name) / 100);
}
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (valuesSource == null) {
TDigestState state = getState(owningBucketOrdinal);
if (state == null) {
return buildEmptyAggregation();
} else {
return new InternalPercentiles(name, percents, state, keyed);
}
return new InternalPercentiles(name, estimator.result(owningBucketOrdinal), keyed);
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalPercentiles(name, estimator.emptyResult(), keyed);
return new InternalPercentiles(name, percents, new TDigestState(compression), keyed);
}

@Override
protected void doClose() {
estimator.close();
Releasables.close(states);
}

public static class Factory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.Numeric> {

private final PercentilesEstimator.Factory estimatorFactory;
private final double[] percents;
private final double compression;
private final boolean keyed;

public Factory(String name, ValuesSourceConfig<ValuesSource.Numeric> valuesSourceConfig,
double[] percents, PercentilesEstimator.Factory estimatorFactory, boolean keyed) {
double[] percents, double compression, boolean keyed) {
super(name, InternalPercentiles.TYPE.name(), valuesSourceConfig);
this.estimatorFactory = estimatorFactory;
this.percents = percents;
this.compression = compression;
this.keyed = keyed;
}

@Override
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) {
return new PercentilesAggregator(name, 0, null, aggregationContext, parent, estimatorFactory.create(percents, 0, aggregationContext), keyed);
return new PercentilesAggregator(name, 0, null, aggregationContext, parent, percents, compression, keyed);
}

@Override
protected Aggregator create(ValuesSource.Numeric valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) {
PercentilesEstimator estimator = estimatorFactory.create(percents, expectedBucketsCount, aggregationContext);
return new PercentilesAggregator(name, expectedBucketsCount, valuesSource, aggregationContext, parent, estimator, keyed);
return new PercentilesAggregator(name, expectedBucketsCount, valuesSource, aggregationContext, parent, percents, compression, keyed);
}
}

}

0 comments on commit cb8139a

Please sign in to comment.