Skip to content

Commit

Permalink
Cleanup WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits committed Feb 16, 2020
1 parent 63c6855 commit b593d66
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 10 deletions.
Expand Up @@ -215,7 +215,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric();

private final ChecksumBlobStoreFormat<MetaData> globalMetaDataFormat;
protected final ChecksumBlobStoreFormat<MetaData> globalMetaDataFormat;

private final ChecksumBlobStoreFormat<IndexMetaData> indexMetaDataFormat;

Expand Down Expand Up @@ -508,7 +508,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
protected RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad;
if (bestEffortConsistency) {
Expand Down Expand Up @@ -756,7 +756,7 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
Map<String, BlobMetaData> foundRootBlobs = blobContainer().listBlobs();
final Map<String, BlobMetaData> foundRootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, foundRootBlobs);
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
final Set<String> survivingIndexIds = getSurvivingIndexIds(repositoryData, foundIndices.keySet());
Expand Down Expand Up @@ -969,7 +969,7 @@ public IndexMetaData getSnapshotIndexMetaData(final SnapshotId snapshotId, final
}
}

private BlobPath indicesPath() {
protected BlobPath indicesPath() {
return basePath().add("indices");
}

Expand Down
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -30,6 +32,7 @@
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -53,13 +56,15 @@
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public final class EncryptedRepository extends BlobStoreRepository {
static final Logger logger = LogManager.getLogger(EncryptedRepository.class);
Expand Down Expand Up @@ -207,11 +212,98 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

@Override
public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListener<RepositoryCleanupResult> listener) {
super.cleanup(repositoryStateId, writeShardGens, ActionListener.wrap(repositoryCleanupResult -> {
EncryptedBlobContainer encryptedBlobContainer = (EncryptedBlobContainer) blobContainer();
cleanUpOrphanedMetadataRecursively(encryptedBlobContainer);
listener.onResponse(repositoryCleanupResult);
}, listener::onFailure));
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"));
return;
}
StepListener<RepositoryCleanupResult> cleanupDelegatedRepositoryStep = new StepListener<>();
super.cleanup(repositoryStateId, writeShardGens, cleanupDelegatedRepositoryStep);
cleanupDelegatedRepositoryStep.whenComplete(delegatedRepositoryCleanupResult -> {
RepositoryData repositoryData = safeRepositoryData(repositoryStateId, blobContainer().listBlobs());
// list all encryption metadata indices blob container
Map<String, BlobContainer> staleMetadataIndices =
((EncryptedBlobContainer) blobStore().blobContainer(indicesPath())).encryptionMetadataBlobContainer.children();
repositoryData.getIndices().values().stream().map(IndexId::getId).forEach(survivingIndexId -> {
// stale blob containers are those which are not surviving
staleMetadataIndices.remove(survivingIndexId);
});
// list all encryption metadata root blobs
Map<String, BlobMetaData> staleRootMetadataBlobs =
((EncryptedBlobContainer) blobContainer()).encryptionMetadataBlobContainer.listBlobs();
Iterator<Map.Entry<String, BlobMetaData>> it = staleRootMetadataBlobs.entrySet().iterator();
// staleRootMetadataBlobs.entrySet().removeIf(rootMetadata -> {
// String metadataBlobName = rootMetadata.getKey();
// // unrecognized metadata blob, do NOT remove
// if (metadataBlobName.length() <= METADATA_UID_LENGTH_IN_CHARS) {
// return false;
// }
// String blobName = metadataBlobName.substring(0, metadataBlobName.length() - METADATA_UID_LENGTH_IN_CHARS);
// });
// while (it.hasNext()) {
// String metadataBlobName = it.next().getKey();
// // unrecognized metadata blob, do NOT remove
// if (metadataBlobName.length() < METADATA_UID_LENGTH_IN_CHARS) {
// it.remove();
// } else {
// String blobName = metadataBlobName.substring(0, metadataBlobName.length() - METADATA_UID_LENGTH_IN_CHARS);
// if (FsBlobContainer.isTempBlobName(blobName)) {
// // this must be removed
// } else if (blobName.endsWith(".dat")) {
// final String foundUUID;
// if (blobName.startsWith(SNAPSHOT_PREFIX)) {
// foundUUID = blobName.substring(SNAPSHOT_PREFIX.length(), blobName.length() - ".dat".length());
// assert snapshotFormat.blobName(foundUUID).equals(blobName);
// } else if (blobName.startsWith(METADATA_PREFIX)) {
// foundUUID = blobName.substring(METADATA_PREFIX.length(), blobName.length() - ".dat".length());
// assert globalMetaDataFormat.blobName(foundUUID).equals(blobName);
// } else {
//
// return true;
// }
// return survivingSnapshotIds.contains(foundUUID);
//
// }
// }
// }
// Set<String> repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
// return foundRootBlobNames.stream().filter(
// blob -> {
// if (FsBlobContainer.isTempBlobName(blob)) {
// return false;
// } else if (blob.endsWith(".dat")) {
// final String foundUUID;
// if (blob.startsWith(SNAPSHOT_PREFIX)) {
// foundUUID = blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length());
// assert snapshotFormat.blobName(foundUUID).equals(blob);
// } else if (blob.startsWith(METADATA_PREFIX)) {
// foundUUID = blob.substring(METADATA_PREFIX.length(), blob.length() - ".dat".length());
// assert globalMetaDataFormat.blobName(foundUUID).equals(blob);
// } else {
// return true;
// }
// return survivingSnapshotIds.contains(foundUUID);
// } else if (blob.startsWith(INDEX_FILE_PREFIX)) {
// // TODO: Include the current generation here once we remove keeping index-(N-1) around from #writeIndexGen
// return repositoryData.getGenId() <= Long.parseLong(blob.substring(INDEX_FILE_PREFIX.length()));
// } else {
// return true;
// }
// }
// ).collect(Collectors.toSet());
// final Set<String> survivingRootBlobNames = getSurvivingRootBlobNames(repositoryData, foundRootBlobs.keySet());
// if (survivingIndexIds.containsAll(foundIndices.keySet()) && survivingRootBlobNames.containsAll(foundRootBlobs.keySet())) {
// // Nothing to clean up we return
// listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
// } else {
// // write new index-N blob to ensure concurrent operations will fail
// writeIndexGen(repositoryData, repositoryStateId, writeShardGens,
// ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, survivingIndexIds, foundRootBlobs, survivingRootBlobNames,
// ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
//
// }

}, listener::onFailure);

}

private void cleanUpOrphanedMetadataRecursively(EncryptedBlobContainer encryptedBlobContainer) throws IOException{
Expand Down Expand Up @@ -298,8 +390,8 @@ public BlobContainer blobContainer(BlobPath path) {
return new EncryptedBlobContainer(delegatedBlobStore, delegatedBasePath, path, dataEncryptionKeySupplier, metadataEncryption,
encryptionNonceSupplier, metadataIdentifierSupplier);
}

}

private static class EncryptedBlobContainer implements BlobContainer {

private final BlobStore delegatedBlobStore;
Expand Down

0 comments on commit b593d66

Please sign in to comment.