Skip to content

Commit

Permalink
Introduce deduce_mappings transform setting (#82256)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Jan 18, 2022
1 parent 8cddea0 commit 7be74a8
Show file tree
Hide file tree
Showing 19 changed files with 302 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class SettingsConfig implements ToXContentObject {
private static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
private static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
private static final ParseField USE_PIT = new ParseField("use_point_in_time");
private static final ParseField DEDUCE_MAPPINGS = new ParseField("deduce_mappings");
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;

Expand All @@ -39,16 +40,27 @@ public class SettingsConfig implements ToXContentObject {
// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_USE_PIT = -1;

// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_DEDUCE_MAPPINGS = -1;

private final Integer maxPageSearchSize;
private final Float docsPerSecond;
private final Integer datesAsEpochMillis;
private final Integer alignCheckpoints;
private final Integer usePit;
private final Integer deduceMappings;

private static final ConstructingObjectParser<SettingsConfig, Void> PARSER = new ConstructingObjectParser<>(
"settings_config",
true,
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3], (Integer) args[4])
args -> new SettingsConfig(
(Integer) args[0],
(Float) args[1],
(Integer) args[2],
(Integer) args[3],
(Integer) args[4],
(Integer) args[5]
)
);

static {
Expand All @@ -75,18 +87,33 @@ public class SettingsConfig implements ToXContentObject {
USE_PIT,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
PARSER.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_DEDUCE_MAPPINGS : p.booleanValue() ? 1 : 0,
DEDUCE_MAPPINGS,
ValueType.BOOLEAN_OR_NULL
);
}

public static SettingsConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints, Integer usePit) {
SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Integer datesAsEpochMillis,
Integer alignCheckpoints,
Integer usePit,
Integer deduceMappings
) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.alignCheckpoints = alignCheckpoints;
this.usePit = usePit;
this.deduceMappings = deduceMappings;
}

@Override
Expand Down Expand Up @@ -127,6 +154,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(USE_PIT.getPreferredName(), usePit > 0 ? true : false);
}
}
if (deduceMappings != null) {
if (deduceMappings.equals(DEFAULT_DEDUCE_MAPPINGS)) {
builder.field(DEDUCE_MAPPINGS.getPreferredName(), (Boolean) null);
} else {
builder.field(DEDUCE_MAPPINGS.getPreferredName(), deduceMappings > 0 ? true : false);
}
}
builder.endObject();
return builder;
}
Expand All @@ -151,6 +185,10 @@ public Boolean getUsePit() {
return usePit != null ? usePit > 0 : null;
}

public Boolean getDeduceMappings() {
return deduceMappings != null ? deduceMappings > 0 : null;
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand All @@ -165,12 +203,13 @@ public boolean equals(Object other) {
&& Objects.equals(docsPerSecond, that.docsPerSecond)
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit);
&& Objects.equals(usePit, that.usePit)
&& Objects.equals(deduceMappings, that.deduceMappings);
}

@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings);
}

