Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.spectralogic.ds3client.commands.spectrads3.notifications.*;
import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException;
import com.spectralogic.ds3client.helpers.*;
import com.spectralogic.ds3client.helpers.channelbuilders.ObjectInputStreamBuilder;
import com.spectralogic.ds3client.helpers.events.FailureEvent;
import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner;
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.io.BufferedInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -1859,6 +1861,51 @@ public void testPutJobUsingStreamedTransferStrategy() throws IOException, URISyn
}
}

public class StreamObjectPutter extends ObjectInputStreamBuilder {
final InputStream _is;

public StreamObjectPutter(final InputStream is) {
_is = is;
}

@Override
public InputStream buildInputStream(final String key) {
return _is;
}
}

@Test
public void testPutJobUsingStreamedTransferStrategyAboveChunk() throws IOException, URISyntaxException {
final String DIR_NAME = "books/";
final String[] FILE_NAMES = new String[]{"Paw-3.1.9.zip"};

try {
final Path dirPath = ResourceUtils.loadFileResource(DIR_NAME);

final List<Ds3Object> objectsToWrite = new ArrayList<>();
for (final String book : FILE_NAMES) {
final Path objPath = ResourceUtils.loadFileResource(DIR_NAME + book);
final long bookSize = Files.size(objPath);
final Ds3Object obj = new Ds3Object(book, bookSize);

objectsToWrite.add(obj);
}

final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJobUsingStreamedBehavior(BUCKET_NAME, objectsToWrite, WriteJobOptions.create());
writeJob.transfer(new StreamObjectPutter(new FileInputStream(dirPath.resolve("Paw-3.1.9.zip").toFile())));

final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(client);
final Iterable<Contents> bucketContentsIterable = ds3ClientHelpers.listObjects(BUCKET_NAME);

for (final Contents bucketContents : bucketContentsIterable) {
assertEquals(FILE_NAMES[0], bucketContents.getKey());
}
} finally {
cancelAllJobsForBucket(client, BUCKET_NAME);
deleteAllContents(client, BUCKET_NAME);
}
}

