Skip to content

Commit

Permalink
[TSDB] Add support for downsampling aggregate_metric_double fields (#…
Browse files Browse the repository at this point in the history
…90029)

his PR adds support for downsampling metric fields of type aggregate_metric_double, enabling the rollup-of-rollups functionality.
  • Loading branch information
csoulios committed Sep 20, 2022
1 parent 56dd7c7 commit b9f20e9
Show file tree
Hide file tree
Showing 16 changed files with 653 additions and 239 deletions.
4 changes: 1 addition & 3 deletions x-pack/plugin/mapper-aggregate-metric/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ apply plugin: 'elasticsearch.internal-es-plugin'

esplugin {
name 'x-pack-aggregate-metric'
description 'Module for the aggregate_metric field type, which allows pre-aggregated fields to be stored a single field.'
description 'Module for the aggregate_metric_double field type, which allows pre-aggregated fields to be stored as a single field'
classname 'org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin'
extendedPlugins = ['x-pack-core']
}
archivesBaseName = 'x-pack-aggregate-metric'

dependencies {
compileOnly project(":server")

compileOnly project(path: xpackModule('core'))
testImplementation(testArtifact(project(xpackModule('core'))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
Expand All @@ -27,7 +28,7 @@

import static java.util.Collections.singletonMap;

public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin, SearchPlugin {
public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin, SearchPlugin, ExtensiblePlugin {

@Override
public Map<String, Mapper.TypeParser> getMappers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ private void setMetricFields(EnumMap<Metric, NumberFieldMapper.NumberFieldType>
this.metricFields = metricFields;
}

public Map<Metric, NumberFieldMapper.NumberFieldType> getMetricFields() {
return Collections.unmodifiableMap(metricFields);
}

public void addMetricField(Metric m, NumberFieldMapper.NumberFieldType subfield) {
if (metricFields == null) {
metricFields = new EnumMap<>(AggregateDoubleMetricFieldMapper.Metric.class);
Expand Down
10 changes: 3 additions & 7 deletions x-pack/plugin/rollup/build.gradle
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import org.elasticsearch.gradle.internal.info.BuildParams

apply plugin: 'elasticsearch.internal-es-plugin'
esplugin {
name 'x-pack-rollup'
description 'Elasticsearch Expanded Pack Plugin - Rollup'
classname 'org.elasticsearch.xpack.rollup.Rollup'
extendedPlugins = ['x-pack-core']
extendedPlugins = ['x-pack-aggregate-metric']
}

archivesBaseName = 'x-pack-rollup'

dependencies {
compileOnly project(":server")
compileOnly project(path: xpackModule('core'))
compileOnly project(path: xpackModule('analytics'))
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
compileOnly project(path: xpackModule('ilm'))
compileOnly project(':modules:data-streams')
compileOnly project(path: xpackModule('ilm'))
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
testImplementation(testArtifact(project(xpackModule('core'))))
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/rollup/qa/rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {

restResources {
restApi {
include '_common', 'bulk', 'cluster', 'indices', 'search', 'downsample'
include '_common', 'bulk', 'cluster', 'indices', 'search'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ setup:
index.blocks.write: true

---
"Rollup index":
"Downsample index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand Down Expand Up @@ -141,6 +141,24 @@ setup:
- match: { rollup-test.settings.index.number_of_shards: "1" }
- match: { rollup-test.settings.index.number_of_replicas: "0" }

# Assert rollup index mapping
- do:
indices.get_mapping:
index: rollup-test

- match: { rollup-test.mappings.properties.@timestamp.type: date }
- match: { rollup-test.mappings.properties.@timestamp.meta.fixed_interval: 1h }
- match: { rollup-test.mappings.properties.@timestamp.meta.time_zone: UTC }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.type: aggregate_metric_double }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.metrics: [ "min", "max", "sum", "value_count" ] }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.default_metric: max }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.time_series_metric: gauge }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-counter.type: long }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-counter.time_series_metric: counter }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.uid.type: keyword }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.uid.time_series_dimension: true }


# Assert source index has not been deleted
- do:
indices.get:
Expand All @@ -156,7 +174,7 @@ setup:
- match: { indices.rollup-test.shards.0.0.num_search_segments: 1}

---
"Rollup non-existing index":
"Downsample non-existing index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand All @@ -172,7 +190,7 @@ setup:
}
---
"Rollup to existing rollup index":
"Downsample to existing index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand All @@ -192,7 +210,7 @@ setup:
}
---
"Rollup not time_series index":
"Downsample not time_series index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand All @@ -213,7 +231,7 @@ setup:
---
"Rollup no metric index":
"Downsample no metric index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand Down Expand Up @@ -254,3 +272,142 @@ setup:
{
"fixed_interval": "1h"
}
---
"Downsample a downsampled index":
- skip:
version: " - 8.4.99"
reason: "Rollup of rollups introduced in 8.5.0"

- do:
indices.downsample:
index: test
target_index: rollup-test
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "2h"
}
- is_true: acknowledged