public static Builder builder() {
Expand All @@ -183,6 +222,7 @@ public static class Builder {
private Integer datesAsEpochMillis;
private Integer alignCheckpoints;
private Integer usePit;
private Integer deduceMappings;

/**
* Sets the paging maximum paging maxPageSearchSize that transform can use when
Expand Down Expand Up @@ -256,8 +296,22 @@ public Builder setUsePit(Boolean usePit) {
return this;
}

/**
* Whether the destination index mappings should be deduced from the transform config.
* It is used per default.
*
* An explicit `null` resets to default.
*
* @param deduceMappings true if the transform should try deducing mappings from the config.
* @return the {@link Builder} with deduceMappings set.
*/
public Builder setDeduceMappings(Boolean deduceMappings) {
this.deduceMappings = deduceMappings == null ? DEFAULT_DEDUCE_MAPPINGS : deduceMappings ? 1 : 0;
return this;
}

public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.client.transform.UpdateTransformRequest;
import org.elasticsearch.client.transform.UpdateTransformResponse;
import org.elasticsearch.client.transform.transforms.DestConfig;
import org.elasticsearch.client.transform.transforms.SettingsConfig;
import org.elasticsearch.client.transform.transforms.SourceConfig;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.client.transform.transforms.TransformConfig;
Expand Down Expand Up @@ -66,6 +67,7 @@

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -353,6 +355,30 @@ public void testPreview() throws IOException {
assertExpectedPreview(preview);
}

@SuppressWarnings("unchecked")
public void testPreviewWithoutMappingDeduction() throws IOException {
String sourceIndex = "transform-source";
createIndex(sourceIndex);
indexData(sourceIndex);

TransformConfig transform = validTransformConfigBuilder("test-preview", sourceIndex, null).setSettings(
SettingsConfig.builder().setDeduceMappings(false).build()
).build();

TransformClient client = highLevelClient().transform();
PreviewTransformResponse preview = execute(
new PreviewTransformRequest(transform),
client::previewTransform,
client::previewTransformAsync
);

assertExpectedPreviewDocs(preview.getDocs());

assertThat(preview.getMappings(), hasKey("properties"));
Map<String, Object> fields = (Map<String, Object>) preview.getMappings().get("properties");
assertThat(fields, anEmptyMap());
}

public void testPreviewById() throws IOException {
String sourceIndex = "transform-source";
createIndex(sourceIndex);
Expand All @@ -371,9 +397,13 @@ public void testPreviewById() throws IOException {
assertExpectedPreview(preview);
}

@SuppressWarnings("unchecked")
private static void assertExpectedPreview(PreviewTransformResponse preview) {
List<Map<String, Object>> docs = preview.getDocs();
assertExpectedPreviewDocs(preview.getDocs());
assertExpectedPreviewMappings(preview.getMappings());
}

@SuppressWarnings("unchecked")
private static void assertExpectedPreviewDocs(List<Map<String, Object>> docs) {
assertThat(docs, hasSize(2));
Optional<Map<String, Object>> theresa = docs.stream().filter(doc -> "theresa".equals(doc.get("reviewer"))).findFirst();
assertTrue(theresa.isPresent());
Expand All @@ -382,8 +412,10 @@ private static void assertExpectedPreview(PreviewTransformResponse preview) {
Optional<Map<String, Object>> michel = docs.stream().filter(doc -> "michel".equals(doc.get("reviewer"))).findFirst();
assertTrue(michel.isPresent());
assertEquals(3.6d, (double) michel.get().get("avg_rating"), 0.1d);
}

Map<String, Object> mappings = preview.getMappings();
@SuppressWarnings("unchecked")
private static void assertExpectedPreviewMappings(Map<String, Object> mappings) {
assertThat(mappings, hasKey("properties"));
Map<String, Object> fields = (Map<String, Object>) mappings.get("properties");
assertThat(fields.get("reviewer"), equalTo(Map.of("type", "keyword")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static SettingsConfig randomSettingsConfig() {
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1)
);
}
Expand Down Expand Up @@ -76,6 +77,7 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("deduce_mappings", "not_set"), equalTo("not_set"));

config = fromString("{\"dates_as_epoch_millis\" : null}");
assertFalse(config.getDatesAsEpochMillis());
Expand All @@ -86,6 +88,7 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("deduce_mappings", "not_set"), equalTo("not_set"));

config = fromString("{\"align_checkpoints\" : null}");
assertFalse(config.getAlignCheckpoints());
Expand All @@ -96,6 +99,7 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("align_checkpoints", "not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("deduce_mappings", "not_set"), equalTo("not_set"));

config = fromString("{\"use_point_in_time\" : null}");
assertFalse(config.getUsePit());
Expand All @@ -112,6 +116,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("deduce_mappings", "not_set"), equalTo("not_set"));

SettingsConfig emptyConfig = new SettingsConfig.Builder().build();
assertNull(emptyConfig.getMaxPageSearchSize());
Expand All @@ -130,6 +135,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("deduce_mappings", "not_set"), equalTo("not_set"));

config = new SettingsConfig.Builder().setDatesAsEpochMillis(null).build();
// returns false, however it's `null` as in "use default", checked next
Expand All @@ -140,6 +146,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("deduce_mappings", "not_set"), equalTo("not_set"));

config = new SettingsConfig.Builder().setAlignCheckpoints(null).build();
// returns false, however it's `null` as in "use default", checked next
Expand All @@ -150,6 +157,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("align_checkpoints", "not_set"));
assertThat(settingsAsMap.getOrDefault("deduce_mappings", "not_set"), equalTo("not_set"));
}

private Map<String, Object> xContentToMap(ToXContent xcontent) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static org.elasticsearch.xpack.core.transform.transforms.SettingsConfig r
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1)
);
}
Expand All @@ -38,6 +39,7 @@ public static void assertHlrcEquals(
assertEquals(serverTestInstance.getDatesAsEpochMillis(), clientInstance.getDatesAsEpochMillis());
assertEquals(serverTestInstance.getAlignCheckpoints(), clientInstance.getAlignCheckpoints());
assertEquals(serverTestInstance.getUsePit(), clientInstance.getUsePit());
assertEquals(serverTestInstance.getDeduceMappings(), clientInstance.getDeduceMappings());
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,11 @@ destination index will be performed thus improving overall performance.
The default value is `true`, which means the checkpoint ranges will be optimized if possible.
end::transform-settings-align-checkpoints[]

tag::transform-settings-deduce-mappings[]
Specifies whether the transform should deduce the destination index mappings from the transform config.
The default value is `true`, which means the destination index mappings will be deduced if possible.
end::transform-settings-deduce-mappings[]

tag::transform-settings-max-page-search-size[]
Defines the initial page size to use for the composite aggregation for each
checkpoint. If circuit breaker exceptions occur, the page size is dynamically
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/transform/apis/preview-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,18 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings]
.Properties of `settings`
[%collapsible%open]
====
`dates_as_epoch_millis`:::
(Optional, boolean)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-dates-as-epoch-milli]
`docs_per_second`:::
(Optional, float)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-docs-per-second]
`align_checkpoints`:::
(Optional, boolean)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-align-checkpoints]
`deduce_mappings`:::
(Optional, boolean)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-deduce-mappings]
`max_page_search_size`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/transform/apis/put-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-doc
`align_checkpoints`:::
(Optional, boolean)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-align-checkpoints]
`deduce_mappings`:::
(Optional, boolean)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-deduce-mappings]
`max_page_search_size`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/transform/apis/update-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-doc
`align_checkpoints`:::
(Optional, boolean)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-align-checkpoints]
`deduce_mappings`:::
(Optional, boolean)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-deduce-mappings]
`max_page_search_size`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class TransformField {
public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
public static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
public static final ParseField USE_PIT = new ParseField("use_point_in_time");
public static final ParseField DEDUCE_MAPPINGS = new ParseField("deduce_mappings");
public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME = new ParseField("time");
Expand Down

0 comments on commit 7be74a8

Please sign in to comment.