Skip to content

Commit

Permalink
[ML] adds new change_point pipeline aggregation (#83428)
Browse files Browse the repository at this point in the history
adds a new `change_point` sibling pipeline aggregation.

This aggregation detects a change_point in a multi-bucket aggregation. 

Example:
```
POST kibana_sample_data_flights/_search
{
  "size": 0,
  "aggs": {
    "histo": {
      "date_histogram": {
        "field": "timestamp",
        "fixed_interval": "3h"
      },
      "aggs": {
        "ticket_price": {
          "max": {
            "field": "AvgTicketPrice"
          }
        }
      }
    },
    "changes": {
      "change_point": {
        "buckets_path": "histo>ticket_price"
      }
    }
  }
}
```

Response
```
{
  /*<snip>*/ 
  "aggregations" : {
    "histo" : {
      "buckets" : [ /*<snip>*/ ]
    },
    "changes" : {
      "bucket" : {
        "key" : "2022-01-28T23:00:00.000Z",
        "doc_count" : 48,
        "ticket_price" : {
          "value" : 1187.61083984375
        }
      },
      "type" : {
        "distribution_change" : {
          "p_value" : 0.023753965139433175,
          "change_point" : 40
        }
      }
    }
  }
}
```
  • Loading branch information
benwtrent committed Mar 4, 2022
1 parent b104bcd commit cf151b5
Show file tree
Hide file tree
Showing 19 changed files with 2,077 additions and 8 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83428.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83428
summary: Adds new `change_point` pipeline aggregation
area: Machine Learning
type: feature
issues: []
2 changes: 2 additions & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ include::pipeline/bucket-selector-aggregation.asciidoc[]

include::pipeline/bucket-sort-aggregation.asciidoc[]

include::pipeline/change-point-aggregation.asciidoc[]

include::pipeline/cumulative-cardinality-aggregation.asciidoc[]

include::pipeline/cumulative-sum-aggregation.asciidoc[]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
[role="xpack"]
[[search-aggregations-change-point-aggregation]]
=== Change point aggregation
++++
<titleabbrev>Change point</titleabbrev>
++++

experimental::[]

A sibling pipeline that detects, spikes, dips, and change points in a metric. Given a distribution of values
provided by the sibling multi-bucket aggregation, this aggregation indicates the bucket of any spike or dip
and/or the bucket at which the largest change in the distribution of values, if they are statistically significant.



[[change-point-agg-syntax]]
==== Parameters

`buckets_path`::
(Required, string)
Path to the buckets that contain one set of values in which to detect a change point. There must be at least 21 bucketed
values. Fewer than 1,000 is preferred.
For syntax, see <<buckets-path-syntax>>.

==== Syntax

A `change_point` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"change_point": {
"buckets_path": "date_histogram>_count" <1>
}
}
--------------------------------------------------
// NOTCONSOLE
<1> The buckets containing the values to test against.

[[change-point-agg-response]]
==== Response body

`bucket`::
(Optional, object)
Values of the bucket that indicates the discovered change point. Not returned if no change point was found.
All the aggregations in the bucket are returned as well.
+
.Properties of bucket
[%collapsible%open]
====
`key`:::
(value)
The key of the bucket matched. Could be string or numeric.
`doc_count`:::
(number)
The document count of the bucket.
====

`type`::
(object)
The found change point type and its related values. Possible types:
+
--
* `dip`: a significant dip occurs at this change point
* `distribution_change`: the overall distribution of the values has changed significantly
* `non_stationary`: there is no change point, but the values are not from a stationary distribution
* `spike`: a significant spike occurs at this point
* `stationary`: no change point found
* `step_change`: the change indicates a statistically significant step up or down in value distribution
* `trend_change`: there is an overall trend change occurring at this point
--

