Skip to content

Commit

Permalink
[7.6] ILM use Priority.IMMEDIATE for stop ILM cluster update (#54909) (
Browse files Browse the repository at this point in the history
…#55016)

This changes the priority of the cluster state update that stops ILM
altogether to `IMMEDIATE`. We've chosen to change this as it can be useful to
temporarily stop ILM if a cluster is overwhelmed, but a `NORMAL`
priority can see the "stop ILM update" not make it up the tasks queue.

On the same note, we're keeping the `start ILM` cluster update priority
to `NORMAL` on purpose such that we only start `ILM` if the cluster can
handle it.

(cherry picked from commit d67df3a)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
  • Loading branch information
andreidan committed Apr 11, 2020
1 parent a477a19 commit 3c6404f
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -363,7 +364,13 @@ assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugi
}

public void submitOperationModeUpdate(OperationMode mode) {
clusterService.submitStateUpdateTask("ilm_operation_mode_update", OperationModeUpdateTask.ilmMode(mode));
OperationModeUpdateTask ilmOperationModeUpdateTask;
if (mode == OperationMode.STOPPING || mode == OperationMode.STOPPED) {
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.IMMEDIATE, mode);
} else {
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.NORMAL, mode);
}
clusterService.submitStateUpdateTask("ilm_operation_mode_update {OperationMode " + mode.name() + "}", ilmOperationModeUpdateTask);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,40 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;

/**
* This task updates the operation mode state for ILM.
*
* As stopping ILM proved to be an action we want to sometimes take in order to allow clusters to stabilise when under heavy load this
* task might run at {@link Priority#IMMEDIATE} priority so please make sure to keep this task as lightweight as possible.
*/
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
@Nullable
private final OperationMode ilmMode;
@Nullable
private final OperationMode slmMode;

private OperationModeUpdateTask(OperationMode ilmMode, OperationMode slmMode) {
private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) {
super(priority);
this.ilmMode = ilmMode;
this.slmMode = slmMode;
}

public static OperationModeUpdateTask ilmMode(OperationMode mode) {
return new OperationModeUpdateTask(mode, null);
return ilmMode(Priority.NORMAL, mode);
}

public static OperationModeUpdateTask ilmMode(Priority priority, OperationMode mode) {
return new OperationModeUpdateTask(priority, mode, null);
}

public static OperationModeUpdateTask slmMode(OperationMode mode) {
return new OperationModeUpdateTask(null, mode);
return new OperationModeUpdateTask(Priority.NORMAL, null, mode);
}

OperationMode getILMOperationMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -49,7 +50,8 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
@Override
protected void masterOperation(StopILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.IMMEDIATE, request, listener) {

@Override
public ClusterState execute(ClusterState currentState) {
return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -40,8 +41,10 @@
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.hamcrest.Description;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

import java.time.Clock;
Expand All @@ -62,9 +65,11 @@
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class IndexLifecycleServiceTests extends ESTestCase {
Expand Down Expand Up @@ -242,7 +247,8 @@ private void verifyCanStopWithStep(String stoppableStep) {
doAnswer(invocationOnMock -> {
changedOperationMode.set(true);
return null;
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
any(OperationModeUpdateTask.class));
indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, true);
assertTrue(changedOperationMode.get());
Expand Down Expand Up @@ -294,7 +300,8 @@ public void testRequestedStopOnSafeAction() {
assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED));
moveToMaintenance.set(true);
return null;
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
any(OperationModeUpdateTask.class));

indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
Expand All @@ -310,6 +317,40 @@ public void testExceptionStillProcessesOtherIndicesOnMaster() {
doTestExceptionStillProcessesOtherIndices(true);
}

public void testOperationModeUpdateTaskPriority() {
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPING);
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPING, Priority.IMMEDIATE);
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPED);
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPED, Priority.IMMEDIATE);
indexLifecycleService.submitOperationModeUpdate(OperationMode.RUNNING);
verifyOperationModeUpdateTaskPriority(OperationMode.RUNNING, Priority.NORMAL);
}

private void verifyOperationModeUpdateTaskPriority(OperationMode mode, Priority expectedPriority) {
verify(clusterService).submitStateUpdateTask(
Mockito.eq("ilm_operation_mode_update {OperationMode " + mode.name() +"}"),
argThat(new ArgumentMatcher<OperationModeUpdateTask>() {

Priority actualPriority = null;

@Override
public boolean matches(Object argument) {
if (argument instanceof OperationModeUpdateTask == false) {
return false;
}
actualPriority = ((OperationModeUpdateTask) argument).priority();
return actualPriority == expectedPriority;
}

@Override
public void describeTo(Description description) {
description.appendText("the cluster state update task priority must be "+ expectedPriority+" but got: ")
.appendText(actualPriority.name());
}
})
);
}

@SuppressWarnings("unchecked")
public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
String policy1 = randomAlphaOfLengthBetween(1, 20);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ilm.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
import org.hamcrest.Description;
import org.mockito.ArgumentMatcher;

import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class TransportStopILMActionTests extends ESTestCase {

private static final ActionListener<AcknowledgedResponse> EMPTY_LISTENER = new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {

}

@Override
public void onFailure(Exception e) {

}
};

@SuppressWarnings("unchecked")
public void testStopILMClusterStatePriorityIsImmediate() {
ClusterService clusterService = mock(ClusterService.class);

TransportStopILMAction transportStopILMAction = new TransportStopILMAction(mock(TransportService.class),
clusterService, mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
StopILMRequest request = new StopILMRequest();
transportStopILMAction.masterOperation(request, ClusterState.EMPTY_STATE, EMPTY_LISTENER);

verify(clusterService).submitStateUpdateTask(
eq("ilm_operation_mode_update"),
argThat(new ArgumentMatcher<AckedClusterStateUpdateTask<AcknowledgedResponse>>() {

Priority actualPriority = null;

@Override
public boolean matches(Object argument) {
if (argument instanceof AckedClusterStateUpdateTask == false) {
return false;
}
actualPriority = ((AckedClusterStateUpdateTask<AcknowledgedResponse>) argument).priority();
return actualPriority == Priority.IMMEDIATE;
}

@Override
public void describeTo(Description description) {
description.appendText("the cluster state update task priority must be URGENT but got: ")
.appendText(actualPriority.name());
}
})
);
}

}

0 comments on commit 3c6404f

Please sign in to comment.