Skip to content

Commit

Permalink
[Transform] add an unattended mode setting to transform (#89212)
Browse files Browse the repository at this point in the history
add an unattended mode setting. This will change how transform reacts on certain error types and lets transform run without failing.
  • Loading branch information
Hendrik Muhs committed Aug 11, 2022
1 parent e4a19d4 commit 6c12fe0
Show file tree
Hide file tree
Showing 16 changed files with 663 additions and 275 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/89212.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89212
summary: Add an unattended mode setting to transform
area: Transform
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public final class TransformField {
public static final ParseField USE_PIT = new ParseField("use_point_in_time");
public static final ParseField DEDUCE_MAPPINGS = new ParseField("deduce_mappings");
public static final ParseField NUM_FAILURE_RETRIES = new ParseField("num_failure_retries");
public static final ParseField UNATTENDED = new ParseField("unattended");

public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME = new ParseField("time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private static final int DEFAULT_USE_PIT = -1;
private static final int DEFAULT_DEDUCE_MAPPINGS = -1;
private static final int DEFAULT_NUM_FAILURE_RETRIES = -2;
private static final int DEFAULT_UNATTENDED = -1;

private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
Expand All @@ -55,7 +56,8 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
(Integer) args[3],
(Integer) args[4],
(Integer) args[5],
(Integer) args[6]
(Integer) args[6],
(Integer) args[7]
)
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
Expand Down Expand Up @@ -89,6 +91,13 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
ValueType.BOOLEAN_OR_NULL
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_NUM_FAILURE_RETRIES, TransformField.NUM_FAILURE_RETRIES);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
parser.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_UNATTENDED : p.booleanValue() ? 1 : 0,
TransformField.UNATTENDED,
ValueType.BOOLEAN_OR_NULL
);
return parser;
}

Expand All @@ -99,9 +108,10 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
private final Integer usePit;
private final Integer deduceMappings;
private final Integer numFailureRetries;
private final Integer unattended;

public SettingsConfig() {
this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null);
this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null);
}

public SettingsConfig(
Expand All @@ -111,7 +121,8 @@ public SettingsConfig(
Boolean alignCheckpoints,
Boolean usePit,
Boolean deduceMappings,
Integer numFailureRetries
Integer numFailureRetries,
Boolean unattended
) {
this(
maxPageSearchSize,
Expand All @@ -120,7 +131,8 @@ public SettingsConfig(
alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0,
usePit == null ? null : usePit ? 1 : 0,
deduceMappings == null ? null : deduceMappings ? 1 : 0,
numFailureRetries
numFailureRetries,
unattended == null ? null : unattended ? 1 : 0
);
}

Expand All @@ -131,7 +143,8 @@ public SettingsConfig(
Integer alignCheckpoints,
Integer usePit,
Integer deduceMappings,
Integer numFailureRetries
Integer numFailureRetries,
Integer unattended
) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
Expand All @@ -140,6 +153,7 @@ public SettingsConfig(
this.usePit = usePit;
this.deduceMappings = deduceMappings;
this.numFailureRetries = numFailureRetries;
this.unattended = unattended;
}

public SettingsConfig(final StreamInput in) throws IOException {
Expand Down Expand Up @@ -170,6 +184,11 @@ public SettingsConfig(final StreamInput in) throws IOException {
} else {
numFailureRetries = null;
}
if (in.getVersion().onOrAfter(Version.V_8_5_0)) {
unattended = in.readOptionalInt();
} else {
unattended = DEFAULT_UNATTENDED;
}
}

public Integer getMaxPageSearchSize() {
Expand Down Expand Up @@ -220,6 +239,14 @@ public Integer getNumFailureRetriesForUpdate() {
return numFailureRetries;
}

public Boolean getUnattended() {
return unattended != null ? (unattended == DEFAULT_UNATTENDED) ? null : (unattended > 0) : null;
}

public Integer getUnattendedForUpdate() {
return unattended;
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) {
validationException = addValidationError(
Expand All @@ -239,6 +266,17 @@ public ActionRequestValidationException validate(ActionRequestValidationExceptio
validationException
);
}

// disallow setting unattended to true with explicit num failure retries
if (unattended != null && unattended == 1 && numFailureRetries != null && numFailureRetries > 0) {
validationException = addValidationError(
"settings.num_failure_retries ["
+ numFailureRetries
+ "] can not be set in unattended mode, unattended retries indefinitely",
validationException
);
}

return validationException;
}

Expand All @@ -263,6 +301,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_8_4_0)) {
out.writeOptionalInt(numFailureRetries);
}
if (out.getVersion().onOrAfter(Version.V_8_5_0)) {
out.writeOptionalInt(unattended);
}
}

