Skip to content

Commit

Permalink
Change step execution flow to be deliberate about type
Browse files Browse the repository at this point in the history
This commit changes the way that step execution flows. Rather than have any step
run when the cluster state changes or the periodic scheduler fires, this now
runs the different types of steps at different times.

`AsyncWaitStep` is run at a periodic manner, ie, every 10 minutes by default
`ClusterStateActionStep` and `ClusterStateWaitStep` are run every time the
cluster state changes.
`AsyncActionStep` is now run only after the cluster state has been transitioned
into a new step. This prevents these non-idempotent steps from running at the
same time. It addition to being run when transitioned into, this is also run
when a node is newly elected master (only if set as the current step) so that
master failover does not fail to run the step.

This also changes the `RolloverStep` from an `AsyncActionStep` to an
`AsyncWaitStep` so that it can run periodically.

Relates to elastic#29823
  • Loading branch information
dakrone committed Sep 27, 2018
1 parent a26cc1a commit 1666cce
Show file tree
Hide file tree
Showing 15 changed files with 594 additions and 278 deletions.
Expand Up @@ -6,8 +6,8 @@
package org.elasticsearch.xpack.core.indexlifecycle;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;

public abstract class AsyncWaitStep extends Step {

Expand All @@ -22,7 +22,7 @@ protected Client getClient() {
return client;
}

public abstract void evaluateCondition(Index index, Listener listener);
public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener);

public interface Listener {

Expand Down
Expand Up @@ -219,9 +219,9 @@ public List<Step> toSteps(Client client) {
steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey));

Collections.reverse(steps);
logger.debug("STEP COUNT: " + steps.size());
logger.trace("STEP COUNT: " + steps.size());
for (Step step : steps) {
logger.debug(step.getKey() + " -> " + step.getNextStepKey());
logger.trace(step.getKey() + " -> " + step.getNextStepKey());
}

return steps;
Expand Down
Expand Up @@ -8,16 +8,18 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;

public class RolloverStep extends AsyncActionStep {
public class RolloverStep extends AsyncWaitStep {
public static final String NAME = "attempt_rollover";

private ByteSizeValue maxSize;
Expand All @@ -33,7 +35,7 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client, ByteSizeVal
}

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());

if (Strings.isNullOrEmpty(rolloverAlias)) {
Expand All @@ -54,7 +56,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentState
rolloverRequest.addMaxIndexDocsCondition(maxDocs);
}
getClient().admin().indices().rolloverIndex(rolloverRequest,
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver()), listener::onFailure));
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver(), new EmptyInfo()), listener::onFailure));
}

ByteSizeValue getMaxSize() {
Expand Down Expand Up @@ -89,4 +91,13 @@ public boolean equals(Object obj) {
Objects.equals(maxDocs, other.maxDocs);
}

// TODO: expand the information we provide?
private class EmptyInfo implements ToXContentObject {
private EmptyInfo() {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
}
Expand Up @@ -8,12 +8,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -38,12 +38,14 @@ public int getMaxNumSegments() {
}

@Override
public void evaluateCondition(Index index, Listener listener) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
long numberShardsLeftToMerge = StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false)
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
}, listener::onFailure));
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
ActionListener.wrap(response -> {
long numberShardsLeftToMerge =
StreamSupport.stream(response.getIndices().get(indexMetaData.getIndex().getName()).spliterator(), false)
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
}, listener::onFailure));
}

@Override
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.junit.Before;
import org.mockito.Mockito;
Expand Down Expand Up @@ -148,10 +148,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());

SetOnce<Boolean> actionCompleted = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {

@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
actionCompleted.set(complete);
}

Expand Down Expand Up @@ -205,10 +205,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());

SetOnce<Boolean> actionCompleted = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {

@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
actionCompleted.set(complete);
}

Expand Down Expand Up @@ -263,10 +263,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());

SetOnce<Boolean> exceptionThrown = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {

@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
throw new AssertionError("Unexpected method call");
}

Expand All @@ -292,9 +292,9 @@ public void testPerformActionInvalidNullOrEmptyAlias() {
RolloverStep step = createRandomInstance();

SetOnce<Exception> exceptionThrown = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
throw new AssertionError("Unexpected method call");
}

Expand Down
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.indexlifecycle;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
Expand All @@ -14,6 +15,8 @@
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Segment;
Expand Down Expand Up @@ -41,6 +44,15 @@ public SegmentCountStep createRandomInstance() {
return new SegmentCountStep(stepKey, nextStepKey, null, maxNumSegments);
}

private IndexMetaData makeMeta(Index index) {
return IndexMetaData.builder(index.getName())
.settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.build();
}

@Override
public SegmentCountStep mutateInstance(SegmentCountStep instance) {
StepKey key = instance.getKey();
Expand Down Expand Up @@ -109,7 +121,7 @@ public void testIsConditionMet() {
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();

SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject info) {
conditionMetResult.set(conditionMet);
Expand Down Expand Up @@ -166,7 +178,7 @@ public void testIsConditionFails() {
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();

SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject info) {
conditionMetResult.set(conditionMet);
Expand Down Expand Up @@ -206,7 +218,7 @@ public void testThrowsException() {
SetOnce<Boolean> exceptionThrown = new SetOnce<>();

SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject info) {
throw new AssertionError("unexpected method call");
Expand Down
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;

import java.io.IOException;
import java.util.function.LongSupplier;
Expand All @@ -28,15 +29,18 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
private final Index index;
private final Step startStep;
private final PolicyStepsRegistry policyStepsRegistry;
private final IndexLifecycleRunner lifecycleRunner;
private LongSupplier nowSupplier;
private Step.StepKey nextStepKey;

public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry,
LongSupplier nowSupplier) {
IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) {
this.policy = policy;
this.index = index;
this.startStep = startStep;
this.policyStepsRegistry = policyStepsRegistry;
this.nowSupplier = nowSupplier;
this.lifecycleRunner = lifecycleRunner;
}

String getPolicy() {
Expand Down Expand Up @@ -88,6 +92,7 @@ public ClusterState execute(ClusterState currentState) throws IOException {
if (currentStep.getNextStepKey() == null) {
return currentState;
}
nextStepKey = currentStep.getNextStepKey();
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
currentStep.getNextStepKey(), nowSupplier);
} else {
Expand All @@ -104,6 +109,7 @@ public ClusterState execute(ClusterState currentState) throws IOException {
if (currentStep.getNextStepKey() == null) {
return currentState;
}
nextStepKey = currentStep.getNextStepKey();
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
currentStep.getNextStepKey(), nowSupplier);
} else {
Expand All @@ -130,6 +136,19 @@ public ClusterState execute(ClusterState currentState) throws IOException {
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) {
IndexMetaData indexMetaData = newState.metaData().index(index);
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) {
// After the cluster state has been processed and we have moved
// to a new step, we need to conditionally execute the step iff
// it is an `AsyncAction` so that it is executed exactly once.
lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey);
}
}
}

@Override
public void onFailure(String source, Exception e) {
throw new ElasticsearchException(
Expand Down

0 comments on commit 1666cce

Please sign in to comment.