Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The SDK can also be included directly into a Maven or Gradle build. There is als
<dependency>
<groupId>com.spectralogic.ds3</groupId>
<artifactId>ds3-sdk</artifactId>
<version>3.5.2</version>
<version>3.5.3</version>
<!-- <classifier>all</classifier> -->
</dependency>
...
Expand All @@ -63,8 +63,8 @@ repositories {

dependencies {
...
compile 'com.spectralogic.ds3:ds3-sdk:3.5.2'
// compile 'com.spectralogic.ds3:ds3-sdk:3.5.2:all'
compile 'com.spectralogic.ds3:ds3-sdk:3.5.3'
// compile 'com.spectralogic.ds3:ds3-sdk:3.5.3:all'
...
}

Expand Down
2 changes: 1 addition & 1 deletion SETUP.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ If using Eclipse:
* In the results that are returned install `Gradle Integration for Eclipse (4.4) 3.7.1.RELEASE` or current version. Clicking `Install` will take you to the 'Confirm Selected Features' dialog. (De-select the two optional Spring checkboxes on older versions). `Gradle IDE` and `org.gradle.toolingapi.feature` should both be selected. Accept any dialogs that popup.
* After Gradle has been installed into eclipse and git has been installed, you should be able to clone the repo and import that project into eclipse

If using Intelllij:
If using Intellij:
* Open Intellij and select `Import Project`
* Find the `build.gradle` file contained at the root of the project and select it
* Accept the defaults
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ buildscript {

allprojects {
group = 'com.spectralogic.ds3'
version = '3.5.2'
version = '3.5.3'
}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.spectralogic.ds3client.helpers;

import com.spectralogic.ds3client.Ds3Client;
import com.spectralogic.ds3client.commands.PutObjectResponse;
import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner;
import com.spectralogic.ds3client.integration.Util;
import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.spectralogic.ds3client.helpers.events.FailureEvent;
import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner;
import com.spectralogic.ds3client.helpers.options.ReadJobOptions;
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.BlobStrategy;
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.ChunkAttemptRetryBehavior;
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.ChunkAttemptRetryDelayBehavior;
Expand Down Expand Up @@ -83,7 +84,9 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -101,11 +104,15 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request.MIN_UPLOAD_SIZE_IN_BYTES;
import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME;
import static com.spectralogic.ds3client.integration.Util.deleteAllContents;
import static com.spectralogic.ds3client.integration.Util.deleteBucketContents;
Expand Down Expand Up @@ -1449,4 +1456,68 @@ public void testThatFifoIsNotProcessed() throws IOException, InterruptedExceptio

assertTrue(caughtException.get());
}

@Test
public void testStreamedGetJobWithBlobbedFile() throws Exception {
final int chunkSize = MIN_UPLOAD_SIZE_IN_BYTES;
final long biggerThanAChunkSize = chunkSize * 2L + 1024;

final int numIntsInBiggerThanAChunkSize = (int)biggerThanAChunkSize / 4;

final String originalFileName = "Gracie.bin";
final String movedFileName = "Gracie.bak";

try {
final DataOutputStream originalFileStream = new DataOutputStream(new FileOutputStream(originalFileName));

byte[] bytes = new byte[4];

for (int i = 0; i < numIntsInBiggerThanAChunkSize; ++i) {
bytes[0] = (byte)i;
bytes[1] = (byte)(i >> 8);
bytes[2] = (byte)(i >> 16);
bytes[3] = (byte)(i >> 24);
originalFileStream.write(bytes);
}

originalFileStream.close();

final Ds3Object ds3Object = new Ds3Object();
ds3Object.setName(originalFileName);
ds3Object.setSize(biggerThanAChunkSize);

final AtomicLong numBytesTransferred = new AtomicLong(0);

final WriteJobOptions writeJobOptions = WriteJobOptions.create();
writeJobOptions.withMaxUploadSize(chunkSize);

final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJob(BUCKET_NAME, Collections.singletonList(ds3Object), writeJobOptions);
writeJob.attachDataTransferredListener(numBytesTransferred::addAndGet);

final CountDownLatch writeCountDownLatch = new CountDownLatch(1);

writeJob.attachObjectCompletedListener(name -> writeCountDownLatch.countDown());

writeJob.transfer(new FileObjectPutter(Paths.get(".")));

writeCountDownLatch.await();

assertEquals(biggerThanAChunkSize, numBytesTransferred.get());

Files.move(Paths.get(originalFileName), Paths.get(movedFileName));

final CountDownLatch readCountdownLatch = new CountDownLatch(1);

final Ds3ClientHelpers.Job readJob = HELPERS.startReadJobUsingStreamedBehavior(BUCKET_NAME, Collections.singletonList(ds3Object));
readJob.attachObjectCompletedListener(name -> readCountdownLatch.countDown());
readJob.transfer(new FileObjectGetter(Paths.get(".")));

readCountdownLatch.await();

assertTrue(FileUtils.contentEquals(new File(movedFileName), new File(originalFileName)));
} finally {
Files.deleteIfExists(Paths.get(originalFileName));
Files.deleteIfExists(Paths.get(movedFileName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -66,9 +68,12 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request.MIN_UPLOAD_SIZE_IN_BYTES;
import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME;
import static com.spectralogic.ds3client.integration.Util.deleteAllContents;
import static org.hamcrest.Matchers.*;
Expand Down Expand Up @@ -2059,4 +2064,70 @@ public void testThatNonExistentFileDoesNotStopPutJob() throws IOException {
assertTrue(caughtNoSuchFileException.get());
assertTrue(getJobRan.get());
}

@Test
public void testStreamedPutJobWithBlobbedFile() throws Exception {
final int chunkSize = MIN_UPLOAD_SIZE_IN_BYTES;
final long biggerThanAChunkSize = chunkSize * 2L + 1024;

final int numIntsInBiggerThanAChunkSize = (int)biggerThanAChunkSize / 4;

final String originalFileName = "Gracie.bin";
final String movedFileName = "Gracie.bak";

try {
final DataOutputStream originalFileStream = new DataOutputStream(new FileOutputStream(originalFileName));

byte[] bytes = new byte[4];

for (int i = 0; i < numIntsInBiggerThanAChunkSize; ++i) {
bytes[0] = (byte)i;
bytes[1] = (byte)(i >> 8);
bytes[2] = (byte)(i >> 16);
bytes[3] = (byte)(i >> 24);
originalFileStream.write(bytes);
}

originalFileStream.close();

final Ds3Object ds3Object = new Ds3Object();
ds3Object.setName(originalFileName);
ds3Object.setSize(biggerThanAChunkSize);

final AtomicLong numBytesTransferred = new AtomicLong(0);

final WriteJobOptions writeJobOptions = WriteJobOptions.create();
writeJobOptions.withMaxUploadSize(chunkSize);

final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJobUsingStreamedBehavior(BUCKET_NAME, Collections.singletonList(ds3Object), writeJobOptions);
writeJob.attachDataTransferredListener(numBytesTransferred::addAndGet);

final CountDownLatch writeCountDownLatch = new CountDownLatch(1);

writeJob.attachObjectCompletedListener(name -> writeCountDownLatch.countDown());

writeJob.transfer(new FileObjectPutter(Paths.get(".")));

writeCountDownLatch.await();

assertEquals(biggerThanAChunkSize, numBytesTransferred.get());

Files.move(Paths.get(originalFileName), Paths.get(movedFileName));

final CountDownLatch readCountdownLatch = new CountDownLatch(1);

final Ds3ClientHelpers.Job readJob = HELPERS.startReadJob(BUCKET_NAME, Collections.singletonList(ds3Object));
readJob.attachObjectCompletedListener(name -> readCountdownLatch.countDown());
readJob.transfer(new FileObjectGetter(Paths.get(".")));

readCountdownLatch.await();

assertTrue(FileUtils.contentEquals(new File(movedFileName), new File(originalFileName)));
} finally {
deleteAllContents(client, BUCKET_NAME);

Files.deleteIfExists(Paths.get(originalFileName));
Files.deleteIfExists(Paths.get(movedFileName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ContentLengthNotMatchException extends IOException {
private final long contentLength;
private final long totalBytes;
public ContentLengthNotMatchException(final String fileName, final long contentLength, final long totalBytes) {
super(String.format("The Content length for %s (%d) not match the number of byte read (%d)", fileName, contentLength, totalBytes));
super(String.format("The Content length for %s (%d) does not match the number of bytes read (%d)", fileName, contentLength, totalBytes));

this.fileName = fileName;
this.contentLength = contentLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.spectralogic.ds3client.Ds3Client;
import com.spectralogic.ds3client.commands.PutObjectResponse;
import com.spectralogic.ds3client.helpers.options.ReadJobOptions;
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
import com.spectralogic.ds3client.helpers.pagination.FileSystemKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,52 +25,124 @@
* An instance of {@link SeekableByteChannel} used to decorate another SeekableByteChannel in the
* situation where we re-use the same channel for more than 1 blob. This subclass prevents closing
* a channel when there are other blobs still referencing the shared channel.
*
* This class positions the content of a blob within the bounds of a channel that may be capable
* of containing more than one blob.
*/
class SeekableByteChannelDecorator implements SeekableByteChannel {
private final SeekableByteChannel seekableByteChannel;
private final Object lock = new Object();

SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel) {
Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null");
private final SeekableByteChannel seekableByteChannel;
private final long blobOffset;
private final long blobLength;
private long nextAvailableByteOffset = 0;

SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel, final long blobOffset, final long blobLength) throws IOException {
Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null.");
Preconditions.checkArgument(blobOffset >= 0, "blobOffset must be >= 0.");
Preconditions.checkArgument(blobLength >= 0, "blobLength must be >= 0.");
this.seekableByteChannel = seekableByteChannel;
this.blobOffset = blobOffset;
this.blobLength = blobLength;

seekableByteChannel.position(blobOffset);
}

protected SeekableByteChannel wrappedSeekableByteChannel() {
SeekableByteChannel wrappedSeekableByteChannel() {
return seekableByteChannel;
}

@Override
public int read(final ByteBuffer dst) throws IOException {
return seekableByteChannel.read(dst);
synchronized (lock) {
final long remainingInWindow = blobLength - nextAvailableByteOffset;
final long numBytesWeCanRead = Math.min(dst.remaining(), remainingInWindow);

if (numBytesWeCanRead <= 0) {
return 0;
}

final int numBytesRead;

if (numBytesWeCanRead != dst.remaining()) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[(int) numBytesWeCanRead]);
numBytesRead = seekableByteChannel.read(byteBuffer);
byteBuffer.flip();
dst.put(byteBuffer);
} else {
numBytesRead = seekableByteChannel.read(dst);
}

nextAvailableByteOffset += numBytesRead;

return numBytesRead;
}
}

@Override
public int write(final ByteBuffer src) throws IOException {
return seekableByteChannel.write(src);
synchronized (lock) {
final long remainingInWindow = blobLength - nextAvailableByteOffset;
final long numBytesWeCanWrite = Math.min(src.remaining(), remainingInWindow);

if (numBytesWeCanWrite <= 0) {
return 0;
}

final int numBytesWritten;

if (numBytesWeCanWrite != src.remaining()) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[(int) numBytesWeCanWrite]);
byteBuffer.put(src);
byteBuffer.flip();
numBytesWritten = seekableByteChannel.write(byteBuffer);
} else {
numBytesWritten = seekableByteChannel.write(src);
}

nextAvailableByteOffset += numBytesWritten;

return numBytesWritten;
}
}

@Override
public long position() throws IOException {
return seekableByteChannel.position();
synchronized (lock) {
return seekableByteChannel.position();
}
}

@Override
public SeekableByteChannel position(final long newPosition) throws IOException {
return seekableByteChannel.position(newPosition);
synchronized (lock) {
final long greatestPossiblePosition = blobLength - 1;
nextAvailableByteOffset = Math.min(newPosition, greatestPossiblePosition);
seekableByteChannel.position(blobOffset + nextAvailableByteOffset);

return this;
}
}

@Override
public long size() throws IOException {
return seekableByteChannel.size();
synchronized (lock) {
return seekableByteChannel.size();
}
}

@Override
public SeekableByteChannel truncate(final long size) throws IOException {
return seekableByteChannel.truncate(size);
synchronized (lock) {
return seekableByteChannel.truncate(size);
}
}

@Override
public boolean isOpen() {
return seekableByteChannel.isOpen();
synchronized (lock) {
return seekableByteChannel.isOpen();
}
}

@Override
Expand Down
Loading