diff --git a/README.md b/README.md index a8fe754a7..7a1184f3f 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ The SDK can also be included directly into a Maven or Gradle build. There is als com.spectralogic.ds3 ds3-sdk - 3.5.2 + 3.5.3 ... @@ -63,8 +63,8 @@ repositories { dependencies { ... - compile 'com.spectralogic.ds3:ds3-sdk:3.5.2' - // compile 'com.spectralogic.ds3:ds3-sdk:3.5.2:all' + compile 'com.spectralogic.ds3:ds3-sdk:3.5.3' + // compile 'com.spectralogic.ds3:ds3-sdk:3.5.3:all' ... } diff --git a/SETUP.md b/SETUP.md index 82c281663..ae0f77fde 100644 --- a/SETUP.md +++ b/SETUP.md @@ -13,7 +13,7 @@ If using Eclipse: * In the results that are returned install `Gradle Integration for Eclipse (4.4) 3.7.1.RELEASE` or current version. Clicking `Install` will take you to the 'Confirm Selected Features' dialog. (De-select the two optional Spring checkboxes on older versions). `Gradle IDE` and `org.gradle.toolingapi.feature` should both be selected. Accept any dialogs that popup. * After Gradle has been installed into eclipse and git has been installed, you should be able to clone the repo and import that project into eclipse -If using Intelllij: +If using Intellij: * Open Intellij and select `Import Project` * Find the `build.gradle` file contained at the root of the project and select it * Accept the defaults diff --git a/build.gradle b/build.gradle index 0591609d4..641fefdf3 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ buildscript { allprojects { group = 'com.spectralogic.ds3' - version = '3.5.2' + version = '3.5.3' } subprojects { diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/helpers/FileSystemHelper_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/helpers/FileSystemHelper_Test.java index ebe263f06..2e904d99e 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/helpers/FileSystemHelper_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/helpers/FileSystemHelper_Test.java @@ -16,7 +16,6 @@ package com.spectralogic.ds3client.helpers; import com.spectralogic.ds3client.Ds3Client; -import com.spectralogic.ds3client.commands.PutObjectResponse; import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner; import com.spectralogic.ds3client.integration.Util; import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds; diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java index 5af9f2c41..5790c994e 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java @@ -41,6 +41,7 @@ import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner; import com.spectralogic.ds3client.helpers.options.ReadJobOptions; +import com.spectralogic.ds3client.helpers.options.WriteJobOptions; import com.spectralogic.ds3client.helpers.strategy.blobstrategy.BlobStrategy; import com.spectralogic.ds3client.helpers.strategy.blobstrategy.ChunkAttemptRetryBehavior; import com.spectralogic.ds3client.helpers.strategy.blobstrategy.ChunkAttemptRetryDelayBehavior; @@ -83,7 +84,9 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -101,11 +104,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import static com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request.MIN_UPLOAD_SIZE_IN_BYTES; import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME; import static com.spectralogic.ds3client.integration.Util.deleteAllContents; import static com.spectralogic.ds3client.integration.Util.deleteBucketContents; @@ -1449,4 +1456,68 @@ public void testThatFifoIsNotProcessed() throws IOException, InterruptedExceptio assertTrue(caughtException.get()); } + + @Test + public void testStreamedGetJobWithBlobbedFile() throws Exception { + final int chunkSize = MIN_UPLOAD_SIZE_IN_BYTES; + final long biggerThanAChunkSize = chunkSize * 2L + 1024; + + final int numIntsInBiggerThanAChunkSize = (int)biggerThanAChunkSize / 4; + + final String originalFileName = "Gracie.bin"; + final String movedFileName = "Gracie.bak"; + + try { + final DataOutputStream originalFileStream = new DataOutputStream(new FileOutputStream(originalFileName)); + + byte[] bytes = new byte[4]; + + for (int i = 0; i < numIntsInBiggerThanAChunkSize; ++i) { + bytes[0] = (byte)i; + bytes[1] = (byte)(i >> 8); + bytes[2] = (byte)(i >> 16); + bytes[3] = (byte)(i >> 24); + originalFileStream.write(bytes); + } + + originalFileStream.close(); + + final Ds3Object ds3Object = new Ds3Object(); + ds3Object.setName(originalFileName); + ds3Object.setSize(biggerThanAChunkSize); + + final AtomicLong numBytesTransferred = new AtomicLong(0); + + final WriteJobOptions writeJobOptions = WriteJobOptions.create(); + writeJobOptions.withMaxUploadSize(chunkSize); + + final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJob(BUCKET_NAME, Collections.singletonList(ds3Object), writeJobOptions); + writeJob.attachDataTransferredListener(numBytesTransferred::addAndGet); + + final CountDownLatch writeCountDownLatch = new CountDownLatch(1); + + writeJob.attachObjectCompletedListener(name -> writeCountDownLatch.countDown()); + + writeJob.transfer(new FileObjectPutter(Paths.get("."))); + + writeCountDownLatch.await(); + + assertEquals(biggerThanAChunkSize, numBytesTransferred.get()); + + Files.move(Paths.get(originalFileName), Paths.get(movedFileName)); + + final CountDownLatch readCountdownLatch = new CountDownLatch(1); + + final Ds3ClientHelpers.Job readJob = HELPERS.startReadJobUsingStreamedBehavior(BUCKET_NAME, Collections.singletonList(ds3Object)); + readJob.attachObjectCompletedListener(name -> readCountdownLatch.countDown()); + readJob.transfer(new FileObjectGetter(Paths.get("."))); + + readCountdownLatch.await(); + + assertTrue(FileUtils.contentEquals(new File(movedFileName), new File(originalFileName))); + } finally { + Files.deleteIfExists(Paths.get(originalFileName)); + Files.deleteIfExists(Paths.get(movedFileName)); + } + } } diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java index 067bf9fb0..cfdad62ab 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java @@ -52,7 +52,9 @@ import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; @@ -66,9 +68,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import static com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request.MIN_UPLOAD_SIZE_IN_BYTES; import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME; import static com.spectralogic.ds3client.integration.Util.deleteAllContents; import static org.hamcrest.Matchers.*; @@ -2059,4 +2064,70 @@ public void testThatNonExistentFileDoesNotStopPutJob() throws IOException { assertTrue(caughtNoSuchFileException.get()); assertTrue(getJobRan.get()); } + + @Test + public void testStreamedPutJobWithBlobbedFile() throws Exception { + final int chunkSize = MIN_UPLOAD_SIZE_IN_BYTES; + final long biggerThanAChunkSize = chunkSize * 2L + 1024; + + final int numIntsInBiggerThanAChunkSize = (int)biggerThanAChunkSize / 4; + + final String originalFileName = "Gracie.bin"; + final String movedFileName = "Gracie.bak"; + + try { + final DataOutputStream originalFileStream = new DataOutputStream(new FileOutputStream(originalFileName)); + + byte[] bytes = new byte[4]; + + for (int i = 0; i < numIntsInBiggerThanAChunkSize; ++i) { + bytes[0] = (byte)i; + bytes[1] = (byte)(i >> 8); + bytes[2] = (byte)(i >> 16); + bytes[3] = (byte)(i >> 24); + originalFileStream.write(bytes); + } + + originalFileStream.close(); + + final Ds3Object ds3Object = new Ds3Object(); + ds3Object.setName(originalFileName); + ds3Object.setSize(biggerThanAChunkSize); + + final AtomicLong numBytesTransferred = new AtomicLong(0); + + final WriteJobOptions writeJobOptions = WriteJobOptions.create(); + writeJobOptions.withMaxUploadSize(chunkSize); + + final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJobUsingStreamedBehavior(BUCKET_NAME, Collections.singletonList(ds3Object), writeJobOptions); + writeJob.attachDataTransferredListener(numBytesTransferred::addAndGet); + + final CountDownLatch writeCountDownLatch = new CountDownLatch(1); + + writeJob.attachObjectCompletedListener(name -> writeCountDownLatch.countDown()); + + writeJob.transfer(new FileObjectPutter(Paths.get("."))); + + writeCountDownLatch.await(); + + assertEquals(biggerThanAChunkSize, numBytesTransferred.get()); + + Files.move(Paths.get(originalFileName), Paths.get(movedFileName)); + + final CountDownLatch readCountdownLatch = new CountDownLatch(1); + + final Ds3ClientHelpers.Job readJob = HELPERS.startReadJob(BUCKET_NAME, Collections.singletonList(ds3Object)); + readJob.attachObjectCompletedListener(name -> readCountdownLatch.countDown()); + readJob.transfer(new FileObjectGetter(Paths.get("."))); + + readCountdownLatch.await(); + + assertTrue(FileUtils.contentEquals(new File(movedFileName), new File(originalFileName))); + } finally { + deleteAllContents(client, BUCKET_NAME); + + Files.deleteIfExists(Paths.get(originalFileName)); + Files.deleteIfExists(Paths.get(movedFileName)); + } + } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/exceptions/ContentLengthNotMatchException.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/exceptions/ContentLengthNotMatchException.java index 66a14f8a1..6d3e0e932 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/exceptions/ContentLengthNotMatchException.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/exceptions/ContentLengthNotMatchException.java @@ -22,7 +22,7 @@ public class ContentLengthNotMatchException extends IOException { private final long contentLength; private final long totalBytes; public ContentLengthNotMatchException(final String fileName, final long contentLength, final long totalBytes) { - super(String.format("The Content length for %s (%d) not match the number of byte read (%d)", fileName, contentLength, totalBytes)); + super(String.format("The Content length for %s (%d) does not match the number of bytes read (%d)", fileName, contentLength, totalBytes)); this.fileName = fileName; this.contentLength = contentLength; diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java index f01d4f4dc..4ca191438 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java @@ -18,7 +18,6 @@ import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.spectralogic.ds3client.Ds3Client; -import com.spectralogic.ds3client.commands.PutObjectResponse; import com.spectralogic.ds3client.helpers.options.ReadJobOptions; import com.spectralogic.ds3client.helpers.options.WriteJobOptions; import com.spectralogic.ds3client.helpers.pagination.FileSystemKey; diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java index 2acdc4dab..9196caff4 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java @@ -25,52 +25,124 @@ * An instance of {@link SeekableByteChannel} used to decorate another SeekableByteChannel in the * situation where we re-use the same channel for more than 1 blob. This subclass prevents closing * a channel when there are other blobs still referencing the shared channel. + * + * This class positions the content of a blob within the bounds of a channel that may be capable + * of containing more than one blob. */ class SeekableByteChannelDecorator implements SeekableByteChannel { - private final SeekableByteChannel seekableByteChannel; + private final Object lock = new Object(); - SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel) { - Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null"); + private final SeekableByteChannel seekableByteChannel; + private final long blobOffset; + private final long blobLength; + private long nextAvailableByteOffset = 0; + + SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel, final long blobOffset, final long blobLength) throws IOException { + Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null."); + Preconditions.checkArgument(blobOffset >= 0, "blobOffset must be >= 0."); + Preconditions.checkArgument(blobLength >= 0, "blobLength must be >= 0."); this.seekableByteChannel = seekableByteChannel; + this.blobOffset = blobOffset; + this.blobLength = blobLength; + + seekableByteChannel.position(blobOffset); } - protected SeekableByteChannel wrappedSeekableByteChannel() { + SeekableByteChannel wrappedSeekableByteChannel() { return seekableByteChannel; } @Override public int read(final ByteBuffer dst) throws IOException { - return seekableByteChannel.read(dst); + synchronized (lock) { + final long remainingInWindow = blobLength - nextAvailableByteOffset; + final long numBytesWeCanRead = Math.min(dst.remaining(), remainingInWindow); + + if (numBytesWeCanRead <= 0) { + return 0; + } + + final int numBytesRead; + + if (numBytesWeCanRead != dst.remaining()) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[(int) numBytesWeCanRead]); + numBytesRead = seekableByteChannel.read(byteBuffer); + byteBuffer.flip(); + dst.put(byteBuffer); + } else { + numBytesRead = seekableByteChannel.read(dst); + } + + nextAvailableByteOffset += numBytesRead; + + return numBytesRead; + } } @Override public int write(final ByteBuffer src) throws IOException { - return seekableByteChannel.write(src); + synchronized (lock) { + final long remainingInWindow = blobLength - nextAvailableByteOffset; + final long numBytesWeCanWrite = Math.min(src.remaining(), remainingInWindow); + + if (numBytesWeCanWrite <= 0) { + return 0; + } + + final int numBytesWritten; + + if (numBytesWeCanWrite != src.remaining()) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[(int) numBytesWeCanWrite]); + byteBuffer.put(src); + byteBuffer.flip(); + numBytesWritten = seekableByteChannel.write(byteBuffer); + } else { + numBytesWritten = seekableByteChannel.write(src); + } + + nextAvailableByteOffset += numBytesWritten; + + return numBytesWritten; + } } @Override public long position() throws IOException { - return seekableByteChannel.position(); + synchronized (lock) { + return seekableByteChannel.position(); + } } @Override public SeekableByteChannel position(final long newPosition) throws IOException { - return seekableByteChannel.position(newPosition); + synchronized (lock) { + final long greatestPossiblePosition = blobLength - 1; + nextAvailableByteOffset = Math.min(newPosition, greatestPossiblePosition); + seekableByteChannel.position(blobOffset + nextAvailableByteOffset); + + return this; + } } @Override public long size() throws IOException { - return seekableByteChannel.size(); + synchronized (lock) { + return seekableByteChannel.size(); + } } @Override public SeekableByteChannel truncate(final long size) throws IOException { - return seekableByteChannel.truncate(size); + synchronized (lock) { + return seekableByteChannel.truncate(size); + } } @Override public boolean isOpen() { - return seekableByteChannel.isOpen(); + synchronized (lock) { + return seekableByteChannel.isOpen(); + } } @Override diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java index 23a672967..d3b7974f0 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java @@ -88,7 +88,7 @@ private SeekableByteChannel makeNewChannel(final BulkObject blob) throws IOExcep channelPreparer.prepareChannel(blob.getName(), objectChannelBuilder); final SeekableByteChannel seekableByteChannel = channelStrategyDelegate.acquireChannelForBlob(blob); - final SeekableByteChannelDecorator seekableByteChannelDecorator = new SeekableByteChannelDecorator(seekableByteChannel); + final SeekableByteChannelDecorator seekableByteChannelDecorator = new SeekableByteChannelDecorator(seekableByteChannel, blob.getOffset(), blob.getLength()); blobNameChannelMap.put(blob.getName(), seekableByteChannelDecorator); return seekableByteChannelDecorator; diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/GetJobNetworkFailureRetryDecorator.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/GetJobNetworkFailureRetryDecorator.java index b80ca778c..737151477 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/GetJobNetworkFailureRetryDecorator.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/GetJobNetworkFailureRetryDecorator.java @@ -16,6 +16,7 @@ package com.spectralogic.ds3client.helpers.strategy.transferstrategy; import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.spectralogic.ds3client.exceptions.ContentLengthNotMatchException; @@ -25,6 +26,7 @@ import com.spectralogic.ds3client.helpers.strategy.channelstrategy.ChannelStrategy; import com.spectralogic.ds3client.models.BulkObject; import com.spectralogic.ds3client.models.common.Range; +import com.spectralogic.ds3client.utils.Guard; import java.io.IOException; import java.util.HashMap; @@ -125,12 +127,24 @@ private ImmutableCollection initializeRanges(final BulkObject blob, final if (ranges == null) { final long numBytesTransferred = 0; - ranges = updateRanges(ranges, numBytesTransferred, blob.getLength()); + ranges = adjustRangesForBlobOffset(updateRanges(ranges, numBytesTransferred, blob.getLength()), blob); } return ranges; } + private ImmutableCollection adjustRangesForBlobOffset(final ImmutableCollection ranges, final BulkObject blob) { + if (Guard.isNullOrEmpty(ranges) || ranges.size() > 1) { + return ranges; + } + + final Range firstRange = ranges.iterator().next(); + + final long blobOffset = blob.getOffset(); + + return ImmutableList.of(new Range(firstRange.getStart() + blobOffset, firstRange.getEnd() + blobOffset)); + } + private ImmutableCollection updateRanges(final ImmutableCollection ranges, final long numBytesTransferred, final Long numBytesToTransfer)