Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/com/emc/object/s3/LargeFileUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ public void waitForCompletion() {
}
}

@Override
public void waitForCompletion(long timeout, TimeUnit timeoutUnit) throws TimeoutException {
try {
future.get(timeout, timeoutUnit);
} catch (RuntimeException | TimeoutException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public LargeFileUploaderResumeContext pause() {
active.set(false); // all part uploads that have not started yet should effectively become no-ops
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/emc/object/s3/lfu/LargeFileUpload.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,24 @@
*/
package com.emc.object.s3.lfu;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public interface LargeFileUpload {
/**
* Blocks until the upload is complete.
*/
void waitForCompletion();

/**
* Waits if necessary for at most the given time for the upload to complete.
*
* @param timeout the maximum time to wait
* @param timeoutUnit the time unit of the timeout argument
* @throws TimeoutException if the wait timed out
*/
void waitForCompletion(long timeout, TimeUnit timeoutUnit) throws TimeoutException;

/**
* Pauses this upload and returns a ResumeContext that can be used to later resume it from the same source data.
* This method first prevents any part uploads that have not yet started transferring from starting.
Expand Down
43 changes: 38 additions & 5 deletions src/test/java/com/emc/object/s3/LargeFileUploaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

public class LargeFileUploaderTest extends AbstractS3ClientTest {
Expand Down Expand Up @@ -618,6 +620,42 @@ public void testPauseResume() throws Exception {
Assert.assertEquals(mockMultipartSource.getTotalSize() - (2 * partSize), pl.completed.get());
}

@Test
public void testAsyncWithTimeout() {
String key = "testLfuAsyncTimeout";
int delayMs = 2000;
MockMultipartSource mockMultipartSource = new MockMultipartSource();
// 1-second delay before yielding part streams
// this allows us to time the start of the first 2 part uploads accurately
mockMultipartSource.setPartDelayMs(delayMs);
LargeFileUploader lfu = new TestLargeFileUploader(client, getTestBucket(), key, mockMultipartSource)
.withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize())
.withThreads(2); // throttle part uploads to 2 threads

LargeFileUpload upload = lfu.uploadAsync();

int timeoutCount = 0;
while (true) {
try {
upload.waitForCompletion(delayMs, TimeUnit.MILLISECONDS);
break;
} catch (TimeoutException e) {
timeoutCount++;
}
}

// based on part count, thread count and part delay, we expect at least (ceiling(partCount / threadCount)) timeouts to occur
long partCount = (mockMultipartSource.getTotalSize() - 1) / mockMultipartSource.getPartSize() + 1;
long expectedTimeouts = (partCount - 1) / 2 + 1;
Assert.assertTrue(timeoutCount >= expectedTimeouts);

// upload should be done
GetObjectRequest<?> request = new GetObjectRequest<>(getTestBucket(), key);
GetObjectResult<byte[]> result = client.getObject(request, byte[].class);
Assert.assertArrayEquals(mockMultipartSource.getTotalBytes(), result.getObject());
Assert.assertEquals(mockMultipartSource.getMpuETag(), result.getObjectMetadata().getETag());
}

@Test
public void testAbort() throws Exception {
String bucket = getTestBucket();
Expand All @@ -641,12 +679,7 @@ public void testAbort() throws Exception {
Assert.assertEquals(1, client.listMultipartUploads(getTestBucket()).getUploads().size());

// abort it
long abortStart = System.currentTimeMillis();
upload.abort();
long abortDone = System.currentTimeMillis();

// this should be immediate (< 500ms)
Assert.assertTrue(abortDone - abortStart < 500);

// make sure resume context is cleared
Assert.assertNull(lfu.getResumeContext().getUploadId());
Expand Down