Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions generator/.DevConfigs/55fe9e14-c79e-4426-9828-deae0451d4f6.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"services": [
{
"serviceName": "S3",
"type": "minor",
"changeLogMessages": [
"Created new DownloadDirectoryWithResponseAsync methods on the Amazon.S3.Transfer.TransferUtility class. The new operations support downloading directories using multipart download for files and return response metadata."
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,24 @@ internal partial class DownloadDirectoryCommand : BaseCommand<TransferUtilityDow
private readonly IAmazonS3 _s3Client;
private readonly TransferUtilityDownloadDirectoryRequest _request;
private readonly bool _skipEncryptionInstructionFiles;
private readonly bool _useMultipartDownload;
int _totalNumberOfFilesToDownload;
int _numberOfFilesDownloaded;
long _totalBytes;
long _transferredBytes;
string _currentFile;

internal DownloadDirectoryCommand(IAmazonS3 s3Client, TransferUtilityDownloadDirectoryRequest request)
: this(s3Client, request, useMultipartDownload: false)
{
}

internal DownloadDirectoryCommand(IAmazonS3 s3Client, TransferUtilityDownloadDirectoryRequest request, bool useMultipartDownload)
{
if (s3Client == null)
throw new ArgumentNullException("s3Client");
throw new ArgumentNullException(nameof(s3Client));
if (request == null)
throw new ArgumentNullException(nameof(request));

this._s3Client = s3Client;
this._request = request;
Expand All @@ -60,6 +68,13 @@ internal DownloadDirectoryCommand(IAmazonS3 s3Client, TransferUtilityDownloadDir
request.FailurePolicy == FailurePolicy.AbortOnFailure
? new AbortOnFailurePolicy()
: new ContinueOnFailurePolicy(_errors);
this._useMultipartDownload = useMultipartDownload;
}

internal DownloadDirectoryCommand(IAmazonS3 s3Client, TransferUtilityDownloadDirectoryRequest request, TransferUtilityConfig config, bool useMultipartDownload)
: this(s3Client, request, useMultipartDownload)
{
this._config = config;
}

private void downloadedProgressEventCallback(object sender, WriteObjectProgressArgs e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ private Logger Logger
get { return Logger.GetLogger(typeof(TransferUtility)); }
}

/// <summary>
/// Initializes a new instance for file downloads.
/// Writes parts directly to disk without memory buffering.
/// </summary>
public FilePartDataHandler(FileDownloadConfiguration config)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
Expand Down Expand Up @@ -90,14 +94,16 @@ await WritePartToFileAsync(offset, response, cancellationToken)
/// <inheritdoc/>
public Task WaitForCapacityAsync(CancellationToken cancellationToken)
{
// No backpressure needed - OS handles concurrent file access
// No-op: FilePartDataHandler writes directly to disk without buffering parts in memory.
// Memory throttling is only needed for BufferedPartDataHandler which keeps parts in memory.
return Task.CompletedTask;
}

/// <inheritdoc/>
public void ReleaseCapacity()
{
// No-op
// No-op: FilePartDataHandler writes directly to disk without buffering parts in memory.
// Memory throttling is only needed for BufferedPartDataHandler which keeps parts in memory.
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*
*/
using System;
using System.Threading;
using Amazon.Runtime.Internal.Util;
using Amazon.S3.Model;
using Amazon.S3.Util;
Expand All @@ -36,6 +37,7 @@ internal partial class MultipartDownloadCommand : BaseCommand<TransferUtilityDow
private readonly IAmazonS3 _s3Client;
private readonly TransferUtilityDownloadRequest _request;
private readonly TransferUtilityConfig _config;
private readonly SemaphoreSlim _sharedHttpThrottler;

// Track last known transferred bytes from coordinator's progress events
private long _lastKnownTransferredBytes;
Expand All @@ -49,16 +51,29 @@ private static Logger Logger
}

/// <summary>
/// Initializes a new instance of the MultipartDownloadCommand class.
/// Initializes a new instance of the MultipartDownloadCommand class for single file downloads.
/// </summary>
/// <param name="s3Client">The S3 client to use for downloads.</param>
/// <param name="request">The download request containing configuration.</param>
/// <param name="config">The TransferUtility configuration.</param>
internal MultipartDownloadCommand(IAmazonS3 s3Client, TransferUtilityDownloadRequest request, TransferUtilityConfig config)
: this(s3Client, request, config, null)
{
}

/// <summary>
/// Initializes a new instance of the MultipartDownloadCommand class for directory downloads.
/// </summary>
/// <param name="s3Client">The S3 client to use for downloads.</param>
/// <param name="request">The download request containing configuration.</param>
/// <param name="config">The TransferUtility configuration.</param>
/// <param name="sharedHttpThrottler">Shared HTTP concurrency throttler for directory operations, or null for single file downloads.</param>
internal MultipartDownloadCommand(IAmazonS3 s3Client, TransferUtilityDownloadRequest request, TransferUtilityConfig config, SemaphoreSlim sharedHttpThrottler)
{
_s3Client = s3Client ?? throw new ArgumentNullException(nameof(s3Client));
_request = request ?? throw new ArgumentNullException(nameof(request));
_config = config ?? throw new ArgumentNullException(nameof(config));
_sharedHttpThrottler = sharedHttpThrottler; // Can be null for single file downloads
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ internal class MultipartDownloadManager : IDownloadManager
private readonly DownloadManagerConfiguration _config;
private readonly IPartDataHandler _dataHandler;
private readonly SemaphoreSlim _httpConcurrencySlots;
private readonly bool _ownsHttpThrottler;
private readonly RequestEventHandler _requestEventHandler;

private Exception _downloadException;
Expand Down Expand Up @@ -79,23 +80,82 @@ private Logger Logger
public Task DownloadCompletionTask => _downloadCompletionTask ?? Task.CompletedTask;

/// <summary>
/// Initializes a new instance of the <see cref="MultipartDownloadManager"/> class.
/// Initializes a new instance of the <see cref="MultipartDownloadManager"/> for single file downloads.
/// This constructor creates and owns its own HTTP concurrency throttler based on the configuration.
/// </summary>
/// <param name="s3Client">The <see cref="IAmazonS3"/> client for making S3 requests.</param>
/// <param name="request">The <see cref="BaseDownloadRequest"/> containing download parameters.</param>
/// <param name="config">The <see cref="DownloadManagerConfiguration"/> with download settings.</param>
/// <param name="dataHandler">The <see cref="IPartDataHandler"/> for processing downloaded parts.</param>
/// <param name="requestEventHandler">Optional <see cref="RequestEventHandler"/> for user agent tracking.</param>
/// <exception cref="ArgumentNullException">Thrown when any required parameter is null.</exception>
/// <param name="s3Client">The <see cref="IAmazonS3"/> client used to make GetObject requests to S3.</param>
/// <param name="request">The <see cref="BaseDownloadRequest"/> containing bucket, key, version, and download strategy configuration.</param>
/// <param name="config">The <see cref="DownloadManagerConfiguration"/> specifying concurrency limits and part size settings.</param>
/// <param name="dataHandler">The <see cref="IPartDataHandler"/> responsible for buffering and processing downloaded part data.</param>
/// <param name="requestEventHandler">Optional request event handler for adding custom headers or tracking requests. May be null.</param>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="s3Client"/>, <paramref name="request"/>, <paramref name="config"/>, or <paramref name="dataHandler"/> is null.
/// </exception>
/// <remarks>
/// This constructor is used for single file downloads where each download manages its own HTTP concurrency.
/// The created <see cref="SemaphoreSlim"/> throttler will be disposed when this instance is disposed.
/// For directory downloads with shared concurrency management, use the overload that accepts a shared throttler.
/// </remarks>
/// <seealso cref="DownloadManagerConfiguration"/>
/// <seealso cref="IPartDataHandler"/>
/// <seealso cref="MultipartDownloadType"/>
public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request, DownloadManagerConfiguration config, IPartDataHandler dataHandler, RequestEventHandler requestEventHandler = null)
: this(s3Client, request, config, dataHandler, requestEventHandler, null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MultipartDownloadManager"/> for directory downloads or scenarios requiring shared concurrency control.
/// This constructor allows using a shared HTTP concurrency throttler across multiple concurrent file downloads.
/// </summary>
/// <param name="s3Client">The <see cref="IAmazonS3"/> client used to make GetObject requests to S3.</param>
/// <param name="request">The <see cref="BaseDownloadRequest"/> containing bucket, key, version, and download strategy configuration.</param>
/// <param name="config">The <see cref="DownloadManagerConfiguration"/> specifying concurrency limits and part size settings.</param>
/// <param name="dataHandler">The <see cref="IPartDataHandler"/> responsible for buffering and processing downloaded part data.</param>
/// <param name="requestEventHandler">Optional request event handler for adding custom headers or tracking requests. May be null.</param>
/// <param name="sharedHttpThrottler">
/// Optional shared <see cref="SemaphoreSlim"/> for coordinating HTTP concurrency across multiple downloads.
/// If null, a new throttler will be created and owned by this instance.
/// If provided, the caller retains ownership and responsibility for disposal.
/// </param>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="s3Client"/>, <paramref name="request"/>, <paramref name="config"/>, or <paramref name="dataHandler"/> is null.
/// </exception>
/// <remarks>
/// <para>
/// This constructor is typically used by directory download operations where multiple files are being downloaded
/// concurrently and need to share a global HTTP concurrency limit.
/// </para>
/// <para>
/// <strong>Resource Ownership:</strong>
/// If <paramref name="sharedHttpThrottler"/> is provided, this instance does NOT take ownership and will NOT dispose it.
/// If <paramref name="sharedHttpThrottler"/> is null, this instance creates and owns the throttler and will dispose it.
/// </para>
/// </remarks>
/// <seealso cref="DownloadManagerConfiguration"/>
/// <seealso cref="IPartDataHandler"/>
/// <seealso cref="MultipartDownloadType"/>
/// <seealso cref="DiscoverDownloadStrategyAsync"/>
/// <seealso cref="StartDownloadsAsync"/>
public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request, DownloadManagerConfiguration config, IPartDataHandler dataHandler, RequestEventHandler requestEventHandler, SemaphoreSlim sharedHttpThrottler)
{
_s3Client = s3Client ?? throw new ArgumentNullException(nameof(s3Client));
_request = request ?? throw new ArgumentNullException(nameof(request));
_config = config ?? throw new ArgumentNullException(nameof(config));
_dataHandler = dataHandler ?? throw new ArgumentNullException(nameof(dataHandler));
_requestEventHandler = requestEventHandler;

_httpConcurrencySlots = new SemaphoreSlim(_config.ConcurrentServiceRequests);
// Use shared throttler if provided, otherwise create our own
if (sharedHttpThrottler != null)
{
_httpConcurrencySlots = sharedHttpThrottler;
_ownsHttpThrottler = false; // Don't dispose - directory command owns it
}
else
{
_httpConcurrencySlots = new SemaphoreSlim(_config.ConcurrentServiceRequests);
_ownsHttpThrottler = true; // We own it, so we dispose it
}
}

/// <inheritdoc/>
Expand Down Expand Up @@ -654,7 +714,11 @@ public void Dispose()
{
try
{
_httpConcurrencySlots?.Dispose();
// Only dispose HTTP throttler if we own it
if (_ownsHttpThrottler)
{
_httpConcurrencySlots?.Dispose();
}
_dataHandler?.Dispose();
}
catch (Exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ public override async Task<TransferUtilityDownloadResponse> ExecuteAsync(Cancell
using (var dataHandler = new FilePartDataHandler(config))
{
// Create coordinator to manage the download process
// Pass shared HTTP throttler to control concurrency across files
using (var coordinator = new MultipartDownloadManager(
_s3Client,
_request,
config,
dataHandler,
RequestEventHandler))
RequestEventHandler,
_sharedHttpThrottler))
{
long totalBytes = -1;
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,43 @@ public override async Task<TransferUtilityDownloadDirectoryResponse> ExecuteAsyn

this._totalNumberOfFilesToDownload = objs.Count;

SemaphoreSlim asyncThrottler = null;
// Two-level throttling architecture:
// 1. File-level throttler: Controls how many files are downloaded concurrently
// 2. HTTP-level throttler: Controls total HTTP requests across ALL file downloads
//
// Example with ConcurrentServiceRequests = 10:
// - fileOperationThrottler = 10: Up to 10 files can download simultaneously
// - sharedHttpRequestThrottler = 10: All 10 files share 10 total HTTP request slots
// - Without HTTP throttler: Would result in 10 files × 10 parts = 100 concurrent HTTP requests
// - With HTTP throttler: Enforces 10 total concurrent HTTP requests across all files
//
// This prevents resource exhaustion when downloading many large files with multipart downloads.
SemaphoreSlim fileOperationThrottler = null;
SemaphoreSlim sharedHttpRequestThrottler = null;
CancellationTokenSource internalCts = null;

try
{
asyncThrottler = DownloadFilesConcurrently ?
// File-level throttler: Controls concurrent file operations
fileOperationThrottler = DownloadFilesConcurrently ?
new SemaphoreSlim(this._config.ConcurrentServiceRequests) :
new SemaphoreSlim(1);

// HTTP-level throttler: Shared across all downloads to control total HTTP concurrency
// Only needed for multipart downloads where each file makes multiple HTTP requests
if (this._useMultipartDownload)
{
sharedHttpRequestThrottler = new SemaphoreSlim(this._config.ConcurrentServiceRequests);
}

internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var pendingTasks = new List<Task>();
foreach (S3Object s3o in objs)
{
if (s3o.Key.EndsWith("/", StringComparison.Ordinal))
continue;

await asyncThrottler.WaitAsync(cancellationToken)
await fileOperationThrottler.WaitAsync(cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);

try
Expand Down Expand Up @@ -137,7 +157,15 @@ await asyncThrottler.WaitAsync(cancellationToken)

var task = _failurePolicy.ExecuteAsync(
async () => {
var command = new DownloadCommand(this._s3Client, downloadRequest);
BaseCommand<TransferUtilityDownloadResponse> command;
if (this._useMultipartDownload)
{
command = new MultipartDownloadCommand(this._s3Client, downloadRequest, this._config, sharedHttpRequestThrottler);
}
else
{
command = new DownloadCommand(this._s3Client, downloadRequest);
}
await command.ExecuteAsync(internalCts.Token)
.ConfigureAwait(false);
},
Expand All @@ -149,7 +177,7 @@ await command.ExecuteAsync(internalCts.Token)
}
finally
{
asyncThrottler.Release();
fileOperationThrottler.Release();
}
}
await TaskHelpers.WhenAllOrFirstExceptionAsync(pendingTasks, cancellationToken)
Expand All @@ -170,7 +198,8 @@ await TaskHelpers.WhenAllOrFirstExceptionAsync(pendingTasks, cancellationToken)
finally
{
internalCts.Dispose();
asyncThrottler.Dispose();
fileOperationThrottler.Dispose();
sharedHttpRequestThrottler?.Dispose();
}
}

Expand Down
Loading