Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for histogram fields to rate aggregation #63289

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -4,7 +4,8 @@
=== Rate Aggregation

A `rate` metrics aggregation can be used only inside a `date_histogram` and calculates a rate of documents or a field in each
`date_histogram` bucket.
`date_histogram` bucket. The field values can be generated by a provided script or extracted from specific numeric or
<<histogram,histogram fields>> in the documents.

==== Syntax

Expand Down
Expand Up @@ -28,7 +28,7 @@
import java.util.Map;
import java.util.Objects;

public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, RateAggregationBuilder> {
public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, RateAggregationBuilder> {
public static final String NAME = "rate";
public static final ParseField UNIT_FIELD = new ParseField("unit");
public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
Expand Down
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
Expand All @@ -23,13 +25,14 @@
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

import java.io.IOException;
import java.util.Map;

public class RateAggregator extends NumericMetricsAggregator.SingleValue {

private final ValuesSource.Numeric valuesSource;
private final ValuesSource valuesSource;
private final DocValueFormat format;
private final Rounding.DateTimeUnit rateUnit;
private final SizedBucketAggregator sizedBucketAggregator;
Expand All @@ -46,7 +49,7 @@ public RateAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
this.valuesSource = valuesSourceConfig.getValuesSource();
this.format = valuesSourceConfig.format();
if (valuesSource != null) {
sums = context.bigArrays().newDoubleArray(1, true);
Expand Down Expand Up @@ -78,33 +81,53 @@ public ScoreMode scoreMode() {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
if (valuesSource instanceof HistogramValuesSource.Histogram) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using an instanceof check and having one aggregator that supports both ValuesSourceTypes, we should create a new aggregator for histogram types, and add a second mapping in the register aggregators method on the factory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, didn't think about that! Great idea.

final HistogramValues values = ((HistogramValuesSource.Histogram)valuesSource).getHistogramValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while (sketch.next()) {
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
kahanSummation.add(sketch.value());
compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
};
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
}
}

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;

import java.io.IOException;
import java.util.List;
Expand All @@ -44,7 +45,7 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(
RateAggregationBuilder.REGISTRY_KEY,
List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN),
List.of(CoreValuesSourceType.NUMERIC, AnalyticsValuesSourceType.HISTOGRAM),
RateAggregator::new,
true
);
Expand Down
Expand Up @@ -6,6 +6,22 @@

package org.elasticsearch.xpack.analytics.rate;

import static org.elasticsearch.xpack.analytics.AnalyticsTestsUtils.histogramFieldDocValues;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
Expand Down Expand Up @@ -40,21 +56,7 @@
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.lookup.LeafDocLookup;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;

public class RateAggregatorTests extends AggregatorTestCase {

Expand Down Expand Up @@ -386,6 +388,67 @@ public void testFormatter() throws IOException {
}, dateType, numType);
}

public void testHistogramFieldMonthToMonth() throws IOException {
MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap());
MappedFieldType dateType = dateFieldType(DATE_FIELD);
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");

DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("month"))
.subAggregation(rateAggregationBuilder);

testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 })));
iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3, 4 })));
}, (Consumer<InternalDateHistogram>) dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValue(), closeTo(3.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).getValue(), closeTo(7.0, 0.000001));
}, dateType, histType);
}

public void testHistogramFieldMonthToYear() throws IOException {
MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap());
MappedFieldType dateType = dateFieldType(DATE_FIELD);
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");

DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("year"))
.subAggregation(rateAggregationBuilder);

testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 })));
iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3, 4 })));
}, (Consumer<InternalDateHistogram>) dh -> {
assertThat(dh.getBuckets(), hasSize(1));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValue(), closeTo(10.0 / 12, 0.000001));
}, dateType, histType);
}

public void testFilterWithHistogramField() throws IOException {
MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap());
MappedFieldType dateType = dateFieldType(DATE_FIELD);
MappedFieldType keywordType = new KeywordFieldMapper.KeywordFieldType("term");
RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");

DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
.calendarInterval(new DateHistogramInterval("month"))
.subAggregation(rateAggregationBuilder);

testCase(dateHistogramAggregationBuilder, new TermQuery(new Term("term", "a")), iw -> {
iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 }),
new StringField("term", "a", Field.Store.NO)));
iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3 }),
new StringField("term", "a", Field.Store.NO)));
iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 4 }),
new StringField("term", "b", Field.Store.NO)));
}, (Consumer<InternalDateHistogram>) dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
}, dateType, histType, keywordType);
}

private void testCase(
Query query,
String interval,
Expand Down