Skip to content

Commit

Permalink
Rewind ILM if create-snapshot fails to take snapshot (#75961) (#76657)
Browse files Browse the repository at this point in the history
This fixes the case when the create-snapshot step fails because it treats
a partial snapshot as a failure, hence the step will be retried and every retry
will fail as the snapshot already exists in the repository.

This makes the step report "incomplete" and have ILM rewind to `cleanup-snapshot`
step in order to first delete the existing (partial) snapshot and create a fresh one.

The change is quite big due to changing the signature of AsyncActionStep#performAction
to not use ActionListener<Boolean> (as all steps should've return true) but to use
ActionListener<Void>. This also deletes AsyncActionBranchingStep because it was
unfit for purpose given the new *explicit* binary state (success or failure) for async
steps - which was the source of the bug this commit is fixing.

(cherry picked from commit 050acc9)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>

# Conflicts:
#	x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncActionBranchingStep.java
#	x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AsyncActionBranchingStepTests.java
#	x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CreateSnapshotStepTests.java
#	x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStepTests.java
#	x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkStepTests.java
  • Loading branch information
andreidan committed Aug 18, 2021
1 parent 17feae8 commit bc8e784
Show file tree
Hide file tree
Showing 52 changed files with 410 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ abstract class AbstractUnfollowIndexStep extends AsyncActionStep {

@Override
public final void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
ClusterStateObserver observer, ActionListener<Void> listener) {
String followerIndex = indexMetadata.getIndex().getName();
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
listener.onResponse(true);
listener.onResponse(null);
return;
}

innerPerformAction(followerIndex, currentClusterState, listener);
}

abstract void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Boolean> listener);
abstract void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Void> listener);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ public boolean indexSurvives() {
}

public abstract void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, ActionListener<Boolean> listener);
ClusterStateObserver observer, ActionListener<Void> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public AsyncRetryDuringSnapshotActionStep(StepKey key, StepKey nextStepKey, Clie

@Override
public final void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
ClusterStateObserver observer, ActionListener<Void> listener) {
// Wrap the original listener to handle exceptions caused by ongoing snapshots
SnapshotExceptionListener snapshotExceptionListener = new SnapshotExceptionListener(indexMetadata.getIndex(), listener, observer,
currentClusterState.nodes().getLocalNode());
Expand All @@ -46,7 +46,7 @@ public final void performAction(IndexMetadata indexMetadata, ClusterState curren
/**
* Method to be performed during which no snapshots for the index are already underway.
*/
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener);
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener);

/**
* SnapshotExceptionListener is an injected listener wrapper that checks to see if a particular
Expand All @@ -55,13 +55,13 @@ public final void performAction(IndexMetadata indexMetadata, ClusterState curren
* re-running the step's {@link #performAction(IndexMetadata, ClusterState, ClusterStateObserver, ActionListener)}
* method when the snapshot is no longer running.
*/
class SnapshotExceptionListener implements ActionListener<Boolean> {
class SnapshotExceptionListener implements ActionListener<Void> {
private final Index index;
private final ActionListener<Boolean> originalListener;
private final ActionListener<Void> originalListener;
private final ClusterStateObserver observer;
private final DiscoveryNode localNode;

SnapshotExceptionListener(Index index, ActionListener<Boolean> originalListener, ClusterStateObserver observer,
SnapshotExceptionListener(Index index, ActionListener<Void> originalListener, ClusterStateObserver observer,
DiscoveryNode localNode) {
this.index = index;
this.originalListener = originalListener;
Expand All @@ -70,8 +70,8 @@ class SnapshotExceptionListener implements ActionListener<Boolean> {
}

@Override
public void onResponse(Boolean complete) {
originalListener.onResponse(complete);
public void onResponse(Void unused) {
originalListener.onResponse(null);
}

@Override
Expand All @@ -94,7 +94,7 @@ public void onNewClusterState(ClusterState state) {
IndexMetadata idxMeta = state.metadata().index(index);
if (idxMeta == null) {
// The index has since been deleted, mission accomplished!
originalListener.onResponse(true);
originalListener.onResponse(null);
} else {
// Re-invoke the performAction method with the new state
performAction(idxMeta, state, observer, originalListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
final String shrunkenIndexSource = IndexMetadata.INDEX_RESIZE_SOURCE_NAME.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(shrunkenIndexSource) == false) {
// the current managed index is a shrunk index
Expand All @@ -47,7 +47,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetadata.getSettings());
logger.warn("managed index [{}] as part of policy [{}] is a shrunk index and the source index [{}] does not exist " +
"anymore. will skip the [{}] step", indexMetadata.getIndex().getName(), policyName, shrunkenIndexSource, NAME);
listener.onResponse(true);
listener.onResponse(null);
return;
}
}
Expand All @@ -56,7 +56,7 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
final String shrinkIndexName = lifecycleState.getShrinkIndexName();
// if the shrink index was not generated there is nothing to delete so we move on
if (Strings.hasText(shrinkIndexName) == false) {
listener.onResponse(true);
listener.onResponse(null);
return;
}
getClient().admin().indices()
Expand All @@ -66,14 +66,14 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
// even if not all nodes acked the delete request yet we can consider this operation as successful as
// we'll generate a new index name and attempt to shrink into the newly generated name
listener.onResponse(true);
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
// we can move on if the index was deleted in the meantime
listener.onResponse(true);
listener.onResponse(null);
} else {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
final String indexName = indexMetadata.getIndex().getName();

LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
final String repositoryName = lifecycleState.getSnapshotRepository();
// if the snapshot information is missing from the ILM execution state there is nothing to delete so we move on
if (Strings.hasText(repositoryName) == false) {
listener.onResponse(true);
listener.onResponse(null);
return;
}
final String snapshotName = lifecycleState.getSnapshotName();
if (Strings.hasText(snapshotName) == false) {
listener.onResponse(true);
listener.onResponse(null);
return;
}
getClient().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).setMasterNodeTimeout(TimeValue.MAX_VALUE)
Expand All @@ -60,14 +60,14 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
throw new ElasticsearchException("cleanup snapshot step request for repository [" + repositoryName + "] and snapshot " +
"[" + snapshotName + "] policy [" + policyName + "] and index [" + indexName + "] failed to be acknowledged");
}
listener.onResponse(true);
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
if (e instanceof SnapshotMissingException) {
// during the happy flow we generate a snapshot name and that snapshot doesn't exist in the repository
listener.onResponse(true);
listener.onResponse(null);
} else {
if (e instanceof RepositoryMissingException) {
String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
String followerIndex = indexMetadata.getIndex().getName();
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
listener.onResponse(true);
listener.onResponse(null);
return;
}

Expand All @@ -48,12 +48,12 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
if (r.isAcknowledged() == false) {
throw new ElasticsearchException("close index request failed to be acknowledged");
}
listener.onResponse(true);
listener.onResponse(null);
},
listener::onFailure)
);
} else {
listener.onResponse(true);
listener.onResponse(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ public class CloseIndexStep extends AsyncActionStep {

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
ClusterStateObserver observer, ActionListener<Void> listener) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
CloseIndexRequest request = new CloseIndexRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(TimeValue.MAX_VALUE);
getClient().admin().indices()
.close(request, ActionListener.wrap(closeIndexResponse -> {
if (closeIndexResponse.isAcknowledged() == false) {
throw new ElasticsearchException("close index request failed to be acknowledged");
}
listener.onResponse(true);
listener.onResponse(null);
}, listener::onFailure));
}
else {
listener.onResponse(true);
listener.onResponse(null);
}
}

Expand Down

0 comments on commit bc8e784

Please sign in to comment.