@Override
Expand Down Expand Up @@ -290,6 +331,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (numFailureRetries != null && (numFailureRetries.equals(DEFAULT_NUM_FAILURE_RETRIES) == false)) {
builder.field(TransformField.NUM_FAILURE_RETRIES.getPreferredName(), numFailureRetries);
}
if (unattended != null && (unattended.equals(DEFAULT_UNATTENDED) == false)) {
builder.field(TransformField.UNATTENDED.getPreferredName(), unattended > 0 ? true : false);
}
builder.endObject();
return builder;
}
Expand All @@ -310,7 +354,8 @@ public boolean equals(Object other) {
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit)
&& Objects.equals(deduceMappings, that.deduceMappings)
&& Objects.equals(numFailureRetries, that.numFailureRetries);
&& Objects.equals(numFailureRetries, that.numFailureRetries)
&& Objects.equals(unattended, that.unattended);
}

@Override
Expand All @@ -322,7 +367,8 @@ public int hashCode() {
alignCheckpoints,
usePit,
deduceMappings,
numFailureRetries
numFailureRetries,
unattended
);
}

Expand All @@ -343,6 +389,7 @@ public static class Builder {
private Integer usePit;
private Integer deduceMappings;
private Integer numFailureRetries;
private Integer unattended;

/**
* Default builder
Expand All @@ -362,6 +409,7 @@ public Builder(SettingsConfig base) {
this.usePit = base.usePit;
this.deduceMappings = base.deduceMappings;
this.numFailureRetries = base.numFailureRetries;
this.unattended = base.unattended;
}

/**
Expand Down Expand Up @@ -455,6 +503,21 @@ public Builder setNumFailureRetries(Integer numFailureRetries) {
return this;
}

/**
* Whether to run the transform in unattended mode.
* In unattended mode the transform does not immediately fail for errors that are classified
* as irrecoverable.
*
* An explicit `null` resets to default.
*
* @param unattended true if this is a unattended transform.
* @return the {@link Builder} with usePit set.
*/
public Builder setUnattended(Boolean unattended) {
this.unattended = unattended == null ? DEFAULT_UNATTENDED : unattended ? 1 : 0;
return this;
}

/**
* Update settings according to given settings config.
*
Expand Down Expand Up @@ -495,6 +558,9 @@ public Builder update(SettingsConfig update) {
? null
: update.getNumFailureRetriesForUpdate();
}
if (update.getUnattendedForUpdate() != null) {
this.unattended = update.getUnattendedForUpdate().equals(DEFAULT_UNATTENDED) ? null : update.getUnattendedForUpdate();
}

return this;
}
Expand All @@ -507,7 +573,8 @@ public SettingsConfig build() {
alignCheckpoints,
usePit,
deduceMappings,
numFailureRetries
numFailureRetries,
unattended
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
builder.getSettings().getNumFailureRetries(),
builder.getSettings().getUnattended()
)
);
}
Expand All @@ -644,7 +645,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
builder.getSettings().getNumFailureRetries(),
builder.getSettings().getUnattended()
)
);
}
Expand All @@ -659,7 +661,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) {
false,
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
builder.getSettings().getNumFailureRetries(),
builder.getSettings().getUnattended()
)
);
}
Expand Down

0 comments on commit 6c12fe0

Please sign in to comment.