Skip to content

Commit

Permalink
Added a more randomized passthrough indexing test. (#105344)
Browse files Browse the repository at this point in the history
That also asserts routing aspects of indexing, searching and getting by
id.

Relates to #103567
  • Loading branch information
martijnvg committed Feb 9, 2024
1 parent 2ff676c commit 47304a1
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 6 deletions.
@@ -0,0 +1,194 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.xcontent.XContentType;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class TSDBPassthroughIndexingIT extends ESSingleNodeTestCase {

public static final String MAPPING_TEMPLATE = """
{
"_doc":{
"properties": {
"@timestamp" : {
"type": "date"
},
"attributes": {
"type": "passthrough",
"dynamic": true,
"time_series_dimension": true
},
"metrics": {
"properties": {
"network": {
"properties": {
"tx": {
"type": "long",
"time_series_metric": "counter"
},
"rx": {
"type": "long",
"time_series_metric": "counter"
}
}
}
}
}
}
}
}""";

private static final String DOC = """
{
"@timestamp": "$time",
"attributes": {
"metricset": "pod",
"pod": {
"name": "$name",
"uid": "$uid",
"ip": "$ip"
}
},
"metrics": {
"network": {
"tx": 1434595272,
"rx": 530605511
}
}
}
""";

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class);
}

@Override
protected Settings nodeSettings() {
Settings.Builder newSettings = Settings.builder();
newSettings.put(super.nodeSettings());
// This essentially disables the automatic updates to end_time settings of a data stream's latest backing index.
newSettings.put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m");
return newSettings.build();
}

public void testIndexingGettingAndSearching() throws Exception {
var templateSettings = Settings.builder()
.put("index.mode", "time_series")
.put("index.number_of_shards", randomIntBetween(2, 10))
.put("index.number_of_replicas", 0);

var request = new TransportPutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of("k8s*"))
.template(new Template(templateSettings.build(), new CompressedXContent(MAPPING_TEMPLATE), null))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();

String index = null;
int indexingIters = randomIntBetween(16, 128);
Instant time = Instant.now();
for (int i = 0; i < indexingIters; i++) {
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(
DOC.replace("$time", formatInstant(time))
.replace("$uid", randomUUID())
.replace("$name", randomAlphaOfLength(4))
.replace("$ip", InetAddresses.toAddrString(randomIp(randomBoolean()))),
XContentType.JSON
);
var indexResponse = client().index(indexRequest).actionGet();
index = indexResponse.getIndex();
String id = indexResponse.getId();

var getResponse = client().get(new GetRequest(index, id)).actionGet();
assertThat(getResponse.isExists(), is(true));

client().admin().indices().refresh(new RefreshRequest(index)).actionGet();
var searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder("_id", id)));
assertResponse(client().search(searchRequest), searchResponse -> {
assertHitCount(searchResponse, 1);
assertThat(searchResponse.getHits().getHits()[0].getId(), equalTo(id));
});
time = time.plusMillis(1);
}

