Skip to content

Commit

Permalink
[Transform] Implement per-transform num_failure_retries setting. (#87361
Browse files Browse the repository at this point in the history
)
  • Loading branch information
przemekwitek committed Jun 9, 2022
1 parent e6f5cd3 commit 8656a29
Show file tree
Hide file tree
Showing 18 changed files with 404 additions and 54 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/87361.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 87361
summary: "Implement per-transform num_failure_retries setting"
area: Transform
type: enhancement
issues: []

7 changes: 7 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,13 @@ adjusted to a lower value. The minimum value is `10` and the maximum is `65,536`
The default value is `500`.
end::transform-settings-max-page-search-size[]

tag::transform-settings-num-failure-retries[]
Defines the number of retries on a recoverable failure before the {transform} task is marked as `failed`.
The minimum value is `0` and the maximum is `100`.
`-1` can be used to denote infinity. In this case, the {transform} never gives up on retrying a recoverable failure.
The default value is the cluster-level setting `num_transform_failure_retries`.
end::transform-settings-num-failure-retries[]

tag::transform-sort[]
Specifies the date field that is used to identify the latest documents.
end::transform-sort[]
Expand Down
10 changes: 6 additions & 4 deletions docs/reference/settings/transform-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ default.

`node.roles: [ transform ]`::
(<<static-cluster-setting,Static>>) Set `node.roles` to contain `transform` to
identify the node as a _transform node_. If you want to run {transforms}, there
identify the node as a _transform node_. If you want to run {transforms}, there
must be at least one {transform} node in your cluster.
+
If you set `node.roles`, you must explicitly specify all the required roles for
the node. To learn more, refer to <<modules-node>>.
+
IMPORTANT: It is strongly recommended that dedicated {transform} nodes also have
the `remote_cluster_client` role; otherwise, {ccs} fails when used in
+
IMPORTANT: It is strongly recommended that dedicated {transform} nodes also have
the `remote_cluster_client` role; otherwise, {ccs} fails when used in
{transforms}. See <<remote-node>>.

`xpack.transform.enabled`::
Expand All @@ -37,3 +37,5 @@ retries when it experiences a non-fatal error. Once the number of retries is
exhausted, the {transform} task is marked as `failed`. The default value is `10`
with a valid minimum of `0` and maximum of `100`. If a {transform} is already
running, it has to be restarted to use the changed setting.
The `num_failure_retries` setting can also be specified on an individual {transform} level.
Specifying this setting for each {transform} individually is recommended.
3 changes: 3 additions & 0 deletions docs/reference/transform/apis/put-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-ded
`max_page_search_size`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
`num_failure_retries`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-num-failure-retries]
====
//End settings

Expand Down
7 changes: 5 additions & 2 deletions docs/reference/transform/apis/update-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ each checkpoint.
of update and runs with those privileges. If you provide
<<http-clients-secondary-authorization,secondary authorization headers>>, those
credentials are used instead.
* You must use {kib} or this API to update a {transform}. Directly updating any
{transform} internal, system, or hidden indices is not supported and may cause
* You must use {kib} or this API to update a {transform}. Directly updating any
{transform} internal, system, or hidden indices is not supported and may cause
permanent failure.
====
Expand Down Expand Up @@ -157,6 +157,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-ded
`max_page_search_size`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
`num_failure_retries`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-num-failure-retries]
====
//End settings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void declareInt(BiConsumer<Value, Integer> consumer, ParseField field) {
}

/**
* Declare a double field that parses explicit {@code null}s in the json to a default value.
* Declare an integer field that parses explicit {@code null}s in the json to a default value.
*/
public void declareIntOrNull(BiConsumer<Value, Integer> consumer, int nullValue, ParseField field) {
declareField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class TransformField {
public static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
public static final ParseField USE_PIT = new ParseField("use_point_in_time");
public static final ParseField DEDUCE_MAPPINGS = new ParseField("deduce_mappings");
public static final ParseField NUM_FAILURE_RETRIES = new ParseField("num_failure_retries");
public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME = new ParseField("time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ public class SettingsConfig implements Writeable, ToXContentObject {
public static final ConstructingObjectParser<SettingsConfig, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<SettingsConfig, Void> LENIENT_PARSER = createParser(true);

public static final int MAX_NUM_FAILURE_RETRIES = 100;

private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1;
private static final int DEFAULT_ALIGN_CHECKPOINTS = -1;
private static final int DEFAULT_USE_PIT = -1;
private static final int DEFAULT_DEDUCE_MAPPINGS = -1;
private static final int DEFAULT_NUM_FAILURE_RETRIES = -2;

private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
Expand All @@ -51,7 +54,8 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
(Integer) args[2],
(Integer) args[3],
(Integer) args[4],
(Integer) args[5]
(Integer) args[5],
(Integer) args[6]
)
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
Expand Down Expand Up @@ -84,6 +88,7 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
TransformField.DEDUCE_MAPPINGS,
ValueType.BOOLEAN_OR_NULL
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_NUM_FAILURE_RETRIES, TransformField.NUM_FAILURE_RETRIES);
return parser;
}

Expand All @@ -93,9 +98,10 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
private final Integer alignCheckpoints;
private final Integer usePit;
private final Integer deduceMappings;
private final Integer numFailureRetries;

public SettingsConfig() {
this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null);
this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null);
}

