-
Notifications
You must be signed in to change notification settings - Fork 24.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recursively Delete Unreferenced Index Directories #42189
Changes from all commits
09ec6f3
20a6720
273a165
aba91e1
ac3c496
e593e0c
de61d35
22a668e
3d7da2e
826e513
96dabb7
c0e025e
e99e5c0
0e66c19
e4aa770
a2bfa0f
98dc56c
372fd26
5eb036b
3d1e26b
cb2a196
2ca8c4c
908cfeb
2d0f368
d72cbcd
b9b4c32
8a82885
ae4c9a8
b8441ac
f50a895
014f5e0
f825d3f
2e0bd14
1c33d31
bb3fb73
def2d77
d6fa0fe
e7a3c2b
bb95f7f
9cac1a4
c8394fe
b338a91
a8bf18e
be8b479
58fcfe2
1e39e9b
5b5ddb2
dc62d64
c52a394
dea1a07
43bb00a
af47bb8
47766e1
81aab9b
deb73e3
b8a7067
c613d89
d841af4
e13d799
f384e9c
79478cb
bc1497e
421553b
b3478b5
bbb9632
26df577
7617660
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,7 +58,6 @@ | |
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.util.set.Sets; | ||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.common.xcontent.XContentFactory; | ||
|
@@ -393,46 +392,68 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action | |
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); | ||
} | ||
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots | ||
final RepositoryData repositoryData; | ||
final RepositoryData updatedRepositoryData; | ||
final Map<String, BlobContainer> foundIndices; | ||
try { | ||
repositoryData = getRepositoryData(); | ||
final RepositoryData repositoryData = getRepositoryData(); | ||
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); | ||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never | ||
// delete an index that was created by another master node after writing this index-N blob. | ||
foundIndices = blobStore().blobContainer(basePath().add("indices")).children(); | ||
writeIndexGen(updatedRepositoryData, repositoryStateId); | ||
} catch (Exception ex) { | ||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); | ||
return; | ||
} | ||
final SnapshotInfo finalSnapshotInfo = snapshot; | ||
final Collection<IndexId> unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values()); | ||
unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values()); | ||
try { | ||
blobContainer().deleteBlobsIgnoringIfNotExists( | ||
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()))); | ||
} catch (IOException e) { | ||
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e); | ||
} | ||
final var survivingIndices = updatedRepositoryData.getIndices(); | ||
deleteIndices( | ||
Optional.ofNullable(finalSnapshotInfo) | ||
.map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to run partial deletes on indices that went fully out of scope now. This would be great to have for #41581 because if we were to delay writing the metadata in the current implementation, we run into the messy spot of not knowing about the shards if we fail to write the index metadata -> current implementation couldn't delete the stale blobs from a partial upload of a new index. |
||
.map(info -> info.indices().stream().filter(survivingIndices::containsKey) | ||
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList())) | ||
.orElse(Collections.emptyList()), | ||
snapshotId, | ||
ActionListener.map(listener, v -> { | ||
try { | ||
blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists( | ||
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); | ||
} catch (IOException e) { | ||
logger.warn(() -> | ||
new ParameterizedMessage( | ||
"[{}] indices {} are no longer part of any snapshots in the repository, " + | ||
"but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e); | ||
} | ||
cleanupStaleIndices(foundIndices, survivingIndices); | ||
return null; | ||
}) | ||
); | ||
} | ||
} | ||
|
||
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Map<String, IndexId> survivingIndices) { | ||
try { | ||
final Set<String> survivingIndexIds = survivingIndices.values().stream() | ||
.map(IndexId::getId).collect(Collectors.toSet()); | ||
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) { | ||
final String indexSnId = indexEntry.getKey(); | ||
try { | ||
if (survivingIndexIds.contains(indexSnId) == false) { | ||
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); | ||
indexEntry.getValue().delete(); | ||
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); | ||
} | ||
} catch (IOException e) { | ||
logger.warn(() -> new ParameterizedMessage( | ||
"[{}] index {} is no longer part of any snapshots in the repository, " + | ||
"but failed to clean up their index folders", metadata.name(), indexSnId), e); | ||
} | ||
} | ||
} catch (Exception e) { | ||
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add an |
||
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations | ||
// bubbling up and breaking the snapshot functionality. | ||
assert false : e; | ||
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e); | ||
} | ||
} | ||
|
||
private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) { | ||
if (indices.isEmpty()) { | ||
listener.onResponse(null); | ||
|
@@ -494,9 +515,9 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, | |
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures, | ||
includeGlobalState, userMetadata); | ||
try { | ||
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); | ||
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID()); | ||
final RepositoryData repositoryData = getRepositoryData(); | ||
writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId); | ||
writeIndexGen(updatedRepositoryData, repositoryStateId); | ||
} catch (FileAlreadyExistsException ex) { | ||
// if another master was elected and took over finalizing the snapshot, it is possible | ||
// that both nodes try to finalize the snapshot and write to the same blobs, so we just | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,13 @@ | |
|
||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; | ||
import org.elasticsearch.cluster.SnapshotsInProgress; | ||
import org.elasticsearch.cluster.metadata.RepositoryMetaData; | ||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.repositories.RepositoriesService; | ||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; | ||
import org.elasticsearch.snapshots.mockstore.MockRepository; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.junit.After; | ||
|
@@ -65,6 +67,32 @@ public void assertConsistentHistoryInLuceneIndex() throws Exception { | |
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); | ||
} | ||
|
||
private String skipRepoConsistencyCheckReason; | ||
|
||
@After | ||
public void assertRepoConsistency() { | ||
if (skipRepoConsistencyCheckReason == null) { | ||
client().admin().cluster().prepareGetRepositories().get().repositories() | ||
.stream() | ||
.map(RepositoryMetaData::name) | ||
.forEach(name -> { | ||
final List<SnapshotInfo> snapshots = client().admin().cluster().prepareGetSnapshots(name).get().getSnapshots(name); | ||
// Delete one random snapshot to trigger repository cleanup. | ||
if (snapshots.isEmpty() == false) { | ||
client().admin().cluster().prepareDeleteSnapshot(name, randomFrom(snapshots).snapshotId().getName()).get(); | ||
} | ||
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); | ||
}); | ||
} else { | ||
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps log the reason in the else branch here |
||
} | ||
|
||
protected void disableRepoConsistencyCheck(String reason) { | ||
assertNotNull(reason); | ||
skipRepoConsistencyCheckReason = reason; | ||
} | ||
|
||
public static long getFailureCount(String repository) { | ||
long failureCount = 0; | ||
for (RepositoriesService repositoriesService : | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,8 +141,8 @@ public void testWhenMetadataAreLoaded() throws Exception { | |
// Deleting a snapshot does not load the global metadata state but loads each index metadata | ||
assertAcked(client().admin().cluster().prepareDeleteSnapshot("repository", "snap").get()); | ||
assertGlobalMetadataLoads("snap", 1); | ||
assertIndexMetadataLoads("snap", "docs", 5); | ||
assertIndexMetadataLoads("snap", "others", 4); | ||
assertIndexMetadataLoads("snap", "docs", 4); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we don't need to read meta blobs to get an index's shard count before deleting since we just get that from the child container listing. |
||
assertIndexMetadataLoads("snap", "others", 3); | ||
} | ||
|
||
private void assertGlobalMetadataLoads(final String snapshot, final int times) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this approach of listing before writing the updated
index-N
makes this approach completely safe even on an eventually consistent blob store. If an index uuid (the snapshot uuid, not the CS one!) goes out of scope here for a singleindex-N
it will never be reused in a futureindex-M
(M > N)
=> we don't have to care about stuck master nodes coming back to haunt us.What we could however do is optimize this a little and maybe write a
SUCCESS-N
blob after we finished all operations that went into a givenindex-N
blob and only run the listing if we suspect dangling indices because an operation that wrote a givenindex-N
didn't finish fully.The downside of this is, that it gets a little complicated when you have to account for master failover scenarios and may not be worth it performance wise now that we have bulk deletes and could parallelize the recursive delete as well (I just didn't do it here for readability on a first pass).