diff --git a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java index f4d6e720b..5a1aef89e 100644 --- a/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java +++ b/ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java @@ -27,6 +27,7 @@ import com.spectralogic.ds3client.commands.spectrads3.notifications.*; import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException; import com.spectralogic.ds3client.helpers.*; +import com.spectralogic.ds3client.helpers.channelbuilders.ObjectInputStreamBuilder; import com.spectralogic.ds3client.helpers.events.FailureEvent; import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner; import com.spectralogic.ds3client.helpers.options.WriteJobOptions; @@ -57,6 +58,7 @@ import java.io.BufferedInputStream; import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -1859,6 +1861,51 @@ public void testPutJobUsingStreamedTransferStrategy() throws IOException, URISyn } } + public class StreamObjectPutter extends ObjectInputStreamBuilder { + final InputStream _is; + + public StreamObjectPutter(final InputStream is) { + _is = is; + } + + @Override + public InputStream buildInputStream(final String key) { + return _is; + } + } + + @Test + public void testPutJobUsingStreamedTransferStrategyAboveChunk() throws IOException, URISyntaxException { + final String DIR_NAME = "books/"; + final String[] FILE_NAMES = new String[]{"Paw-3.1.9.zip"}; + + try { + final Path dirPath = ResourceUtils.loadFileResource(DIR_NAME); + + final List objectsToWrite = new ArrayList<>(); + for (final String book : FILE_NAMES) { + final Path objPath = ResourceUtils.loadFileResource(DIR_NAME + book); + final long bookSize = Files.size(objPath); + final Ds3Object obj = new Ds3Object(book, bookSize); + + objectsToWrite.add(obj); + } + + final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJobUsingStreamedBehavior(BUCKET_NAME, objectsToWrite, WriteJobOptions.create()); + writeJob.transfer(new StreamObjectPutter(new FileInputStream(dirPath.resolve("Paw-3.1.9.zip").toFile()))); + + final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(client); + final Iterable bucketContentsIterable = ds3ClientHelpers.listObjects(BUCKET_NAME); + + for (final Contents bucketContents : bucketContentsIterable) { + assertEquals(FILE_NAMES[0], bucketContents.getKey()); + } + } finally { + cancelAllJobsForBucket(client, BUCKET_NAME); + deleteAllContents(client, BUCKET_NAME); + } + } + @Test public void testPutJobUsingRandomAccessTransferStrategy() throws IOException, URISyntaxException { final String DIR_NAME = "books/"; diff --git a/ds3-sdk-integration/src/test/resources/books/Paw-3.1.9.zip b/ds3-sdk-integration/src/test/resources/books/Paw-3.1.9.zip new file mode 100644 index 000000000..acd861b3e Binary files /dev/null and b/ds3-sdk-integration/src/test/resources/books/Paw-3.1.9.zip differ diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/Ds3InputStreamEntity.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/Ds3InputStreamEntity.java index 9d3256a9f..700271fe9 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/Ds3InputStreamEntity.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/Ds3InputStreamEntity.java @@ -39,18 +39,14 @@ public void setBufferSize(final int bufferSize) { this.bufferSize = bufferSize; } - public long getBufferSize() { - return bufferSize; - } - @Override public void writeTo(final OutputStream outStream) throws IOException { final long startTime = PerformanceUtils.getCurrentTime(); final long totalBytes = IOUtils.copy(this.getContent(), outStream, bufferSize, path, true); final long endTime = PerformanceUtils.getCurrentTime(); - - if (this.getContentLength() != -1 && totalBytes != this.getContentLength()) { - throw new ContentLengthNotMatchException(path, this.getContentLength(), totalBytes); + final long length = this.getContentLength(); + if (length != -1 && totalBytes != length) { + throw new ContentLengthNotMatchException(path, length, totalBytes); } PerformanceUtils.logMbps(startTime, endTime, totalBytes, path, true); diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/commands/PutMultiPartUploadPartRequest.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/commands/PutMultiPartUploadPartRequest.java index 0eb7c0c7f..101e83f4d 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/commands/PutMultiPartUploadPartRequest.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/commands/PutMultiPartUploadPartRequest.java @@ -19,7 +19,6 @@ import com.spectralogic.ds3client.networking.HttpVerb; import com.spectralogic.ds3client.commands.interfaces.AbstractRequest; import java.util.UUID; -import com.google.common.net.UrlEscapers; import javax.annotation.Nonnull; import com.google.common.base.Preconditions; import com.spectralogic.ds3client.utils.SeekableByteChannelInputStream; @@ -63,7 +62,7 @@ public PutMultiPartUploadPartRequest(final String bucketName, final String objec this.stream = new SeekableByteChannelInputStream(channel); } - + public PutMultiPartUploadPartRequest(final String bucketName, final String objectName, @Nonnull final SeekableByteChannel channel, final int partNumber, final long size, final String uploadId) { Preconditions.checkNotNull(channel, "Channel may not be null."); this.bucketName = bucketName; diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/channelbuilders/ReadOnlySeekableByteChannel.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/channelbuilders/ReadOnlySeekableByteChannel.java index 02b7cb9cf..cc6d1fcb7 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/channelbuilders/ReadOnlySeekableByteChannel.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/channelbuilders/ReadOnlySeekableByteChannel.java @@ -25,8 +25,8 @@ public class ReadOnlySeekableByteChannel implements SeekableByteChannel { private final ReadableByteChannel channel; - private int size = 0; - private int position = 0; + private long size = 0; + private long position = 0; public ReadOnlySeekableByteChannel(final ReadableByteChannel channel) { this.channel = channel; @@ -40,7 +40,7 @@ public ReadableByteChannel getChannel() { public int read(final ByteBuffer dst) throws IOException { size = channel.read(dst); position += size; - return size; + return Long.valueOf(size).intValue(); } @Override @@ -55,6 +55,9 @@ public long position() throws IOException { @Override public SeekableByteChannel position(final long newPosition) throws IOException { + if (position == 0 || newPosition == 0) { + this.position = newPosition; + } if (newPosition == this.position) { return this; } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java index 5a498caf5..ac346246c 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java @@ -48,10 +48,11 @@ class SeekableByteChannelDecorator implements SeekableByteChannel { seekableByteChannel.position(blobOffset); } - SeekableByteChannel wrappedSeekableByteChannel() { + public SeekableByteChannel wrappedSeekableByteChannel() { return seekableByteChannel; } + @Override public int read(final ByteBuffer dst) throws IOException { synchronized (lock) { diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java index 4d6163dbe..07872bc18 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java @@ -18,7 +18,10 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; import com.spectralogic.ds3client.helpers.Ds3ClientHelpers; +import com.spectralogic.ds3client.helpers.channelbuilders.ReadOnlySeekableByteChannel; import com.spectralogic.ds3client.models.BulkObject; +import com.spectralogic.ds3client.models.JobRequestType; +import com.spectralogic.ds3client.models.MasterObjectList; import java.io.IOException; import java.nio.channels.SeekableByteChannel; @@ -40,6 +43,7 @@ public class SequentialChannelStrategy implements ChannelStrategy { private final ChannelStrategy channelStrategyDelegate; private final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder; private final ChannelPreparable channelPreparer; + private final MasterObjectList masterObjectList; /** * @param channelStrategy The instance of {@link ChannelStrategy} that holds the 1 channel reference a blob needs @@ -51,12 +55,14 @@ public class SequentialChannelStrategy implements ChannelStrategy { * either {@link TruncatingChannelPreparable} or {@link NullChannelPreparable}. */ public SequentialChannelStrategy(final ChannelStrategy channelStrategy, - final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder, - final ChannelPreparable channelPreparer) + final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder, + final ChannelPreparable channelPreparer, + final MasterObjectList masterObjectList) { channelStrategyDelegate = channelStrategy; this.objectChannelBuilder = objectChannelBuilder; this.channelPreparer = channelPreparer; + this.masterObjectList = masterObjectList; } /** @@ -109,10 +115,19 @@ public void releaseChannelForBlob(final SeekableByteChannel seekableByteChannel, blobNameOffsetMap.remove(blobName, blob.getOffset()); - if (blobNameOffsetMap.get(blobName).size() == 0) { + final Long maximumOffset = masterObjectList.getObjects().stream() + .flatMap(objects -> objects.getObjects().stream()) + .filter(bulkObject -> bulkObject.getName().equals(blobName)) + .map(bulkObject -> bulkObject.getOffset()) + .max(Long::compareTo).orElseGet(() -> blob.getOffset()); + + final boolean isReadOnly = ((SeekableByteChannelDecorator) seekableByteChannel).wrappedSeekableByteChannel() instanceof ReadOnlySeekableByteChannel; + + if (blobNameOffsetMap.get(blobName).size() == 0 && (blob.getOffset() == maximumOffset || !(isReadOnly))) { blobNameChannelMap.remove(blobName); channelStrategyDelegate.releaseChannelForBlob(((SeekableByteChannelDecorator)seekableByteChannel).wrappedSeekableByteChannel(), blob); } + } } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/TransferStrategyBuilder.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/TransferStrategyBuilder.java index 3c40684b7..855b9896d 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/TransferStrategyBuilder.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/TransferStrategyBuilder.java @@ -475,26 +475,13 @@ private TransferStrategy makeStreamingPutTransferStrategy() { getOrMakeTransferRetryDecorator(); return makeTransferStrategy( - new BlobStrategyMaker() { - @Override - public BlobStrategy makeBlobStrategy(final Ds3Client client, - final MasterObjectList masterObjectList, - final EventDispatcher eventDispatcher) - { - return new PutSequentialBlobStrategy(ds3Client, - masterObjectList, - eventDispatcher, - getOrMakeChunkAttemptRetryBehavior(), - getOrMakeChunkAllocationRetryDelayBehavior() - ); - } - }, - new TransferMethodMaker() { - @Override - public TransferMethod makeTransferMethod() { - return makePutTransferMethod(); - } - }); + (client, masterObjectList, eventDispatcher) -> new PutSequentialBlobStrategy(ds3Client, + masterObjectList, + eventDispatcher, + getOrMakeChunkAttemptRetryBehavior(), + getOrMakeChunkAllocationRetryDelayBehavior() + ), + this::makePutTransferMethod); } private void maybeMakeStreamedPutChannelStrategy() { @@ -502,7 +489,7 @@ private void maybeMakeStreamedPutChannelStrategy() { Preconditions.checkNotNull(channelBuilder, "channelBuilder my not be null"); channelStrategy = new SequentialChannelStrategy(new SequentialFileReaderChannelStrategy(channelBuilder), - channelBuilder, new NullChannelPreparable()); + channelBuilder, new NullChannelPreparable(), masterObjectList); } } @@ -823,22 +810,12 @@ private TransferStrategy makeStreamingGetTransferStrategy() { getOrMakeTransferRetryDecorator(); return makeTransferStrategy( - new BlobStrategyMaker() { - @Override - public BlobStrategy makeBlobStrategy(final Ds3Client client, final MasterObjectList masterObjectList, final EventDispatcher eventDispatcher) { - return new GetSequentialBlobStrategy(ds3Client, - masterObjectList, - eventDispatcher, - getOrMakeChunkAttemptRetryBehavior(), - getOrMakeChunkAllocationRetryDelayBehavior()); - } - }, - new TransferMethodMaker() { - @Override - public TransferMethod makeTransferMethod() { - return makeGetTransferMethod(); - } - }); + (client, masterObjectList, eventDispatcher) -> new GetSequentialBlobStrategy(ds3Client, + masterObjectList, + eventDispatcher, + getOrMakeChunkAttemptRetryBehavior(), + getOrMakeChunkAllocationRetryDelayBehavior()), + this::makeGetTransferMethod); } private void maybeMakeSequentialGetChannelStrategy() { @@ -846,7 +823,7 @@ private void maybeMakeSequentialGetChannelStrategy() { Preconditions.checkNotNull(channelBuilder, "channelBuilder my not be null"); channelStrategy = new SequentialChannelStrategy(new SequentialFileWriterChannelStrategy(channelBuilder), - channelBuilder, new TruncatingChannelPreparable()); + channelBuilder, new TruncatingChannelPreparable(), masterObjectList); } } diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/ReadOnlySeekableByteChannelInputStream.kt b/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/ReadOnlySeekableByteChannelInputStream.kt new file mode 100644 index 000000000..a5f3d196e --- /dev/null +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/ReadOnlySeekableByteChannelInputStream.kt @@ -0,0 +1,9 @@ +package com.spectralogic.ds3client.utils + +import com.spectralogic.ds3client.helpers.channelbuilders.ReadOnlySeekableByteChannel + +class ReadOnlySeekableByteChannelInputStream( + readOnlySeekableByteChannel: ReadOnlySeekableByteChannel +) : SeekableByteChannelInputStream(readOnlySeekableByteChannel) { + override fun reset(): Unit = Unit +} \ No newline at end of file diff --git a/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/SeekableByteChannelInputStream.java b/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/SeekableByteChannelInputStream.java index 1214a1625..dff55f692 100644 --- a/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/SeekableByteChannelInputStream.java +++ b/ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/SeekableByteChannelInputStream.java @@ -39,7 +39,7 @@ public int read() throws IOException { return -1; } } - + @Override public int read(final byte[] b, final int off, final int len) throws IOException { final ByteBuffer buffer = ByteBuffer.wrap(b); @@ -52,18 +52,18 @@ public int read(final byte[] b, final int off, final int len) throws IOException return -1; } } - + @Override public long skip(final long n) throws IOException { this.seekableByteChannel.position(this.seekableByteChannel.position() + n); return this.seekableByteChannel.position(); } - + @Override public boolean markSupported() { return true; } - + @Override public void mark(final int readlimit) { try { @@ -72,13 +72,13 @@ public void mark(final int readlimit) { throw new RuntimeException(e); } } - + @Override public void reset() throws IOException { this.seekableByteChannel.position(this.markPosition); this.markPosition = 0; } - + @Override public void close() throws IOException { this.seekableByteChannel.close();