Skip to content

Commit

Permalink
[Transform] Make GetCheckpointAction and GetCheckpointNodeAction time…
Browse files Browse the repository at this point in the history
… out (#101055)
  • Loading branch information
przemekwitek committed Oct 25, 2023
1 parent 77dac65 commit db80251
Show file tree
Hide file tree
Showing 21 changed files with 468 additions and 80 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/101055.yaml
@@ -0,0 +1,5 @@
pr: 101055
summary: Make tasks that calculate checkpoints time out
area: Transform
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Expand Up @@ -145,6 +145,8 @@ static TransportVersion def(int id) {
public static final TransportVersion PLUGIN_DESCRIPTOR_STRING_VERSION = def(8_520_00_0);
public static final TransportVersion TOO_MANY_SCROLL_CONTEXTS_EXCEPTION_ADDED = def(8_521_00_0);
public static final TransportVersion UNCONTENDED_REGISTER_ANALYSIS_ADDED = def(8_522_00_0);
public static final TransportVersion TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED = def(8_523_00_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -16,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;

Expand Down Expand Up @@ -46,16 +48,23 @@ public static class Request extends ActionRequest implements IndicesRequest.Repl

private String[] indices;
private final IndicesOptions indicesOptions;
private final TimeValue timeout;

public Request(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) {
timeout = in.readOptionalTimeValue();
} else {
timeout = null;
}
}

public Request(String[] indices, IndicesOptions indicesOptions) {
public Request(String[] indices, IndicesOptions indicesOptions, TimeValue timeout) {
this.indices = indices != null ? indices : Strings.EMPTY_ARRAY;
this.indicesOptions = indicesOptions;
this.timeout = timeout;
}

@Override
Expand All @@ -73,6 +82,10 @@ public IndicesOptions indicesOptions() {
return indicesOptions;
}

public TimeValue getTimeout() {
return timeout;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand All @@ -83,19 +96,24 @@ public boolean equals(Object obj) {
}
Request that = (Request) obj;

return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions);
return Arrays.equals(indices, that.indices)
&& Objects.equals(indicesOptions, that.indicesOptions)
&& Objects.equals(timeout, that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
return Objects.hash(Arrays.hashCode(indices), indicesOptions, timeout);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) {
out.writeOptionalTimeValue(timeout);
}
}

@Override
Expand Down
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -16,6 +17,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -90,16 +92,23 @@ public static class Request extends ActionRequest implements IndicesRequest {

private final Set<ShardId> shards;
private final OriginalIndices originalIndices;
private final TimeValue timeout;

public Request(Set<ShardId> shards, OriginalIndices originalIndices) {
public Request(Set<ShardId> shards, OriginalIndices originalIndices, TimeValue timeout) {
this.shards = shards;
this.originalIndices = originalIndices;
this.timeout = timeout;
}

public Request(StreamInput in) throws IOException {
super(in);
this.shards = in.readCollectionAsImmutableSet(ShardId::new);
this.originalIndices = OriginalIndices.readOriginalIndices(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) {
this.timeout = in.readOptionalTimeValue();
} else {
this.timeout = null;
}
}

@Override
Expand All @@ -112,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeCollection(shards);
OriginalIndices.writeOriginalIndices(originalIndices, out);
if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED)) {
out.writeOptionalTimeValue(timeout);
}
}

public Set<ShardId> getShards() {
Expand All @@ -122,6 +134,10 @@ public OriginalIndices getOriginalIndices() {
return originalIndices;
}

public TimeValue getTimeout() {
return timeout;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand All @@ -132,12 +148,14 @@ public boolean equals(Object obj) {
}
Request that = (Request) obj;

return Objects.equals(shards, that.shards) && Objects.equals(originalIndices, that.originalIndices);
return Objects.equals(shards, that.shards)
&& Objects.equals(originalIndices, that.originalIndices)
&& Objects.equals(timeout, that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(shards, originalIndices);
return Objects.hash(shards, originalIndices, timeout);
}

@Override
Expand Down
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
Expand Down Expand Up @@ -41,8 +42,9 @@ protected Reader<Request> instanceReader() {
protected Request mutateInstance(Request instance) {
List<String> indices = instance.indices() != null ? new ArrayList<>(Arrays.asList(instance.indices())) : new ArrayList<>();
IndicesOptions indicesOptions = instance.indicesOptions();
TimeValue timeout = instance.getTimeout();

switch (between(0, 1)) {
switch (between(0, 2)) {
case 0:
indices.add(randomAlphaOfLengthBetween(1, 20));
break;
Expand All @@ -55,11 +57,14 @@ protected Request mutateInstance(Request instance) {
SearchRequest.DEFAULT_INDICES_OPTIONS
);
break;
case 2:
timeout = timeout != null ? null : TimeValue.timeValueSeconds(randomIntBetween(1, 300));
break;
default:
throw new AssertionError("Illegal randomization branch");
}

return new Request(indices.toArray(new String[0]), indicesOptions);
return new Request(indices.toArray(new String[0]), indicesOptions, timeout);
}

public void testCreateTask() {
Expand All @@ -69,7 +74,7 @@ public void testCreateTask() {
}

public void testCreateTaskWithNullIndices() {
Request request = new Request(null, null);
Request request = new Request(null, null, null);
CancellableTask task = request.createTask(123, "type", "action", new TaskId("dummy-node:456"), Map.of());
assertThat(task.getDescription(), is(equalTo("get_checkpoint[0]")));
}
Expand All @@ -83,7 +88,8 @@ private static Request randomRequest(Integer numIndices) {
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
SearchRequest.DEFAULT_INDICES_OPTIONS
)
),
randomBoolean() ? TimeValue.timeValueSeconds(randomIntBetween(1, 300)) : null
);
}
}
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -32,13 +33,17 @@ protected Reader<Request> instanceReader() {

@Override
protected Request createTestInstance() {
return new Request(randomShards(randomInt(10)), randomOriginalIndices(randomIntBetween(0, 20)));
return new Request(
randomShards(randomInt(10)),
randomOriginalIndices(randomIntBetween(0, 20)),
randomBoolean() ? randomTimeout() : null
);
}

@Override
protected Request mutateInstance(Request instance) {

switch (random().nextInt(1)) {
switch (random().nextInt(2)) {
case 0 -> {
Set<ShardId> shards = new HashSet<>(instance.getShards());
if (randomBoolean() && shards.size() > 0) {
Expand All @@ -50,36 +55,43 @@ protected Request mutateInstance(Request instance) {
} else {
shards.add(new ShardId(randomAlphaOfLength(8), randomAlphaOfLength(4), randomInt(5)));
}
return new Request(shards, instance.getOriginalIndices());
return new Request(shards, instance.getOriginalIndices(), instance.getTimeout());
}
case 1 -> {
OriginalIndices originalIndices = randomOriginalIndices(instance.indices().length + 1);
return new Request(instance.getShards(), originalIndices);
return new Request(instance.getShards(), originalIndices, instance.getTimeout());
}
case 2 -> {
return new Request(
instance.getShards(),
instance.getOriginalIndices(),
instance.getTimeout() != null ? null : randomTimeout()
);
}
default -> throw new IllegalStateException("The test should only allow 1 parameters mutated");
}
}

public void testCreateTask() {
Request request = new Request(randomShards(7), randomOriginalIndices(19));
Request request = new Request(randomShards(7), randomOriginalIndices(19), null);
CancellableTask task = request.createTask(123, "type", "action", new TaskId("dummy-node:456"), Map.of());
assertThat(task.getDescription(), is(equalTo("get_checkpoint_node[19;7]")));
}

public void testCreateTaskWithNullShardsAndIndices() {
Request request = new Request(null, OriginalIndices.NONE);
Request request = new Request(null, OriginalIndices.NONE, null);
CancellableTask task = request.createTask(123, "type", "action", new TaskId("dummy-node:456"), Map.of());
assertThat(task.getDescription(), is(equalTo("get_checkpoint_node[0;0]")));
}

public void testCreateTaskWithNullShards() {
Request request = new Request(null, randomOriginalIndices(13));
Request request = new Request(null, randomOriginalIndices(13), null);
CancellableTask task = request.createTask(123, "type", "action", new TaskId("dummy-node:456"), Map.of());
assertThat(task.getDescription(), is(equalTo("get_checkpoint_node[13;0]")));
}

public void testCreateTaskWithNullIndices() {
Request request = new Request(randomShards(11), OriginalIndices.NONE);
Request request = new Request(randomShards(11), OriginalIndices.NONE, null);
CancellableTask task = request.createTask(123, "type", "action", new TaskId("dummy-node:456"), Map.of());
assertThat(task.getDescription(), is(equalTo("get_checkpoint_node[0;11]")));
}
Expand All @@ -100,4 +112,8 @@ private static OriginalIndices randomOriginalIndices(int numIndices) {
IndicesOptions indicesOptions = randomBoolean() ? IndicesOptions.strictExpand() : IndicesOptions.lenientExpandOpen();
return new OriginalIndices(randomIndices, indicesOptions);
}

private static TimeValue randomTimeout() {
return TimeValue.timeValueSeconds(randomIntBetween(1, 300));
}
}

0 comments on commit db80251

Please sign in to comment.