Skip to content

Commit

Permalink
TSDB: Add support for composite aggregation on _tsid field (#81998)
Browse files Browse the repository at this point in the history
Currently, composite aggregation supports only string or numeric values for the fields 
in the sources part of the composite aggregation. Although _tsid is encoded and stored
as a byte array, it is formatted as map {"dim1": "value1", "dim2": value2", ...} for input/output.

This PR adds support for composite aggregation on _tsid field.

Relates to #74660
  • Loading branch information
csoulios committed Jan 19, 2022
1 parent 3f8011e commit 8ae9781
Show file tree
Hide file tree
Showing 12 changed files with 411 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ public String format(BytesRef value) {
}

@Override
public BytesRef parseBytesRef(String value) {
char[] encoded = value.toCharArray();
public BytesRef parseBytesRef(Object value) {
char[] encoded = value.toString().toCharArray();
int decodedLength = IndexableBinaryStringTools.getDecodedLength(encoded, 0, encoded.length);
byte[] decoded = new byte[decodedLength];
IndexableBinaryStringTools.decode(encoded, 0, encoded.length, decoded, 0, decodedLength);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
setup:
- skip:
version: " - 8.0.99"
reason: introduced in 8.1.0

- do:
indices.create:
index: test
body:
settings:
index:
mode: time_series
routing_path: [metricset, k8s.pod.uid]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
metricset:
type: keyword
time_series_dimension: true
k8s:
properties:
pod:
properties:
uid:
type: keyword
time_series_dimension: true
name:
type: keyword
ip:
type: ip
network:
properties:
tx:
type: long
rx:
type: long
- do:
bulk:
refresh: true
index: test
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}'

---
composite aggregation on tsid:
- skip:
version: " - 8.0.99"
reason: _tsid introduced in 8.1.0

- do:
search:
index: test
body:
size: 0
aggregations:
tsids:
composite:
sources: [
"tsid": {
"terms": {
"field": "_tsid"
}
},
"date": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
}
}
]

- match: { hits.total.value: 8 }
- length: { aggregations.tsids.buckets: 4 }
- match: { aggregations.tsids.buckets.0.key.tsid.k8s\.pod\.uid: "947e4ced-1786-4e53-9e0c-5c447e959507" }
- match: { aggregations.tsids.buckets.0.key.tsid.metricset: "pod" }
- match: { aggregations.tsids.buckets.0.key.date: 1619635800000}
- match: { aggregations.tsids.buckets.0.doc_count: 3 }
- match: { aggregations.tsids.buckets.1.key.tsid.k8s\.pod\.uid: "947e4ced-1786-4e53-9e0c-5c447e959507" }
- match: { aggregations.tsids.buckets.1.key.tsid.metricset: "pod" }
- match: { aggregations.tsids.buckets.1.key.date: 1619635860000}
- match: { aggregations.tsids.buckets.1.doc_count: 1 }
- match: { aggregations.tsids.buckets.2.key.tsid.k8s\.pod\.uid: "df3145b3-0563-4d3b-a0f7-897eb2876ea9" }
- match: { aggregations.tsids.buckets.2.key.tsid.metricset: "pod" }
- match: { aggregations.tsids.buckets.2.key.date: 1619635800000}
- match: { aggregations.tsids.buckets.2.doc_count: 3 }
- match: { aggregations.tsids.buckets.3.key.tsid.k8s\.pod\.uid: "df3145b3-0563-4d3b-a0f7-897eb2876ea9" }
- match: { aggregations.tsids.buckets.3.key.tsid.metricset: "pod" }
- match: { aggregations.tsids.buckets.3.key.date: 1619635860000}
- match: { aggregations.tsids.buckets.3.doc_count: 1 }
- match: { aggregations.tsids.after_key.tsid.k8s\.pod\.uid: "df3145b3-0563-4d3b-a0f7-897eb2876ea9" }
- match: { aggregations.tsids.after_key.tsid.metricset: "pod" }
- match: { aggregations.tsids.after_key.date: 1619635860000}

---
composite aggregation on tsid with after:
- skip:
version: " - 8.0.99"
reason: _tsid introduced in 8.1.0

- do:
search:
index: test
body:
size: 0
aggregations:
tsids:
composite:
sources: [
"tsid": {
"terms": {
"field": "_tsid"
}
},
"date": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
}
}
]
after: {
tsid: { k8s.pod.uid: "df3145b3-0563-4d3b-a0f7-897eb2876ea9", metricset: "pod" },
date: 1619635800000
}

