From 938ac290c9dda6479ca00ecb57bd3d47117211a3 Mon Sep 17 00:00:00 2001 From: Stu Arnett Date: Thu, 14 Apr 2022 11:47:44 -0500 Subject: [PATCH 1/2] * LFU added timeout option to LargeFileUpload.waitForCompletion --- .../com/emc/object/s3/LargeFileUploader.java | 11 ++++++ .../emc/object/s3/lfu/LargeFileUpload.java | 12 ++++++ .../emc/object/s3/LargeFileUploaderTest.java | 37 +++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/src/main/java/com/emc/object/s3/LargeFileUploader.java b/src/main/java/com/emc/object/s3/LargeFileUploader.java index d1235f75..0d53f96c 100755 --- a/src/main/java/com/emc/object/s3/LargeFileUploader.java +++ b/src/main/java/com/emc/object/s3/LargeFileUploader.java @@ -239,6 +239,17 @@ public void waitForCompletion() { } } + @Override + public void waitForCompletion(long timeout, TimeUnit timeoutUnit) throws TimeoutException { + try { + future.get(timeout, timeoutUnit); + } catch (RuntimeException | TimeoutException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public LargeFileUploaderResumeContext pause() { active.set(false); // all part uploads that have not started yet should effectively become no-ops diff --git a/src/main/java/com/emc/object/s3/lfu/LargeFileUpload.java b/src/main/java/com/emc/object/s3/lfu/LargeFileUpload.java index 24314a8c..c8d1a3fd 100644 --- a/src/main/java/com/emc/object/s3/lfu/LargeFileUpload.java +++ b/src/main/java/com/emc/object/s3/lfu/LargeFileUpload.java @@ -27,12 +27,24 @@ */ package com.emc.object.s3.lfu; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + public interface LargeFileUpload { /** * Blocks until the upload is complete. */ void waitForCompletion(); + /** + * Waits if necessary for at most the given time for the upload to complete. + * + * @param timeout the maximum time to wait + * @param timeoutUnit the time unit of the timeout argument + * @throws TimeoutException if the wait timed out + */ + void waitForCompletion(long timeout, TimeUnit timeoutUnit) throws TimeoutException; + /** * Pauses this upload and returns a ResumeContext that can be used to later resume it from the same source data. * This method first prevents any part uploads that have not yet started transferring from starting. diff --git a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java index 8cd510dd..db654abf 100644 --- a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java +++ b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java @@ -52,6 +52,8 @@ import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; public class LargeFileUploaderTest extends AbstractS3ClientTest { @@ -618,6 +620,41 @@ public void testPauseResume() throws Exception { Assert.assertEquals(mockMultipartSource.getTotalSize() - (2 * partSize), pl.completed.get()); } + @Test + public void testAsyncWithTimeout() { + String key = "testLfuAsyncTimeout"; + int delayMs = 2000; + MockMultipartSource mockMultipartSource = new MockMultipartSource(); + // 1-second delay before yielding part streams + // this allows us to time the start of the first 2 part uploads accurately + mockMultipartSource.setPartDelayMs(delayMs); + LargeFileUploader lfu = new TestLargeFileUploader(client, getTestBucket(), key, mockMultipartSource) + .withPartSize(mockMultipartSource.getPartSize()).withMpuThreshold((int) mockMultipartSource.getTotalSize()) + .withThreads(2); // throttle part uploads to 2 threads + + LargeFileUpload upload = lfu.uploadAsync(); + + int timeoutCount = 0; + while (true) { + try { + upload.waitForCompletion(delayMs, TimeUnit.MILLISECONDS); + break; + } catch (TimeoutException e) { + timeoutCount++; + } + } + + // based on part count, thread count and part delay, we expect (ceiling(partCount / threadCount)) timeouts to occur + long partCount = (mockMultipartSource.getTotalSize() - 1) / mockMultipartSource.getPartSize() + 1; + Assert.assertEquals((partCount - 1) / 2 + 1, timeoutCount); + + // upload should be done + GetObjectRequest request = new GetObjectRequest<>(getTestBucket(), key); + GetObjectResult result = client.getObject(request, byte[].class); + Assert.assertArrayEquals(mockMultipartSource.getTotalBytes(), result.getObject()); + Assert.assertEquals(mockMultipartSource.getMpuETag(), result.getObjectMetadata().getETag()); + } + @Test public void testAbort() throws Exception { String bucket = getTestBucket(); From ae84bebb87fea53536ea671bd038d0577ebfaa20 Mon Sep 17 00:00:00 2001 From: Stu Arnett Date: Thu, 14 Apr 2022 12:11:52 -0500 Subject: [PATCH 2/2] * modified LFU tests to be more flexible with time-sensitivity --- .../java/com/emc/object/s3/LargeFileUploaderTest.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java index db654abf..2c17ae0f 100644 --- a/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java +++ b/src/test/java/com/emc/object/s3/LargeFileUploaderTest.java @@ -644,9 +644,10 @@ public void testAsyncWithTimeout() { } } - // based on part count, thread count and part delay, we expect (ceiling(partCount / threadCount)) timeouts to occur + // based on part count, thread count and part delay, we expect at least (ceiling(partCount / threadCount)) timeouts to occur long partCount = (mockMultipartSource.getTotalSize() - 1) / mockMultipartSource.getPartSize() + 1; - Assert.assertEquals((partCount - 1) / 2 + 1, timeoutCount); + long expectedTimeouts = (partCount - 1) / 2 + 1; + Assert.assertTrue(timeoutCount >= expectedTimeouts); // upload should be done GetObjectRequest request = new GetObjectRequest<>(getTestBucket(), key); @@ -678,12 +679,7 @@ public void testAbort() throws Exception { Assert.assertEquals(1, client.listMultipartUploads(getTestBucket()).getUploads().size()); // abort it - long abortStart = System.currentTimeMillis(); upload.abort(); - long abortDone = System.currentTimeMillis(); - - // this should be immediate (< 500ms) - Assert.assertTrue(abortDone - abortStart < 500); // make sure resume context is cleared Assert.assertNull(lfu.getResumeContext().getUploadId());