// validate index:
var getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(index)).actionGet();
assertThat(getIndexResponse.getSettings().get(index).get("index.routing_path"), equalTo("[attributes.*]"));
// validate mapping
var mapping = getIndexResponse.mappings().get(index).getSourceAsMap();
assertMap(
ObjectPath.eval("properties.attributes.properties.metricset", mapping),
matchesMap().entry("type", "keyword").entry("time_series_dimension", true)
);
@SuppressWarnings("unchecked")
var attributes = (Map<String, Map<?, ?>>) ObjectPath.eval("properties.attributes.properties", mapping);
assertMap(attributes.get("pod.ip"), matchesMap().entry("type", "keyword").entry("time_series_dimension", true));
assertMap(attributes.get("pod.uid"), matchesMap().entry("type", "keyword").entry("time_series_dimension", true));
assertMap(attributes.get("pod.name"), matchesMap().entry("type", "keyword").entry("time_series_dimension", true));
// alias field mappers:
assertMap(
ObjectPath.eval("properties.metricset", mapping),
matchesMap().entry("type", "alias").entry("path", "attributes.metricset")
);
assertMap(
ObjectPath.eval("properties.pod.properties", mapping),
matchesMap().extraOk().entry("name", matchesMap().entry("type", "alias").entry("path", "attributes.pod.name"))
);
assertMap(
ObjectPath.eval("properties.pod.properties", mapping),
matchesMap().extraOk().entry("uid", matchesMap().entry("type", "alias").entry("path", "attributes.pod.uid"))
);
assertMap(
ObjectPath.eval("properties.pod.properties", mapping),
matchesMap().extraOk().entry("ip", matchesMap().entry("type", "alias").entry("path", "attributes.pod.ip"))
);
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}
Expand Up @@ -114,7 +114,11 @@ public RootObjectMapper build(MapperBuilderContext context) {
Map<String, Mapper> mappers = buildMappers(context);

Map<String, Mapper> aliasMappers = new HashMap<>();
getAliasMappers(mappers, aliasMappers, context, 0);
Map<String, ObjectMapper.Builder> objectIntermediates = new HashMap<>(1);
getAliasMappers(mappers, aliasMappers, objectIntermediates, context, 0);
for (var entry : objectIntermediates.entrySet()) {
aliasMappers.put(entry.getKey(), entry.getValue().build(context));
}
mappers.putAll(aliasMappers);

return new RootObjectMapper(
Expand All @@ -131,7 +135,13 @@ public RootObjectMapper build(MapperBuilderContext context) {
);
}

void getAliasMappers(Map<String, Mapper> mappers, Map<String, Mapper> aliasMappers, MapperBuilderContext context, int level) {
void getAliasMappers(
Map<String, Mapper> mappers,
Map<String, Mapper> aliasMappers,
Map<String, ObjectMapper.Builder> objectIntermediates,
MapperBuilderContext context,
int level
) {
if (level >= MAX_NESTING_LEVEL_FOR_PASS_THROUGH_OBJECTS) {
logger.warn("Exceeded maximum nesting level for searching for pass-through object fields within object fields.");
return;
Expand Down Expand Up @@ -174,21 +184,20 @@ void getAliasMappers(Map<String, Mapper> mappers, Map<String, Mapper> aliasMappe
);
for (int i = fieldNameParts.length - 2; i >= 0; --i) {
String intermediateObjectName = fieldNameParts[i];
ObjectMapper.Builder intermediate = new ObjectMapper.Builder(
ObjectMapper.Builder intermediate = objectIntermediates.computeIfAbsent(
intermediateObjectName,
ObjectMapper.Defaults.SUBOBJECTS
s -> new ObjectMapper.Builder(intermediateObjectName, ObjectMapper.Defaults.SUBOBJECTS)
);
intermediate.add(fieldBuilder);
fieldBuilder = intermediate;
}
aliasMappers.put(fieldNameParts[0], fieldBuilder.build(context));
}
}
}
}
} else if (mapper instanceof ObjectMapper objectMapper) {
// Call recursively to check child fields. The level guards against long recursive call sequences.
getAliasMappers(objectMapper.mappers, aliasMappers, context, level + 1);
getAliasMappers(objectMapper.mappers, aliasMappers, objectIntermediates, context, level + 1);
}
}
}
Expand Down
Expand Up @@ -394,6 +394,7 @@ public void testPassThroughObjectNested() throws IOException {
public void testAliasMappersCreatesAlias() throws Exception {
var context = MapperBuilderContext.root(false, false);
Map<String, Mapper> aliases = new HashMap<>();
var objectIntermediates = new HashMap<String, ObjectMapper.Builder>(1);
new RootObjectMapper.Builder("root", Explicit.EXPLICIT_FALSE).getAliasMappers(
Map.of(
"labels",
Expand All @@ -407,6 +408,7 @@ public void testAliasMappersCreatesAlias() throws Exception {
)
),
aliases,
objectIntermediates,
context,
0
);
Expand All @@ -417,6 +419,7 @@ public void testAliasMappersCreatesAlias() throws Exception {
public void testAliasMappersCreatesAliasNested() throws Exception {
var context = MapperBuilderContext.root(false, false);
Map<String, Mapper> aliases = new HashMap<>();
var objectIntermediates = new HashMap<String, ObjectMapper.Builder>(1);
new RootObjectMapper.Builder("root", Explicit.EXPLICIT_FALSE).getAliasMappers(
Map.of(
"outer",
Expand All @@ -440,6 +443,7 @@ public void testAliasMappersCreatesAliasNested() throws Exception {
)
),
aliases,
objectIntermediates,
context,
0
);
Expand All @@ -450,6 +454,7 @@ public void testAliasMappersCreatesAliasNested() throws Exception {
public void testAliasMappersExitsInDeepNesting() throws Exception {
var context = MapperBuilderContext.root(false, false);
Map<String, Mapper> aliases = new HashMap<>();
var objectIntermediates = new HashMap<String, ObjectMapper.Builder>(1);
new RootObjectMapper.Builder("root", Explicit.EXPLICIT_FALSE).getAliasMappers(
Map.of(
"labels",
Expand All @@ -463,6 +468,7 @@ public void testAliasMappersExitsInDeepNesting() throws Exception {
)
),
aliases,
objectIntermediates,
context,
1_000_000
);
Expand All @@ -472,6 +478,7 @@ public void testAliasMappersExitsInDeepNesting() throws Exception {
public void testAliasMappersCreatesNoAliasForRegularObject() throws Exception {
var context = MapperBuilderContext.root(false, false);
Map<String, Mapper> aliases = new HashMap<>();
var objectIntermediates = new HashMap<String, ObjectMapper.Builder>(1);
new RootObjectMapper.Builder("root", Explicit.EXPLICIT_FALSE).getAliasMappers(
Map.of(
"labels",
Expand All @@ -485,6 +492,7 @@ public void testAliasMappersCreatesNoAliasForRegularObject() throws Exception {
)
),
aliases,
objectIntermediates,
context,
0
);
Expand All @@ -494,6 +502,7 @@ public void testAliasMappersCreatesNoAliasForRegularObject() throws Exception {
public void testAliasMappersConflictingField() throws Exception {
var context = MapperBuilderContext.root(false, false);
Map<String, Mapper> aliases = new HashMap<>();
var objectIntermediates = new HashMap<String, ObjectMapper.Builder>(1);
new RootObjectMapper.Builder("root", Explicit.EXPLICIT_FALSE).getAliasMappers(
Map.of(
"labels",
Expand All @@ -509,6 +518,7 @@ public void testAliasMappersConflictingField() throws Exception {
new KeywordFieldMapper.Builder("host", IndexVersion.current()).build(context)
),
aliases,
objectIntermediates,
context,
0
);
Expand Down

0 comments on commit 47304a1

Please sign in to comment.