==== Response example
[source,js]
--------------------------------------------------
"changes" : {
"bucket" : {
"key" : "2022-01-28T23:00:00.000Z", <1>
"doc_count" : 48, <2>
"ticket_price" : { <3>
"value" : 1187.61083984375
}
},
"type" : { <4>
"distribution_change" : {
"p_value" : 0.023753965139433175, <5>
"change_point" : 40 <6>
}
}
}
--------------------------------------------------
// NOTCONSOLE
<1> The bucket key that is the change point.
<2> The number of documents in that bucket.
<3> Aggregated values in the bucket.
<4> Type of change found.
<5> The `p_value` indicates how extreme the change is; lower values indicate greater change.
<6> The specific bucket where the change occurs (indexing starts at `0`).
4 changes: 4 additions & 0 deletions x-pack/plugin/ml/qa/ml-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ tasks.named("yamlRestTest").configure {
'ml/categorization_agg/Test categorization agg simple',
'ml/categorization_agg/Test categorization aggregation against unsupported field',
'ml/categorization_agg/Test categorization aggregation with poor settings',
'ml/change_point_agg/Test change_point agg simple',
'ml/change_point_agg/Test change_point with missing buckets_path',
'ml/change_point_agg/Test change_point with bad buckets_path',
'ml/change_point_agg/Test change_point with too few buckets',
'ml/custom_all_field/Test querying custom all field',
'ml/datafeeds_crud/Test delete datafeed with missing id',
'ml/datafeeds_crud/Test put datafeed referring to missing job_id',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizeTextAggregationBuilder;
import org.elasticsearch.xpack.ml.aggs.categorization.InternalCategorizationAggregation;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointAggregationBuilder;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointNamedContentProvider;
import org.elasticsearch.xpack.ml.aggs.correlation.BucketCorrelationAggregationBuilder;
import org.elasticsearch.xpack.ml.aggs.correlation.CorrelationNamedContentProvider;
import org.elasticsearch.xpack.ml.aggs.heuristic.PValueScore;
Expand Down Expand Up @@ -1389,7 +1391,8 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {
return Arrays.asList(
InferencePipelineAggregationBuilder.buildSpec(modelLoadingService, getLicenseState(), settings),
BucketCorrelationAggregationBuilder.buildSpec(),
BucketCountKSTestAggregationBuilder.buildSpec()
BucketCountKSTestAggregationBuilder.buildSpec(),
ChangePointAggregationBuilder.buildSpec()
);
}

Expand Down Expand Up @@ -1514,6 +1517,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
namedWriteables.addAll(new MlInferenceNamedXContentProvider().getNamedWriteables());
namedWriteables.addAll(MlAutoscalingNamedWritableProvider.getNamedWriteables());
namedWriteables.addAll(new CorrelationNamedContentProvider().getNamedWriteables());
namedWriteables.addAll(new ChangePointNamedContentProvider().getNamedWriteables());
return namedWriteables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ public static InvalidAggregationPathException invalidPathException(List<String>
* @return The double values and doc_counts extracted from the path if the bucket path exists and the value is a valid number
*/
public static Optional<DoubleBucketValues> extractDoubleBucketedValues(String bucketPath, Aggregations aggregations) {
return extractDoubleBucketedValues(bucketPath, aggregations, BucketHelpers.GapPolicy.INSERT_ZEROS, false);
}

/**
* This extracts the bucket values as doubles.
*
* If the gap policy is skippable, the true bucket index (as stored in the aggregation) is returned as well.
* @param bucketPath The bucket path from which to extract values
* @param aggregations The aggregations
* @param gapPolicy the desired gap policy
* @param excludeLastBucket should the last bucket be excluded? This is useful when excluding potentially partial buckets
* @return The double values, doc_counts, and bucket index positions extracted from the path if the bucket path exists
*/
public static Optional<DoubleBucketValues> extractDoubleBucketedValues(
String bucketPath,
Aggregations aggregations,
BucketHelpers.GapPolicy gapPolicy,
boolean excludeLastBucket
) {
List<String> parsedPath = AggregationPath.parse(bucketPath).getPathElementsAsStringList();
for (Aggregation aggregation : aggregations) {
if (aggregation.getName().equals(parsedPath.get(0))) {
Expand All @@ -44,14 +63,19 @@ public static Optional<DoubleBucketValues> extractDoubleBucketedValues(String bu
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = multiBucketsAgg.getBuckets();
List<Double> values = new ArrayList<>(buckets.size());
List<Long> docCounts = new ArrayList<>(buckets.size());
List<Integer> bucketIndexes = new ArrayList<>(buckets.size());
int bucketCount = 0;
int totalBuckets = buckets.size();
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
Double bucketValue = BucketHelpers.resolveBucketValue(
multiBucketsAgg,
bucket,
sublistedPath,
BucketHelpers.GapPolicy.INSERT_ZEROS
);
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, sublistedPath, gapPolicy);
if (excludeLastBucket && bucketCount >= totalBuckets - 1) {
continue;
}
if (bucketValue == null || Double.isNaN(bucketValue)) {
if (gapPolicy.isSkippable) {
bucketCount++;
continue;
}
throw new AggregationExecutionException(
"missing or invalid bucket value found for path ["
+ bucketPath
Expand All @@ -60,30 +84,56 @@ public static Optional<DoubleBucketValues> extractDoubleBucketedValues(String bu
+ "]"
);
}
bucketIndexes.add(bucketCount++);
values.add(bucketValue);
docCounts.add(bucket.getDocCount());
}
return Optional.of(
new DoubleBucketValues(
docCounts.stream().mapToLong(Long::longValue).toArray(),
values.stream().mapToDouble(Double::doubleValue).toArray()
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketCount == bucketIndexes.size() ? new int[0] : bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
)
);
}
}
return Optional.empty();
}

public static Optional<InternalMultiBucketAggregation.InternalBucket> extractBucket(
String bucketPath,
Aggregations aggregations,
int bucket
) {
List<String> parsedPath = AggregationPath.parse(bucketPath).getPathElementsAsStringList();
for (Aggregation aggregation : aggregations) {
if (aggregation.getName().equals(parsedPath.get(0))) {
InternalMultiBucketAggregation<?, ?> multiBucketsAgg = (InternalMultiBucketAggregation<?, ?>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = multiBucketsAgg.getBuckets();
if (bucket < buckets.size() && bucket >= 0) {
return Optional.of(buckets.get(bucket));
}
}
}
return Optional.empty();
}

/**
* Utility class for holding an unboxed double value and the document count for a bucket
*/
public static class DoubleBucketValues {
private final long[] docCounts;
private final double[] values;
private final int[] buckets;

public DoubleBucketValues(long[] docCounts, double[] values) {
this(docCounts, values, new int[0]);
}

public DoubleBucketValues(long[] docCounts, double[] values, int[] buckets) {
this.docCounts = docCounts;
this.values = values;
this.buckets = buckets;
}

public long[] getDocCounts() {
Expand All @@ -93,6 +143,13 @@ public long[] getDocCounts() {
public double[] getValues() {
return values;
}

public int getBucketIndex(int bucketPos) {
if (buckets.length == 0) {
return bucketPos;
}
return buckets[bucketPos];
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.aggs.changepoint;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;

public class ChangePointAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<ChangePointAggregationBuilder> {

public static final ParseField NAME = new ParseField("change_point");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<ChangePointAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
NAME.getPreferredName(),
false,
(args, context) -> new ChangePointAggregationBuilder(context, (String) args[0])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), BUCKETS_PATH_FIELD);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> BucketHelpers.GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation()),
GAP_POLICY,
ObjectParser.ValueType.STRING
);
}

public ChangePointAggregationBuilder(String name, String bucketsPath) {
super(name, NAME.getPreferredName(), new String[] { bucketsPath });
}

public ChangePointAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME.getPreferredName());
}

public static SearchPlugin.PipelineAggregationSpec buildSpec() {
return new SearchPlugin.PipelineAggregationSpec(NAME, ChangePointAggregationBuilder::new, ChangePointAggregationBuilder.PARSER)
.addResultReader(InternalChangePointAggregation::new);
}

@Override
public String getWriteableName() {
return NAME.getPreferredName();
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_8_2_0;
}

@Override
protected void innerWriteTo(StreamOutput out) throws IOException {}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
return new ChangePointAggregator(name, bucketsPaths[0], metadata);
}

@Override
protected boolean overrideBucketsPath() {
return true;
}

@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(BUCKETS_PATH_FIELD.getPreferredName(), bucketsPaths[0]);
return builder;
}

}

0 comments on commit cf151b5

Please sign in to comment.