Skip to content

Commit

Permalink
[ML] add new bucket_correlation aggregation with initial count_correl…
Browse files Browse the repository at this point in the history
…ation function (#72133)

This commit adds a new pipeline aggregation that allows correlation within the aggregation frame work in bucketed values. 

The initial function is a `count_correlation` function. The purpose of which is to correlate the count in a consistent number of buckets with a pre calculated indicator. The indicator and the aggregated buckets should related to the same metrics with in documents. 

Example for correlating terms within a `service.version.keyword` with latency percentiles. The percentiles and provided correlation indicator both refer to the same source data where the indicator was previously calculated.:
```
GET apm-7.12.0-transaction-generated/_search
{
  "size": 0,
  "aggs": {
    "field_terms": {
      "terms": {
        "field": "service.version.keyword",
        "size": 20
      },
      "aggs": {
        "latency_range": {
          "range": {
            "field": "transaction.duration.us",
            "ranges": [<snip>],
            "keyed": true
          }
        },
        "correlation": {
          "bucket_correlation": {
            "buckets_path": "latency_range>_count",
            "count_correlation": {
              "indicator": {
                 "expectations": [<snip>],
                 "doc_count": 20000
               }
            }
          }
        }
      }
    }
  }
}
```
  • Loading branch information
benwtrent committed May 10, 2021
1 parent 05a0c83 commit 8069e9b
Show file tree
Hide file tree
Showing 29 changed files with 1,733 additions and 111 deletions.
41 changes: 41 additions & 0 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,47 @@ setups['setup-repository'] = '''
body: |
#atomic_red_data#
'''
// fake data used by the correlation bucket agg
buildRestTests.setups['correlate_latency'] = '''
- do:
indices.create:
index: correlate_latency
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
latency:
type: double
version:
type: keyword
- do:
bulk:
index: correlate_latency
refresh: true
body: |'''


for (int i = 100; i < 200; i++) {
def value = i
if (i % 10) {
value = i * 10
}
buildRestTests.setups['correlate_latency'] += """
{"index":{}}
{"latency": "$value", "version": "1.0"}"""
}
for (int i = 0; i < 100; i++) {
def value = i
if (i % 10) {
value = i * 10
}
buildRestTests.setups['correlate_latency'] += """
{"index":{}}
{"latency": "$value", "version": "2.0"}"""
}

/* Load the actual events only if we're going to use them. */
File atomicRedRegsvr32File = new File("$projectDir/src/test/resources/normalized-T1117-AtomicRed-regsvr32.json")
inputs.file(atomicRedRegsvr32File)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-bucket-correlation-aggregation]]
=== Bucket correlation aggregation
++++
<titleabbrev>Bucket correlation aggregation</titleabbrev>
++++

experimental::[]

A sibling pipeline aggregation which executes a correlation function on the
configured sibling multi-bucket aggregation.


[[bucket-correlation-agg-syntax]]
==== Parameters

`buckets_path`::
(Required, string)
Path to the buckets that contain one set of values to correlate.
For syntax, see <<buckets-path-syntax>>.

`function`::
(Required, object)
The correlation function to execute.
+
.Properties of `function`
[%collapsible%open]
====
`count_correlation`:::
(Required^*^, object)
The configuration to calculate a count correlation. This function is designed for
determining the correlation of a term value and a given metric. Consequently, it
needs to meet the following requirements.
* The `buckets_path` must point to a `_count` metric.
* The total count of all the `bucket_path` count values must be less than or equal to `indicator.doc_count`.
* When utilizing this function, an initial calculation to gather the required `indicator` values is required.
.Properties of `count_correlation`
[%collapsible%open]
=====
`indicator`:::
(Required, object)
The indicator with which to correlate the configured `bucket_path` values.

.Properties of `indicator`
[%collapsible%open]
=====
`expectations`:::
(Required, array)
An array of numbers with which to correlate the configured `bucket_path` values. The length of this value must always equal
the number of buckets returned by the `bucket_path`.
`fractions`:::
(Optional, array)
An array of fractions to use when averaging and calculating variance. This should be used if the pre-calculated data and the
`buckets_path` have known gaps. The length of `fractions`, if provided, must equal `expectations`.
`doc_count`:::
(Required, integer)
The total number of documents that initially created the `expectations`. It's required to be greater than or equal to the sum
of all values in the `buckets_path` as this is the originating superset of data to which the term values are correlated.
=====
=====
====

==== Syntax

A `bucket_correlation` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"bucket_correlation": {
"buckets_path": "range_values>_count", <1>
"function": {
"count_correlation": { <2>
"expectations": [...],
"doc_count": 10000
}
}
}
}
--------------------------------------------------
// NOTCONSOLE
<1> The buckets containing the values to correlate against.
<2> The correlation function definition.


[[bucket-correlation-agg-example]]
==== Example

The following snippet correlates the individual terms in the field `version` with the `latency` metric. Not shown
is the pre-calculation of the `latency` indicator values, which was done utilizing the
<<search-aggregations-metrics-percentile-aggregation,percentiles>> aggregation.

