Skip to content

Commit

Permalink
Make ILM Steps use Infinite Master Timeout (#74143) (#74622)
Browse files Browse the repository at this point in the history
Same as #72085 but for ILM. Having a timeout on these internal "requests"
only adds more noise if master is slow already when timed out steps trigger
moves to the error step.
It seems like it is safe to remove the setting for the timeout outright as well
as it was not used anywhere and never documented as far as I can tell.
  • Loading branch information
original-brownbear committed Jun 28, 2021
1 parent a4908f8 commit 2b8b7c1
Show file tree
Hide file tree
Showing 45 changed files with 63 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;

import java.util.Objects;

/**
* Performs an action which must be performed asynchronously because it may take time to complete.
Expand All @@ -31,11 +28,6 @@ protected Client getClient() {
return client;
}

public static TimeValue getMasterTimeout(ClusterState clusterState){
Objects.requireNonNull(clusterState, "cannot determine master timeout when cluster state is null");
return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterState.metadata().settings());
}

public boolean indexSurvives() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;

import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.fromIndexMetadata;
Expand Down Expand Up @@ -59,7 +60,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
return;
}
getClient().admin().indices()
.delete(new DeleteIndexRequest(shrinkIndexName).masterNodeTimeout(getMasterTimeout(currentClusterState)),
.delete(new DeleteIndexRequest(shrinkIndexName).masterNodeTimeout(TimeValue.MAX_VALUE),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.SnapshotMissingException;

Expand Down Expand Up @@ -50,8 +50,8 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
listener.onResponse(true);
return;
}
DeleteSnapshotRequest deleteSnapshotRequest = new DeleteSnapshotRequest(repositoryName, snapshotName);
getClient().admin().cluster().deleteSnapshot(deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
getClient().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).setMasterNodeTimeout(TimeValue.MAX_VALUE)
.execute(new ActionListener<AcknowledgedResponse>() {

@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;

import java.util.Map;

Expand Down Expand Up @@ -41,7 +42,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl

if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
.masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().admin().indices().close(closeIndexRequest, ActionListener.wrap(
r -> {
if (r.isAcknowledged() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;

/**
* Invokes a close step on a single index.
Expand All @@ -30,7 +31,7 @@ public class CloseIndexStep extends AsyncActionStep {
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
CloseIndexRequest request = new CloseIndexRequest(indexMetadata.getIndex().getName());
CloseIndexRequest request = new CloseIndexRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().admin().indices()
.close(request, ActionListener.wrap(closeIndexResponse -> {
if (closeIndexResponse.isAcknowledged() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.util.Locale;
Expand Down Expand Up @@ -65,7 +66,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
// complete
request.waitForCompletion(true);
request.includeGlobalState(false);
request.masterNodeTimeout(getMasterTimeout(currentClusterState));
request.masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().admin().cluster().createSnapshot(request,
ActionListener.wrap(response -> {
logger.debug("create snapshot response for policy [{}] and index [{}] is: {}", policyName, indexName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;

import java.util.Locale;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState cu
}

getClient().admin().indices()
.delete(new DeleteIndexRequest(indexName).masterNodeTimeout(getMasterTimeout(currentState)),
.delete(new DeleteIndexRequest(indexName).masterNodeTimeout(TimeValue.MAX_VALUE),
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;

Expand All @@ -27,7 +28,7 @@ public FreezeStep(StepKey key, StepKey nextStepKey, Client client) {
@Override
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, ActionListener<Boolean> listener) {
getClient().admin().indices().execute(FreezeIndexAction.INSTANCE,
new FreezeRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
new FreezeRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE),
ActionListener.wrap(response -> {
if (response.isAcknowledged() == false) {
throw new ElasticsearchException("freeze index request failed to be acknowledged");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class LifecycleSettings {
true, Setting.Property.NodeScope);
public static final Setting<TimeValue> LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING =
Setting.positiveTimeSetting(LIFECYCLE_STEP_MASTER_TIMEOUT, TimeValue.timeValueSeconds(30), Setting.Property.Dynamic,
Setting.Property.NodeScope);
Setting.Property.NodeScope, Setting.Property.Deprecated);
// This setting configures how much time since step_time should ILM wait for a condition to be met. After the threshold wait time has
// elapsed ILM will likely stop waiting and go to the next step.
// Also see {@link org.elasticsearch.xpack.core.ilm.ClusterStateWaitUntilThresholdStep}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.DataTier;
Expand Down Expand Up @@ -120,6 +121,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
// perform expensive operations (ie. clusterStateProcessed)
false,
storageType);
mountSearchableSnapshotRequest.masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().execute(MountSearchableSnapshotAction.INSTANCE, mountSearchableSnapshotRequest,
ActionListener.wrap(response -> {
if (response.status() != RestStatus.OK && response.status() != RestStatus.ACCEPTED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;

/**
* Invokes a open step on a single index.
Expand All @@ -31,7 +32,7 @@ final class OpenIndexStep extends AsyncActionStep {
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
OpenIndexRequest request = new OpenIndexRequest(indexMetadata.getIndex().getName());
OpenIndexRequest request = new OpenIndexRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().admin().indices()
.open(request,
ActionListener.wrap(openIndexResponse -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask;
Expand Down Expand Up @@ -52,6 +53,7 @@ void innerPerformAction(String followerIndex, ClusterState currentClusterState,
}

PauseFollowAction.Request request = new PauseFollowAction.Request(followerIndex);
request.masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().execute(PauseFollowAction.INSTANCE, request, ActionListener.wrap(
r -> {
if (r.isAcknowledged() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.core.TimeValue;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;

Expand All @@ -31,7 +32,7 @@ public ReadOnlyStep(StepKey key, StepKey nextStepKey, Client client) {
public void performAction(IndexMetadata indexMetadata, ClusterState currentState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
getClient().admin().indices().execute(AddIndexBlockAction.INSTANCE,
new AddIndexBlockRequest(WRITE, indexMetadata.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
new AddIndexBlockRequest(WRITE, indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE),
ActionListener.wrap(response -> {
if (response.isAcknowledged() == false) {
throw new ElasticsearchException("read only add block index request failed to be acknowledged");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;

import java.util.Locale;
import java.util.Objects;
Expand Down Expand Up @@ -89,8 +90,7 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentClust
}

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null).masterNodeTimeout(TimeValue.MAX_VALUE);
// We don't wait for active shards when we perform the rollover because the
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
import org.elasticsearch.xpack.core.rollup.action.RollupAction;

Expand Down Expand Up @@ -50,7 +51,7 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentState
"] and index [" + indexName + "]"));
return;
}
RollupAction.Request request = new RollupAction.Request(indexName, rollupIndexName, config);
RollupAction.Request request = new RollupAction.Request(indexName, rollupIndexName, config).masterNodeTimeout(TimeValue.MAX_VALUE);
// currently RollupAction always acknowledges action was complete when no exceptions are thrown.
getClient().execute(RollupAction.INSTANCE, request,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void performAction(IndexMetadata indexMetadata, ClusterState clusterState
Settings settings = Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", nodeId.get()).build();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName)
.masterNodeTimeout(getMasterTimeout(clusterState))
.masterNodeTimeout(TimeValue.MAX_VALUE)
.settings(settings);
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState cu
// get target shrink index
LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
String targetIndexName = getShrinkIndexName(indexName, lifecycleState);
deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, getMasterTimeout(currentState), targetIndexName, listener);
deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, targetIndexName, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;

import java.util.Objects;

Expand Down Expand Up @@ -83,7 +84,7 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentState
Settings relevantTargetSettings = builder.build();

ResizeRequest resizeRequest = new ResizeRequest(shrunkenIndexName, indexMetadata.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentState));
.masterNodeTimeout(TimeValue.MAX_VALUE);
resizeRequest.setMaxPrimaryShardSize(maxPrimaryShardSize);
resizeRequest.getTargetIndexRequest().settings(relevantTargetSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentClust
return;
}

deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, getMasterTimeout(currentClusterState), targetIndexName, listener);
deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, targetIndexName, listener);
}

/**
Expand All @@ -69,11 +69,11 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentClust
* <p>
* The is_write_index will *not* be set on the target index as this operation is currently executed on read-only indices.
*/
static void deleteSourceIndexAndTransferAliases(Client client, IndexMetadata sourceIndex, TimeValue masterTimeoutValue,
String targetIndex, ActionListener<Boolean> listener) {
static void deleteSourceIndexAndTransferAliases(Client client, IndexMetadata sourceIndex, String targetIndex,
ActionListener<Boolean> listener) {
String sourceIndexName = sourceIndex.getIndex().getName();
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest()
.masterNodeTimeout(masterTimeoutValue)
.masterNodeTimeout(TimeValue.MAX_VALUE)
.addAliasAction(IndicesAliasesRequest.AliasActions.removeIndex().index(sourceIndexName))
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(targetIndex).alias(sourceIndexName));
// copy over other aliases from source index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.util.List;
Expand All @@ -32,7 +33,7 @@ public boolean isRetryable() {

@Override
void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Boolean> listener) {
UnfollowAction.Request request = new UnfollowAction.Request(followerIndex);
UnfollowAction.Request request = new UnfollowAction.Request(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().execute(UnfollowAction.INSTANCE, request, ActionListener.wrap(
r -> {
if (r.isAcknowledged() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;

import java.util.Objects;

Expand Down Expand Up @@ -56,7 +57,7 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentState
}
Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, rollupPolicy).build();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(rollupIndexName)
.masterNodeTimeout(getMasterTimeout(currentState))
.masterNodeTimeout(TimeValue.MAX_VALUE)
.settings(settings);
getClient().admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
Expand Down

0 comments on commit 2b8b7c1

Please sign in to comment.