Skip to content

Commit

Permalink
Add Bulk Delete Api to BlobStore
Browse files Browse the repository at this point in the history
  • Loading branch information
mkleen committed May 26, 2020
1 parent 875377a commit 9009e61
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -56,6 +57,12 @@

class S3BlobContainer extends AbstractBlobContainer {

/**
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
*/
private static final int MAX_BULK_DELETES = 1000;

private final S3BlobStore blobStore;
private final String keyPath;

Expand Down Expand Up @@ -109,6 +116,49 @@ public void deleteBlob(String blobName) throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String blob : blobNames) {
partition.add(buildKey(blob));
if (partition.size() == MAX_BULK_DELETES) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
AmazonClientException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
try {
clientReference.client().deleteObjects(deleteRequest);
} catch (AmazonClientException e) {
if (aex == null) {
aex = e;
} else {
aex.addSuppressed(e);
}
}
}
if (aex != null) {
throw aex;
}
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
}
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
}

@Override
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
Expand All @@ -119,6 +169,7 @@ public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
}
}


@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
final HashMap<String, BlobMetaData> blobsBuilder = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
/*
* Licensed to Crate under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership. Crate licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may
* obtain a copy of the License at
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial
* agreement.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories.s3;
Expand All @@ -30,8 +27,6 @@
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.HeadBucketRequest;
import com.amazonaws.services.s3.model.HeadBucketResult;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -146,12 +141,6 @@ public ObjectListing listObjects(final ListObjectsRequest request) throws Amazon
return listing;
}

@Override
public HeadBucketResult headBucket(HeadBucketRequest request) throws SdkClientException {
assertThat(request.getBucketName(), equalTo(bucket));
return new HeadBucketResult();
}

@Override
public void deleteObject(final DeleteObjectRequest request) throws AmazonClientException {
assertThat(request.getBucketName(), equalTo(bucket));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected BlobStore newBlobStore() {

@Override
@Test
public void testDeleteBlob() {
public void testDeleteBlobs() {
assumeFalse("not implemented because of S3's weak consistency model", true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -103,7 +104,7 @@ default void writeBlobAtomic(final String blobName, final InputStream inputStrea
}

/**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* Deletes the blob with the given name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
*
* @param blobName
Expand All @@ -113,6 +114,33 @@ default void writeBlobAtomic(final String blobName, final InputStream inputStrea
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.
*
* @param blobNames The names of the blob to delete.
* @throws IOException if a subset of blob exists but could not be deleted.
*/
default void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
IOException ioe = null;
for (String blobName : blobNames) {
try {
deleteBlob(blobName);
} catch (NoSuchFileException e) {
// ignored
} catch (IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
}
}

/**
* Deletes a blob with giving name, ignoring if the blob does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -446,7 +445,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
} catch (ElasticsearchParseException | IOException ex) {
LOGGER.warn(() ->
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex);
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex);
}

deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId);
Expand All @@ -458,7 +457,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
} catch (SnapshotException ex) {
final int finalShardId = shardId;
LOGGER.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
snapshotId, index, finalShardId), ex);
snapshotId, index, finalShardId), ex);
}
}
}
Expand All @@ -469,27 +468,22 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
for (final IndexId indexId : indicesToCleanUp) {
try {
indicesBlobContainer.deleteBlob(indexId.getId());
} catch (DirectoryNotEmptyException dnee) {
// if the directory isn't empty for some reason, it will fail to clean up;
// we'll ignore that and accept that cleanup didn't fully succeed.
// since we are using UUIDs for path names, this won't be an issue for
// snapshotting indices of the same name
LOGGER.debug(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, but failed to clean up " +
"its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
} catch (IOException ioe) {
// a different IOException occurred while trying to delete - will just log the issue for now
LOGGER.debug(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, but failed to clean up " +
"its index folder.", metadata.name(), indexId), ioe);
}
try {
indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException ioe) {
// a different IOException occurred while trying to delete - will just log the issue for now
LOGGER.warn(() ->
new ParameterizedMessage(
"[{}] indices {} are no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
}
} catch (IOException | ResourceNotFoundException ex) {
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
}
}


private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
try {
snapshotFormat.delete(blobContainer(), blobId);
Expand Down Expand Up @@ -1049,16 +1043,14 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
for (final String blobName : blobs.keySet()) {
if (FsBlobContainer.isTempBlobName(blobName)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
LOGGER.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> blobNames =
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
} catch (IOException e) {
LOGGER.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, blobNames), e);
throw e;
}

// If we deleted all snapshots, we don't need to create a new index file
Expand All @@ -1067,28 +1059,26 @@ protected void finalize(final List<SnapshotFiles> snapshots,
}

// Delete old index files
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
LOGGER.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> indexBlobs =
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
} catch (IOException e) {
LOGGER.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, indexBlobs), e);
throw e;
}

// Delete all blobs that don't exist in a snapshot
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
LOGGER.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
snapshotId, shardId, blobName), e);
}
}
final List<String> orphanedBlobs = blobs.keySet().stream()
.filter(blobName ->
blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
.collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
} catch (IOException e) {
LOGGER.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
snapshotId, shardId, orphanedBlobs), e);
}
} catch (IOException e) {
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
Expand Down Expand Up @@ -120,19 +121,20 @@ public void testList() throws IOException {
}
}

public void testDeleteBlob() throws IOException {
public void testDeleteBlobs() throws IOException {
try (BlobStore store = newBlobStore()) {
final String blobName = "foobar";
final List<String> blobNames = List.of("foobar", "barfoo");
final BlobContainer container = store.blobContainer(new BlobPath());
expectThrows(NoSuchFileException.class, () -> container.deleteBlob(blobName));

container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
final BytesArray bytesArray = new BytesArray(data);
writeBlob(container, blobName, bytesArray, randomBoolean());
container.deleteBlob(blobName); // should not raise

// blob deleted, so should raise again
expectThrows(NoSuchFileException.class, () -> container.deleteBlob(blobName));
for (String blobName : blobNames) {
writeBlob(container, blobName, bytesArray, randomBoolean());
}
assertEquals(container.listBlobs().size(), 2);
container.deleteBlobsIgnoringIfNotExists(blobNames);
assertTrue(container.listBlobs().isEmpty());
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
}
}

Expand Down

0 comments on commit 9009e61

Please sign in to comment.