Skip to content

Commit

Permalink
[Transform] Implement integration test for transform's deduce_mapping…
Browse files Browse the repository at this point in the history
…s setting (#103052)
  • Loading branch information
przemekwitek committed Dec 7, 2023
1 parent 468c3da commit e1e101e
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1744,8 +1744,8 @@ protected static Map<String, Object> getIndexMapping(String index) throws IOExce

@SuppressWarnings("unchecked")
protected Map<String, Object> getIndexMappingAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexMapping(index);
return (Map<String, Object>) ((Map<String, Object>) indexSettings.get(index)).get("mappings");
Map<String, Object> indexMapping = getIndexMapping(index);
return (Map<String, Object>) ((Map<String, Object>) indexMapping.get(index)).get("mappings");
}

protected static boolean indexExists(String index) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.action.admin.indices.RestPutIndexTemplateAction;
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
Expand All @@ -21,6 +23,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class TransformDestIndexIT extends TransformRestTestCase {

private static boolean indicesCreated = false;
Expand Down Expand Up @@ -140,6 +145,116 @@ public void testTransformDestIndexCreatedDuringUpdate() throws Exception {
assertTrue(indexExists(destIndex));
}

public void testTransformDestIndexMappings_DeduceMappings() throws Exception {
testTransformDestIndexMappings("test_dest_index_mappings_deduce", true);
}

public void testTransformDestIndexMappings_NoDeduceMappings() throws Exception {
testTransformDestIndexMappings("test_dest_index_mappings_no_deduce", false);
}

private void testTransformDestIndexMappings(String transformId, boolean deduceMappings) throws Exception {
String destIndex = transformId + "-dest";

{
String destIndexTemplate = Strings.format("""
{
"index_patterns": [ "%s*" ],
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"reviewer": {
"type": "keyword"
},
"avg_rating": {
"type": "double"
}
}
}
}""", destIndex);
Request createIndexTemplateRequest = new Request("PUT", "_template/test_dest_index_no_deduce_template");
createIndexTemplateRequest.setJsonEntity(destIndexTemplate);
createIndexTemplateRequest.setOptions(expectWarnings(RestPutIndexTemplateAction.DEPRECATION_WARNING));
Map<String, Object> createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest));
assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}

// Verify that the destination index does not exist yet, even though the template already exists
assertFalse(indexExists(destIndex));

{
String config = Strings.format("""
{
"dest": {
"index": "%s"
},
"source": {
"index": "%s"
},
"sync": {
"time": {
"field": "timestamp",
"delay": "15m"
}
},
"frequency": "1s",
"pivot": {
"group_by": {
"timestamp": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "10s"
}
},
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
},
"settings": {
"unattended": true,
"deduce_mappings": %s
}
}""", destIndex, REVIEWS_INDEX_NAME, deduceMappings);
createReviewsTransform(transformId, null, null, config);

startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);
}

// Verify that the destination index now exists and has correct mappings from the template
assertTrue(indexExists(destIndex));
assertThat(
getIndexMappingAsMap(destIndex),
is(
equalTo(
Map.of(
"properties",
Map.of(
"avg_rating",
Map.of("type", "double"),
"reviewer",
Map.of("type", "keyword"),
"timestamp",
Map.of("type", "date")
)
)
)
)
);
}

