Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bulk Delete Api to BlobStore #40322

Merged
merged 14 commits into from
Apr 2, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -56,6 +57,11 @@

class S3BlobContainer extends AbstractBlobContainer {

/**
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps link to AWS docs here

*/
private static final int MAX_BULK_DELETES = 1000;

private final S3BlobStore blobStore;
private final String keyPath;

Expand Down Expand Up @@ -118,6 +124,51 @@ public void deleteBlob(String blobName) throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public void deleteBlobs(List<String> blobNames) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call this deleteBlobsIgnoringIfNotExists?

if (blobNames.isEmpty()) {
return;
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String blob : blobNames) {
partition.add(buildKey(blob));
if (partition.size() == MAX_BULK_DELETES ) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
SocketAccess.doPrivilegedVoid(() -> {
AmazonClientException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
try {
clientReference.client().deleteObjects(deleteRequest);
} catch (AmazonClientException e) {
if (aex == null) {
aex = e;
} else {
aex.addSuppressed(e);
}
}
}
if (aex != null) {
throw aex;
}
});
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an IOException we do not proceed even if we have more DeleteRequests to be sent.
Previously when performing deletes - if one delete failed, we still were proceeding with the next delete requests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, fixed this for S3 as well as the generic case now by catching and aggregating exceptions in the loop :)

}
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
}

@Override
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
final RequestHandler bulkDeleteHandler = request -> {
final List<String> deletes = new ArrayList<>();
final List<String> errors = new ArrayList<>();

Expand All @@ -344,7 +344,6 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = requestBody.substring(offset, closingOffset);

boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.containsKey(objectName)) {
Expand All @@ -369,7 +368,9 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
}
}
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
});
};
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this implementation work for both cases? It seems that {bucket} is even not used in the implementation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, the unfortunate reality here is that the current fixtures logic for bulk delete doesn't even care about the bucket and simply tries to find the given blobs in any bucket.
I wouldn't really put any effort into this tbh. I think we should probably rather look into just removing the fixture now that we have the Docker-based Minio tests and third-party tests.
The fixture seems completely redundant now ...


// non-authorized requests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,7 @@ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) throws Sd

final List<DeleteObjectsResult.DeletedObject> deletions = new ArrayList<>();
for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
if (blobs.remove(key.getKey()) == null) {
AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist.");
exception.setStatusCode(404);
throw exception;
} else {
if (blobs.remove(key.getKey()) != null) {
DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
deletion.setKey(key.getKey());
deletions.add(deletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -96,8 +97,9 @@ public interface BlobContainer {
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* Deletes the blob with the given name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
*
* @param blobName
Expand All @@ -107,6 +109,33 @@ public interface BlobContainer {
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.
*
* @param blobNames The names of the blob to delete.
* @throws IOException if a subset of blob exists but could not be deleted.
*/
default void deleteBlobs(List<String> blobNames) throws IOException {
IOException ioe = null;
for (String blobName : blobNames) {
try {
deleteBlob(blobName);
} catch (NoSuchFileException e) {
// ignored
} catch (IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
}
}

/**
* Deletes a blob with giving name, ignoring if the blob does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
Expand Down Expand Up @@ -466,22 +465,15 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
for (final IndexId indexId : indicesToCleanUp) {
try {
indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
} catch (DirectoryNotEmptyException dnee) {
// if the directory isn't empty for some reason, it will fail to clean up;
// we'll ignore that and accept that cleanup didn't fully succeed.
// since we are using UUIDs for path names, this won't be an issue for
// snapshotting indices of the same name
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
indicesBlobContainer.deleteBlobs(indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException ioe) {
andrershov marked this conversation as resolved.
Show resolved Hide resolved
// a different IOException occurred while trying to delete - will just log the issue for now
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder.", metadata.name(), indexId), ioe);
logger.warn(() ->
new ParameterizedMessage(
"[{}] indices [{}] are no longer part of any snapshots in the repository, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second placeholder can be just {} instead of [{}] as it's a list that will render anyway with the brackets. Same issue in other places in this PR

"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer know what particular indices are not removed. We just log all indices, including those that are successful.
The same thing applies to deleteBlobs usage below.
Probably we can add this kind of information to IOException thrown by deleteBlobs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9a13dadd0c1 :) We now aggregate all the exceptions. The IndexId that are printed here contain the name and the index snapshot id so with the information in the aggregate exception we can work out what index failed to clean up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know how AWS S3 works when you send a bulk request of 1000 and entry number 500 fails? Will it stop at entry 500 or will it try to delete all entries from the bulk?
Will S3 include information about each failed entry to the exception? (or just the first one?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrershov it will try to delete all of the entries and given errors for all the ones that failed :)

}
}
} catch (IOException | ResourceNotFoundException ex) {
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
}
Expand Down Expand Up @@ -1018,16 +1010,14 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
for (final String blobName : blobs.keySet()) {
if (FsBlobContainer.isTempBlobName(blobName)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> blobNames =
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
try {
blobContainer.deleteBlobs(blobNames);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs [{}] during finalization",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete index blobs {} during finalization

snapshotId, shardId, blobNames), e);
throw e;
}

// If we deleted all snapshots, we don't need to create a new index file
Expand All @@ -1036,28 +1026,26 @@ protected void finalize(final List<SnapshotFiles> snapshots,
}

// Delete old index files
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> indexBlobs =
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
try {
blobContainer.deleteBlobs(indexBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs [{}] during finalization",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete index blobs {} during finalization

snapshotId, shardId, indexBlobs), e);
throw e;
}

// Delete all blobs that don't exist in a snapshot
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
snapshotId, shardId, blobName), e);
}
}
final List<String> staleBlobs = blobs.keySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps call this orphanedBlobs?

.filter(blobName ->
blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
.collect(Collectors.toList());
try {
blobContainer.deleteBlobs(staleBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs [{}] during finalization",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete data blobs {} during finalization

snapshotId, shardId, staleBlobs), e);
}
} catch (IOException e) {
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
Expand Down Expand Up @@ -136,6 +137,23 @@ public void testDeleteBlob() throws IOException {
}
}

public void testDeleteBlobs() throws IOException {
try (BlobStore store = newBlobStore()) {
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
final BlobContainer container = store.blobContainer(new BlobPath());
container.deleteBlobs(blobNames); // does not raise when blobs don't exist
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
final BytesArray bytesArray = new BytesArray(data);
for (String blobName : blobNames) {
writeBlob(container, blobName, bytesArray, randomBoolean());
}
assertEquals(container.listBlobs().size(), 2);
container.deleteBlobs(blobNames);
assertTrue(container.listBlobs().isEmpty());
container.deleteBlobs(blobNames); // does not raise when blobs don't exist
}
}

public void testDeleteBlobIgnoringIfNotExists() throws IOException {
try (BlobStore store = newBlobStore()) {
BlobPath blobPath = new BlobPath();
Expand Down