- match: { hits.total.value: 8 }
- length: { aggregations.tsids.buckets: 1 }
- match: { aggregations.tsids.buckets.0.key.tsid.k8s\.pod\.uid: "df3145b3-0563-4d3b-a0f7-897eb2876ea9" }
- match: { aggregations.tsids.buckets.0.key.tsid.metricset: "pod" }
- match: { aggregations.tsids.buckets.0.key.date: 1619635860000}
- match: { aggregations.tsids.buckets.0.doc_count: 1 }
- match: { aggregations.tsids.after_key.tsid.k8s\.pod\.uid: "df3145b3-0563-4d3b-a0f7-897eb2876ea9" }
- match: { aggregations.tsids.after_key.tsid.metricset: "pod" }
- match: { aggregations.tsids.after_key.date: 1619635860000}

Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ public void postParse(DocumentParserContext context) throws IOException {

// SortedMap is expected to be sorted by key (field name)
SortedMap<String, BytesReference> dimensionFields = context.doc().getDimensionBytes();
if (dimensionFields.isEmpty()) {
BytesReference timeSeriesId = buildTsidField(dimensionFields);
context.doc().add(new SortedSetDocValuesField(fieldType().name(), timeSeriesId.toBytesRef()));
}

public static BytesReference buildTsidField(Map<String, BytesReference> dimensionFields) throws IOException {
if (dimensionFields == null || dimensionFields.isEmpty()) {
throw new IllegalArgumentException("Dimension fields are missing.");
}

Expand All @@ -164,8 +169,7 @@ public void postParse(DocumentParserContext context) throws IOException {
if (timeSeriesId.length() > LIMIT) {
throw new IllegalArgumentException(NAME + " longer than [" + LIMIT + "] bytes [" + timeSeriesId.length() + "].");
}
assert timeSeriesId != null : "In time series mode _tsid cannot be null";
context.doc().add(new SortedSetDocValuesField(fieldType().name(), timeSeriesId.toBytesRef()));
return timeSeriesId;
}
}

Expand All @@ -192,7 +196,7 @@ public static Map<String, Object> decodeTsid(StreamInput in) {
case (byte) 'l' -> // parse a long
result.put(name, in.readLong());
case (byte) 'u' -> { // parse an unsigned_long
Object ul = DocValueFormat.UnsignedLongShiftedDocValueFormat.INSTANCE.format(in.readLong());
Object ul = DocValueFormat.UNSIGNED_LONG_SHIFTED.format(in.readLong());
result.put(name, ul);
}
default -> throw new IllegalArgumentException("Cannot parse [" + name + "]: Unknown type [" + type + "]");
Expand Down
61 changes: 54 additions & 7 deletions server/src/main/java/org/elasticsearch/search/DocValueFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -34,7 +35,9 @@
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -78,7 +81,7 @@ default double parseDouble(String value, boolean roundUp, LongSupplier now) {

/** Parse a value that was formatted with {@link #format(BytesRef)} back
* to the original BytesRef. */
default BytesRef parseBytesRef(String value) {
default BytesRef parseBytesRef(Object value) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -155,8 +158,8 @@ public double parseDouble(String value, boolean roundUp, LongSupplier now) {
}

@Override
public BytesRef parseBytesRef(String value) {
return new BytesRef(value);
public BytesRef parseBytesRef(Object value) {
return new BytesRef(value.toString());
}

@Override
Expand Down Expand Up @@ -190,8 +193,8 @@ public String format(BytesRef value) {
}

@Override
public BytesRef parseBytesRef(String value) {
return new BytesRef(Base64.getDecoder().decode(value));
public BytesRef parseBytesRef(Object value) {
return new BytesRef(Base64.getDecoder().decode(value.toString()));
}
};

Expand Down Expand Up @@ -477,8 +480,8 @@ public String format(BytesRef value) {
}

@Override
public BytesRef parseBytesRef(String value) {
return new BytesRef(InetAddressPoint.encode(InetAddresses.forString(value)));
public BytesRef parseBytesRef(Object value) {
return new BytesRef(InetAddressPoint.encode(InetAddresses.forString(value.toString())));
}

@Override
Expand Down Expand Up @@ -694,5 +697,49 @@ public String toString() {
public Object format(BytesRef value) {
return TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(value).streamInput());
}

@Override
public BytesRef parseBytesRef(Object value) {
if (value instanceof Map<?, ?> == false) {
throw new IllegalArgumentException("Cannot parse tsid object [" + value + "]");
}

Map<?, ?> m = (Map<?, ?>) value;
Map<String, BytesReference> dimensionFields = new LinkedHashMap<>(m.size());
for (Map.Entry<?, ?> entry : m.entrySet()) {
String k = (String) entry.getKey();
Object v = entry.getValue();
BytesReference bytes;

if (v instanceof String s) {
bytes = TimeSeriesIdFieldMapper.encodeTsidValue(s);
} else if (v instanceof Long || v instanceof Integer) {
Long l = Long.valueOf(v.toString());
// For a long encoded number, we must check if the number can be the encoded value
// of an unsigned_long.
Number ul = (Number) UNSIGNED_LONG_SHIFTED.format(l);
if (l == ul) {
bytes = TimeSeriesIdFieldMapper.encodeTsidValue(l);
} else {
long ll = UNSIGNED_LONG_SHIFTED.parseLong(String.valueOf(l), false, () -> 0L);
bytes = TimeSeriesIdFieldMapper.encodeTsidUnsignedLongValue(ll);
}
} else if (v instanceof BigInteger ul) {
long ll = UNSIGNED_LONG_SHIFTED.parseLong(ul.toString(), false, () -> 0L);
bytes = TimeSeriesIdFieldMapper.encodeTsidUnsignedLongValue(ll);
} else {
throw new IllegalArgumentException("Unexpected value in tsid object [" + v + "]");
}

assert bytes != null : "Could not parse fields in _tsid field [" + value + "].";
dimensionFields.put(k, bytes);
}

try {
return TimeSeriesIdFieldMapper.buildTsidField(dimensionFields).toBytesRef();
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ void setAfter(Comparable<?> value) {
if (missingBucket && value == null) {
afterValue = null;
} else if (value.getClass() == String.class) {
afterValue = format.parseBytesRef(value.toString());
afterValue = format.parseBytesRef(value);
} else if (value.getClass() == BytesRef.class) {
// The value may be a bytes reference (eg an encoded tsid field)
afterValue = (BytesRef) value;
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper.TimeSeriesIdFieldType;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -235,8 +236,11 @@ protected AggregatorFactory doBuild(
Object obj = after.get(sourceName);
if (configs[i].missingBucket() && obj == null) {
values[i] = null;
} else if (obj instanceof Comparable) {
values[i] = (Comparable<?>) obj;
} else if (obj instanceof Comparable<?> c) {
values[i] = c;
} else if (obj instanceof Map<?, ?> && configs[i].fieldType().getClass() == TimeSeriesIdFieldType.class) {
// If input is a _tsid map, encode the map to the _tsid BytesRef
values[i] = configs[i].format().parseBytesRef(obj);
} else {
throw new IllegalArgumentException(
"Invalid value for [after."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ void setAfter(Comparable<?> value) {
} else if (value.getClass() == String.class || (missingBucket && fieldType == null)) {
// the value might be not string if this field is missing in this shard but present in other shards
// and doesn't have a string type
afterValue = format.parseBytesRef(value.toString());
afterValue = format.parseBytesRef(value);
} else if (value.getClass() == BytesRef.class) {
// The value may be a bytes reference (eg an encoded tsid field)
afterValue = (BytesRef) value;
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ static Object formatObject(Object obj, DocValueFormat format) {
} else {
formatted = format.format(value);
}
parsed = format.parseBytesRef(formatted.toString());
parsed = format.parseBytesRef(formatted);
if (parsed.equals(obj) == false) {
throw new IllegalArgumentException(
"Format ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -199,7 +200,12 @@ private static Object convertValueFromSortType(String fieldName, SortField.Type

case STRING_VAL:
case STRING:
return format.parseBytesRef(value.toString());
if (value instanceof BytesRef bytesRef) {
// _tsid is stored and ordered as BytesRef. We should not format it
return bytesRef;
} else {
return format.parseBytesRef(value);
}

default:
throw new IllegalArgumentException(
Expand Down

0 comments on commit 8ae9781

Please sign in to comment.