This example is only using the 10s percentiles.

[source,console]
-------------------------------------------------
POST correlate_latency/_search?size=0&filter_path=aggregations
{
"aggs": {
"buckets": {
"terms": {
"field": "version",
"size": 2
},
"aggs": {
"latency_ranges": {
"range": {
"field": "latency",
"ranges": [
{ "to": 0.0 },
{ "from": 0, "to": 105 },
{ "from": 105, "to": 225 },
{ "from": 225, "to": 445 },
{ "from": 445, "to": 665 },
{ "from": 665, "to": 885 },
{ "from": 885, "to": 1115 },
{ "from": 1115, "to": 1335 },
{ "from": 1335, "to": 1555 },
{ "from": 1555, "to": 1775 },
{ "from": 1775 }
]
}
},
"bucket_correlation": {
"bucket_correlation": {
"buckets_path": "latency_ranges>_count",
"function": {
"count_correlation": {
"indicator": {
"expectations": [0, 52.5, 165, 335, 555, 775, 1000, 1225, 1445, 1665, 1775],
"doc_count": 200
}
}
}
}
}
}
}
}
}
-------------------------------------------------
// TEST[setup:correlate_latency]

<1> The term buckets containing a range aggregation and the bucket correlation aggregation. Both are utilized to calculate
the correlation of the term values with the latency.
<2> The range aggregation on the latency field. The ranges were created referencing the percentiles of the latency field.
<3> The bucket correlation aggregation that calculates the correlation of the number of term values within each range
and the previously calculated indicator values.

And the following may be the response:

[source,console-result]
----
{
"aggregations" : {
"buckets" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "1.0",
"doc_count" : 100,
"latency_ranges" : {
"buckets" : [
{
"key" : "*-0.0",
"to" : 0.0,
"doc_count" : 0
},
{
"key" : "0.0-105.0",
"from" : 0.0,
"to" : 105.0,
"doc_count" : 1
},
{
"key" : "105.0-225.0",
"from" : 105.0,
"to" : 225.0,
"doc_count" : 9
},
{
"key" : "225.0-445.0",
"from" : 225.0,
"to" : 445.0,
"doc_count" : 0
},
{
"key" : "445.0-665.0",
"from" : 445.0,
"to" : 665.0,
"doc_count" : 0
},
{
"key" : "665.0-885.0",
"from" : 665.0,
"to" : 885.0,
"doc_count" : 0
},
{
"key" : "885.0-1115.0",
"from" : 885.0,
"to" : 1115.0,
"doc_count" : 10
},
{
"key" : "1115.0-1335.0",
"from" : 1115.0,
"to" : 1335.0,
"doc_count" : 20
},
{
"key" : "1335.0-1555.0",
"from" : 1335.0,
"to" : 1555.0,
"doc_count" : 20
},
{
"key" : "1555.0-1775.0",
"from" : 1555.0,
"to" : 1775.0,
"doc_count" : 20
},
{
"key" : "1775.0-*",
"from" : 1775.0,
"doc_count" : 20
}
]
},
"bucket_correlation" : {
"value" : 0.8402398981360937
}
},
{
"key" : "2.0",
"doc_count" : 100,
"latency_ranges" : {
"buckets" : [
{
"key" : "*-0.0",
"to" : 0.0,
"doc_count" : 0
},
{
"key" : "0.0-105.0",
"from" : 0.0,
"to" : 105.0,
"doc_count" : 19
},
{
"key" : "105.0-225.0",
"from" : 105.0,
"to" : 225.0,
"doc_count" : 11
},
{
"key" : "225.0-445.0",
"from" : 225.0,
"to" : 445.0,
"doc_count" : 20
},
{
"key" : "445.0-665.0",
"from" : 445.0,
"to" : 665.0,
"doc_count" : 20
},
{
"key" : "665.0-885.0",
"from" : 665.0,
"to" : 885.0,
"doc_count" : 20
},
{
"key" : "885.0-1115.0",
"from" : 885.0,
"to" : 1115.0,
"doc_count" : 10
},
{
"key" : "1115.0-1335.0",
"from" : 1115.0,
"to" : 1335.0,
"doc_count" : 0
},
{
"key" : "1335.0-1555.0",
"from" : 1335.0,
"to" : 1555.0,
"doc_count" : 0
},
{
"key" : "1555.0-1775.0",
"from" : 1555.0,
"to" : 1775.0,
"doc_count" : 0
},
{
"key" : "1775.0-*",
"from" : 1775.0,
"doc_count" : 0
}
]
},
"bucket_correlation" : {
"value" : -0.5759855613334943
}
}
]
}
}
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.tree.Tree;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.tree.TreeNode;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.inference.aggs.InferencePipelineAggregationBuilder;
import org.elasticsearch.xpack.ml.aggs.inference.InferencePipelineAggregationBuilder;
import org.elasticsearch.xpack.ml.inference.loadingservice.ModelLoadingService;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.junit.Before;
Expand Down

0 comments on commit 8069e9b

Please sign in to comment.