Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1dc10c6
Implementing retry based on a network transfer where the content leng…
GraciesPadre Dec 14, 2016
ae5c491
Fixing an integration test I broke when I hijacked a file it needed f…
GraciesPadre Dec 14, 2016
ad7e7c5
No longer closing the request channel in GetObjectResponseParser, as …
GraciesPadre Dec 14, 2016
f43ffc9
Adding a RangeHelper to generate a new set of ranges form an existing…
GraciesPadre Dec 16, 2016
6df768d
Emitting failure event when we detect http content length mismatch
GraciesPadre Dec 16, 2016
23fbf1b
Remove file we transferred to prevent breaking tests
GraciesPadre Dec 16, 2016
2dce775
Fixing failing integration tests
GraciesPadre Dec 19, 2016
baa04d0
Fixing failing unit tests
GraciesPadre Dec 21, 2016
99e320a
Fixing failing unit tests
GraciesPadre Dec 21, 2016
2996a0a
Beginning to merge from master
GraciesPadre Dec 21, 2016
26fc078
Beginning to merge from master
GraciesPadre Dec 21, 2016
8bd954c
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Dec 21, 2016
a7390cd
Replacing a literal with a named variable.
GraciesPadre Dec 21, 2016
064242c
Make it clearer what we base the destination channel offset on during…
GraciesPadre Dec 21, 2016
3af2e17
Make it clearer that this test is using a non-0 offset into the first…
GraciesPadre Dec 21, 2016
f01f677
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Dec 21, 2016
65ecc23
Restoring back to what the autogen code was before. Moving the chann…
GraciesPadre Dec 21, 2016
a5369c8
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Dec 22, 2016
d2fc6be
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Dec 22, 2016
253d2bf
Removing unhygenic sleep from tests that exec a command on Windoze to…
GraciesPadre Dec 22, 2016
6780c2a
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Dec 23, 2016
69776e1
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Jan 3, 2017
00e857d
Responding to code review comments.
GraciesPadre Jan 4, 2017
ce2b4c7
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Jan 5, 2017
8ddcc5f
Merge branch 'master' into partial_read_bug
GraciesPadre Jan 6, 2017
fac5562
Removing unnecessary imports
GraciesPadre Jan 6, 2017
09c2c83
Merge branch 'master' of https://github.com/SpectraLogic/ds3_java_sdk…
GraciesPadre Jan 6, 2017
940a05e
Merge branch 'partial_read_bug' of https://github.com/GraciesPadre/ds…
GraciesPadre Jan 6, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ public void testObjectsFitBucketPathLacksAccess() throws IOException, Interrupte
final Path directory = Files.createDirectory(Paths.get("dir"));
if (org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS) {
// Deny write data access to everyone, making the directory unwritable
Runtime.getRuntime().exec("icacls dir /deny Everyone:(WD)");
// Exec can return before the command completes. Delay for long enough to let the command finish.
Thread.sleep(5000);
Runtime.getRuntime().exec("icacls dir /deny Everyone:(WD)").waitFor();
} else {
directory.toFile().setWritable(false);
}
Expand All @@ -178,8 +176,7 @@ public void testObjectsFitBucketPathLacksAccess() throws IOException, Interrupte
} finally {
if (org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS) {
// Grant write data access to everyone, making the directory writable, so we can delete it.
Runtime.getRuntime().exec("icacls dir /grant Everyone:(WD)");
Thread.sleep(5000);
Runtime.getRuntime().exec("icacls dir /grant Everyone:(WD)").waitFor();
} else {
directory.toFile().setWritable(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public void testPartialRetriesWithInjectedFailures() throws NoSuchMethodExceptio
final Ds3ClientShim ds3ClientShim = new Ds3ClientShim((Ds3ClientImpl) client);

final int maxNumBlockAllocationRetries = 1;
final int maxNumObjectTransferAttempts = 5;
final int maxNumObjectTransferAttempts = 4;
final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(ds3ClientShim,
maxNumBlockAllocationRetries,
maxNumObjectTransferAttempts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@

public class ContentLengthNotMatchException extends IOException {
private final String fileName;
private final long contentLenght;
private final long contentLength;
private final long totalBytes;
public ContentLengthNotMatchException(final String fileName, final long contentLenght, final long totalBytes) {
super(String.format("The Content length for %s (%d) not match the number of byte read (%d)", fileName, contentLenght, totalBytes));
public ContentLengthNotMatchException(final String fileName, final long contentLength, final long totalBytes) {
super(String.format("The Content length for %s (%d) not match the number of byte read (%d)", fileName, contentLength, totalBytes));

this.fileName = fileName;
this.contentLenght = contentLenght;
this.contentLength = contentLength;
this.totalBytes = totalBytes;
}

public String getFileName() {
return this.fileName;
}

public long getContentLenght() {
return this.contentLenght;
public long getContentLength() {
return this.contentLength;
}

public long getTotalBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ protected FailureEvent makeFailureEvent(final FailureEvent.FailureActivity failu
final Throwable causalException,
final Objects chunk)
{
return new FailureEvent.Builder()
return FailureEvent.builder()
.doingWhat(failureActivity)
.withCausalException(causalException)
.withObjectNamed(getLabelForChunk(chunk))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* ****************************************************************************
* Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* ****************************************************************************
*/

package com.spectralogic.ds3client.helpers;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import com.spectralogic.ds3client.models.common.Range;
import com.spectralogic.ds3client.utils.Guard;

import java.util.ArrayList;
import java.util.List;

final class RangeHelper {
private RangeHelper() {}

static ImmutableCollection<Range> replaceRange(final ImmutableCollection<Range> existingRanges,
final long numBytesTransferred,
final long intendedNumBytesToTransfer)
{
Preconditions.checkState(numBytesTransferred >= 0, "numBytesTransferred must be >= 0.");
Preconditions.checkState(intendedNumBytesToTransfer > 0, "intendedNumBytesToTransfer must be > 0.");
Preconditions.checkState(intendedNumBytesToTransfer > numBytesTransferred, "intendedNumBytesToTransfer must be > numBytesTransferred");

if (Guard.isNullOrEmpty(existingRanges)) {
return ImmutableList.of(Range.byLength(numBytesTransferred, intendedNumBytesToTransfer - numBytesTransferred));
}

final List<Range> newRanges = new ArrayList<>();

final UnmodifiableIterator<Range> existingRangesIterator = existingRanges.iterator();

long previousAccumulatedBytesInRanges = 0;
long currentAccumulatedBytesInRanges = existingRanges.iterator().next().getLength();

while (existingRangesIterator.hasNext()) {
final Range existingRange = existingRangesIterator.next();

if (numBytesTransferred < currentAccumulatedBytesInRanges) {
final Range firstNewRange = Range.byPosition(existingRange.getStart() - previousAccumulatedBytesInRanges + numBytesTransferred, existingRange.getEnd());
newRanges.add(firstNewRange);

addRemainingRanges(existingRangesIterator, newRanges);
break;
}

previousAccumulatedBytesInRanges += existingRange.getLength();
currentAccumulatedBytesInRanges += existingRange.getLength();
}

return ImmutableList.copyOf(newRanges);
}

static void addRemainingRanges(final UnmodifiableIterator<Range> existingRangesIterator, final List<Range> newRanges) {
while (existingRangesIterator.hasNext()) {
newRanges.add(existingRangesIterator.next());
}
}

static long transferSizeForRanges(final ImmutableCollection<Range> existingRanges) {
long result = 0;

if (Guard.isNullOrEmpty(existingRanges)) {
return result;
}

for (final Range range : existingRanges) {
result += range.getLength();
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.spectralogic.ds3client.Ds3Client;
import com.spectralogic.ds3client.commands.GetObjectRequest;
import com.spectralogic.ds3client.commands.GetObjectResponse;
import com.spectralogic.ds3client.exceptions.ContentLengthNotMatchException;
import com.spectralogic.ds3client.helpers.ChunkTransferrer.ItemTransferrer;
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.ObjectChannelBuilder;
import com.spectralogic.ds3client.helpers.events.EventRunner;
Expand Down Expand Up @@ -51,7 +52,7 @@ public ReadJobImpl(
final int retryAfter,
final int retryDelay,
final EventRunner eventRunner
) {
) {
super(client, masterObjectList, objectTransferAttempts, eventRunner);

this.blobToRanges = PartialObjectHelpers.mapRangesToBlob(masterObjectList.getObjects(), objectRanges);
Expand Down Expand Up @@ -144,10 +145,10 @@ protected JobPartTrackerDecorator makeJobPartTracker(final List<Objects> chunks,
}

private final class GetObjectTransferrerRetryDecorator implements ItemTransferrer {
private final GetObjectTransferrer getObjectTransferrer;
private final ItemTransferrer getObjectTransferrer;

private GetObjectTransferrerRetryDecorator(final JobState jobState) {
getObjectTransferrer = new GetObjectTransferrer(jobState);
getObjectTransferrer = new GetObjectTransferrerNetworkFailureDecorator(jobState);
}

@Override
Expand All @@ -156,6 +157,103 @@ public void transferItem(final Ds3Client client, final BulkObject ds3Object) thr
}
}

private final class GetObjectTransferrerNetworkFailureDecorator implements ItemTransferrer {
private final JobState jobState;
private Long numBytesToTransfer;
private ImmutableCollection<Range> ranges;
private Long destinationChannelOffset = 0L;
private ItemTransferrer itemTransferrer;

private GetObjectTransferrerNetworkFailureDecorator(final JobState jobState) {
this.jobState = jobState;
itemTransferrer = new GetObjectTransferrer(jobState);
}

@Override
public void transferItem(final Ds3Client client, final BulkObject ds3Object) throws IOException {
try {
itemTransferrer.transferItem(client, ds3Object);
} catch (final ContentLengthNotMatchException contentLengthNotMatchException) {
makeNewItemTransferrer(ds3Object, contentLengthNotMatchException.getTotalBytes());

emitContentLengthMismatchFailureEvent(ds3Object, contentLengthNotMatchException);

throw new RecoverableIOException(contentLengthNotMatchException);
}
}

private void makeNewItemTransferrer(final BulkObject ds3Object, final long numBytesTransferred) {
initializeRangesAndTransferSize(ds3Object);
updateRanges(numBytesTransferred);
destinationChannelOffset += numBytesTransferred;

itemTransferrer = new GetPartialObjectTransferrer(jobState, ranges, destinationChannelOffset);
}

private void initializeRangesAndTransferSize(final BulkObject ds3Object) {
if (ranges == null) {
ranges = getRangesForBlob(blobToRanges, ds3Object);
}

if (ranges == null) {
final long numBytesTransferred = 0;
ranges = RangeHelper.replaceRange(ranges, numBytesTransferred, ds3Object.getLength());
}

if (numBytesToTransfer == null) {
numBytesToTransfer = RangeHelper.transferSizeForRanges(ranges);
}
}

private void updateRanges(final long numBytesTransferred) {
ranges = RangeHelper.replaceRange(ranges, numBytesTransferred, numBytesToTransfer);
}

private void emitContentLengthMismatchFailureEvent(final BulkObject ds3Object, final Throwable t) {
final FailureEvent failureEvent = FailureEvent.builder()
.doingWhat(FailureEvent.FailureActivity.GettingObject)
.usingSystemWithEndpoint(client.getConnectionDetails().getEndpoint())
.withCausalException(t)
.withObjectNamed(ds3Object.getName())
.build();
emitFailureEvent(failureEvent);
}
}

private final class GetPartialObjectTransferrer implements ItemTransferrer {
private final JobState jobState;
private final ImmutableCollection<Range> ranges;
private final long destinationChannelOffset;

private GetPartialObjectTransferrer(final JobState jobState,
final ImmutableCollection<Range> ranges,
final long destinationChannelOffset)
{
this.jobState = jobState;
this.ranges = ranges;
this.destinationChannelOffset = destinationChannelOffset;
}

@Override
public void transferItem(final Ds3Client client, final BulkObject ds3Object) throws IOException {
final GetObjectRequest getObjectRequest = new GetObjectRequest(
ReadJobImpl.this.masterObjectList.getBucketName(),
ds3Object.getName(),
jobState.getChannel(ds3Object.getName(), destinationChannelOffset, RangeHelper.transferSizeForRanges(ranges)),
ReadJobImpl.this.getJobId().toString(),
ds3Object.getOffset()
);

getObjectRequest.withByteRanges(ranges);

final GetObjectResponse response = client.getObject(getObjectRequest);
final Metadata metadata = response.getMetadata();

emitChecksumEvents(ds3Object, response.getChecksumType(), response.getChecksum());
sendMetadataEvents(ds3Object.getName(), metadata);
}
}

private final class GetObjectTransferrer implements ItemTransferrer {
private final JobState jobState;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.spectralogic.ds3client.helpers;

import java.io.IOException;

/**
* A RecoverableIOException is used to classify IOExceptions into one of 2 types:
* those that should result in retrying a data transfer, such as a failure in an HTTP GET or PUT;
* and those that should <b>not</b> result in retrying a data transfer, such as a failure writing to
* or reading from a file. During an HTTP PUT or GET that involves transferring data to or from
* a file, file system failures are considered non-recoverable, where network-related failures
* are considered recoverable.
*/
public class RecoverableIOException extends IOException {
public RecoverableIOException() {
super();
}

public RecoverableIOException(final String message) {
super(message);
}

public RecoverableIOException(final String message, final Throwable cause) {
super(message, cause);
}

public RecoverableIOException(final Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public Throwable getCausalException() {
return causalException;
}

public static FailureEvent.Builder builder() {
return new FailureEvent.Builder();
}

@Override
public String toString() {
return "Failure " + doingWhat().getActivityText() + " with object named \"" + withObjectNamed() + "\" using system with endpoint " + usingSystemWithEndpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@ public boolean equals(final Object obj) {
public int compareTo(final Range o) {
return Long.compare(this.getStart(), o.getStart());
}

@Override
public String toString() {
return "Range{" +
"start=" + start +
", end=" + end +
", length=" + length +
'}';
}
}
Loading