Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/103817.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103817
summary: Fix deleting index during snapshot finalization
area: Snapshot/Restore
type: bug
issues:
- 101029
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.AbstractDisruptionTestCase;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryData;
Expand All @@ -37,6 +43,7 @@
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -45,9 +52,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
Expand Down Expand Up @@ -2014,6 +2024,115 @@ public void testSnapshotAndCloneQueuedAfterMissingShard() throws Exception {
assertThat(snapshot2.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
}

public void testDeleteIndexWithOutOfOrderFinalization() throws Exception {

final String indexToDelete = "index-to-delete";
final List<String> indexNames = Arrays.asList(indexToDelete, "index-0", "index-1", "index-2");

for (final String indexName : indexNames) {
assertAcked(prepareCreate(indexName, indexSettingsNoReplicas(1)));
}

final String repoName = "test-repo";
createRepository(repoName, "fs");

// block the update-shard-snapshot-status requests so we can execute them in a specific order
final MockTransportService masterTransportService = (MockTransportService) internalCluster().getCurrentMasterNodeInstance(
TransportService.class
);
final Map<String, ListenableFuture<Void>> otherIndexSnapshotListeners = indexNames.stream()
.collect(Collectors.toMap(k -> k, k -> new ListenableFuture<>()));
masterTransportService.<UpdateIndexShardSnapshotStatusRequest>addRequestHandlingBehavior(
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
(handler, request, channel, task) -> {
final String indexName = request.shardId().getIndexName();
if (indexName.equals(indexToDelete)) {
handler.messageReceived(request, channel, task);
} else {
final ListenableFuture<Void> listener = otherIndexSnapshotListeners.get(indexName);
assertNotNull(indexName, listener);
listener.addListener(
ActionTestUtils.assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task))
);
}
}
);

// start the snapshots, each targeting index-to-delete and one other index so we can control their finalization order
final HashMap<String, Runnable> snapshotCompleters = new HashMap<String, Runnable>();
for (final String blockingIndex : Arrays.asList("index-0", "index-1", "index-2")) {
final String snapshotName = "snapshot-with-" + blockingIndex;
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setPartial(true)
.setIndices(indexToDelete, blockingIndex)
.execute();

// ensure each snapshot has really started before moving on to the next one
awaitClusterState(
cs -> cs.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.forRepo(repoName)
.stream()
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals(snapshotName))
);

snapshotCompleters.put(blockingIndex, () -> {
assertFalse(snapshotFuture.isDone());
otherIndexSnapshotListeners.get(blockingIndex).onResponse(null);
assertEquals(SnapshotState.SUCCESS, snapshotFuture.actionGet(10, TimeUnit.SECONDS).getSnapshotInfo().state());
});
}

// set up to delete the index at a very specific moment during finalization
final MetadataDeleteIndexService masterDeleteIndexService = internalCluster().getCurrentMasterNodeInstance(
MetadataDeleteIndexService.class
);
final Thread indexRecreationThread = new Thread(() -> {
try {
// wait until the snapshot has entered finalization
awaitClusterState(
cs -> cs.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.forRepo(repoName)
.stream()
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals("snapshot-with-index-1") && e.state().completed())
);

// execute the index deletion _directly on the master_ so it happens before the snapshot finalization executes
assertTrue(
PlainActionFuture.<AcknowledgedResponse, Exception>get(
future -> masterDeleteIndexService.deleteIndices(
new DeleteIndexClusterStateUpdateRequest().indices(
new Index[] { internalCluster().clusterService().state().metadata().index(indexToDelete).getIndex() }
).ackTimeout(TimeValue.timeValueSeconds(10)).masterNodeTimeout(TimeValue.timeValueSeconds(10)),
future
)
).isAcknowledged()
);

// ultimately create the index again so that taking a full snapshot will pick up any missing shard gen blob, and deleting
// that full snapshot will clean up all dangling shard-level blobs
createIndex(indexToDelete, indexSettingsNoReplicas(1).build());
} catch (Exception e) {
throw new AssertionError(e);
}
});
indexRecreationThread.start();

// release the snapshots to be finalized, in this order
for (final String blockingIndex : Arrays.asList("index-1", "index-2", "index-0")) {
snapshotCompleters.get(blockingIndex).run();
}

indexRecreationThread.join();
masterTransportService.clearAllRules();

// create a full snapshot to verify that the repo is still ok
createFullSnapshot(repoName, "final-full-snapshot");

// delete the full snapshot to clean up the leftover shard-level metadata (which trips repo consistency assertions otherwise)
startDeleteSnapshot(repoName, "final-full-snapshot").actionGet(10, TimeUnit.SECONDS);
}

