diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs index b4e395ccf019..1d8abb71dcc7 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs @@ -270,31 +270,39 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E return; } - // Multipart: Start concurrent downloads for remaining parts (Part 2 onwards) - Logger.InfoFormat("MultipartDownloadManager: Starting concurrent downloads for parts 2-{0}", - discoveryResult.TotalParts); - - for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++) - { - var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token); - downloadTasks.Add(task); - } - - // Store count before WhenAllOrFirstException (which modifies the list internally) - var expectedTaskCount = downloadTasks.Count; - - Logger.DebugFormat("MultipartDownloadManager: Starting {0} download tasks in background", expectedTaskCount); - // Check if already cancelled before creating background task cancellationToken.ThrowIfCancellationRequested(); - // Start background task to wait for all downloads to complete + // Start background task to handle capacity acquisition and task creation // This allows the method to return immediately so the consumer can start reading // which prevents deadlock when MaxInMemoryParts is reached before consumer begins reading _downloadCompletionTask = Task.Run(async () => { try { + Logger.DebugFormat("MultipartDownloadManager: Background task starting capacity acquisition and downloads"); + + // Multipart: Start concurrent downloads for remaining parts (Part 2 onwards) + Logger.InfoFormat("MultipartDownloadManager: Starting concurrent downloads for parts 2-{0}", + discoveryResult.TotalParts); + + // Pre-acquire capacity in sequential order to prevent race condition deadlock + // This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order + // parts from consuming all buffer slots and blocking the next expected part + for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++) + { + Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum); + + // Acquire capacity sequentially - guarantees Part 2 before Part 3, etc. + await _dataHandler.WaitForCapacityAsync(cancellationToken).ConfigureAwait(false); + + Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum); + + var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token); + downloadTasks.Add(task); + } + + var expectedTaskCount = downloadTasks.Count; Logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount); // Wait for all downloads to complete (fails fast on first exception) @@ -330,6 +338,12 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E throw; } #pragma warning restore CA1031 // Do not catch general exception types + finally + { + // Dispose the CancellationTokenSource after all background operations complete + // This ensures the token remains valid for the entire lifetime of download tasks + internalCts.Dispose(); + } }, cancellationToken); // Return immediately to allow consumer to start reading @@ -342,25 +356,19 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E Logger.Error(ex, "MultipartDownloadManager: Download failed"); _dataHandler.OnDownloadComplete(ex); - throw; - } - finally - { + + // Dispose the CancellationTokenSource if background task was never started + // This handles the case where an error occurs before Task.Run is called internalCts.Dispose(); + + throw; } } private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, EventHandler progressCallback, CancellationToken cancellationToken) - { - Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNumber); - - // Wait for capacity before starting download - await _dataHandler.WaitForCapacityAsync(cancellationToken).ConfigureAwait(false); - - Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNumber); - + { GetObjectResponse response = null; var ownsResponse = false; // Track if we still own the response diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs index 3f068d85af40..c1b00afdfef3 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs @@ -729,6 +729,559 @@ public async Task Validation_ContentRange_ValidRange_Succeeds() #endregion + #region Sequential Capacity Acquisition Tests + + [TestMethod] + public async Task StartDownloadsAsync_MultipartDownload_AcquiresCapacitySequentially() + { + // Arrange - Test that capacity is acquired in sequential order (Part 2 before Part 3, etc.) + var totalParts = 4; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityAcquisitionOrder = new List(); + var capacityAcquisitionLock = new object(); + + var mockDataHandler = new Mock(); + + // Track capacity acquisition order + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + lock (capacityAcquisitionLock) + { + // This will be called for parts 2, 3, 4 in that order + capacityAcquisitionOrder.Add(capacityAcquisitionOrder.Count + 2); + } + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + // Wait for background task completion + await coordinator.DownloadCompletionTask; + + // Assert - Capacity should be acquired in sequential order: Part 2, then Part 3, then Part 4 + lock (capacityAcquisitionLock) + { + Assert.AreEqual(3, capacityAcquisitionOrder.Count, "Should acquire capacity for parts 2, 3, 4"); + Assert.AreEqual(2, capacityAcquisitionOrder[0], "First capacity acquisition should be for Part 2"); + Assert.AreEqual(3, capacityAcquisitionOrder[1], "Second capacity acquisition should be for Part 3"); + Assert.AreEqual(4, capacityAcquisitionOrder[2], "Third capacity acquisition should be for Part 4"); + } + } + + [TestMethod] + public async Task StartDownloadsAsync_MultipartDownload_DoesNotCallWaitForCapacityInCreateDownloadTask() + { + // Arrange - Test that CreateDownloadTaskAsync no longer calls WaitForCapacityAsync + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var waitForCapacityCallCount = 0; + var processPartCallCount = 0; + + var mockDataHandler = new Mock(); + + // Track WaitForCapacityAsync calls - should only be called in background task, not in CreateDownloadTaskAsync + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + Interlocked.Increment(ref waitForCapacityCallCount); + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(() => + { + Interlocked.Increment(ref processPartCallCount); + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.DownloadCompletionTask; + + // Assert + // WaitForCapacityAsync should be called exactly once per background part (parts 2 and 3) + Assert.AreEqual(2, waitForCapacityCallCount, + "WaitForCapacityAsync should be called exactly once per background part (2 times for parts 2-3)"); + + // ProcessPartAsync should be called for all parts (1, 2, 3) + Assert.AreEqual(3, processPartCallCount, + "ProcessPartAsync should be called for all parts (3 times for parts 1-3)"); + } + + [TestMethod] + public async Task StartDownloadsAsync_BackgroundTask_PreAcquiresCapacityBeforeCreatingTasks() + { + // Arrange - Test that background task pre-acquires all capacity before creating download tasks + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + // Track operation order with sequential counter + var operationOrder = new List<(string operation, int partNum, int sequence)>(); + var lockObject = new object(); + var operationCounter = 0; + + var mockDataHandler = new Mock(); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + lock (lockObject) + { + var partNum = operationOrder.Count(o => o.operation == "capacity") + 2; // Parts 2, 3 + operationOrder.Add(("capacity", partNum, operationCounter++)); + } + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((partNum, response, ct) => + { + lock (lockObject) + { + operationOrder.Add(("task", partNum, operationCounter++)); + } + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.DownloadCompletionTask; + + // Assert + lock (lockObject) + { + var capacityOps = operationOrder.Where(o => o.operation == "capacity").ToList(); + var taskOps = operationOrder.Where(o => o.operation == "task").ToList(); + + Assert.AreEqual(2, capacityOps.Count, "Should acquire capacity for parts 2-3"); + Assert.AreEqual(3, taskOps.Count, "Should create tasks for parts 1-3"); + + // Verify all capacity acquisitions happened before any task creation + // Find the highest sequence number among capacity operations + var lastCapacitySequence = capacityOps.Max(o => o.sequence); + + // Find the lowest sequence number among task operations + var firstTaskSequence = taskOps.Min(o => o.sequence); + + // All capacity must be acquired (have lower sequence numbers) before tasks start + Assert.IsTrue(lastCapacitySequence < firstTaskSequence, + $"All capacity acquisitions must complete before task creation. " + + $"Last capacity sequence: {lastCapacitySequence}, First task sequence: {firstTaskSequence}. " + + $"Operations: {string.Join(", ", operationOrder.Select(o => $"{o.operation}({o.partNum})={o.sequence}"))}"); + + // Additional verification: Part 1 should be first task (processed during StartDownloadsAsync) + var part1Task = taskOps.FirstOrDefault(o => o.partNum == 1); + Assert.IsNotNull(part1Task, "Part 1 should be processed"); + Assert.IsTrue(part1Task.sequence < lastCapacitySequence, + "Part 1 should be processed before capacity acquisition for background parts"); + } + } + + #endregion + + #region Race Condition Prevention Tests + + [TestMethod] + public async Task StartDownloadsAsync_PreventRaceConditionDeadlock_WithLimitedBuffer() + { + // Arrange - Test scenario that could deadlock with old approach: limited buffer + out-of-order completion + var totalParts = 5; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + // Simulate a scenario where buffer is limited and parts could complete out of order + var maxInMemoryParts = 2; // Very limited buffer + var capacitySlots = new SemaphoreSlim(maxInMemoryParts); + var partProcessingOrder = new List(); + var lockObject = new object(); + + var mockDataHandler = new Mock(); + + // Simulate capacity checking - old approach could deadlock here + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(async () => + { + // Wait for capacity (this is where the old approach could deadlock) + await capacitySlots.WaitAsync(); + // Note: In real implementation, capacity would be released when part is processed + }); + + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((partNum, response, ct) => + { + lock (lockObject) + { + partProcessingOrder.Add(partNum); + } + + // Release capacity after processing + capacitySlots.Release(); + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 3); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act - This should not deadlock with the new sequential approach + var startTime = DateTime.UtcNow; + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.DownloadCompletionTask; + var endTime = DateTime.UtcNow; + + // Assert + var executionTime = (endTime - startTime).TotalSeconds; + Assert.IsTrue(executionTime < 10, + $"Download should complete without deadlock. Took {executionTime:F2} seconds"); + + lock (lockObject) + { + Assert.AreEqual(totalParts, partProcessingOrder.Count, + "All parts should be processed successfully"); + + // Part 1 should be first (processed during StartDownloadsAsync) + Assert.AreEqual(1, partProcessingOrder[0], "Part 1 should be processed first"); + } + } + + [TestMethod] + public async Task StartDownloadsAsync_SequentialCapacityAcquisition_PreventsOutOfOrderBlocking() + { + // Arrange - Test that sequential acquisition prevents out-of-order parts from blocking expected parts + var totalParts = 4; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var capacityOrder = new List(); + var processingOrder = new List(); + var lockObject = new object(); + + var mockDataHandler = new Mock(); + + var partCounter = 1; // Start with part 2 (part 1 doesn't call WaitForCapacityAsync) + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + lock (lockObject) + { + partCounter++; + capacityOrder.Add(partCounter); + } + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((partNum, response, ct) => + { + lock (lockObject) + { + processingOrder.Add(partNum); + } + return Task.CompletedTask; + }); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 2); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await coordinator.DownloadCompletionTask; + + // Assert - Capacity acquisition should be in order, preventing blocking + lock (lockObject) + { + Assert.AreEqual(3, capacityOrder.Count, "Should acquire capacity for parts 2, 3, 4"); + + // Verify sequential order + for (int i = 0; i < capacityOrder.Count; i++) + { + Assert.AreEqual(i + 2, capacityOrder[i], + $"Capacity acquisition {i} should be for part {i + 2}"); + } + + Assert.AreEqual(totalParts, processingOrder.Count, "All parts should be processed"); + } + } + + #endregion + + #region Background Task Resource Management Tests + + [TestMethod] + public async Task StartDownloadsAsync_BackgroundTaskSuccess_DisposesCancellationTokenSource() + { + // Arrange - Test that CancellationTokenSource is disposed after successful background operations + var totalParts = 2; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + // Wait for background task to complete + await coordinator.DownloadCompletionTask; + + // Assert - Background task should complete successfully + Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted && + !coordinator.DownloadCompletionTask.IsFaulted && + !coordinator.DownloadCompletionTask.IsCanceled, + "Background task should complete successfully"); + + Assert.IsNull(coordinator.DownloadException, + "No download exception should occur"); + } + + [TestMethod] + public async Task StartDownloadsAsync_BackgroundTaskFailure_DisposesCancellationTokenSource() + { + // Arrange - Test that CancellationTokenSource is disposed even when background task fails + var totalParts = 2; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var mockDataHandler = new Mock(); + + // First call (Part 1) succeeds + var callCount = 0; + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((partNum, response, ct) => + { + callCount++; + if (partNum == 1) + { + return Task.CompletedTask; // Part 1 succeeds + } + throw new InvalidOperationException("Simulated download failure"); // Background parts fail + }); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + // Wait for background task to complete (with failure) + try + { + await coordinator.DownloadCompletionTask; + } + catch (InvalidOperationException) + { + // Expected failure + } + + // Assert - Background task should have failed but cleanup should be done + Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, + "Background task should be completed (even with failure)"); + Assert.IsNotNull(coordinator.DownloadException, + "Download exception should be captured"); + Assert.IsInstanceOfType(coordinator.DownloadException, typeof(InvalidOperationException), + "Should capture the simulated failure"); + } + + [TestMethod] + public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource() + { + // Arrange - Test CancellationTokenSource disposal when error occurs before background task starts + var mockDataHandler = new Mock(); + + // Simulate error during PrepareAsync (before background task is created) + mockDataHandler + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated prepare failure")); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = new DownloadDiscoveryResult + { + TotalParts = 2, + ObjectSize = 16 * 1024 * 1024, + InitialResponse = new GetObjectResponse() + }; + + // Act & Assert + try + { + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + Assert.Fail("Expected InvalidOperationException to be thrown"); + } + catch (InvalidOperationException ex) + { + Assert.AreEqual("Simulated prepare failure", ex.Message); + } + + // Assert - Exception should be captured and no background task should exist + Assert.IsNotNull(coordinator.DownloadException, "Download exception should be captured"); + Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, + "DownloadCompletionTask should return completed task when no background work exists"); + } + + [TestMethod] + public async Task StartDownloadsAsync_BackgroundTaskCancellation_HandlesTokenDisposalProperly() + { + // Arrange - Test proper token disposal when background task is cancelled + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var cts = new CancellationTokenSource(); + var mockDataHandler = new Mock(); + + // Part 1 succeeds, then cancel before background parts + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // Cancel when waiting for capacity (simulating cancellation during background task) + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(() => + { + cts.Cancel(); // Cancel during background task execution + throw new OperationCanceledException(); + }); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + // Wait for background task cancellation + try + { + await coordinator.DownloadCompletionTask; + } + catch (OperationCanceledException) + { + // Expected + } + + // Assert - Cancellation should be handled properly with cleanup + Assert.IsTrue(coordinator.DownloadCompletionTask.IsCompleted, + "Background task should be completed"); + Assert.IsNotNull(coordinator.DownloadException, + "Cancellation exception should be captured"); + } + + #endregion + #region Disposal Tests [TestMethod]