Skip to content

Commit

Permalink
Fix Potential Repository Corruption during Master Failover (#82912) (#…
Browse files Browse the repository at this point in the history
…82943)

Solves the problem described in #82911.
What corrupts the repository is the sequence of:

1. Start delete that removes all snapshots for an index "A"
2. Queue snapshot for index "A" after that delete. This snapshot will have an `IndexId` for "A" that is the same as used by the snapshot(s) in the above delete. Using this is broken because we assume we never re-use the uuid for the index like that and a stale master may run listing and deleting on a uuid if it ever goes out of scope.
3. Deleting master goes stale and later does stuff to the folder after the snapshot from 2. is done with that shard.

Ideally, we'd never bind the snapshot in the second step to that `IndexId` in the first place since we already technically know that the delete will bring us into a situation where we need a "fresh" `IndexId`. Unfortunately, that delete can fail on IO issues with the repo, in which case we need to use the existing `IndexId` for the snapshot to not corrupt the repo by having two `IndexId` for the same index name. So what I did here is to check the `IndexId`s against the current `RepositoryData` again when starting snapshots because deletes were removed from the cluster state and re-initialize to fresh ones those that are not in the repository data to make sure we never conflict with a stale master doing deletes.
We can technically make this nicer by using some placeholder for index uuids in queued up snapshots or so, but that will require a change to the state machine and BwC around that and it doesn't really buy us much in terms of computation since it's such a rare thing to run into IMO. We need this change to make sure the next rolling upgrade from an older master to a newer master with this fix is safe regardless => I figured this is good enough for now and fixes the test that reliably reproduces this just fine.

closes #82911
  • Loading branch information
original-brownbear committed Jan 24, 2022
1 parent 10e8d19 commit 677ac9f
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,57 @@ public void testMasterFailoverOnFinalizationLoop() throws Exception {
awaitNoMoreRunningOperations();
}

public void testMasterFailoverDuringStaleIndicesCleanup() throws Exception {
internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
createFullSnapshot(repoName, "empty-snapshot");
// use a few more shards to make master take a little longer to clean up the stale index and simulate more concurrency between
// snapshot create and delete below
createIndexWithContent("index-test", indexSettingsNoReplicas(randomIntBetween(6, 10)).build());
final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);

final List<String> fullSnapshotsToDelete = createNSnapshots(repoName, randomIntBetween(1, 5));
final String masterName = internalCluster().getMasterName();
blockMasterOnAnyDataFile(repoName);
final ActionFuture<AcknowledgedResponse> deleteAllSnapshotsWithIndex = startDeleteSnapshots(
repoName,
fullSnapshotsToDelete,
masterName
);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshotFromDataNode(repoName, "new-full-snapshot");
waitForBlock(masterName, repoName);
awaitNDeletionsInProgress(1);
awaitNumberOfSnapshotsInProgress(1);
networkDisruption.startDisrupting();
ensureStableCluster(3, dataNode);
// wait for the snapshot to finish while the isolated master is stuck on deleting a data blob
try {
snapshotFuture.get();
} catch (Exception e) {
// ignore exceptions here, the snapshot will work out fine in all cases but the API might throw because of the master
// fail-over during the snapshot
// TODO: remove this leniency once we fix the API to handle master failover cleaner
}
awaitNoMoreRunningOperations(dataNode);

// now unblock the stale master and have it continue deleting blobs from the repository
unblockNode(repoName, masterName);

networkDisruption.stopDisrupting();
ensureStableCluster(4);
try {
deleteAllSnapshotsWithIndex.get();
} catch (Exception ignored) {
// ignored as we had a failover in here and will get all kinds of errors as a result, just making sure the future completes in
// all cases for now
// TODO: remove this leniency once we fix the API to handle master failover cleaner
}
}

public void testStatusMultipleSnapshotsMultipleRepos() throws Exception {
internalCluster().startMasterOnlyNode();
// We're blocking a some of the snapshot threads when we block the first repo below so we have to make sure we have enough threads
Expand Down Expand Up @@ -1974,6 +2025,16 @@ private ActionFuture<CreateSnapshotResponse> startFullSnapshotFromNonMasterClien
.execute();
}

private ActionFuture<CreateSnapshotResponse> startFullSnapshotFromDataNode(String repoName, String snapshotName) {
logger.info("--> creating full snapshot [{}] to repo [{}] from data node client", snapshotName, repoName);
return internalCluster().dataNodeClient()
.admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.execute();
}

private ActionFuture<CreateSnapshotResponse> startFullSnapshotFromMasterClient(String repoName, String snapshotName) {
logger.info("--> creating full snapshot [{}] to repo [{}] from master client", snapshotName, repoName);
return internalCluster().masterClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,52 @@ public Entry withRepoGen(long newRepoGen) {
);
}