# Assert rollup index mapping
- do:
indices.get_mapping:
index: rollup-test-2

- match: { rollup-test-2.mappings.properties.@timestamp.type: date }
- match: { rollup-test-2.mappings.properties.@timestamp.meta.fixed_interval: 2h }
- match: { rollup-test-2.mappings.properties.@timestamp.meta.time_zone: UTC }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.type: aggregate_metric_double }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.metrics: [ "min", "max", "sum", "value_count" ] }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.default_metric: max }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.time_series_metric: gauge }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-counter.type: long }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-counter.time_series_metric: counter }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.uid.type: keyword }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.uid.time_series_dimension: true }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.type: aggregate_metric_double }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.metrics: [ "min", "max", "sum", "value_count" ] }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.default_metric: max }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.time_series_metric: gauge }

- do:
search:
index: rollup-test-2
body:
sort: [ "_tsid", "@timestamp" ]

- length: { hits.hits: 3 }
- match: { hits.hits.0._source._doc_count: 2 }
- match: { hits.hits.0._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 }
- match: { hits.hits.0._source.metricset: pod }
- match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.0._source.k8s\.pod\.multi-counter: 21 }
- match: { hits.hits.0._source.k8s\.pod\.multi-gauge.min: 90 }
- match: { hits.hits.0._source.k8s\.pod\.multi-gauge.max: 200 }
- match: { hits.hits.0._source.k8s\.pod\.multi-gauge.sum: 726 }
- match: { hits.hits.0._source.k8s\.pod\.multi-gauge.value_count: 6 }
- match: { hits.hits.0._source.k8s\.pod\.network\.tx.min: 2001818691 }
- match: { hits.hits.0._source.k8s\.pod\.network\.tx.max: 2005177954 }
- match: { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 }
- match: { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.26" }
- match: { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:35:00.000Z" }
- match: { hits.hits.0._source.k8s\.pod\.number_of_containers: 2 }
- match: { hits.hits.0._source.k8s\.pod\.tags: [ "backend", "prod", "us-west1" ] }
- match: { hits.hits.0._source.k8s\.pod\.values: [ 1, 1, 3 ] }

- match: { hits.hits.1._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 }
- match: { hits.hits.1._source.metricset: pod }
- match: { hits.hits.1._source.@timestamp: 2021-04-28T20:00:00.000Z }
- match: { hits.hits.1._source._doc_count: 2 }

- match: { hits.hits.2._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 }
- match: { hits.hits.2._source.metricset: pod }
- match: { hits.hits.2._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.2._source._doc_count: 4 }

- do:
indices.downsample:
index: rollup-test
target_index: rollup-test-3
body: >
{
"fixed_interval": "180m"
}
- is_true: acknowledged

---
"Downsample a downsampled index with wrong intervals":
- skip:
version: " - 8.4.99"
reason: "Rollup of rollups introduced in 8.5.0"

- do:
indices.downsample:
index: test
target_index: rollup-test
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
catch: /Downsampling interval \[1h\] must be greater than the the source index interval \[1h\]/
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "1h"
}
- do:
catch: /Downsampling interval \[30m\] must be greater than the the source index interval \[1h\]/
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "30m"
}
- do:
catch: /Downsampling interval \[90m\] must be a multiple of the source index interval \[1h\]/
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "90m"
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Base class for classes that read metric and label fields.
*/
Expand All @@ -22,9 +26,11 @@ abstract class AbstractRollupFieldProducer<T> {

/**
* Collect a value for the field applying the specific subclass collection strategy.
*
* @param field the name of the field to collect
* @param value the value to collect.
*/
public abstract void collect(T value);
public abstract void collect(String field, T value);

/**
* @return the name of the field.
Expand All @@ -34,14 +40,14 @@ public String name() {
}

/**
* @return the value of the field.
* Resets the producer to an empty value.
*/
public abstract Object value();
public abstract void reset();

/**
* Resets the collected value to the specific subclass reset value.
* Serialize the downsampled value of the field.
*/
public abstract void reset();
public abstract void write(XContentBuilder builder) throws IOException;

/**
* @return true if the field has not collected any value.
Expand Down

0 comments on commit b9f20e9

Please sign in to comment.