Skip to content

Commit

Permalink
Implement logic for storing fields that are neither dimensions nor me…
Browse files Browse the repository at this point in the history
…trics (aka tags) (#87929)

For label fields (fields that are not dimensions nor metrics)
we just propagate the latest value into the rollup index, the
same that we do for a counter metric.
  • Loading branch information
salvatore-campagna committed Jul 25, 2022
1 parent 71d5ad6 commit 504b30c
Show file tree
Hide file tree
Showing 13 changed files with 1,018 additions and 264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ public static class Request extends BroadcastRequest<Request> implements Indices
private RollupAction.Request rollupRequest;
private String[] dimensionFields;
private String[] metricFields;

public Request(RollupAction.Request rollupRequest, final String[] dimensionFields, final String[] metricFields) {
private String[] labelFields;

public Request(
RollupAction.Request rollupRequest,
final String[] dimensionFields,
final String[] metricFields,
final String[] labelFields
) {
super(rollupRequest.indices());
this.rollupRequest = rollupRequest;
this.dimensionFields = dimensionFields;
this.metricFields = metricFields;
this.labelFields = labelFields;
}

public Request() {}
Expand All @@ -58,6 +66,7 @@ public Request(StreamInput in) throws IOException {
this.rollupRequest = new RollupAction.Request(in);
this.dimensionFields = in.readStringArray();
this.metricFields = in.readStringArray();
this.labelFields = in.readStringArray();
}

@Override
Expand All @@ -82,6 +91,10 @@ public String[] getMetricFields() {
return this.metricFields;
}

public String[] getLabelFields() {
return labelFields;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new RollupTask(id, type, action, parentTaskId, rollupRequest.getRollupIndex(), rollupRequest.getRollupConfig(), headers);
Expand All @@ -93,6 +106,7 @@ public void writeTo(StreamOutput out) throws IOException {
rollupRequest.writeTo(out);
out.writeStringArray(dimensionFields);
out.writeStringArray(metricFields);
out.writeStringArray(labelFields);
}

@Override
Expand All @@ -106,6 +120,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("rollup_request", rollupRequest);
builder.array("dimension_fields", dimensionFields);
builder.array("metric_fields", metricFields);
builder.array("label_fields", labelFields);
builder.endObject();
return builder;
}
Expand All @@ -115,6 +130,7 @@ public int hashCode() {
int result = rollupRequest.hashCode();
result = 31 * result + Arrays.hashCode(dimensionFields);
result = 31 * result + Arrays.hashCode(metricFields);
result = 31 * result + Arrays.hashCode(labelFields);
return result;
}

Expand All @@ -125,6 +141,7 @@ public boolean equals(Object o) {
Request request = (Request) o;
if (rollupRequest.equals(request.rollupRequest) == false) return false;
if (Arrays.equals(dimensionFields, request.dimensionFields) == false) return false;
if (Arrays.equals(labelFields, request.labelFields) == false) return false;
return Arrays.equals(metricFields, request.metricFields);
}
}
Expand Down Expand Up @@ -225,6 +242,10 @@ public String[] getMetricFields() {
return request.getMetricFields();
}

public String[] getLabelFields() {
return request.getLabelFields();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
setup:
- skip:
version: " - 8.2.99"
reason: tsdb indexing changed in 8.3.0
version: " - 8.3.99"
reason: "rollup: labels support added in 8.4.0"

- do:
indices.create:
Expand Down Expand Up @@ -32,8 +32,18 @@ setup:
time_series_dimension: true
name:
type: keyword
created_at:
type: date_nanos
running:
type: boolean
number_of_containers:
type: integer
ip:
type: ip
tags:
type: keyword
values:
type: integer
network:
properties:
tx:
Expand All @@ -48,21 +58,21 @@ setup:
index: test
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}'
- '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2005177954, "rx": 801479970}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}'
- '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.41", "network": {"tx": 2006223737, "rx": 802337279}, "created_at": "2021-04-28T19:36:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 2]}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}'
- '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.22", "network": {"tx": 2012916202, "rx": 803685721}, "created_at": "2021-04-28T19:37:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 1]}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}'
- '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.33", "network": {"tx": 1434521831, "rx": 530575198}, "created_at": "2021-04-28T19:42:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test"], "values": [2, 3, 4]}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}'
- '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.56", "network": {"tx": 1434577921, "rx": 530600088}, "created_at": "2021-04-28T19:43:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test", "us-west2"], "values": [2, 1, 1]}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}'
- '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.37", "network": {"tx": 1434587694, "rx": 530604797}, "created_at": "2021-04-28T19:44:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [4, 5, 2]}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}'
- '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.120", "network": {"tx": 1434595272, "rx": 530605511}, "created_at": "2021-04-28T19:45:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [3, 2, 1]}}}'

- do:
indices.put_settings:
Expand Down Expand Up @@ -99,7 +109,12 @@ setup:
- match: { hits.hits.0._source.k8s\.pod\.network\.tx.min: 2001818691 }
- match: { hits.hits.0._source.k8s\.pod\.network\.tx.max: 2005177954 }
- match: { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 }
- is_false: hits.hits.0._source.k8s\.pod\.ip # k8s.pod.ip isn't a dimension and is not rolled up
- match: { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.26" }
- match: { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:35:00.000Z" }
- match: { hits.hits.0._source.k8s\.pod\.number_of_containers: 2 }
- match: { hits.hits.0._source.k8s\.pod\.tags: ["backend", "prod", "us-west1"] }
- match: { hits.hits.0._source.k8s\.pod\.values: [1, 1, 3] }
- is_true: hits.hits.0._source.k8s\.pod\.running

# Assert rollup index settings
- do:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.rollup.v2;

/**
* Base class for classes that read metric and label fields.
*/
abstract class AbstractRollupFieldProducer<T> {

protected final String name;
protected boolean isEmpty;

AbstractRollupFieldProducer(String name) {
this.name = name;
this.isEmpty = true;
}

/**
* Collect a value for the field applying the specific subclass collection strategy.
* @param value the value to collect.
*/
public abstract void collect(T value);

/**
* @return the name of the field.
*/
public String name() {
return name;
}

/**
* @return the value of the field.
*/
public abstract Object value();

/**
* Resets the collected value to the specific subclass reset value.
*/
public abstract void reset();

/**
* @return true if the field has not collected any value.
*/
public boolean isEmpty() {
return isEmpty;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -30,8 +28,20 @@
*/
class FieldValueFetcher {

private static final Set<Class<?>> VALID_TYPES = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(Long.class, Double.class, BigInteger.class, String.class, BytesRef.class))
private static final Set<Class<?>> VALID_METRIC_TYPES = Set.of(
Long.class,
Double.class,
BigInteger.class,
String.class,
BytesRef.class
);
private static final Set<Class<?>> VALID_LABEL_TYPES = Set.of(
Long.class,
Double.class,
BigInteger.class,
String.class,
BytesRef.class,
Boolean.class
);

private final String name;
Expand Down Expand Up @@ -66,7 +76,7 @@ public IndexFieldData<?> fieldData() {

FormattedDocValues getLeaf(LeafReaderContext context) {

final FormattedDocValues delegate = fieldData.load(context).getFormattedValues(DocValueFormat.RAW);
final FormattedDocValues delegate = fieldData.load(context).getFormattedValues(format);
return new FormattedDocValues() {
@Override
public boolean advanceExact(int docId) throws IOException {
Expand Down Expand Up @@ -102,28 +112,36 @@ Object format(Object value) {
/**
* Retrieve field fetchers for a list of fields.
*/
static List<FieldValueFetcher> build(SearchExecutionContext context, String[] fields) {
private static List<FieldValueFetcher> build(SearchExecutionContext context, String[] fields, Set<Class<?>> validTypes) {
List<FieldValueFetcher> fetchers = new ArrayList<>(fields.length);
for (String field : fields) {
MappedFieldType fieldType = context.getFieldType(field);
if (fieldType == null) {
throw new IllegalArgumentException("Unknown field: [" + field + "]");
}
IndexFieldData<?> fieldData = context.getForField(fieldType);
fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field)));
fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field, validTypes)));
}
return Collections.unmodifiableList(fetchers);
}

static Function<Object, Object> getValidator(String field) {
static Function<Object, Object> getValidator(String field, Set<Class<?>> validTypes) {
return value -> {
if (VALID_TYPES.contains(value.getClass()) == false) {
if (validTypes.contains(value.getClass()) == false) {
throw new IllegalArgumentException(
"Expected [" + VALID_TYPES + "] for field [" + field + "], " + "got [" + value.getClass() + "]"
"Expected [" + validTypes + "] for field [" + field + "], " + "got [" + value.getClass() + "]"
);
}
return value;
};
}

static List<FieldValueFetcher> forMetrics(SearchExecutionContext context, String[] metricFields) {
return build(context, metricFields, VALID_METRIC_TYPES);
}

static List<FieldValueFetcher> forLabels(SearchExecutionContext context, String[] labelFields) {
return build(context, labelFields, VALID_LABEL_TYPES);
}

}

0 comments on commit 504b30c

Please sign in to comment.