public SettingsConfig(
Expand All @@ -104,32 +110,36 @@ public SettingsConfig(
Boolean datesAsEpochMillis,
Boolean alignCheckpoints,
Boolean usePit,
Boolean deduceMappings
Boolean deduceMappings,
Integer numFailureRetries
) {
this(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0,
alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0,
usePit == null ? null : usePit ? 1 : 0,
deduceMappings == null ? null : deduceMappings ? 1 : 0
deduceMappings == null ? null : deduceMappings ? 1 : 0,
numFailureRetries
);
}

public SettingsConfig(
SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Integer datesAsEpochMillis,
Integer alignCheckpoints,
Integer usePit,
Integer deduceMappings
Integer deduceMappings,
Integer numFailureRetries
) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.alignCheckpoints = alignCheckpoints;
this.usePit = usePit;
this.deduceMappings = deduceMappings;
this.numFailureRetries = numFailureRetries;
}

public SettingsConfig(final StreamInput in) throws IOException {
Expand All @@ -155,6 +165,11 @@ public SettingsConfig(final StreamInput in) throws IOException {
} else {
deduceMappings = DEFAULT_DEDUCE_MAPPINGS;
}
if (in.getVersion().onOrAfter(Version.V_8_4_0)) {
numFailureRetries = in.readOptionalInt();
} else {
numFailureRetries = null;
}
}

public Integer getMaxPageSearchSize() {
Expand Down Expand Up @@ -197,6 +212,14 @@ public Integer getDeduceMappingsForUpdate() {
return deduceMappings;
}

public Integer getNumFailureRetries() {
return numFailureRetries != null ? (numFailureRetries == DEFAULT_NUM_FAILURE_RETRIES ? null : numFailureRetries) : null;
}

public Integer getNumFailureRetriesForUpdate() {
return numFailureRetries;
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) {
validationException = addValidationError(
Expand All @@ -207,7 +230,15 @@ public ActionRequestValidationException validate(ActionRequestValidationExceptio
validationException
);
}

if (numFailureRetries != null && (numFailureRetries < -1 || numFailureRetries > MAX_NUM_FAILURE_RETRIES)) {
validationException = addValidationError(
"settings.num_failure_retries ["
+ numFailureRetries
+ "] is out of range. The minimum value is -1 (infinity) and the maximum is "
+ MAX_NUM_FAILURE_RETRIES,
validationException
);
}
return validationException;
}

Expand All @@ -229,6 +260,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
out.writeOptionalInt(deduceMappings);
}
if (out.getVersion().onOrAfter(Version.V_8_4_0)) {
out.writeOptionalInt(numFailureRetries);
}
}

@Override
Expand All @@ -253,6 +287,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (deduceMappings != null && (deduceMappings.equals(DEFAULT_DEDUCE_MAPPINGS) == false)) {
builder.field(TransformField.DEDUCE_MAPPINGS.getPreferredName(), deduceMappings > 0 ? true : false);
}
if (numFailureRetries != null && (numFailureRetries.equals(DEFAULT_NUM_FAILURE_RETRIES) == false)) {
builder.field(TransformField.NUM_FAILURE_RETRIES.getPreferredName(), numFailureRetries);
}
builder.endObject();
return builder;
}
Expand All @@ -272,12 +309,21 @@ public boolean equals(Object other) {
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit)
&& Objects.equals(deduceMappings, that.deduceMappings);
&& Objects.equals(deduceMappings, that.deduceMappings)
&& Objects.equals(numFailureRetries, that.numFailureRetries);
}

@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings);
return Objects.hash(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis,
alignCheckpoints,
usePit,
deduceMappings,
numFailureRetries
);
}

@Override
Expand All @@ -296,6 +342,7 @@ public static class Builder {
private Integer alignCheckpoints;
private Integer usePit;
private Integer deduceMappings;
private Integer numFailureRetries;

/**
* Default builder
Expand All @@ -314,6 +361,7 @@ public Builder(SettingsConfig base) {
this.alignCheckpoints = base.alignCheckpoints;
this.usePit = base.usePit;
this.deduceMappings = base.deduceMappings;
this.numFailureRetries = base.numFailureRetries;
}

/**
Expand Down Expand Up @@ -402,6 +450,11 @@ public Builder setDeduceMappings(Boolean deduceMappings) {
return this;
}

public Builder setNumFailureRetries(Integer numFailureRetries) {
this.numFailureRetries = numFailureRetries == null ? DEFAULT_NUM_FAILURE_RETRIES : numFailureRetries;
return this;
}

/**
* Update settings according to given settings config.
*
Expand Down Expand Up @@ -437,12 +490,25 @@ public Builder update(SettingsConfig update) {
? null
: update.getDeduceMappingsForUpdate();
}
if (update.getNumFailureRetriesForUpdate() != null) {
this.numFailureRetries = update.getNumFailureRetriesForUpdate().equals(DEFAULT_NUM_FAILURE_RETRIES)
? null
: update.getNumFailureRetriesForUpdate();
}

return this;
}

public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings);
return new SettingsConfig(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis,
alignCheckpoints,
usePit,
deduceMappings,
numFailureRetries
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
builder.getSettings().getDatesAsEpochMillis(),
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings()
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
)
);
}
Expand All @@ -637,7 +638,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
true,
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings()
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
)
);
}
Expand All @@ -651,7 +653,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
builder.getSettings().getDatesAsEpochMillis(),
false,
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings()
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
)
);
}
Expand Down

0 comments on commit 8656a29

Please sign in to comment.