Skip to content

Commit

Permalink
ILM action to wait for SLM policy execution
Browse files Browse the repository at this point in the history
This change add new ILM action to wait for SLM policy execution to ensure that index has snapshot before deletion.

Closes elastic#45067.
  • Loading branch information
probakowski committed Dec 20, 2019
1 parent 7f4cc0e commit b282be6
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public SnapshotAction(StreamInput in) throws IOException {
@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey waitForSnapshotKey = new StepKey(phase, NAME, WaitForSnapshotStep.NAME);
return Collections.singletonList(new WaitForSnapshotStep(waitForSnapshotKey, nextStepKey, client, policy));
return Collections.singletonList(new WaitForSnapshotStep(waitForSnapshotKey, nextStepKey, policy));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
FreezeAction.NAME);
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(DeleteAction.NAME);
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(SnapshotAction.NAME, DeleteAction.NAME);
static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,95 +5,69 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction;
import org.elasticsearch.xpack.core.slm.action.GetSnapshotLifecycleAction;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Objects;

public class WaitForSnapshotStep extends AsyncWaitStep {
public class WaitForSnapshotStep extends ClusterStateWaitStep {

static final String NAME = "wait-for-snapshot";

private static final String MESSAGE_FIELD = "message";
private static final String POLICY_NOT_EXECUTED_MESSAGE = "waiting for policy '%s' to be executed";
private static final String POLICY_NOT_FOUND_MESSAGE = "policy '%s' not found, waiting for it to be created and executed";

private final String policy;

WaitForSnapshotStep(StepKey key, StepKey nextStepKey, Client client, String policy) {
super(key, nextStepKey, client);
WaitForSnapshotStep(StepKey key, StepKey nextStepKey, String policy) {
super(key, nextStepKey);
this.policy = policy;
}

@Override
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
Client client = getClient();
ResponseHandler handler = new ResponseHandler(listener);
public Result isConditionMet(Index index, ClusterState clusterState) {
long phaseTime = LifecycleExecutionState.fromIndexMetadata(clusterState.metaData().index(index)).getPhaseTime();

ExplainLifecycleRequest explainLifecycleReq = new ExplainLifecycleRequest();
String index = indexMetaData.getIndex().getName();
explainLifecycleReq.indices(index);
client.execute(ExplainLifecycleAction.INSTANCE, explainLifecycleReq,
ActionListener.wrap(r -> handler.handleResponse(-r.getIndexResponses().get(index).getPhaseTime()), listener::onFailure));

GetSnapshotLifecycleAction.Request getSnapshotReq = new GetSnapshotLifecycleAction.Request(policy);
client.execute(GetSnapshotLifecycleAction.INSTANCE, getSnapshotReq,
ActionListener.wrap(r -> handler.handleResponse(r.getPolicies().get(0).getLastSuccess().getTimestamp()),
e -> handleSnapshotRequestFailure(listener, e)));
}

private void handleSnapshotRequestFailure(Listener listener, Exception e) {
if (e instanceof ResourceNotFoundException) {
listener.onResponse(false, new Info(POLICY_NOT_FOUND_MESSAGE));
} else {
listener.onFailure(e);
SnapshotLifecycleMetadata snapMeta = clusterState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta == null || snapMeta.getSnapshotConfigurations().containsKey(policy) == false) {
return new Result(false, info(POLICY_NOT_FOUND_MESSAGE));
}
}

private final class ResponseHandler {
private final AtomicLong timeDifference = new AtomicLong();
private final AtomicBoolean requestsFinished = new AtomicBoolean();
private final Listener listener;


public ResponseHandler(Listener listener) {
this.listener = listener;
SnapshotLifecyclePolicyMetadata snapPolicyMeta = snapMeta.getSnapshotConfigurations().get(policy);
if (snapPolicyMeta.getLastSuccess() == null || snapPolicyMeta.getLastSuccess().getTimestamp() < phaseTime) {
return new Result(false, info(POLICY_NOT_EXECUTED_MESSAGE));
}

private void handleResponse(long time) {
timeDifference.addAndGet(time);
if (requestsFinished.getAndSet(true)) {
if (timeDifference.get() > 0) {
listener.onResponse(true, null);
} else {
listener.onResponse(false, new Info(POLICY_NOT_EXECUTED_MESSAGE));
}
}
}
return new Result(true, null);
}

private final class Info implements ToXContentObject {

private static final String MESSAGE_FIELD = "message";
private final String message;

private Info(String message) {
this.message = message;
}
public String getPolicy() {
return policy;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
private ToXContentObject info(String message) {
return (builder, params) -> {
builder.startObject();
builder.field(MESSAGE_FIELD, String.format(message, policy));
builder.endObject();
return builder;
}
};
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
WaitForSnapshotStep that = (WaitForSnapshotStep) o;
return policy.equals(that.policy);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), policy);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.common.io.stream.Writeable;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class WaitForSnapshotStepTests extends AbstractStepTestCase<WaitForSnapshotStep> {

@Override
protected WaitForSnapshotStep createRandomInstance() {
return new WaitForSnapshotStep(randomStepKey(), randomStepKey(), randomAlphaOfLengthBetween(1, 10));
}

@Override
protected WaitForSnapshotStep mutateInstance(WaitForSnapshotStep instance) {
Step.StepKey key = instance.getKey();
Step.StepKey nextKey = instance.getNextStepKey();
String policy = instance.getPolicy();

switch (between(0, 2)) {
case 0:
key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
case 1:
nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
case 2:
policy = randomAlphaOfLengthBetween(1, 10);
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

return new WaitForSnapshotStep(key, nextKey, policy);
}

@Override
protected WaitForSnapshotStep copyInstance(WaitForSnapshotStep instance) {
return new WaitForSnapshotStep(instance.getKey(), instance.getNextStepKey(), instance.getPolicy());
}

public void testNoSlmPolicies() throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Map.of("phase_time", Long.toString(randomLong())))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
MetaData.Builder meta = MetaData.builder().indices(indices.build());
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
WaitForSnapshotStep instance = createRandomInstance();
ClusterStateWaitStep.Result result = instance.isConditionMet(indexMetaData.getIndex(), clusterState);
assertFalse(result.isComplete());
assertTrue(getMessage(result).contains("not found"));
}

public void testSlmPolicyNotExecuted() throws IOException {
WaitForSnapshotStep instance = createRandomInstance();
SnapshotLifecyclePolicyMetadata slmPolicy = SnapshotLifecyclePolicyMetadata.builder()
.setModifiedDate(randomLong())
.setPolicy(new SnapshotLifecyclePolicy("", "", "", "", null, null))
.build();
SnapshotLifecycleMetadata smlMetaData = new SnapshotLifecycleMetadata(Map.of(instance.getPolicy(), slmPolicy),
OperationMode.RUNNING, null);


IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Map.of("phase_time", Long.toString(randomLong())))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
MetaData.Builder meta = MetaData.builder().indices(indices.build()).putCustom(SnapshotLifecycleMetadata.TYPE, smlMetaData);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
ClusterStateWaitStep.Result result = instance.isConditionMet(indexMetaData.getIndex(), clusterState);
assertFalse(result.isComplete());
assertTrue(getMessage(result).contains("to be executed"));
}

public void testSlmPolicyExecutedBeforeStep() throws IOException {
long phaseTime = randomLong();

WaitForSnapshotStep instance = createRandomInstance();
SnapshotLifecyclePolicyMetadata slmPolicy = SnapshotLifecyclePolicyMetadata.builder()
.setModifiedDate(randomLong())
.setPolicy(new SnapshotLifecyclePolicy("", "", "", "", null, null))
.setLastSuccess(new SnapshotInvocationRecord("", phaseTime - 10, ""))
.build();
SnapshotLifecycleMetadata smlMetaData = new SnapshotLifecycleMetadata(Map.of(instance.getPolicy(), slmPolicy),
OperationMode.RUNNING, null);

IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Map.of("phase_time", Long.toString(phaseTime)))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
MetaData.Builder meta = MetaData.builder().indices(indices.build()).putCustom(SnapshotLifecycleMetadata.TYPE, smlMetaData);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
ClusterStateWaitStep.Result result = instance.isConditionMet(indexMetaData.getIndex(), clusterState);
assertFalse(result.isComplete());
assertTrue(getMessage(result).contains("to be executed"));
}

public void testSlmPolicyExecutedAfterStep() throws IOException {
long phaseTime = randomLong();

WaitForSnapshotStep instance = createRandomInstance();
SnapshotLifecyclePolicyMetadata slmPolicy = SnapshotLifecyclePolicyMetadata.builder()
.setModifiedDate(randomLong())
.setPolicy(new SnapshotLifecyclePolicy("", "", "", "", null, null))
.setLastSuccess(new SnapshotInvocationRecord("", phaseTime + 10, ""))
.build();
SnapshotLifecycleMetadata smlMetaData = new SnapshotLifecycleMetadata(Map.of(instance.getPolicy(), slmPolicy),
OperationMode.RUNNING, null);

IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Map.of("phase_time", Long.toString(phaseTime)))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
MetaData.Builder meta = MetaData.builder().indices(indices.build()).putCustom(SnapshotLifecycleMetadata.TYPE, smlMetaData);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
ClusterStateWaitStep.Result result = instance.isConditionMet(indexMetaData.getIndex(), clusterState);
assertTrue(result.isComplete());
assertNull(result.getInfomationContext());
}

private String getMessage(ClusterStateWaitStep.Result result) throws IOException {
XContentBuilder s = result.getInfomationContext().toXContent(JsonXContent.contentBuilder(), null);
s.flush();
return new String(((ByteArrayOutputStream) s.getOutputStream()).toByteArray(), StandardCharsets.UTF_8);
}
}

0 comments on commit b282be6

Please sign in to comment.