Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,31 +270,39 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
return;
}

// Multipart: Start concurrent downloads for remaining parts (Part 2 onwards)
Logger.InfoFormat("MultipartDownloadManager: Starting concurrent downloads for parts 2-{0}",
discoveryResult.TotalParts);

for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++)
{
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
downloadTasks.Add(task);
}

// Store count before WhenAllOrFirstException (which modifies the list internally)
var expectedTaskCount = downloadTasks.Count;

Logger.DebugFormat("MultipartDownloadManager: Starting {0} download tasks in background", expectedTaskCount);

// Check if already cancelled before creating background task
cancellationToken.ThrowIfCancellationRequested();

// Start background task to wait for all downloads to complete
// Start background task to handle capacity acquisition and task creation
// This allows the method to return immediately so the consumer can start reading
// which prevents deadlock when MaxInMemoryParts is reached before consumer begins reading
_downloadCompletionTask = Task.Run(async () =>
{
try
{
Logger.DebugFormat("MultipartDownloadManager: Background task starting capacity acquisition and downloads");

// Multipart: Start concurrent downloads for remaining parts (Part 2 onwards)
Logger.InfoFormat("MultipartDownloadManager: Starting concurrent downloads for parts 2-{0}",
discoveryResult.TotalParts);

// Pre-acquire capacity in sequential order to prevent race condition deadlock
// This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order
// parts from consuming all buffer slots and blocking the next expected part
for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++)
{
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);

// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
await _dataHandler.WaitForCapacityAsync(cancellationToken).ConfigureAwait(false);

Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);

var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
downloadTasks.Add(task);
}

var expectedTaskCount = downloadTasks.Count;
Logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount);

// Wait for all downloads to complete (fails fast on first exception)
Expand Down Expand Up @@ -330,6 +338,12 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
throw;
}
#pragma warning restore CA1031 // Do not catch general exception types
finally
{
// Dispose the CancellationTokenSource after all background operations complete
// This ensures the token remains valid for the entire lifetime of download tasks
internalCts.Dispose();
}
}, cancellationToken);

// Return immediately to allow consumer to start reading
Expand All @@ -342,25 +356,19 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E
Logger.Error(ex, "MultipartDownloadManager: Download failed");

_dataHandler.OnDownloadComplete(ex);
throw;
}
finally
{

// Dispose the CancellationTokenSource if background task was never started
// This handles the case where an error occurs before Task.Run is called
internalCts.Dispose();

throw;
}
}



private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, EventHandler<WriteObjectProgressArgs> progressCallback, CancellationToken cancellationToken)
{
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNumber);

// Wait for capacity before starting download
await _dataHandler.WaitForCapacityAsync(cancellationToken).ConfigureAwait(false);

Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNumber);

{
GetObjectResponse response = null;
var ownsResponse = false; // Track if we still own the response

Expand Down
Loading