private static void assertAliases(String index, String... aliases) throws IOException {
Map<String, Map<?, ?>> expectedAliases = Arrays.stream(aliases).collect(Collectors.toMap(a -> a, a -> Map.of()));
Response aliasesResponse = client().performRequest(new Request("GET", index + "/_alias"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ protected void createLatestReviewsTransform(String transformId, String transform
createReviewsTransform(transformId, null, null, config);
}

private void createReviewsTransform(String transformId, String authHeader, String secondaryAuthHeader, String config)
protected void createReviewsTransform(String transformId, String authHeader, String secondaryAuthHeader, String config)
throws IOException {
final Request createTransformRequest = createRequestWithSecondaryAuth(
"PUT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.stream.Stream;

import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt;
import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.isDateType;
import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.isNumericType;

public final class AggregationResultUtils {
Expand All @@ -70,16 +71,9 @@ public final class AggregationResultUtils {
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}

private static final Map<String, BucketKeyExtractor> BUCKET_KEY_EXTRACTOR_MAP;
private static final BucketKeyExtractor DEFAULT_BUCKET_KEY_EXTRACTOR = new DefaultBucketKeyExtractor();
private static final BucketKeyExtractor DATES_AS_EPOCH_BUCKET_KEY_EXTRACTOR = new DatesAsEpochBucketKeyExtractor();

static {
Map<String, BucketKeyExtractor> tempMap = new HashMap<>();
tempMap.put(GeoTileGroupSource.class.getName(), new GeoTileBucketKeyExtractor());

BUCKET_KEY_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}
private static final BucketKeyExtractor GEO_TILE_BUCKET_KEY_EXTRACTOR = new GeoTileBucketKeyExtractor();

private static final String FIELD_TYPE = "type";
private static final String FIELD_COORDINATES = "coordinates";
Expand Down Expand Up @@ -150,10 +144,13 @@ public static Stream<Map<String, Object>> extractCompositeAggregationResults(
}

static BucketKeyExtractor getBucketKeyExtractor(SingleGroupSource groupSource, boolean datesAsEpoch) {
return BUCKET_KEY_EXTRACTOR_MAP.getOrDefault(
groupSource.getClass().getName(),
datesAsEpoch ? DATES_AS_EPOCH_BUCKET_KEY_EXTRACTOR : DEFAULT_BUCKET_KEY_EXTRACTOR
);
if (groupSource instanceof GeoTileGroupSource) {
return GEO_TILE_BUCKET_KEY_EXTRACTOR;
} else if (datesAsEpoch) {
return DATES_AS_EPOCH_BUCKET_KEY_EXTRACTOR;
} else {
return DEFAULT_BUCKET_KEY_EXTRACTOR;
}
}

static AggValueExtractor getExtractor(Aggregation aggregation) {
Expand Down Expand Up @@ -514,7 +511,6 @@ public Object value(Object key, String type) {
);
return geoShape;
}

}

static class DefaultBucketKeyExtractor implements BucketKeyExtractor {
Expand All @@ -523,16 +519,14 @@ static class DefaultBucketKeyExtractor implements BucketKeyExtractor {
public Object value(Object key, String type) {
if (isNumericType(type) && key instanceof Double) {
return dropFloatingPointComponentIfTypeRequiresIt(type, (Double) key);
} else if ((DateFieldMapper.CONTENT_TYPE.equals(type) || DateFieldMapper.DATE_NANOS_CONTENT_TYPE.equals(type))
&& key instanceof Long) {
// date_histogram return bucket keys with milliseconds since epoch precision, therefore we don't need a
// nanosecond formatter, for the parser on indexing side, time is optional (only the date part is mandatory)
return DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis((Long) key);
}

return key;
} else if (isDateType(type) && key instanceof Long) {
// date_histogram return bucket keys with milliseconds since epoch precision, therefore we don't need a
// nanosecond formatter, for the parser on indexing side, time is optional (only the date part is mandatory)
return DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis((Long) key);
} else {
return key;
}
}

}

static class DatesAsEpochBucketKeyExtractor implements BucketKeyExtractor {
Expand All @@ -541,9 +535,9 @@ static class DatesAsEpochBucketKeyExtractor implements BucketKeyExtractor {
public Object value(Object key, String type) {
if (isNumericType(type) && key instanceof Double) {
return dropFloatingPointComponentIfTypeRequiresIt(type, (Double) key);
} else {
return key;
}
return key;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ protected Stream<Map<String, Object>> extractResults(
// < 7.11 as epoch millis
// >= 7.11 as string
// note: it depends on the version when the transform has been created, not the version of the code
boolean datesAsEpoch = settings.getDatesAsEpochMillis() != null ? settings.getDatesAsEpochMillis()
: version.onOrAfter(TransformConfigVersion.V_7_11_0) ? false
: true;
boolean datesAsEpoch = settings.getDatesAsEpochMillis() != null
? settings.getDatesAsEpochMillis()
: version.before(TransformConfigVersion.V_7_11_0);

return AggregationResultUtils.extractCompositeAggregationResults(
agg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.query.QueryBuilder;
Expand All @@ -28,6 +29,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -39,6 +41,11 @@ public final class SchemaUtil {

// Full collection of numeric field type strings and whether they are floating point or not
private static final Map<String, Boolean> NUMERIC_FIELD_MAPPER_TYPES;
// Full collection of date field type strings
private static final Set<String> DATE_FIELD_MAPPER_TYPES = Set.of(
DateFieldMapper.CONTENT_TYPE,
DateFieldMapper.DATE_NANOS_CONTENT_TYPE
);
static {
Map<String, Boolean> types = Stream.of(NumberFieldMapper.NumberType.values())
.collect(Collectors.toMap(t -> t.typeName(), t -> t.numericType().isFloatingPoint()));
Expand All @@ -55,6 +62,10 @@ public static boolean isNumericType(String type) {
return type != null && NUMERIC_FIELD_MAPPER_TYPES.containsKey(type);
}

public static boolean isDateType(String type) {
return type != null && DATE_FIELD_MAPPER_TYPES.contains(type);
}

/**
* Convert a numeric value to a whole number if it's not a floating point number.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,7 @@ public void testDefaultBucketKeyExtractor() {
assertThat(extractor.value(1577836800000L, "date"), equalTo("2020-01-01T00:00:00.000Z"));
assertThat(extractor.value(1577836800000L, "date_nanos"), equalTo("2020-01-01T00:00:00.000Z"));
assertThat(extractor.value(1577836800000L, "long"), equalTo(1577836800000L));
assertThat(extractor.value(1577836800000L, null), equalTo(1577836800000L));
}

public void testDatesAsEpochBucketKeyExtractor() {
Expand All @@ -1091,6 +1092,7 @@ public void testDatesAsEpochBucketKeyExtractor() {
assertThat(extractor.value(1577836800000L, "date"), equalTo(1577836800000L));
assertThat(extractor.value(1577836800000L, "date_nanos"), equalTo(1577836800000L));
assertThat(extractor.value(1577836800000L, "long"), equalTo(1577836800000L));
assertThat(extractor.value(1577836800000L, null), equalTo(1577836800000L));
}

private void executeTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,28 @@ public void testGetSourceFieldMappingsWithRuntimeMappings() throws InterruptedEx
}
}

public void testIsNumericType() {
assertFalse(SchemaUtil.isNumericType(null));
assertFalse(SchemaUtil.isNumericType("non-existing"));
assertTrue(SchemaUtil.isNumericType("double"));
assertTrue(SchemaUtil.isNumericType("integer"));
assertTrue(SchemaUtil.isNumericType("long"));
assertFalse(SchemaUtil.isNumericType("date"));
assertFalse(SchemaUtil.isNumericType("date_nanos"));
assertFalse(SchemaUtil.isNumericType("keyword"));
}

public void testIsDateType() {
assertFalse(SchemaUtil.isDateType(null));
assertFalse(SchemaUtil.isDateType("non-existing"));
assertFalse(SchemaUtil.isDateType("double"));
assertFalse(SchemaUtil.isDateType("integer"));
assertFalse(SchemaUtil.isDateType("long"));
assertTrue(SchemaUtil.isDateType("date"));
assertTrue(SchemaUtil.isDateType("date_nanos"));
assertFalse(SchemaUtil.isDateType("keyword"));
}

private static class FieldCapsMockClient extends NoOpClient {
FieldCapsMockClient(ThreadPool threadPool) {
super(threadPool);
Expand Down

0 comments on commit e1e101e

Please sign in to comment.