Skip to content

Commit

Permalink
Adds support for the rate aggregation under a composite agg (#76992)
Browse files Browse the repository at this point in the history
rate aggregation should support being a sub-aggregation
of a composite agg.

The catch is that the composite aggregation source
must be a date histogram. Other sources can be present
but their must be exactly one date histogram source
otherwise the rate aggregation does not know which
interval to compare its unit rate to.

closes #76988
  • Loading branch information
benwtrent committed Sep 1, 2021
1 parent 6a14123 commit 100f222
Show file tree
Hide file tree
Showing 7 changed files with 415 additions and 49 deletions.
142 changes: 140 additions & 2 deletions docs/reference/aggregations/metrics/rate-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
<titleabbrev>Rate</titleabbrev>
++++

A `rate` metrics aggregation can be used only inside a `date_histogram` and calculates a rate of documents or a field in each
`date_histogram` bucket. The field values can be generated extracted from specific numeric or
A `rate` metrics aggregation can be used only inside a `date_histogram` or `composite` aggregation. It calculates a rate of documents
or a field in each bucket. The field values can be generated extracted from specific numeric or
<<histogram,histogram fields>> in the documents.

NOTE: For `composite` aggregations, there must be exactly one `date_histogram` source for the `rate` aggregation to be supported.

==== Syntax

A `rate` aggregation looks like this in isolation:
Expand Down Expand Up @@ -167,6 +169,142 @@ The response will contain the average daily sale prices for each month.
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

You can also take advantage of `composite` aggregations to calculate the average daily sale price for each item in
your inventory

[source,console]
--------------------------------------------------
GET sales/_search?filter_path=aggregations&size=0
{
"aggs": {
"buckets": {
"composite": { <1>
"sources": [
{
"month": {
"date_histogram": { <2>
"field": "date",
"calendar_interval": "month"
}
}
},
{
"type": { <3>
"terms": {
"field": "type"
}
}
}
]
},
"aggs": {
"avg_price": {
"rate": {
"field": "price", <4>
"unit": "day" <5>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> Composite aggregation with a date histogram source
and a source for the item type.
<2> The date histogram source grouping monthly
<3> The terms source grouping for each sale item type
<4> Calculate sum of all sale prices, per month and item
<5> Convert to average daily sales per item

The response will contain the average daily sale prices for each month per item.

[source,console-result]
--------------------------------------------------
{
"aggregations" : {
"buckets" : {
"after_key" : {
"month" : 1425168000000,
"type" : "t-shirt"
},
"buckets" : [
{
"key" : {
"month" : 1420070400000,
"type" : "bag"
},
"doc_count" : 1,
"avg_price" : {
"value" : 4.838709677419355
}
},
{
"key" : {
"month" : 1420070400000,
"type" : "hat"
},
"doc_count" : 1,
"avg_price" : {
"value" : 6.451612903225806
}
},
{
"key" : {
"month" : 1420070400000,
"type" : "t-shirt"
},
"doc_count" : 1,
"avg_price" : {
"value" : 6.451612903225806
}
},
{
"key" : {
"month" : 1422748800000,
"type" : "hat"
},
"doc_count" : 1,
"avg_price" : {
"value" : 1.7857142857142858
}
},
{
"key" : {
"month" : 1422748800000,
"type" : "t-shirt"
},
"doc_count" : 1,
"avg_price" : {
"value" : 0.35714285714285715
}
},
{
"key" : {
"month" : 1425168000000,
"type" : "hat"
},
"doc_count" : 1,
"avg_price" : {
"value" : 6.451612903225806
}
},
{
"key" : {
"month" : 1425168000000,
"type" : "t-shirt"
},
"doc_count" : 1,
"avg_price" : {
"value" : 5.645161290322581
}
}
]
}
}
}
--------------------------------------------------

By adding the `mode` parameter with the value `value_count`, we can change the calculation from `sum` to the number of values of the field:

[source,console]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketCollector;
Expand All @@ -46,6 +48,7 @@
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.histogram.SizedBucketAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
Expand All @@ -61,7 +64,7 @@

import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;

final class CompositeAggregator extends BucketsAggregator {
public final class CompositeAggregator extends BucketsAggregator implements SizedBucketAggregator {
private final int size;
private final List<String> sourceNames;
private final int[] reverseMuls;
Expand All @@ -71,6 +74,7 @@ final class CompositeAggregator extends BucketsAggregator {
private final CompositeValuesSourceConfig[] sourceConfigs;
private final SingleDimensionValuesSource<?>[] sources;
private final CompositeValuesCollectorQueue queue;
private final DateHistogramValuesSource[] innerSizedBucketAggregators;

private final List<Entry> entries = new ArrayList<>();
private LeafReaderContext currentLeaf;
Expand Down Expand Up @@ -111,14 +115,19 @@ final class CompositeAggregator extends BucketsAggregator {
);
}
this.sourceConfigs = sourceConfigs;
List<DateHistogramValuesSource> dateHistogramValuesSources = new ArrayList<>();
for (int i = 0; i < sourceConfigs.length; i++) {
this.sources[i] = sourceConfigs[i].createValuesSource(
context.bigArrays(),
context.searcher().getIndexReader(),
size,
this::addRequestCircuitBreakerBytes
);
if (this.sources[i] instanceof DateHistogramValuesSource) {
dateHistogramValuesSources.add((DateHistogramValuesSource) this.sources[i]);
}
}
this.innerSizedBucketAggregators = dateHistogramValuesSources.toArray(new DateHistogramValuesSource[0]);
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size);
if (rawAfterKey != null) {
try {
Expand Down Expand Up @@ -547,6 +556,30 @@ public void collect(int doc, long zeroBucket) throws IOException {
};
}

@Override
public double bucketSize(long bucket, Rounding.DateTimeUnit unit) {
if (innerSizedBucketAggregators.length != 1) {
throw new AggregationExecutionException(
"aggregation ["
+ name()
+ "] does not have exactly one date_histogram value source; exactly one is required when using with rate aggregation"
);
}
return innerSizedBucketAggregators[0].bucketSize(bucket, unit);
}

@Override
public double bucketSize(Rounding.DateTimeUnit unit) {
if (innerSizedBucketAggregators.length != 1) {
throw new AggregationExecutionException(
"aggregation ["
+ name()
+ "] does not have exactly one date_histogram value source; exactly one is required when using with rate aggregation"
);
}
return innerSizedBucketAggregators[0].bucketSize(unit);
}

private static class Entry {
final LeafReaderContext context;
final DocIdSet docIdSet;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.SizedBucketAggregator;

/**
* A {@link SingleDimensionValuesSource} for date histogram values.
*/
public class DateHistogramValuesSource extends LongValuesSource implements SizedBucketAggregator {
private final RoundingValuesSource preparedRounding;

DateHistogramValuesSource(
BigArrays bigArrays,
MappedFieldType fieldType,
RoundingValuesSource roundingValuesSource,
DocValueFormat format,
boolean missingBucket,
int size,
int reverseMul
) {
super(bigArrays, fieldType, roundingValuesSource::longValues, roundingValuesSource::round, format, missingBucket, size, reverseMul);
this.preparedRounding = roundingValuesSource;
}

@Override
public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
if (unitSize != null) {
Long value = toComparable((int) bucket);
assert value != null : "unexpected null value in composite agg bucket [" + (int) bucket + "]";
return preparedRounding.roundingSize(value, unitSize);
} else {
return 1.0;
}
}

@Override
public double bucketSize(Rounding.DateTimeUnit unitSize) {
if (unitSize != null) {
return preparedRounding.roundingSize(unitSize);
} else {
return 1.0;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,10 @@ public static void register(ValuesSourceRegistry.Builder builder) {
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig) -> {
final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource();
return new LongValuesSource(
return new DateHistogramValuesSource(
bigArrays,
compositeValuesSourceConfig.fieldType(),
roundingValuesSource::longValues,
roundingValuesSource::round,
roundingValuesSource,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ public long round(long value) {
return rounding.round(value);
}

public double roundingSize(long milliSeconds, Rounding.DateTimeUnit unit) {
return rounding.roundingSize(milliSeconds, unit);
}

public double roundingSize(Rounding.DateTimeUnit unit) {
return rounding.roundingSize(unit);
}

@Override
public SortedNumericDocValues longValues(LeafReaderContext context) throws IOException {
SortedNumericDocValues values = vs.longValues(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ private SizedBucketAggregator findSizedBucketAncestor() {
}
}
if (sizedBucketAggregator == null) {
throw new IllegalArgumentException("The rate aggregation can only be used inside a date histogram");
throw new IllegalArgumentException(
"The rate aggregation can only be used inside a date histogram aggregation or "
+ "composite aggregation with one date histogram value source"
);
}
return sizedBucketAggregator;
}
Expand Down Expand Up @@ -109,4 +112,5 @@ public InternalAggregation buildEmptyAggregation() {
public void doClose() {
Releasables.close(sums, compensations);
}

}

0 comments on commit 100f222

Please sign in to comment.