private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
final SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
public class DeleteIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<DeleteIndexClusterStateUpdateRequest> {

DeleteIndexClusterStateUpdateRequest() {
public DeleteIndexClusterStateUpdateRequest() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteShardGenerations() {
}

public ClusterState updatedClusterState(ClusterState state) {
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot(), updatedShardGenerations);
obsoleteGenerations.set(
updatedState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.obsoleteGenerations(snapshotInfo.repository(), state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public ShardGeneration getShardGen(IndexId indexId, int shardId) {
return generations.get(shardId);
}

public boolean hasShardGen(RepositoryShardId repositoryShardId) {
final List<ShardGeneration> indexShardGens = getGens(repositoryShardId.index());
return repositoryShardId.shardId() < indexShardGens.size() && indexShardGens.get(repositoryShardId.shardId()) != null;
}

public List<ShardGeneration> getGens(IndexId indexId) {
return shardGenerations.getOrDefault(indexId, Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ private void cleanupOldShardGens(
(indexId, gens) -> gens.forEach(
(shardId, oldGen) -> toDelete.add(
shardContainer(indexId, shardId).path().buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen
.toBlobNamePart()
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ private void startCloning(Repository repository, SnapshotsInProgress.Entry clone
endingSnapshots.add(targetSnapshot);
initializingClones.remove(targetSnapshot);
logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, null);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, null, ShardGenerations.EMPTY);
};

// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
Expand Down Expand Up @@ -1194,7 +1194,8 @@ public void onFailure(String source, Exception e) {
snapshot.snapshot(),
e,
null,
new CleanupAfterErrorListener(userCreateSnapshotListener, e)
new CleanupAfterErrorListener(userCreateSnapshotListener, e),
ShardGenerations.EMPTY
);
}

Expand Down Expand Up @@ -1238,7 +1239,8 @@ public void onFailure(Exception e) {
snapshot.snapshot(),
e,
null,
new CleanupAfterErrorListener(userCreateSnapshotListener, e)
new CleanupAfterErrorListener(userCreateSnapshotListener, e),
ShardGenerations.EMPTY
);
}
});
Expand Down Expand Up @@ -1876,14 +1878,21 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
entry.snapshot(),
new SnapshotException(snapshot, "Aborted on initialization"),
repositoryData,
null
null,
ShardGenerations.EMPTY
);
return;
}
if (entry.isClone() && entry.state() == State.FAILED) {
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
if (newFinalization) {
removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null, null);
removeFailedSnapshotFromClusterState(
snapshot,
new SnapshotException(snapshot, entry.failure()),
null,
null,
ShardGenerations.EMPTY
);
}
return;
}
Expand Down Expand Up @@ -2055,13 +2064,30 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
completeListenersIgnoringException(endAndGetListenersToResolve(writtenSnapshotInfo.snapshot()), result);
logger.info("snapshot [{}] completed with state [{}]", snapshot, writtenSnapshotInfo.state());
runNextQueuedOperation(result.v1(), repository, true);
}, e -> handleFinalizationFailure(e, snapshot, repositoryData))
},
e -> handleFinalizationFailure(
e,
snapshot,
repositoryData,
// we might have written the new root blob before failing here, so we must use the updated shardGenerations
shardGenerations
)
)
)
);
}, e -> handleFinalizationFailure(e, snapshot, repositoryData));
},
e -> handleFinalizationFailure(
e,
snapshot,
repositoryData,
// a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can
// use the updated shardGenerations for all pending shard snapshots
shardGenerations
)
);
} catch (Exception e) {
assert false : new AssertionError(e);
handleFinalizationFailure(e, snapshot, repositoryData);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
}
}

Expand Down Expand Up @@ -2113,7 +2139,12 @@ private List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> endAndGetListe
* @param snapshot snapshot that failed to finalize
* @param repositoryData current repository data for the snapshot's repository
*/
private void handleFinalizationFailure(Exception e, Snapshot snapshot, RepositoryData repositoryData) {
private void handleFinalizationFailure(
Exception e,
Snapshot snapshot,
RepositoryData repositoryData,
ShardGenerations shardGenerations
) {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
// will try ending this snapshot again
Expand All @@ -2125,7 +2156,7 @@ private void handleFinalizationFailure(Exception e, Snapshot snapshot, Repositor
failAllListenersOnMasterFailOver(e);
} else {
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, null);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, null, shardGenerations);
}
}

Expand Down Expand Up @@ -2251,7 +2282,7 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) {
final SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
ClusterState result = state;
int indexOfEntry = -1;
Expand Down Expand Up @@ -2312,7 +2343,8 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
final ShardSnapshotStatus shardState = finishedShardEntry.value;
final RepositoryShardId repositoryShardId = finishedShardEntry.key;
if (shardState.state() != ShardState.SUCCESS
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false) {
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false
|| shardGenerations.hasShardGen(finishedShardEntry.key) == false) {
continue;
}
updatedShardAssignments = maybeAddUpdatedAssignment(
Expand All @@ -2329,7 +2361,8 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
.shardsByRepoShardId()) {
final ShardSnapshotStatus shardState = finishedShardEntry.value;
if (shardState.state() == ShardState.SUCCESS
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.key)) {
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.key)
&& shardGenerations.hasShardGen(finishedShardEntry.key)) {
updatedShardAssignments = maybeAddUpdatedAssignment(
updatedShardAssignments,
shardState,
Expand Down Expand Up @@ -2417,14 +2450,15 @@ private void removeFailedSnapshotFromClusterState(
Snapshot snapshot,
Exception failure,
@Nullable RepositoryData repositoryData,
@Nullable CleanupAfterErrorListener listener
@Nullable CleanupAfterErrorListener listener,
ShardGenerations shardGenerations
) {
assert failure != null : "Failure must be supplied";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot);
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations);
assert updatedState == currentState || endingSnapshots.contains(snapshot)
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
Expand Down