Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions src/main/java/com/emc/object/s3/LargeFileUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public static String getMpuETag(List<MultipartPartETag> partETags) {

private LargeFileUploaderResumeContext resumeContext;
private Map<Integer, MultipartPartETag> existingMpuParts = null;
private boolean abortMpuOnFailure = true;

/**
* Creates a new LargeFileUpload instance using the specified <code>s3Client</code> to upload
Expand Down Expand Up @@ -275,7 +276,7 @@ public void doSinglePut() {
}

/*
* get a map of exising MPU parts from which we can resume an MPU. we can only resume an MPU if the existing
* get a map of existing MPU parts from which we can resume an MPU. we can only resume an MPU if the existing
* part sizes and count are exactly the same as configured in this LFU instance
*/
private Map<Integer, MultipartPartETag> listUploadPartsForResume(String uploadId) {
Expand Down Expand Up @@ -376,7 +377,11 @@ public void doMultipartUpload() {
// abort MP upload
// TODO: are there conditions where the upload should *not* be aborted?
try {
abortMpu(resumeContext.getUploadId());
if (abortMpuOnFailure) {
abortMpu(resumeContext.getUploadId());
resumeContext.setUploadId(null);
resumeContext.setUploadedParts(null);
}
} catch (Throwable t) {
log.warn("could not abort upload after failure", t);
}
Expand Down Expand Up @@ -650,6 +655,21 @@ public void setResumeContext(LargeFileUploaderResumeContext resumeContext) {
this.resumeContext = resumeContext;
}

public boolean isAbortMpuOnFailure() {
return abortMpuOnFailure;
}

/**
* Specifies whether MPU is aborted with any failure
* If a failure occurs and abortMpuOnFailure is true, then MPU is aborted and the resumeContext is cleared
* (uploadId and uploadedParts are set to null).
* If abortMpuOnFailure is false, MPU is left intact and the resumeContext could have a list successfully
* uploaded parts.
*/
public void setAbortMpuOnFailure(boolean abortMpuOnFailure) {
this.abortMpuOnFailure = abortMpuOnFailure;
}

public LargeFileUploader withObjectMetadata(S3ObjectMetadata objectMetadata) {
setObjectMetadata(objectMetadata);
return this;
Expand Down Expand Up @@ -703,6 +723,14 @@ public LargeFileUploader withResumeContext(LargeFileUploaderResumeContext resume
return this;
}

/**
* @see #setAbortMpuOnFailure(boolean)
*/
public LargeFileUploader withAbortMpuOnFailure(boolean abortMpuOnFailure) {
setAbortMpuOnFailure(abortMpuOnFailure);
return this;
}

private class UploadPartTask implements Callable<MultipartPartETag> {
private final String uploadId;
private final int partNumber;
Expand Down
205 changes: 198 additions & 7 deletions src/test/java/com/emc/object/s3/LargeFileUploaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ public void testResumeMpuFromStream() {
new Random().nextBytes(data);

// init MPU
//Generate last modified time to be 5s ahead of upload for better tolerance of slight time drift
Date lastModifiedTime = new Date(System.currentTimeMillis() - 5000);
String uploadId = client.initiateMultipartUpload(bucket, key);
int partNum = 1;
for (long offset = 0; offset < data.length; offset += partSize) {
Expand Down Expand Up @@ -392,11 +390,204 @@ public void testLargeFileMultiPartSource() {
Assert.assertEquals(mockMultipartSource.getMpuETag(), client.getObjectMetadata(getTestBucket(), key).getETag());
}

// TODO: add following test cases:
// + testResumeWithPartList (only the specified parts should be skipped - can compare bytes sent with a progress listener)
// + testResumeWithPartListAndBadPart (use a part list, but one of the ETags is wrong - should abort the upload and throw an exception)
// + testResumeWithBadPart (make one of the existing parts wrong [same size, wrong data] - should overwrite that one part and skip the rest - can use progress listener to compare bytes sent)
// + testResumeWithBadPartAndNoVerify (same as above, but set verifyPartsFoundInTarget to false - should end up with a corrupt object, but all existing parts should be skipped)
@Test
public void testResumeWithPartList() {
String bucket = getTestBucket();
String key = "mpu-resume-with-partlist";
MockMultipartSource mockMultipartSource = new MockMultipartSource();
final long partSize = mockMultipartSource.getPartSize();
int totalPartsToResume = 2;
Map<Integer, MultipartPartETag> uploadedParts = new HashMap<>();

final AtomicLong completed = new AtomicLong();
final AtomicLong total = new AtomicLong();
final AtomicLong transferred = new AtomicLong();
ProgressListener pl = new ProgressListener() {
@Override
public void progress(long c, long t) {
completed.set(c);
total.set(t);
}

@Override
public void transferred(long size) {
transferred.addAndGet(size);
}
};

// init MPU
String uploadId = client.initiateMultipartUpload(bucket, key);

// upload first 2 parts
for (int partNum = 1; partNum <= totalPartsToResume; partNum++) {
UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum,
mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize));
MultipartPartETag multipartPartETag = client.uploadPart(request);
uploadedParts.put(partNum, multipartPartETag);
}

LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId)
.withUploadedParts(uploadedParts);
LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource)
.withPartSize(partSize).withMpuThreshold(mockMultipartSource.getTotalSize()).withResumeContext(resumeContext)
.withProgressListener(pl);
lfu.doMultipartUpload();

S3ObjectMetadata om = client.getObjectMetadata(bucket, key);
Assert.assertEquals(mockMultipartSource.getTotalSize(), (long)om.getContentLength());
Assert.assertEquals(mockMultipartSource.getMpuETag(), om.getETag());

Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, lfu.getBytesTransferred());
Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, completed.get());
Assert.assertEquals(mockMultipartSource.getTotalSize(), total.get());
}


@Test
public void testResumeWithPartListAndBadPart() {
String bucket = getTestBucket();
String key = "mpu-resume-with-bad-part-etag";
MockMultipartSource mockMultipartSource = new MockMultipartSource();
final long partSize = mockMultipartSource.getPartSize();
int totalPartsToResume = 2;
Map<Integer, MultipartPartETag> uploadedParts = new HashMap<>();

// init MPU
String uploadId = client.initiateMultipartUpload(bucket, key);

// upload first 2 parts
for (int partNum = 1; partNum <= totalPartsToResume; partNum++) {
UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum,
mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize));
MultipartPartETag multipartPartETag = client.uploadPart(request);
uploadedParts.put(partNum, multipartPartETag);
}

