diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs index ae55c6c2422c..02fb974c7f72 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs @@ -82,7 +82,7 @@ public async Task ProcessPartAsync( partNumber, buffer.Length); // Add the buffered part to the buffer manager - await _partBufferManager.AddBufferAsync(buffer, cancellationToken).ConfigureAwait(false); + _partBufferManager.AddBuffer(buffer); Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager", partNumber); diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs index 004c27092eae..9675c60b321e 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs @@ -48,9 +48,7 @@ internal interface IPartBufferManager : IDisposable /// Adds a downloaded part buffer and signals readers when next expected part arrives. /// /// The downloaded part buffer to add. - /// A token to cancel the operation. - /// A task that completes when the buffer has been added and signaling is complete. - Task AddBufferAsync(StreamPartBuffer buffer, CancellationToken cancellationToken); + void AddBuffer(StreamPartBuffer buffer); /// /// Reads data from the buffer manager. Automatically handles sequential part consumption diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs index 57d700363eb0..c679fcb91f9e 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs @@ -84,7 +84,7 @@ namespace Amazon.S3.Transfer.Internal /// - Example: With MaxInMemoryParts=10, if parts 5-14 are buffered, the task downloading /// part 15 blocks here until the reader consumes and releases part 5's buffer /// 2. Read part data from S3 into pooled buffer - /// 3. Add buffered part: await + /// 3. Add buffered part: /// - Adds buffer to _partDataSources dictionary /// - Signals _partAvailable to wake consumer if waiting /// 4. Consumer eventually releases the buffer slot after reading the part @@ -286,7 +286,7 @@ public void AddDataSource(IPartDataSource dataSource) } /// - public async Task AddBufferAsync(StreamPartBuffer buffer, CancellationToken cancellationToken) + public void AddBuffer(StreamPartBuffer buffer) { ThrowIfDisposed(); diff --git a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs index 48c3e8369170..e7131cdc208d 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs @@ -78,7 +78,7 @@ public async Task ProcessPartAsync_BuffersPartData() // Assert - should add buffer to manager mockBufferManager.Verify( - x => x.AddBufferAsync(It.IsAny(), It.IsAny()), + x => x.AddBuffer(It.IsAny()), Times.Once); } @@ -92,9 +92,8 @@ public async Task ProcessPartAsync_ReadsExactContentLength() StreamPartBuffer capturedBuffer = null; var mockBufferManager = new Mock(); - mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny(), It.IsAny())) - .Callback((buffer, ct) => capturedBuffer = buffer) - .Returns(Task.CompletedTask); + mockBufferManager.Setup(x => x.AddBuffer(It.IsAny())) + .Callback((buffer) => capturedBuffer = buffer); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -124,9 +123,8 @@ public async Task ProcessPartAsync_HandlesSmallPart() StreamPartBuffer capturedBuffer = null; var mockBufferManager = new Mock(); - mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny(), It.IsAny())) - .Callback((buffer, ct) => capturedBuffer = buffer) - .Returns(Task.CompletedTask); + mockBufferManager.Setup(x => x.AddBuffer(It.IsAny())) + .Callback((buffer) => capturedBuffer = buffer); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -155,9 +153,8 @@ public async Task ProcessPartAsync_HandlesLargePart() StreamPartBuffer capturedBuffer = null; var mockBufferManager = new Mock(); - mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny(), It.IsAny())) - .Callback((buffer, ct) => capturedBuffer = buffer) - .Returns(Task.CompletedTask); + mockBufferManager.Setup(x => x.AddBuffer(It.IsAny())) + .Callback((buffer) => capturedBuffer = buffer); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -191,9 +188,8 @@ public async Task ProcessPartAsync_PreservesDataIntegrity() StreamPartBuffer capturedBuffer = null; var mockBufferManager = new Mock(); - mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny(), It.IsAny())) - .Callback((buffer, ct) => capturedBuffer = buffer) - .Returns(Task.CompletedTask); + mockBufferManager.Setup(x => x.AddBuffer(It.IsAny())) + .Callback((buffer) => capturedBuffer = buffer); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -234,7 +230,7 @@ public async Task ProcessPartAsync_HandlesZeroByteResponse() // Assert - should handle empty response gracefully mockBufferManager.Verify( - x => x.AddBufferAsync(It.IsAny(), It.IsAny()), + x => x.AddBuffer(It.IsAny()), Times.Once); } @@ -301,7 +297,7 @@ public async Task ProcessPartAsync_WithUnexpectedEOF_DoesNotBufferPartialData() // Assert - should NOT have added any buffer to manager since download failed mockBufferManager.Verify( - x => x.AddBufferAsync(It.IsAny(), It.IsAny()), + x => x.AddBuffer(It.IsAny()), Times.Never); } @@ -335,17 +331,14 @@ public async Task ProcessPartAsync_WithCancelledToken_ThrowsTaskCanceledExceptio } [TestMethod] - public async Task ProcessPartAsync_PassesCancellationTokenToBufferManager() + public async Task ProcessPartAsync_CallsAddBufferOnce() { // Arrange var partSize = 1024; var partData = new byte[partSize]; - CancellationToken capturedToken = default; var mockBufferManager = new Mock(); - mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny(), It.IsAny())) - .Callback((buffer, ct) => capturedToken = ct) - .Returns(Task.CompletedTask); + mockBufferManager.Setup(x => x.AddBuffer(It.IsAny())); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); @@ -361,8 +354,8 @@ public async Task ProcessPartAsync_PassesCancellationTokenToBufferManager() // Act await handler.ProcessPartAsync(1, response, cts.Token); - // Assert - Assert.AreEqual(cts.Token, capturedToken); + // Assert - verify AddBuffer was called exactly once + mockBufferManager.Verify(x => x.AddBuffer(It.IsAny()), Times.Once); } #endregion diff --git a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs index 72e3a11158c4..b07ddce455ac 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs @@ -78,7 +78,7 @@ public async Task NextExpectedPartNumber_IncrementsAfterPartComplete() // Add part 1 byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); // Read part 1 completely byte[] readBuffer = new byte[512]; @@ -134,7 +134,7 @@ public async Task WaitForBufferSpaceAsync_WhenMaxPartsReached_Blocks() await manager.WaitForBufferSpaceAsync(CancellationToken.None); byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(i, testBuffer, 512); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); } // Act - Try to wait for space (should block) @@ -169,7 +169,7 @@ public async Task WaitForBufferSpaceAsync_AfterRelease_AllowsAccess() await manager.WaitForBufferSpaceAsync(CancellationToken.None); byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); // Release space manager.ReleaseBufferSpace(); @@ -226,10 +226,10 @@ public async Task WaitForBufferSpaceAsync_WithCancellation_ThrowsOperationCancel #endregion - #region AddBufferAsync Tests + #region AddBuffer Tests [TestMethod] - public async Task AddBufferAsync_CreatesBufferedDataSource() + public async Task AddBuffer_CreatesBufferedDataSource() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -241,7 +241,7 @@ public async Task AddBufferAsync_CreatesBufferedDataSource() var partBuffer = new StreamPartBuffer(1, testBuffer, 512); // Act - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); // Assert - Should be able to read from part 1 byte[] readBuffer = new byte[512]; @@ -256,7 +256,7 @@ public async Task AddBufferAsync_CreatesBufferedDataSource() [TestMethod] [ExpectedException(typeof(ArgumentNullException))] - public async Task AddBufferAsync_WithNullBuffer_ThrowsArgumentNullException() + public void AddBuffer_WithNullBuffer_ThrowsArgumentNullException() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -265,7 +265,7 @@ public async Task AddBufferAsync_WithNullBuffer_ThrowsArgumentNullException() try { // Act - await manager.AddBufferAsync(null, CancellationToken.None); + manager.AddBuffer(null); // Assert - ExpectedException } @@ -276,7 +276,7 @@ public async Task AddBufferAsync_WithNullBuffer_ThrowsArgumentNullException() } [TestMethod] - public async Task AddBufferAsync_SignalsPartAvailable() + public async Task AddBuffer_SignalsPartAvailable() { // Arrange var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); @@ -297,7 +297,7 @@ public async Task AddBufferAsync_SignalsPartAvailable() // Add the part byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); // Assert - Read should complete int bytesRead = await readTask; @@ -411,7 +411,7 @@ public async Task ReadAsync_ReadsDataSequentially() Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); // Act byte[] readBuffer = new byte[512]; @@ -439,7 +439,7 @@ public async Task ReadAsync_AdvancesNextExpectedPartNumber() // Add part 1 byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); // Read part 1 completely byte[] readBuffer = new byte[512]; @@ -572,7 +572,7 @@ public async Task ReadAsync_WaitsForPartAvailability() // Add the part asynchronously byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); // Assert - Read should complete int bytesRead = await readTask; @@ -653,14 +653,14 @@ public async Task ReadAsync_ReadingAcrossPartBoundary_FillsBuffer() byte[] testBuffer1 = ArrayPool.Shared.Rent(100); Buffer.BlockCopy(testData1, 0, testBuffer1, 0, 100); var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100); - await manager.AddBufferAsync(partBuffer1, CancellationToken.None); + manager.AddBuffer(partBuffer1); // Add Part 2 (100 bytes) byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 100); byte[] testBuffer2 = ArrayPool.Shared.Rent(100); Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100); var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); - await manager.AddBufferAsync(partBuffer2, CancellationToken.None); + manager.AddBuffer(partBuffer2); // Act - Request 150 bytes (spans both parts) byte[] readBuffer = new byte[150]; @@ -700,7 +700,7 @@ public async Task ReadAsync_MultiplePartsInSingleRead_AdvancesCorrectly() byte[] testBuffer = ArrayPool.Shared.Rent(50); Buffer.BlockCopy(testData, 0, testBuffer, 0, 50); var partBuffer = new StreamPartBuffer(i, testBuffer, 50); - await manager.AddBufferAsync(partBuffer, CancellationToken.None); + manager.AddBuffer(partBuffer); } // Act - Read 150 bytes (all 3 parts) @@ -729,7 +729,7 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart() // Add part 1 byte[] testBuffer1 = ArrayPool.Shared.Rent(100); var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100); - await manager.AddBufferAsync(partBuffer1, CancellationToken.None); + manager.AddBuffer(partBuffer1); // Read part 1 completely byte[] readBuffer = new byte[100]; @@ -741,7 +741,7 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart() // Add part 2 byte[] testBuffer2 = ArrayPool.Shared.Rent(100); var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); - await manager.AddBufferAsync(partBuffer2, CancellationToken.None); + manager.AddBuffer(partBuffer2); // Read part 2 int bytesRead = await manager.ReadAsync(readBuffer, 0, 100, CancellationToken.None); @@ -768,14 +768,14 @@ public async Task ReadAsync_EmptyPart_ContinuesToNextPart() // Add empty part 1 byte[] testBuffer1 = ArrayPool.Shared.Rent(100); var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 0); // 0 bytes - await manager.AddBufferAsync(partBuffer1, CancellationToken.None); + manager.AddBuffer(partBuffer1); // Add part 2 with data byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 0); byte[] testBuffer2 = ArrayPool.Shared.Rent(100); Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100); var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); - await manager.AddBufferAsync(partBuffer2, CancellationToken.None); + manager.AddBuffer(partBuffer2); // Act - Try to read 100 bytes starting from part 1 byte[] readBuffer = new byte[100]; @@ -945,7 +945,7 @@ public void Dispose_DisposesAllDataSources() byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - manager.AddBufferAsync(partBuffer, CancellationToken.None).Wait(); + manager.AddBuffer(partBuffer); // Act manager.Dispose(); @@ -963,7 +963,7 @@ public void Dispose_ClearsCollection() byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); - manager.AddBufferAsync(partBuffer, CancellationToken.None).Wait(); + manager.AddBuffer(partBuffer); // Act manager.Dispose();