/**
* Reassigns all {@link IndexId} in a snapshot that can be found as keys in the given {@code updates} to the {@link IndexId} value
* that they map to.
* This method is used in an edge case of removing a {@link SnapshotDeletionsInProgress.Entry} from the cluster state at the
* end of a delete. If the delete removed the last use of a certain {@link IndexId} from the repository then we do not want to
* reuse that {@link IndexId} because the implementation of {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository}
* assumes that a given {@link IndexId} will never be reused if it went from referenced to unreferenced in the
* {@link org.elasticsearch.repositories.RepositoryData} in a delete.
*
* @param updates map of existing {@link IndexId} to updated {@link IndexId}
* @return a new instance with updated index ids or this instance if unchanged
*/
public Entry withUpdatedIndexIds(Map<IndexId, IndexId> updates) {
assert isClone() == false : "only snapshots can be reassigned to updated IndexId values";
Map<String, IndexId> updatedIndices = null;
for (IndexId existingIndexId : indices.values()) {
final IndexId updatedIndexId = updates.get(existingIndexId);
if (updatedIndexId != null) {
if (updatedIndices == null) {
updatedIndices = new HashMap<>(indices);
}
updatedIndices.put(updatedIndexId.getName(), updatedIndexId);
}
}
if (updatedIndices != null) {
return new Entry(
snapshot,
includeGlobalState,
partial,
state,
updatedIndices,
dataStreams,
featureStates,
startTime,
repositoryStateId,
shards,
failure,
userMetadata,
version,
source,
ImmutableOpenMap.of()
);
}
return this;
}

public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
if (updatedClones.equals(shardStatusByRepoShardId)) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3271,6 +3271,9 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
final String localNodeId = currentState.nodes().getLocalNodeId();
final String repoName = deleteEntry.repository();
InFlightShardSnapshotStates inFlightShardStates = null;
// Keep track of IndexId values that may have gone unreferenced due to the delete entry just executed.
// See org.elasticsearch.cluster.SnapshotsInProgress.Entry#withUpdatedIndexIds for details.
final Set<IndexId> newIndexIdsToRefresh = new HashSet<>();
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
if (entry.state().completed() == false) {
// TODO: dry up redundant computation and code between clone and non-clone case, in particular reuse
Expand Down Expand Up @@ -3318,9 +3321,13 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
// Collect waiting shards that in entry that we can assign now that we are done with the deletion
final List<RepositoryShardId> canBeUpdated = new ArrayList<>();
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> value : entry.shardsByRepoShardId()) {
final RepositoryShardId repositoryShardId = value.key;
if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)
&& reassignedShardIds.contains(value.key) == false) {
canBeUpdated.add(value.key);
&& reassignedShardIds.contains(repositoryShardId) == false) {
canBeUpdated.add(repositoryShardId);
if (repositoryData.hasIndex(repositoryShardId.indexName()) == false) {
newIndexIdsToRefresh.add(repositoryShardId.index());
}
}
}
if (canBeUpdated.isEmpty()) {
Expand Down Expand Up @@ -3368,6 +3375,15 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState
snapshotEntries.add(entry);
}
}
if (changed && newIndexIdsToRefresh.isEmpty() == false) {
final Map<IndexId, IndexId> updatedIndexIds = new HashMap<>(newIndexIdsToRefresh.size());
for (IndexId indexIdToRefresh : newIndexIdsToRefresh) {
updatedIndexIds.put(indexIdToRefresh, new IndexId(indexIdToRefresh.getName(), UUIDs.randomBase64UUID()));
}
for (int i = 0; i < snapshotEntries.size(); i++) {
snapshotEntries.set(i, snapshotEntries.get(i).withUpdatedIndexIds(updatedIndexIds));
}
}
return changed ? snapshotsInProgress.withUpdatedEntriesForRepo(repoName, snapshotEntries) : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ public static void blockMasterFromFinalizingSnapshotOnSnapFile(final String repo
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockAndFailOnWriteSnapFiles();
}

public static void blockMasterOnAnyDataFile(final String repositoryName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).blockOnDataFiles();
}

public static void blockMasterOnShardLevelSnapshotFile(final String repositoryName, String indexId) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repositoryName).setBlockOnShardLevelSnapFiles(indexId);
}
Expand Down Expand Up @@ -632,6 +636,11 @@ protected ActionFuture<AcknowledgedResponse> startDeleteSnapshot(String repoName
return clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute();
}

protected ActionFuture<AcknowledgedResponse> startDeleteSnapshots(String repoName, List<String> snapshotNames, String viaNode) {
logger.info("--> deleting snapshots {} from repo [{}]", snapshotNames, repoName);
return client(viaNode).admin().cluster().prepareDeleteSnapshot(repoName, snapshotNames.toArray(Strings.EMPTY_ARRAY)).execute();
}

protected static void updateClusterState(final Function<ClusterState, ClusterState> updater) throws Exception {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
Expand Down

0 comments on commit 677ac9f

Please sign in to comment.