// crate a wrong MultipartPartETag by reversing Etag
MultipartPartETag badMultipartPartETag = new MultipartPartETag(1, new StringBuilder(uploadedParts.get(1).getETag()).reverse().toString());
uploadedParts.replace(1, badMultipartPartETag);

LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId)
.withUploadedParts(uploadedParts);
LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource)
.withPartSize(partSize).withMpuThreshold(mockMultipartSource.getTotalSize()).withResumeContext(resumeContext);
try {
lfu.doMultipartUpload();
Assert.fail("one of the ETags in uploadedParts is wrong - should abort the upload and throw an exception");
} catch (S3Exception e) {
Assert.assertEquals(400, e.getHttpCode());
Assert.assertEquals("InvalidPart", e.getErrorCode());
}
try {
client.listParts(bucket, key, uploadId);
Assert.fail("UploadId should not exist because MPU is aborted");
} catch (S3Exception e) {
Assert.assertEquals(404, e.getHttpCode());
Assert.assertEquals("NoSuchUpload", e.getErrorCode());
}
}
Comment thread
twincitiesguy marked this conversation as resolved.

@Test
public void testResumeWithBadPart() {
String bucket = getTestBucket();
String key = "mpu-resume-with-bad-part-data";
MockMultipartSource mockMultipartSource = new MockMultipartSource();
final long partSize = mockMultipartSource.getPartSize();
int totalPartsToResume = 2;

// init MPU
String uploadId = client.initiateMultipartUpload(bucket, key);

// upload first 2 parts
for (int partNum = 1; partNum <= totalPartsToResume; partNum++) {
UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum,
mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize));
client.uploadPart(request);
if (partNum == totalPartsToResume) {
// simulate an uploaded part got wrong data
byte[] data = new byte[(int)partSize];
new Random().nextBytes(data);
request = new UploadPartRequest(bucket, key, uploadId, partNum, new ByteArrayInputStream(data));
client.uploadPart(request);
}
}

LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId);
LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource).withPartSize(partSize)
.withMpuThreshold(mockMultipartSource.getTotalSize()).withResumeContext(resumeContext);
try {
lfu.doMultipartUpload();
Assert.fail("one of the data in uploadedParts is wrong - should abort the upload and throw an exception");
} catch (S3Exception e) {
Assert.assertEquals(400, e.getHttpCode());
Assert.assertEquals("InvalidPart", e.getErrorCode());
}
try {
client.listParts(bucket, key, uploadId);
Assert.fail("UploadId should not exist because MPU is aborted");
} catch (S3Exception e) {
Assert.assertEquals(404, e.getHttpCode());
Assert.assertEquals("NoSuchUpload", e.getErrorCode());
}
}

@Test
public void testResumeWithBadPartAndNoVerify() {
String bucket = getTestBucket();
String key = "mpu-resume-with-bad-part-data-no-verify";
MockMultipartSource mockMultipartSource = new MockMultipartSource();
final long partSize = mockMultipartSource.getPartSize();
int totalPartsToResume = 2;
Map<Integer, MultipartPartETag> uploadedParts = new HashMap<>();

final AtomicLong completed = new AtomicLong();
final AtomicLong total = new AtomicLong();
final AtomicLong transferred = new AtomicLong();
ProgressListener pl = new ProgressListener() {
@Override
public void progress(long c, long t) {
completed.set(c);
total.set(t);
}

@Override
public void transferred(long size) {
transferred.addAndGet(size);
}
};

// init MPU
String uploadId = client.initiateMultipartUpload(bucket, key);

// upload first 2 parts
for (int partNum = 1; partNum <= totalPartsToResume; partNum++) {
UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNum,
mockMultipartSource.getPartDataStream((partNum - 1) * partSize, partSize));
client.uploadPart(request);
if (partNum == totalPartsToResume) {
// simulate an uploaded part got wrong data
byte[] data = new byte[(int)partSize];
new Random().nextBytes(data);
request = new UploadPartRequest(bucket, key, uploadId, partNum, new ByteArrayInputStream(data));
client.uploadPart(request);
}
}

LargeFileUploaderResumeContext resumeContext = new LargeFileUploaderResumeContext().withUploadId(uploadId)
.withVerifyPartsFoundInTarget(false);
LargeFileUploader lfu = new LargeFileUploader(client, bucket, key, mockMultipartSource)
.withPartSize(partSize).withMpuThreshold(mockMultipartSource.getTotalSize())
.withResumeContext(resumeContext).withProgressListener(pl);
lfu.doMultipartUpload();

S3ObjectMetadata om = client.getObjectMetadata(bucket, key);
Assert.assertEquals(mockMultipartSource.getTotalSize(), (long)om.getContentLength());
Assert.assertNotEquals(mockMultipartSource.getMpuETag(), om.getETag());

Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, lfu.getBytesTransferred());
Assert.assertEquals(mockMultipartSource.getTotalSize() - partSize * totalPartsToResume, completed.get());
Assert.assertEquals(mockMultipartSource.getTotalSize(), total.get());
}

static class NullStream extends OutputStream {
@Override
Expand Down