Skip to content

Commit

Permalink
[Transform] respect timeout parameters in all API's (#79468)
Browse files Browse the repository at this point in the history
make timeout parameter explicit in all transform actions and pass timeout values in sub-calls.

fixes #79268
  • Loading branch information
Hendrik Muhs committed Oct 25, 2021
1 parent 33f5c9a commit 4202262
Show file tree
Hide file tree
Showing 30 changed files with 455 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Requ

public static final TimeValue DEFAULT_ACK_TIMEOUT = timeValueSeconds(30);

protected TimeValue timeout = DEFAULT_ACK_TIMEOUT;
protected TimeValue timeout;

protected AcknowledgedRequest() {
this(DEFAULT_ACK_TIMEOUT);
}

protected AcknowledgedRequest(TimeValue timeout) {
this.timeout = timeout;
}

protected AcknowledgedRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;

Expand All @@ -28,11 +29,12 @@ private DeleteTransformAction() {
super(NAME, AcknowledgedResponse::readFrom);
}

public static class Request extends MasterNodeRequest<Request> {
public static class Request extends AcknowledgedRequest<Request> {
private final String id;
private final boolean force;

public Request(String id, boolean force) {
public Request(String id, boolean force, TimeValue timeout) {
super(timeout);
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.force = force;
}
Expand Down Expand Up @@ -71,7 +73,8 @@ public ActionRequestValidationException validate() {

@Override
public int hashCode() {
return Objects.hash(id, force);
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(timeout(), id, force);
}

@Override
Expand All @@ -84,7 +87,8 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id) && force == other.force;
// the base class does not implement equals, therefore we need to check timeout ourselves
return Objects.equals(id, other.id) && force == other.force && timeout().equals(other.timeout());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand Down Expand Up @@ -54,7 +55,8 @@ public static class Request extends AcknowledgedRequest<Request> implements ToXC

private final TransformConfig config;

public Request(TransformConfig config) {
public Request(TransformConfig config, TimeValue timeout) {
super(timeout);
this.config = config;
}

Expand All @@ -63,7 +65,7 @@ public Request(StreamInput in) throws IOException {
this.config = new TransformConfig(in);
}

public static Request fromXContent(final XContentParser parser) throws IOException {
public static Request fromXContent(final XContentParser parser, TimeValue timeout) throws IOException {
Map<String, Object> content = parser.map();
// dest.index is not required for _preview, so we just supply our own
Map<String, String> tempDestination = new HashMap<>();
Expand All @@ -86,7 +88,7 @@ public static Request fromXContent(final XContentParser parser) throws IOExcepti
BytesReference.bytes(xContentBuilder).streamInput()
)
) {
return new Request(TransformConfig.fromXContent(newParser, null, false));
return new Request(TransformConfig.fromXContent(newParser, null, false), timeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static class Request extends AcknowledgedRequest<Request> {
private final TransformConfig config;
private final boolean deferValidation;

public Request(TransformConfig config, boolean deferValidation) {
public Request(TransformConfig config, boolean deferValidation, TimeValue timeout) {
super(timeout);
this.config = config;
this.deferValidation = deferValidation;
}
Expand All @@ -59,8 +60,13 @@ public Request(StreamInput in) throws IOException {
}
}

public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) {
return new Request(TransformConfig.fromXContent(parser, id, false), deferValidation);
public static Request fromXContent(
final XContentParser parser,
final String id,
final boolean deferValidation,
final TimeValue timeout
) {
return new Request(TransformConfig.fromXContent(parser, id, false), deferValidation, timeout);
}

/**
Expand Down Expand Up @@ -123,7 +129,8 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public int hashCode() {
return Objects.hash(config, deferValidation);
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(timeout(), config, deferValidation);
}

@Override
Expand All @@ -135,7 +142,11 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(config, other.config) && this.deferValidation == other.deferValidation;

// the base class does not implement equals, therefore we need to check timeout ourselves
return Objects.equals(config, other.config)
&& this.deferValidation == other.deferValidation
&& timeout().equals(other.timeout());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.transform.TransformField;
Expand All @@ -36,7 +37,8 @@ public static class Request extends AcknowledgedRequest<Request> {

private final String id;

public Request(String id) {
public Request(String id, TimeValue timeout) {
super(timeout);
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
}

Expand Down Expand Up @@ -68,7 +70,8 @@ public ActionRequestValidationException validate() {

@Override
public int hashCode() {
return Objects.hash(id);
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(timeout(), id);
}

@Override
Expand All @@ -80,7 +83,8 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
// the base class does not implement equals, therefore we need to check timeout ourselves
return Objects.equals(id, other.id) && timeout().equals(other.timeout());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public static class Request extends BaseTasksRequest<Request> {
private final boolean deferValidation;
private TransformConfig config;

public Request(TransformConfigUpdate update, String id, boolean deferValidation) {
public Request(TransformConfigUpdate update, String id, boolean deferValidation, TimeValue timeout) {
this.update = update;
this.id = id;
this.deferValidation = deferValidation;
this.setTimeout(timeout);
}

// use fromStreamWithBWC, this can be changed back to public after BWC is not required anymore
Expand All @@ -71,11 +72,16 @@ public static Request fromStreamWithBWC(StreamInput in) throws IOException {
return new Request(in);
}
UpdateTransformActionPre78.Request r = new UpdateTransformActionPre78.Request(in);
return new Request(r.getUpdate(), r.getId(), r.isDeferValidation());
return new Request(r.getUpdate(), r.getId(), r.isDeferValidation(), r.timeout());
}

public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) {
return new Request(TransformConfigUpdate.fromXContent(parser), id, deferValidation);
public static Request fromXContent(
final XContentParser parser,
final String id,
final boolean deferValidation,
final TimeValue timeout
) {
return new Request(TransformConfigUpdate.fromXContent(parser), id, deferValidation, timeout);
}

/**
Expand Down Expand Up @@ -151,7 +157,8 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public int hashCode() {
return Objects.hash(update, id, deferValidation, config);
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(getTimeout(), update, id, deferValidation, config);
}

@Override
Expand All @@ -163,10 +170,13 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;

// the base class does not implement equals, therefore we need to check timeout ourselves
return Objects.equals(update, other.update)
&& this.deferValidation == other.deferValidation
&& this.id.equals(other.id)
&& Objects.equals(config, other.config);
&& Objects.equals(config, other.config)
&& getTimeout().equals(other.getTimeout());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

Expand All @@ -30,7 +31,7 @@ private UpgradeTransformsAction() {
super(NAME, UpgradeTransformsAction.Response::new);
}

public static class Request extends MasterNodeRequest<Request> {
public static class Request extends AcknowledgedRequest<Request> {

private final boolean dryRun;

Expand All @@ -39,8 +40,8 @@ public Request(StreamInput in) throws IOException {
this.dryRun = in.readBoolean();
}

public Request(boolean dryRun) {
super();
public Request(boolean dryRun, TimeValue timeout) {
super(timeout);
this.dryRun = dryRun;
}

Expand All @@ -61,7 +62,8 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public int hashCode() {
return Objects.hash(dryRun);
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(timeout(), dryRun);
}

@Override
Expand All @@ -73,7 +75,9 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return this.dryRun == other.dryRun;

// the base class does not implement equals, therefore we need to check timeout ourselves
return this.dryRun == other.dryRun && timeout().equals(other.timeout());
}
}

Expand Down Expand Up @@ -144,9 +148,7 @@ public boolean equals(Object obj) {
return false;
}
Response other = (Response) obj;
return this.updated == other.updated
&& this.noAction == other.noAction
&& this.needsUpdate == other.needsUpdate;
return this.updated == other.updated && this.noAction == other.noAction && this.needsUpdate == other.needsUpdate;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;

Expand All @@ -34,7 +35,8 @@ public static class Request extends AcknowledgedRequest<Request> {
private final TransformConfig config;
private final boolean deferValidation;

public Request(TransformConfig config, boolean deferValidation) {
public Request(TransformConfig config, boolean deferValidation, TimeValue timeout) {
super(timeout);
this.config = config;
this.deferValidation = deferValidation;
}
Expand Down Expand Up @@ -82,13 +84,15 @@ public boolean equals(Object obj) {
return false;
}
Request that = (Request) obj;
return Objects.equals(config, that.config)
&& deferValidation == that.deferValidation;

// the base class does not implement equals, therefore we need to check timeout ourselves
return Objects.equals(config, that.config) && deferValidation == that.deferValidation && timeout().equals(that.timeout());
}

@Override
public int hashCode() {
return Objects.hash(config, deferValidation);
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(timeout(), config, deferValidation);
}
}

Expand All @@ -104,6 +108,7 @@ public Response(StreamInput in) throws IOException {
this.destIndexMappings = in.readMap(StreamInput::readString, StreamInput::readString);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(destIndexMappings, StreamOutput::writeString, StreamOutput::writeString);
}
Expand Down

0 comments on commit 4202262

Please sign in to comment.