Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions generator/.DevConfigs/9d07dc1e-d82d-4f94-8700-c7b57f872043.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"services": [
{
"serviceName": "S3",
"type": "minor",
"changeLogMessages": [
"Created new DownloadWithResponseAsync method on the Amazon.S3.Transfer.TransferUtility class. The new operation supports downloading in parallel parts of the S3 object to a file for improved performance."
]
}
]
}
86 changes: 55 additions & 31 deletions sdk/src/Services/S3/Custom/Model/GetObjectResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,59 @@ private void ValidateWrittenStreamSize(long bytesWritten)
}

#if BCL || NETSTANDARD
/// <summary>
/// Copies data from ResponseStream to destination stream with progress tracking and validation.
/// Internal method to enable reuse across different download scenarios.
/// </summary>
/// <param name="destinationStream">Stream to write data to</param>
/// <param name="filePath">File path for progress event reporting (can be null)</param>
/// <param name="bufferSize">Buffer size for reading/writing operations</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <param name="validateSize">Whether to validate copied bytes match ContentLength</param>
internal async System.Threading.Tasks.Task WriteResponseStreamAsync(
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ive just added this as a helper function for writing to a response stream instead of a file so i can also use the same logic in BufferedPartDataHandler

Stream destinationStream,
string filePath,
int bufferSize,
System.Threading.CancellationToken cancellationToken,
bool validateSize = true)
{
long current = 0;
#if NETSTANDARD
Stream stream = this.ResponseStream;
#else
Stream stream = new BufferedStream(this.ResponseStream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does .NET Framework need to wrap the ResponseStream in a a BufferedStream but not .NET Core? If there is a good reason add a comment why it is being done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont have the answer for this but i kept the logic because it was there before, which is why i didnt add a comment because i dont know 100%

#endif
byte[] buffer = new byte[bufferSize];
int bytesRead = 0;
long totalIncrementTransferred = 0;

while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false)) > 0)
{
cancellationToken.ThrowIfCancellationRequested();

await destinationStream.WriteAsync(buffer, 0, bytesRead, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);
current += bytesRead;
totalIncrementTransferred += bytesRead;

if (totalIncrementTransferred >= AWSSDKUtils.DefaultProgressUpdateInterval)
{
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed: false);
totalIncrementTransferred = 0;
}
}

if (validateSize)
{
ValidateWrittenStreamSize(current);
}

// Encrypted objects may have size smaller than the total amount of data transferred due to padding.
// Instead of changing the file size or the total downloaded size, pass a flag that indicates transfer is complete.
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed: true);
}

/// <summary>
/// Writes the content of the ResponseStream a file indicated by the filePath argument.
/// </summary>
Expand All @@ -923,37 +976,8 @@ public async System.Threading.Tasks.Task WriteResponseStreamToFileAsync(string f

try
{
long current = 0;
#if NETSTANDARD
Stream stream = this.ResponseStream;
#else
Stream stream = new BufferedStream(this.ResponseStream);
#endif
byte[] buffer = new byte[S3Constants.DefaultBufferSize];
int bytesRead = 0;
long totalIncrementTransferred = 0;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false)) > 0)
{
cancellationToken.ThrowIfCancellationRequested();

await downloadStream.WriteAsync(buffer, 0, bytesRead, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);
current += bytesRead;
totalIncrementTransferred += bytesRead;

if (totalIncrementTransferred >= AWSSDKUtils.DefaultProgressUpdateInterval)
{
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed:false);
totalIncrementTransferred = 0;
}
}

ValidateWrittenStreamSize(current);

