From b6a09657f0684f1f3c6d2860b37a6f533169d6e9 Mon Sep 17 00:00:00 2001 From: Eric Ren Date: Wed, 6 Apr 2022 10:44:15 +0100 Subject: [PATCH 1/3] Added new general option to abort MPU on any failure (default true) --- .../com/emc/object/s3/LargeFileUploader.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/emc/object/s3/LargeFileUploader.java b/src/main/java/com/emc/object/s3/LargeFileUploader.java index ad2f8e69..18da65ae 100755 --- a/src/main/java/com/emc/object/s3/LargeFileUploader.java +++ b/src/main/java/com/emc/object/s3/LargeFileUploader.java @@ -101,6 +101,7 @@ public static String getMpuETag(List partETags) { private LargeFileUploaderResumeContext resumeContext; private Map existingMpuParts = null; + private boolean abortMpuOnFailure = true; /** * Creates a new LargeFileUpload instance using the specified s3Client to upload @@ -275,7 +276,7 @@ public void doSinglePut() { } /* - * get a map of exising MPU parts from which we can resume an MPU. we can only resume an MPU if the existing + * get a map of existing MPU parts from which we can resume an MPU. we can only resume an MPU if the existing * part sizes and count are exactly the same as configured in this LFU instance */ private Map listUploadPartsForResume(String uploadId) { @@ -376,7 +377,11 @@ public void doMultipartUpload() { // abort MP upload // TODO: are there conditions where the upload should *not* be aborted? try { - abortMpu(resumeContext.getUploadId()); + if (abortMpuOnFailure) { + abortMpu(resumeContext.getUploadId()); + resumeContext.setUploadId(null); + resumeContext.setUploadedParts(null); + } } catch (Throwable t) { log.warn("could not abort upload after failure", t); } @@ -650,6 +655,14 @@ public void setResumeContext(LargeFileUploaderResumeContext resumeContext) { this.resumeContext = resumeContext; } + public boolean isAbortMpuOnFailure() { + return abortMpuOnFailure; + } + + public void setAbortMpuOnFailure(boolean abortMpuOnFailure) { + this.abortMpuOnFailure = abortMpuOnFailure; + } + public LargeFileUploader withObjectMetadata(S3ObjectMetadata objectMetadata) { setObjectMetadata(objectMetadata); return this; @@ -703,6 +716,11 @@ public LargeFileUploader withResumeContext(LargeFileUploaderResumeContext resume return this; } + public LargeFileUploader AbortMpuOnFailure(boolean abortMpuOnFailure) { + setAbortMpuOnFailure(abortMpuOnFailure); + return this; + } + private class UploadPartTask implements Callable { private final String uploadId; private final int partNumber; From a2738d3153a928847e398a8d22a74dd1c133d783 Mon Sep 17 00:00:00 2001 From: Eric Ren Date: Thu, 7 Apr 2022 17:56:42 +0100 Subject: [PATCH 2/3] Added more test cases for MPU Resume. --- .../com/emc/object/s3/LargeFileUploader.java | 7 + .../emc/object/s3/LargeFileUploaderTest.java | 198 +++++++++++++++++- 2 files changed, 198 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/emc/object/s3/LargeFileUploader.java b/src/main/java/com/emc/object/s3/LargeFileUploader.java index 18da65ae..f6c50dc8 100755 --- a/src/main/java/com/emc/object/s3/LargeFileUploader.java +++ b/src/main/java/com/emc/object/s3/LargeFileUploader.java @@ -659,6 +659,13 @@ public boolean isAbortMpuOnFailure() { return abortMpuOnFailure; } + /** + * Specifies whether MPU is aborted with any failure + * If a failure occurs and abortMpuOnFailure is true, then MPU is aborted and the resumeContext is cleared + * (uploadId and uploadedParts are set to null). + * If abortMpuOnFailure is false, MPU is left intact and the resumeContext could have a list successfully + * uploaded parts. + */ public void setAbortMpuOnFailure(boolean abortMpuOnFailure) { this.abortMpuOnFailure = abortMpuOnFailure; } diff --git a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java index 4384daa2..9bdbff25 100644 --- a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java +++ b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java @@ -302,8 +302,6 @@ public void testResumeMpuFromStream() { new Random().nextBytes(data); // init MPU - //Generate last modified time to be 5s ahead of upload for better tolerance of slight time drift - Date lastModifiedTime = new Date(System.currentTimeMillis() - 5000); String uploadId = client.initiateMultipartUpload(bucket, key); int partNum = 1; for (long offset = 0; offset < data.length; offset += partSize) { @@ -392,11 +390,197 @@ public void testLargeFileMultiPartSource() { Assert.assertEquals(mockMultipartSource.getMpuETag(), client.getObjectMetadata(getTestBucket(), key).getETag()); } - // TODO: add following test cases: - // + testResumeWithPartList (only the specified parts should be skipped - can compare bytes sent with a progress listener) - // + testResumeWithPartListAndBadPart (use a part list, but one of the ETags is wrong - should abort the upload and throw an exception) - // + testResumeWithBadPart (make one of the existing parts wrong [same size, wrong data] - should overwrite that one part and skip the rest - can use progress listener to compare bytes sent) - // + testResumeWithBadPartAndNoVerify (same as above, but set verifyPartsFoundInTarget to false - should end up with a corrupt object, but all existing parts should be skipped) + @Test + public void testResumeWithPartList() { + String bucket = getTestBucket(); + String key = "mpu-resume-with-partlist"; + MockMultipartSource mockMultipartSource = new MockMultipartSource(); + final long partSize = mockMultipartSource.getPartSize(); + int totalPartsToResume = 2; + Map uploadedParts = new HashMap<>(); + + final AtomicLong completed = new AtomicLong(); + final AtomicLong total = new AtomicLong(); + final AtomicLong transferred = new AtomicLong(); + ProgressListener pl = new ProgressListener() { + @Override + public void progress(long c, long t) { + completed.set(c); + total.set(t); + } + + @Override + public void transferred(long size) { + transferred.addAndGet(size); + } + }; + + // init MPU + String uploadId = client.initiateMultipartUpload(bucket, key); + + // upload first 2 parts + for (int partNum = 1; partNum <= totalPartsToResume; partNum++) { + UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum, + mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize)); + MultipartPartETag multipartPartETag = client.uploadPart(request); + uploadedParts.put(partNum, multipartPartETag); + } + + LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId) + .withUploadedParts(uploadedParts); + LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource) + .withPartSize(partSize).withMpuThreshold(mockMultipartSource.getTotalSize()).withResumeContext(resumeContext) + .withProgressListener(pl); + lfu.doMultipartUpload(); + + S3ObjectMetadata om = client.getObjectMetadata(bucket, key); + Assert.assertEquals(mockMultipartSource.getTotalSize(), (long)om.getContentLength()); + Assert.assertEquals(mockMultipartSource.getMpuETag(), om.getETag()); + + Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, lfu.getBytesTransferred()); + Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, completed.get()); + Assert.assertEquals(mockMultipartSource.getTotalSize(), total.get()); + } + + + @Test + public void testResumeWithPartListAndBadPart() { + String bucket = getTestBucket(); + String key = "mpu-resume-with-bad-part-etag"; + MockMultipartSource mockMultipartSource = new MockMultipartSource(); + final long partSize = mockMultipartSource.getPartSize(); + int totalPartsToResume = 2; + Map uploadedParts = new HashMap<>(); + + // init MPU + String uploadId = client.initiateMultipartUpload(bucket, key); + + // upload first 2 parts + for (int partNum = 1; partNum <= totalPartsToResume; partNum++) { + UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum, + mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize)); + MultipartPartETag multipartPartETag = client.uploadPart(request); + uploadedParts.put(partNum, multipartPartETag); + } + + // crate a wrong MultipartPartETag by reversing Etag + MultipartPartETag badMultipartPartETag = new MultipartPartETag(1, new StringBuilder(uploadedParts.get(1).getETag()).reverse().toString()); + uploadedParts.replace(1, badMultipartPartETag); + + LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId) + .withUploadedParts(uploadedParts); + LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource) + .withPartSize(partSize).withMpuThreshold(mockMultipartSource.getTotalSize()).withResumeContext(resumeContext); + try { + lfu.doMultipartUpload(); + Assert.fail("one of the ETags in uploadedParts is wrong - should abort the upload and throw an exception"); + } catch (S3Exception e) { + Assert.assertEquals(400, e.getHttpCode()); + Assert.assertEquals("InvalidPart", e.getErrorCode()); + } + } + + @Test + public void testResumeWithBadPart() { + String bucket = getTestBucket(); + String key = "mpu-resume-with-bad-part-data"; + MockMultipartSource mockMultipartSource = new MockMultipartSource(); + final long partSize = mockMultipartSource.getPartSize(); + int totalPartsToResume = 2; + + // init MPU + String uploadId = client.initiateMultipartUpload(bucket, key); + + // upload first 2 parts + for (int partNum = 1; partNum <= totalPartsToResume; partNum++) { + UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum, + mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize)); + client.uploadPart(request); + if (partNum == totalPartsToResume) { + // simulate an uploaded part got wrong data + byte[] data = new byte[(int)partSize]; + new Random().nextBytes(data); + request = new UploadPartRequest(bucket, key, uploadId, partNum, new ByteArrayInputStream(data)); + client.uploadPart(request); + } + } + + LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId); + LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource).withPartSize(partSize) + .withMpuThreshold(mockMultipartSource.getTotalSize()).withResumeContext(resumeContext); + try { + lfu.doMultipartUpload(); + Assert.fail("one of the data in uploadedParts is wrong - should abort the upload and throw an exception"); + } catch (S3Exception e) { + Assert.assertEquals(400, e.getHttpCode()); + Assert.assertEquals("InvalidPart", e.getErrorCode()); + } + try { + client.listParts(bucket, key, uploadId); + Assert.fail("UploadId should not exist because MPU is aborted"); + } catch (S3Exception e) { + Assert.assertEquals(404, e.getHttpCode()); + Assert.assertEquals("NoSuchUpload", e.getErrorCode()); + } + } + + @Test + public void testResumeWithBadPartAndNoVerify() { + String bucket = getTestBucket(); + String key = "mpu-resume-with-bad-part-data-no-verify"; + MockMultipartSource mockMultipartSource = new MockMultipartSource(); + final long partSize = mockMultipartSource.getPartSize(); + int totalPartsToResume = 2; + Map uploadedParts = new HashMap<>(); + + final AtomicLong completed = new AtomicLong(); + final AtomicLong total = new AtomicLong(); + final AtomicLong transferred = new AtomicLong(); + ProgressListener pl = new ProgressListener() { + @Override + public void progress(long c, long t) { + completed.set(c); + total.set(t); + } + + @Override + public void transferred(long size) { + transferred.addAndGet(size); + } + }; + + // init MPU + String uploadId = client.initiateMultipartUpload(bucket, key); + + // upload first 2 parts + for (int partNum = 1; partNum <= totalPartsToResume; partNum++) { + UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum, + mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize)); + client.uploadPart(request); + if (partNum == totalPartsToResume) { + // simulate an uploaded part got wrong data + byte[] data = new byte[(int)partSize]; + new Random().nextBytes(data); + request = new UploadPartRequest(bucket, key, uploadId, partNum, new ByteArrayInputStream(data)); + client.uploadPart(request); + } + } + + LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId) + .withVerifyPartsFoundInTarget(false); + LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource) + .withPartSize(partSize).withMpuThreshold(mockMultipartSource.getTotalSize()) + .withResumeContext(resumeContext).withProgressListener(pl); + lfu.doMultipartUpload(); + + S3ObjectMetadata om = client.getObjectMetadata(bucket, key); + Assert.assertEquals(mockMultipartSource.getTotalSize(), (long)om.getContentLength()); + Assert.assertNotEquals(mockMultipartSource.getMpuETag(), om.getETag()); + + Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, lfu.getBytesTransferred()); + Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, completed.get()); + Assert.assertEquals(mockMultipartSource.getTotalSize(), total.get()); + } static class NullStream extends OutputStream { @Override From 38b8ea4a7fc54d0641a9e8a51b48a678f47d4a01 Mon Sep 17 00:00:00 2001 From: Stu Arnett Date: Fri, 8 Apr 2022 10:15:35 -0500 Subject: [PATCH 3/3] LFU: changes from PR --- src/main/java/com/emc/object/s3/LargeFileUploader.java | 5 ++++- src/test/java/com/emc/object/s3/LargeFileUploaderTest.java | 7 +++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/emc/object/s3/LargeFileUploader.java b/src/main/java/com/emc/object/s3/LargeFileUploader.java index f6c50dc8..f8ae5878 100755 --- a/src/main/java/com/emc/object/s3/LargeFileUploader.java +++ b/src/main/java/com/emc/object/s3/LargeFileUploader.java @@ -723,7 +723,10 @@ public LargeFileUploader withResumeContext(LargeFileUploaderResumeContext resume return this; } - public LargeFileUploader AbortMpuOnFailure(boolean abortMpuOnFailure) { + /** + * @see #setAbortMpuOnFailure(boolean) + */ + public LargeFileUploader withAbortMpuOnFailure(boolean abortMpuOnFailure) { setAbortMpuOnFailure(abortMpuOnFailure); return this; } diff --git a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java index 9bdbff25..0203929e 100644 --- a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java +++ b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java @@ -478,6 +478,13 @@ public void testResumeWithPartListAndBadPart() { Assert.assertEquals(400, e.getHttpCode()); Assert.assertEquals("InvalidPart", e.getErrorCode()); } + try { + client.listParts(bucket, key, uploadId); + Assert.fail("UploadId should not exist because MPU is aborted"); + } catch (S3Exception e) { + Assert.assertEquals(404, e.getHttpCode()); + Assert.assertEquals("NoSuchUpload", e.getErrorCode()); + } } @Test