Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,23 @@ public static Optional<MultipartDownloadResumeContext> multipartDownloadResumeCo
}

/**
* Parses the start byte from a Content-Range header.
*
* Parses start byte and end byte from a Content-Range header.
*
* @param contentRange the Content-Range header value (e.g., "bytes 0-1023/2048")
* @return the start byte position, or -1 if parsing fails
* @return array of [startByte, endByte], or null if parsing fails
*/
public static long parseStartByteFromContentRange(String contentRange) {
public static long[] parseContentRange(String contentRange) {
if (contentRange == null) {
return -1;
return null;
}
Matcher matcher = CONTENT_RANGE_PATTERN.matcher(contentRange);
if (!matcher.matches()) {
return -1;
return null;
}
return Long.parseLong(matcher.group(1));
return new long[] {
Long.parseLong(matcher.group(1)),
Long.parseLong(matcher.group(2))
};
}

/**
Expand All @@ -111,4 +114,15 @@ public static Optional<Long> parseContentRangeForTotalSize(String contentRange)
return Optional.of(Long.parseLong(matcher.group(3)));
}

/**
* Calculates the total number of parts needed to download an object of the given size.
*
* @param contentLength total object size in bytes
* @param partSize size of each part in bytes
* @return the number of parts
*/
public static int calculateTotalParts(long contentLength, long partSize) {
return (int) Math.ceil((double) contentLength / partSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class ParallelPresignedUrlMultipartDownloaderSubscriber
implements Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> {

private static final Logger log = Logger.loggerFor(ParallelPresignedUrlMultipartDownloaderSubscriber.class);
private static final String BYTES_RANGE_PREFIX = "bytes=";

private final S3AsyncClient s3AsyncClient;
private final PresignedUrlDownloadRequest presignedUrlDownloadRequest;
Expand All @@ -66,7 +65,10 @@ public class ParallelPresignedUrlMultipartDownloaderSubscriber
private final AtomicInteger partNumber = new AtomicInteger(0);
private final AtomicInteger completedParts = new AtomicInteger(0);
private final Semaphore inFlightPermits;
private final AtomicBoolean isCompletedExceptionally = new AtomicBoolean(false);
/**
* CAS gate ensuring only the first part failure triggers error handling and cancellation.
*/
private final AtomicBoolean downloadFailed = new AtomicBoolean(false);
private final AtomicBoolean processingPending = new AtomicBoolean(false);
private final Map<Integer, CompletableFuture<GetObjectResponse>> inFlightRequests = new ConcurrentHashMap<>();
private final Queue<Pair<Integer, AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>>> pendingTransformers =
Expand Down Expand Up @@ -138,6 +140,7 @@ private void sendFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObj
s3AsyncClient.presignedUrlExtension().getObject(partRequest, transformer);

inFlightRequests.put(0, response);
CompletableFutureUtils.forwardExceptionTo(resultFuture, response);

response.whenComplete((res, error) -> {
inFlightRequests.remove(0);
Expand All @@ -156,8 +159,7 @@ private void sendFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObj
return;
}

if (isCompletedExceptionally.get()) {
handlePartError(error, 0);
if (downloadFailed.get()) {
return;
}

Expand All @@ -179,9 +181,15 @@ private void sendFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObj
}

this.totalContentLength = parsedTotal.get();
this.totalParts = calculateTotalParts(totalContentLength, configuredPartSizeInBytes);
this.totalParts = MultipartDownloadUtils.calculateTotalParts(totalContentLength, configuredPartSizeInBytes);
log.debug(() -> String.format("Total content length: %d, Total parts: %d", totalContentLength, totalParts));

Optional<SdkClientException> validationError = validatePartResponse(res, 0);
if (validationError.isPresent()) {
handlePartError(validationError.get(), 0);
return;
}

if (totalParts <= 1) {
resultFuture.complete(firstResponse);
synchronized (subscriptionLock) {
Expand Down Expand Up @@ -217,7 +225,7 @@ private void processRequest(AsyncResponseTransformer<GetObjectResponse, GetObjec

private void sendPartRequest(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> transformer,
int partIndex) {
if (isCompletedExceptionally.get()) {
if (downloadFailed.get()) {
inFlightPermits.release();
return;
}
Expand All @@ -235,10 +243,14 @@ private void sendPartRequest(AsyncResponseTransformer<GetObjectResponse, GetObje
inFlightRequests.remove(partIndex);
inFlightPermits.release();

if (error != null || isCompletedExceptionally.get()) {
if (error != null) {
handlePartError(error, partIndex);
return;
}
if (downloadFailed.get()) {
log.debug(() -> "Ignoring late completion for part " + partIndex + ", download already failed");
return;
}

Optional<SdkClientException> validationError = validatePartResponse(res, partIndex);
if (validationError.isPresent()) {
Expand Down Expand Up @@ -288,35 +300,12 @@ private void processPendingTransformers() {
}

private Optional<SdkClientException> validatePartResponse(GetObjectResponse response, int partIndex) {
String contentRange = response.contentRange();
if (contentRange == null) {
return Optional.of(PresignedUrlDownloadHelper.missingContentRangeHeader());
}
Long contentLength = response.contentLength();
if (contentLength == null || contentLength < 0) {
return Optional.of(PresignedUrlDownloadHelper.invalidContentLength());
}
long expectedStartByte = partIndex * configuredPartSizeInBytes;
long expectedEndByte = Math.min(expectedStartByte + configuredPartSizeInBytes - 1, totalContentLength - 1);
String expectedRange = "bytes " + expectedStartByte + "-" + expectedEndByte + "/";
if (!contentRange.startsWith(expectedRange)) {
return Optional.of(SdkClientException.create(
"Content-Range mismatch. Expected range starting with: " + expectedRange +
", but got: " + contentRange));
}
long expectedPartSize = (partIndex == totalParts - 1)
? totalContentLength - (partIndex * configuredPartSizeInBytes)
: configuredPartSizeInBytes;
if (!contentLength.equals(expectedPartSize)) {
return Optional.of(SdkClientException.create(
String.format("Part content length validation failed for part %d. Expected: %d, but got: %d",
partIndex, expectedPartSize, contentLength)));
}
return Optional.empty();
return PresignedUrlDownloadHelper.validatePartResponse(
response, partIndex, configuredPartSizeInBytes, totalContentLength, totalParts);
}

private void handlePartError(Throwable error, int partIndex) {
if (isCompletedExceptionally.compareAndSet(false, true)) {
if (downloadFailed.compareAndSet(false, true)) {
log.debug(() -> "Error on part " + partIndex, error);
resultFuture.completeExceptionally(error);
inFlightRequests.values().forEach(future -> future.cancel(true));
Expand All @@ -329,24 +318,8 @@ private void handlePartError(Throwable error, int partIndex) {
}

private PresignedUrlDownloadRequest createRangedGetRequest(int partIndex) {
long startByte = partIndex * configuredPartSizeInBytes;
long endByte;
if (totalContentLength != null) {
endByte = Math.min(startByte + configuredPartSizeInBytes - 1, totalContentLength - 1);
} else {
endByte = startByte + configuredPartSizeInBytes - 1;
}
String rangeHeader = BYTES_RANGE_PREFIX + startByte + "-" + endByte;
PresignedUrlDownloadRequest.Builder builder = presignedUrlDownloadRequest.toBuilder()
.range(rangeHeader);
if (partIndex > 0 && eTag != null) {
builder.ifMatch(eTag);
}
return builder.build();
}

private int calculateTotalParts(long contentLength, long partSize) {
return (int) Math.ceil((double) contentLength / partSize);
return PresignedUrlDownloadHelper.createRangedGetRequest(
presignedUrlDownloadRequest, partIndex, configuredPartSizeInBytes, totalContentLength, eTag);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.services.s3.internal.multipart;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -142,6 +143,93 @@ static SdkClientException invalidContentLength() {
return SdkClientException.create("Invalid or missing Content-Length in response");
}

/**
* Validates a part response for data integrity. Checks that Content-Range and Content-Length
* match the expected values based on part index, part size, and total object size.
*
* @param response the GetObject response to validate
* @param partIndex zero-based index of this part
* @param partSizeInBytes configured part size
* @param totalContentLength total object size (from Content-Range), or null if not yet known
* @param totalParts total number of parts, or null if not yet known
* @return empty if valid, or an SdkClientException describing the mismatch
*/
static Optional<SdkClientException> validatePartResponse(GetObjectResponse response,
int partIndex,
long partSizeInBytes,
Long totalContentLength,
Integer totalParts) {
String contentRange = response.contentRange();
if (contentRange == null) {
return Optional.of(missingContentRangeHeader());
}

Long contentLength = response.contentLength();
if (contentLength == null || contentLength < 0) {
return Optional.of(invalidContentLength());
}

long expectedStartByte = partIndex * partSizeInBytes;
long[] parsedRange = MultipartDownloadUtils.parseContentRange(contentRange);
if (parsedRange == null) {
return Optional.of(invalidContentRangeHeader(contentRange));
}
long actualStartByte = parsedRange[0];
long actualEndByte = parsedRange[1];
if (actualStartByte != expectedStartByte) {
return Optional.of(SdkClientException.create(
"Content-Range mismatch for part " + partIndex + ". Expected start byte: " + expectedStartByte
+ ", but got: bytes " + actualStartByte + "-" + actualEndByte));
}
if (totalContentLength != null) {
long expectedEndByte = Math.min(expectedStartByte + partSizeInBytes - 1, totalContentLength - 1);
if (actualEndByte != expectedEndByte) {
return Optional.of(SdkClientException.create(
"Content-Range mismatch for part " + partIndex + ". Expected: bytes " + expectedStartByte + "-"
+ expectedEndByte + ", but got: bytes " + actualStartByte + "-" + actualEndByte));
}
}

if (totalContentLength != null && totalParts != null) {
long expectedPartSize = (partIndex == totalParts - 1)
? totalContentLength - (partIndex * partSizeInBytes)
: partSizeInBytes;
if (!contentLength.equals(expectedPartSize)) {
return Optional.of(SdkClientException.create(
String.format("Part content length validation failed for part %d. Expected: %d, but got: %d",
partIndex, expectedPartSize, contentLength)));
}
}
return Optional.empty();
}

/**
* Creates a range-based GET request for a specific part of a presigned URL download.
*
* @param originalRequest the original presigned URL request
* @param partIndex zero-based index of this part
* @param partSizeInBytes configured part size
* @param totalContentLength total object size, or null if not yet known (first part)
* @param eTag ETag from first response, used for If-Match on parts 1+
* @return a new PresignedUrlDownloadRequest with the appropriate Range and If-Match headers
*/
static PresignedUrlDownloadRequest createRangedGetRequest(PresignedUrlDownloadRequest originalRequest,
int partIndex,
long partSizeInBytes,
Long totalContentLength,
String eTag) {
long startByte = partIndex * partSizeInBytes;
long endByte = totalContentLength != null
? Math.min(startByte + partSizeInBytes - 1, totalContentLength - 1)
: startByte + partSizeInBytes - 1;
PresignedUrlDownloadRequest.Builder builder = originalRequest.toBuilder()
.range("bytes=" + startByte + "-" + endByte);
if (partIndex > 0 && eTag != null) {
builder.ifMatch(eTag);
}
return builder.build();
}

/**
* Returns true if the error is a 416 Range Not Satisfiable response from S3.
* Used by subscribers to detect empty object responses on the first range request.
Expand Down
Loading
Loading