Skip to content

Commit

Permalink
Fix trappy timeouts in ILM (elastic#109112)
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed May 28, 2024
1 parent d880c76 commit f73fedc
Show file tree
Hide file tree
Showing 33 changed files with 183 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testZeroToOne() throws Exception {
singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(fsRepoName, randomBoolean()))
);
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy("policy", Map.of("hot", hotPhase, "frozen", frozenPhase));
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(lifecyclePolicy);
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, lifecyclePolicy);
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).get());

Settings settings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@

import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;

public class StartILMRequest extends AcknowledgedRequest<StartILMRequest> {

public StartILMRequest(StreamInput in) throws IOException {
super(in);

}

public StartILMRequest() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public StartILMRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout) {
super(masterNodeTimeout, ackTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@

import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;

public class StopILMRequest extends AcknowledgedRequest<StopILMRequest> {

public StopILMRequest(StreamInput in) throws IOException {
super(in);

}

public StopILMRequest() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public StopILMRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout) {
super(masterNodeTimeout, ackTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -29,12 +29,10 @@ protected DeleteLifecycleAction() {

public static class Request extends AcknowledgedRequest<Request> {

public static final ParseField POLICY_FIELD = new ParseField("policy");
private final String policyName;

private String policyName;

public Request(String policyName) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String policyName) {
super(masterNodeTimeout, ackTimeout);
this.policyName = policyName;
}

Expand All @@ -43,10 +41,6 @@ public Request(StreamInput in) throws IOException {
policyName = in.readString();
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
}

public String getPolicyName() {
return policyName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -103,8 +104,8 @@ public Iterator<ToXContent> toXContentChunked(ToXContent.Params outerParams) {
public static class Request extends AcknowledgedRequest<Request> {
private final String[] policyNames;

public Request(String... policyNames) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... policyNames) {
super(masterNodeTimeout, ackTimeout);
if (policyNames == null) {
throw new IllegalArgumentException("ids cannot be null");
}
Expand All @@ -116,11 +117,6 @@ public Request(StreamInput in) throws IOException {
policyNames = in.readStringArray();
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
policyNames = Strings.EMPTY_ARRAY;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "get-lifecycle-task", parentTaskId, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -25,20 +26,31 @@

public class PutLifecycleRequest extends AcknowledgedRequest<PutLifecycleRequest> implements ToXContentObject {

public interface Factory {
PutLifecycleRequest create(LifecyclePolicy lifecyclePolicy);

String getPolicyName();
}

public static final ParseField POLICY_FIELD = new ParseField("policy");
private static final ConstructingObjectParser<PutLifecycleRequest, String> PARSER = new ConstructingObjectParser<>(
private static final ConstructingObjectParser<PutLifecycleRequest, Factory> PARSER = new ConstructingObjectParser<>(
"put_lifecycle_request",
a -> new PutLifecycleRequest((LifecyclePolicy) a[0])
false,
(a, factory) -> factory.create((LifecyclePolicy) a[0])
);

static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), LifecyclePolicy::parse, POLICY_FIELD);
PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(parser, factory) -> LifecyclePolicy.parse(parser, factory.getPolicyName()),
POLICY_FIELD
);
}

private LifecyclePolicy policy;
private final LifecyclePolicy policy;

public PutLifecycleRequest(LifecyclePolicy policy) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public PutLifecycleRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, LifecyclePolicy policy) {
super(masterNodeTimeout, ackTimeout);
this.policy = policy;
}

Expand All @@ -47,10 +59,6 @@ public PutLifecycleRequest(StreamInput in) throws IOException {
policy = new LifecyclePolicy(in);
}

public PutLifecycleRequest() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
}

public LifecyclePolicy getPolicy() {
return policy;
}
Expand All @@ -71,8 +79,8 @@ public ActionRequestValidationException validate() {
return err;
}

public static PutLifecycleRequest parseRequest(String name, XContentParser parser) {
return PARSER.apply(parser, name);
public static PutLifecycleRequest parseRequest(Factory factory, XContentParser parser) {
return PARSER.apply(parser, factory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -113,12 +114,8 @@ public Request(StreamInput in) throws IOException {
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
}

public Request(String... indices) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String... indices) {
super(masterNodeTimeout, ackTimeout);
if (indices == null) {
throw new IllegalArgumentException("indices cannot be null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(IndexTemplateRegistry.class);

private static final TimeValue REGISTRY_ACTION_TIMEOUT = TimeValue.THIRTY_SECONDS; // TODO should this be longer?

protected final Settings settings;
protected final Client client;
protected final ThreadPool threadPool;
Expand Down Expand Up @@ -614,7 +616,7 @@ protected boolean isUpgradeRequired(LifecyclePolicy currentPolicy, LifecyclePoli
private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
PutLifecycleRequest request = new PutLifecycleRequest(policy);
PutLifecycleRequest request = new PutLifecycleRequest(REGISTRY_ACTION_TIMEOUT, REGISTRY_ACTION_TIMEOUT, policy);
request.masterNodeTimeout(TimeValue.MAX_VALUE);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class StartILMRequestTests extends AbstractWireSerializingTestCase<StartI

@Override
protected StartILMRequest createTestInstance() {
return new StartILMRequest();
return new StartILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class StopILMRequestTests extends AbstractWireSerializingTestCase<StopILM

@Override
protected StopILMRequest createTestInstance() {
return new StopILMRequest();
return new StopILMRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class DeleteLifecycleRequestTests extends AbstractWireSerializingTestCase

@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));
return new Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, randomAlphaOfLengthBetween(1, 20));
}

@Override
Expand All @@ -24,7 +24,7 @@ protected Writeable.Reader<Request> instanceReader() {

@Override
protected Request mutateInstance(Request request) {
return new Request(request.getPolicyName() + randomAlphaOfLengthBetween(1, 10));
return new Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, request.getPolicyName() + randomAlphaOfLengthBetween(1, 10));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class GetLifecycleRequestTests extends AbstractWireSerializingTestCase<Re

@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));
return new Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, randomAlphaOfLengthBetween(1, 20));
}

@Override
Expand All @@ -29,6 +29,6 @@ protected Request mutateInstance(Request request) {
String[] originalPolicies = request.getPolicyNames();
String[] newPolicies = Arrays.copyOf(originalPolicies, originalPolicies.length + 1);
newPolicies[originalPolicies.length] = randomAlphaOfLength(5);
return new Request(newPolicies);
return new Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, newPolicies);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ public void setup() {

@Override
protected PutLifecycleRequest createTestInstance() {
return new PutLifecycleRequest(LifecyclePolicyTests.randomTimeseriesLifecyclePolicy(lifecycleName));
return new PutLifecycleRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
LifecyclePolicyTests.randomTimeseriesLifecyclePolicy(lifecycleName)
);
}

@Override
Expand All @@ -58,7 +62,17 @@ protected Writeable.Reader<PutLifecycleRequest> instanceReader() {

@Override
protected PutLifecycleRequest doParseInstance(XContentParser parser) {
return PutLifecycleRequest.parseRequest(lifecycleName, parser);
return PutLifecycleRequest.parseRequest(new PutLifecycleRequest.Factory() {
@Override
public PutLifecycleRequest create(LifecyclePolicy lifecyclePolicy) {
return new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, lifecyclePolicy);
}

@Override
public String getPolicyName() {
return lifecycleName;
}
}, parser);
}

@Override
Expand Down Expand Up @@ -130,7 +144,7 @@ protected PutLifecycleRequest mutateInstance(PutLifecycleRequest request) {
request.getPolicy(),
() -> LifecyclePolicyTests.randomTimeseriesLifecyclePolicy(name)
);
return new PutLifecycleRequest(policy);
return new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class RemoveIndexLifecyclePolicyRequestTests extends AbstractWireSerializ

@Override
protected Request createTestInstance() {
Request request = new Request(generateRandomStringArray(20, 20, false));
Request request = new Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, generateRandomStringArray(20, 20, false));
if (randomBoolean()) {
IndicesOptions indicesOptions = IndicesOptions.fromOptions(
randomBoolean(),
Expand Down Expand Up @@ -67,13 +67,16 @@ protected Request mutateInstance(Request instance) {
);
default -> throw new AssertionError("Illegal randomisation branch");
}
Request newRequest = new Request(indices);
Request newRequest = new Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, indices);
newRequest.indicesOptions(indicesOptions);
return newRequest;
}

public void testNullIndices() {
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new Request((String[]) null));
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> new Request(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, (String[]) null)
);
assertEquals("indices cannot be null", exception.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void setup(final String sourceIndex, int numOfShards, int numOfReplicas,
)
);
LifecyclePolicy policy = new LifecyclePolicy(POLICY_NAME, phases);
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(policy);
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, policy);
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).actionGet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testWaitInShrunkShardsAllocatedExceedsThreshold() throws Exception {
Map.of(MigrateAction.NAME, MigrateAction.DISABLED, ShrinkAction.NAME, new ShrinkAction(1, null, false))
);
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, Map.of("warm", warmPhase));
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(lifecyclePolicy);
PutLifecycleRequest putLifecycleRequest = new PutLifecycleRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, lifecyclePolicy);
assertAcked(client().execute(ILMActions.PUT, putLifecycleRequest).get());

// we're configuring a very high number of replicas. this will make ths shrunk index unable to allocate successfully, so ILM will
Expand Down
Loading

0 comments on commit f73fedc

Please sign in to comment.