@Test
public void testPutJobUsingRandomAccessTransferStrategy() throws IOException, URISyntaxException {
final String DIR_NAME = "books/";
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,14 @@ public void setBufferSize(final int bufferSize) {
this.bufferSize = bufferSize;
}

public long getBufferSize() {
return bufferSize;
}

@Override
public void writeTo(final OutputStream outStream) throws IOException {
final long startTime = PerformanceUtils.getCurrentTime();
final long totalBytes = IOUtils.copy(this.getContent(), outStream, bufferSize, path, true);
final long endTime = PerformanceUtils.getCurrentTime();

if (this.getContentLength() != -1 && totalBytes != this.getContentLength()) {
throw new ContentLengthNotMatchException(path, this.getContentLength(), totalBytes);
final long length = this.getContentLength();
if (length != -1 && totalBytes != length) {
throw new ContentLengthNotMatchException(path, length, totalBytes);
}

PerformanceUtils.logMbps(startTime, endTime, totalBytes, path, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.spectralogic.ds3client.networking.HttpVerb;
import com.spectralogic.ds3client.commands.interfaces.AbstractRequest;
import java.util.UUID;
import com.google.common.net.UrlEscapers;
import javax.annotation.Nonnull;
import com.google.common.base.Preconditions;
import com.spectralogic.ds3client.utils.SeekableByteChannelInputStream;
Expand Down Expand Up @@ -63,7 +62,7 @@ public PutMultiPartUploadPartRequest(final String bucketName, final String objec
this.stream = new SeekableByteChannelInputStream(channel);
}


public PutMultiPartUploadPartRequest(final String bucketName, final String objectName, @Nonnull final SeekableByteChannel channel, final int partNumber, final long size, final String uploadId) {
Preconditions.checkNotNull(channel, "Channel may not be null.");
this.bucketName = bucketName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
public class ReadOnlySeekableByteChannel implements SeekableByteChannel {

private final ReadableByteChannel channel;
private int size = 0;
private int position = 0;
private long size = 0;
private long position = 0;

public ReadOnlySeekableByteChannel(final ReadableByteChannel channel) {
this.channel = channel;
Expand All @@ -40,7 +40,7 @@ public ReadableByteChannel getChannel() {
public int read(final ByteBuffer dst) throws IOException {
size = channel.read(dst);
position += size;
return size;
return Long.valueOf(size).intValue();
Copy link
Contributor

@rpmoore rpmoore Nov 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the lack of this cause problems when a blob was over 4GB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the places it failed, I can't say if this is wholly the issue though.

}

@Override
Expand All @@ -55,6 +55,9 @@ public long position() throws IOException {

@Override
public SeekableByteChannel position(final long newPosition) throws IOException {
if (position == 0 || newPosition == 0) {
this.position = newPosition;
}
if (newPosition == this.position) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ class SeekableByteChannelDecorator implements SeekableByteChannel {
seekableByteChannel.position(blobOffset);
}

SeekableByteChannel wrappedSeekableByteChannel() {
public SeekableByteChannel wrappedSeekableByteChannel() {
return seekableByteChannel;
}


@Override
public int read(final ByteBuffer dst) throws IOException {
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers;
import com.spectralogic.ds3client.helpers.channelbuilders.ReadOnlySeekableByteChannel;
import com.spectralogic.ds3client.models.BulkObject;
import com.spectralogic.ds3client.models.JobRequestType;
import com.spectralogic.ds3client.models.MasterObjectList;

import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
Expand All @@ -40,6 +43,7 @@ public class SequentialChannelStrategy implements ChannelStrategy {
private final ChannelStrategy channelStrategyDelegate;
private final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder;
private final ChannelPreparable channelPreparer;
private final MasterObjectList masterObjectList;

/**
* @param channelStrategy The instance of {@link ChannelStrategy} that holds the 1 channel reference a blob needs
Expand All @@ -51,12 +55,14 @@ public class SequentialChannelStrategy implements ChannelStrategy {
* either {@link TruncatingChannelPreparable} or {@link NullChannelPreparable}.
*/
public SequentialChannelStrategy(final ChannelStrategy channelStrategy,
final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder,
final ChannelPreparable channelPreparer)
final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder,
final ChannelPreparable channelPreparer,
final MasterObjectList masterObjectList)
{
channelStrategyDelegate = channelStrategy;
this.objectChannelBuilder = objectChannelBuilder;
this.channelPreparer = channelPreparer;
this.masterObjectList = masterObjectList;
}

/**
Expand Down Expand Up @@ -109,10 +115,19 @@ public void releaseChannelForBlob(final SeekableByteChannel seekableByteChannel,

blobNameOffsetMap.remove(blobName, blob.getOffset());

if (blobNameOffsetMap.get(blobName).size() == 0) {
final Long maximumOffset = masterObjectList.getObjects().stream()
.flatMap(objects -> objects.getObjects().stream())
.filter(bulkObject -> bulkObject.getName().equals(blobName))
.map(bulkObject -> bulkObject.getOffset())
.max(Long::compareTo).orElseGet(() -> blob.getOffset());

final boolean isReadOnly = ((SeekableByteChannelDecorator) seekableByteChannel).wrappedSeekableByteChannel() instanceof ReadOnlySeekableByteChannel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the cast here needed in order to perform the instanceof check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know it is a SeekableByteChannelDecorator because we wrap it further up, but the wrappedSeekableByteChannel() is only on the Decorator


if (blobNameOffsetMap.get(blobName).size() == 0 && (blob.getOffset() == maximumOffset || !(isReadOnly))) {
blobNameChannelMap.remove(blobName);
channelStrategyDelegate.releaseChannelForBlob(((SeekableByteChannelDecorator)seekableByteChannel).wrappedSeekableByteChannel(), blob);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,34 +475,21 @@ private TransferStrategy makeStreamingPutTransferStrategy() {
getOrMakeTransferRetryDecorator();

return makeTransferStrategy(
new BlobStrategyMaker() {
@Override
public BlobStrategy makeBlobStrategy(final Ds3Client client,
final MasterObjectList masterObjectList,
final EventDispatcher eventDispatcher)
{
return new PutSequentialBlobStrategy(ds3Client,
masterObjectList,
eventDispatcher,
getOrMakeChunkAttemptRetryBehavior(),
getOrMakeChunkAllocationRetryDelayBehavior()
);
}
},
new TransferMethodMaker() {
@Override
public TransferMethod makeTransferMethod() {
return makePutTransferMethod();
}
});
(client, masterObjectList, eventDispatcher) -> new PutSequentialBlobStrategy(ds3Client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

masterObjectList,
eventDispatcher,
getOrMakeChunkAttemptRetryBehavior(),
getOrMakeChunkAllocationRetryDelayBehavior()
),
this::makePutTransferMethod);
}

private void maybeMakeStreamedPutChannelStrategy() {
if (channelStrategy == null) {
Preconditions.checkNotNull(channelBuilder, "channelBuilder my not be null");

channelStrategy = new SequentialChannelStrategy(new SequentialFileReaderChannelStrategy(channelBuilder),
channelBuilder, new NullChannelPreparable());
channelBuilder, new NullChannelPreparable(), masterObjectList);
}
}

Expand Down Expand Up @@ -823,30 +810,20 @@ private TransferStrategy makeStreamingGetTransferStrategy() {
getOrMakeTransferRetryDecorator();

return makeTransferStrategy(
new BlobStrategyMaker() {
@Override
public BlobStrategy makeBlobStrategy(final Ds3Client client, final MasterObjectList masterObjectList, final EventDispatcher eventDispatcher) {
return new GetSequentialBlobStrategy(ds3Client,
masterObjectList,
eventDispatcher,
getOrMakeChunkAttemptRetryBehavior(),
getOrMakeChunkAllocationRetryDelayBehavior());
}
},
new TransferMethodMaker() {
@Override
public TransferMethod makeTransferMethod() {
return makeGetTransferMethod();
}
});
(client, masterObjectList, eventDispatcher) -> new GetSequentialBlobStrategy(ds3Client,
masterObjectList,
eventDispatcher,
getOrMakeChunkAttemptRetryBehavior(),
getOrMakeChunkAllocationRetryDelayBehavior()),
this::makeGetTransferMethod);
}

private void maybeMakeSequentialGetChannelStrategy() {
if (channelStrategy == null) {
Preconditions.checkNotNull(channelBuilder, "channelBuilder my not be null");

channelStrategy = new SequentialChannelStrategy(new SequentialFileWriterChannelStrategy(channelBuilder),
channelBuilder, new TruncatingChannelPreparable());
channelBuilder, new TruncatingChannelPreparable(), masterObjectList);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.spectralogic.ds3client.utils

import com.spectralogic.ds3client.helpers.channelbuilders.ReadOnlySeekableByteChannel

class ReadOnlySeekableByteChannelInputStream(
readOnlySeekableByteChannel: ReadOnlySeekableByteChannel
) : SeekableByteChannelInputStream(readOnlySeekableByteChannel) {
override fun reset(): Unit = Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public int read() throws IOException {
return -1;
}
}

@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
final ByteBuffer buffer = ByteBuffer.wrap(b);
Expand All @@ -52,18 +52,18 @@ public int read(final byte[] b, final int off, final int len) throws IOException
return -1;
}
}

@Override
public long skip(final long n) throws IOException {
this.seekableByteChannel.position(this.seekableByteChannel.position() + n);
return this.seekableByteChannel.position();
}

@Override
public boolean markSupported() {
return true;
}

@Override
public void mark(final int readlimit) {
try {
Expand All @@ -72,13 +72,13 @@ public void mark(final int readlimit) {
throw new RuntimeException(e);
}
}

@Override
public void reset() throws IOException {
this.seekableByteChannel.position(this.markPosition);
this.markPosition = 0;
}

@Override
public void close() throws IOException {
this.seekableByteChannel.close();
Expand Down