// Encrypted objects may have size smaller than the total amount of data trasnfered due to padding.
// Instead of changing the file size or the total downloaded size, pass a flag that indicate that the transfer is complete.
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed:true);
await WriteResponseStreamAsync(downloadStream, filePath, S3Constants.DefaultBufferSize, cancellationToken, validateSize: true)
.ConfigureAwait(continueOnCapturedContext: false);
}
finally
{
Expand Down
183 changes: 183 additions & 0 deletions sdk/src/Services/S3/Custom/Transfer/Internal/AtomicFileHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*******************************************************************************
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* *****************************************************************************
* __ _ _ ___
* ( )( \/\/ )/ __)
* /__\ \ / \__ \
* (_)(_) \/\/ (___/
*
* AWS SDK for .NET
* API Version: 2006-03-01
*
*/
using System;
using System.IO;
using System.Security.Cryptography;

namespace Amazon.S3.Transfer.Internal
{
/// <summary>
/// Handles atomic file operations for multipart downloads using SEP-compliant temporary file pattern.
/// Creates .s3tmp.{uniqueId} files and ensures atomic commits to prevent partial file corruption.
/// </summary>
internal class AtomicFileHandler : IDisposable
{
private string _tempFilePath;
private bool _disposed = false;

/// <summary>
/// Creates a temporary file with unique identifier for atomic operations.
/// Pattern: {destinationPath}.s3tmp.{8-char-unique-id}
/// Uses FileMode.CreateNew for atomic file creation (no race condition).
/// </summary>
public string CreateTemporaryFile(string destinationPath)
{
if (string.IsNullOrEmpty(destinationPath))
throw new ArgumentException("Destination path cannot be null or empty", nameof(destinationPath));

// Create directory if it doesn't exist (Directory.CreateDirectory is idempotent)
var directory = Path.GetDirectoryName(destinationPath);
if (!string.IsNullOrEmpty(directory))
{
Directory.CreateDirectory(directory);
}

// Try up to 100 times to create unique file atomically
for (int attempt = 0; attempt < 100; attempt++)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if the for loop is really needed since in reality random should produce a unique id. we can remove this if we think its overkill

{
var uniqueId = GenerateRandomId(8);
var tempPath = $"{destinationPath}.s3tmp.{uniqueId}";

try
{
// FileMode.CreateNew fails atomically if file exists - no race condition
using (var stream = new FileStream(tempPath, FileMode.CreateNew, FileAccess.Write))
{
// File created successfully - immediately close it
}

_tempFilePath = tempPath;
return tempPath;
}
catch (IOException) when (attempt < 99)
{
// File exists, try again with new ID
continue;
}
}

throw new InvalidOperationException("Unable to generate unique temporary file name after 100 attempts");
}

/// <summary>
/// Atomically commits the temporary file to the final destination.
/// Uses File.Replace for atomic replacement when destination exists, or File.Move for new files.
/// This prevents data loss if the process crashes during commit.
/// </summary>
public void CommitFile(string tempPath, string destinationPath)
{
if (string.IsNullOrEmpty(tempPath))
throw new ArgumentException("Temp path cannot be null or empty", nameof(tempPath));
if (string.IsNullOrEmpty(destinationPath))
throw new ArgumentException("Destination path cannot be null or empty", nameof(destinationPath));

if (!File.Exists(tempPath))
throw new FileNotFoundException($"Temporary file not found: {tempPath}");

try
{
// Use File.Replace for atomic replacement when overwriting existing file
// This prevents data loss if process crashes between delete and move operations
// File.Replace is atomic on Windows (ReplaceFile API) and Unix (rename syscall)
if (File.Exists(destinationPath))
{
File.Replace(tempPath, destinationPath, null);
}
else
{
// For new files, File.Move is sufficient and atomic on same volume
File.Move(tempPath, destinationPath);
}

if (_tempFilePath == tempPath)
_tempFilePath = null; // Successfully committed
}
catch (Exception ex)
{
throw new InvalidOperationException($"Failed to commit temporary file {tempPath} to {destinationPath}", ex);
}
}

/// <summary>
/// Cleans up temporary file in case of failure or cancellation.
/// Safe to call multiple times - File.Delete() is idempotent (no-op if file doesn't exist).
/// </summary>
public void CleanupOnFailure(string tempPath = null)
{
var pathToClean = string.IsNullOrEmpty(tempPath) ? _tempFilePath : tempPath;

if (string.IsNullOrEmpty(pathToClean))
return;

try
{
// File.Delete() is idempotent - doesn't throw if file doesn't exist
File.Delete(pathToClean);

if (_tempFilePath == pathToClean)
_tempFilePath = null;
}
catch (IOException)
{
// Log warning but don't throw - cleanup is best effort
// In production, this would use proper logging infrastructure
}
catch (UnauthorizedAccessException)
{
// Log warning but don't throw - cleanup is best effort
}
}

/// <summary>
/// Generates a cryptographically secure random identifier of specified length.
/// Uses base32 encoding to avoid filesystem-problematic characters.
/// </summary>
private string GenerateRandomId(int length)
{
const string base32Chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"; // RFC 4648 base32

using (var rng = RandomNumberGenerator.Create())
{
var bytes = new byte[length];
rng.GetBytes(bytes);

var result = new char[length];
for (int i = 0; i < length; i++)
{
result[i] = base32Chars[bytes[i] % base32Chars.Length];
}

return new string(result);
}
}

public void Dispose()
{
if (!_disposed)
{
// Cleanup any remaining temp file
CleanupOnFailure();
_disposed = true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public BufferedPartDataHandler(
_config = config ?? throw new ArgumentNullException(nameof(config));
}

public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken)
{
// No preparation needed for buffered handler - buffers are created on demand
return Task.CompletedTask;
}

/// <inheritdoc/>
public async Task ProcessPartAsync(
int partNumber,
Expand Down Expand Up @@ -127,55 +133,39 @@ private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
// Get reference to the buffer for writing
var partBuffer = downloadedPart.ArrayPoolBuffer;

int totalRead = 0;
int chunkCount = 0;

// Read response stream into buffer in chunks based on ContentLength.
// Example: For a 10MB part with 8KB BufferSize:
// - Loop 1: remainingBytes=10MB, readSize=8KB → reads 8KB at offset 0
// - Loop 2: remainingBytes=9.992MB, readSize=8KB → reads 8KB at offset 8KB
// - ...continues until totalRead reaches 10MB (1,280 iterations)
while (totalRead < expectedBytes)
// Create a MemoryStream wrapper around the pooled buffer
// writable: true allows WriteResponseStreamAsync to write to it
// The MemoryStream starts at position 0 and can grow up to initialBufferSize
using (var memoryStream = new MemoryStream(partBuffer, 0, initialBufferSize, writable: true))
{
// Calculate how many bytes we still need to read
int remainingBytes = (int)(expectedBytes - totalRead);

// Read in chunks up to BufferSize, but never exceed remaining bytes
int readSize = Math.Min(remainingBytes, _config.BufferSize);

// Read directly into buffer at current position
int bytesRead = await response.ResponseStream.ReadAsync(
partBuffer,
totalRead,
readSize,
cancellationToken).ConfigureAwait(false);
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into buffer",
partNumber);

// Use GetObjectResponse's stream copy logic which includes:
// - Progress tracking with events
// - Size validation (ContentLength vs bytes read)
// - Buffered reading with proper chunk sizes
await response.WriteResponseStreamAsync(
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this to use the same helper function i made reading response stream. I retested again and i see same performance results

Total bytes per run: 5,368,709,120

Run:1 Secs:2.494888 Gb/s:17.215071
Run:2 Secs:1.263313 Gb/s:33.997642
Run:3 Secs:1.120677 Gb/s:38.324749
Run:4 Secs:1.062679 Gb/s:40.416394
Run:5 Secs:1.032120 Gb/s:41.613045
Run:6 Secs:1.117588 Gb/s:38.430699
Run:7 Secs:1.087574 Gb/s:39.491276
Run:8 Secs:1.088066 Gb/s:39.473393
Run:9 Secs:1.120819 Gb/s:38.319901
Run:10 Secs:1.113135 Gb/s:38.584438

memoryStream,
null, // destination identifier (not needed for memory stream)
_config.BufferSize,
cancellationToken,
validateSize: true)
.ConfigureAwait(false);

if (bytesRead == 0)
{
var errorMessage = $"Unexpected end of stream while downloading part {partNumber}. " +
$"Expected {expectedBytes} bytes but only received {totalRead} bytes. " +
$"This indicates a network error or S3 service issue.";

Logger.Error(null, "BufferedPartDataHandler: [Part {0}] {1}",
partNumber, errorMessage);

throw new IOException(errorMessage);
}
int totalRead = (int)memoryStream.Position;

totalRead += bytesRead;
chunkCount++;
}

Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes in {2} chunks from response stream",
partNumber, totalRead, chunkCount);
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes from response stream",
partNumber, totalRead);

// Set the length to reflect actual bytes read
downloadedPart.SetLength(totalRead);
// Set the length to reflect actual bytes read
downloadedPart.SetLength(totalRead);

if (totalRead != expectedBytes)
{
Logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
partNumber, expectedBytes, totalRead);
if (totalRead != expectedBytes)
{
Logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
partNumber, expectedBytes, totalRead);
}
}

return downloadedPart;
Expand Down
Loading