Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async Task ProcessPartAsync(
partNumber, buffer.Length);

// Add the buffered part to the buffer manager
await _partBufferManager.AddBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
_partBufferManager.AddBuffer(buffer);

Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager",
partNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ internal interface IPartBufferManager : IDisposable
/// Adds a downloaded part buffer and signals readers when next expected part arrives.
/// </summary>
/// <param name="buffer">The downloaded part buffer to add.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>A task that completes when the buffer has been added and signaling is complete.</returns>
Task AddBufferAsync(StreamPartBuffer buffer, CancellationToken cancellationToken);
void AddBuffer(StreamPartBuffer buffer);

/// <summary>
/// Reads data from the buffer manager. Automatically handles sequential part consumption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace Amazon.S3.Transfer.Internal
/// - Example: With MaxInMemoryParts=10, if parts 5-14 are buffered, the task downloading
/// part 15 blocks here until the reader consumes and releases part 5's buffer
/// 2. Read part data from S3 into pooled buffer
/// 3. Add buffered part: await <see cref="AddBufferAsync"/>
/// 3. Add buffered part: <see cref="AddBuffer"/>
/// - Adds buffer to _partDataSources dictionary
/// - Signals _partAvailable to wake consumer if waiting
/// 4. Consumer eventually releases the buffer slot after reading the part
Expand Down Expand Up @@ -286,7 +286,7 @@ public void AddDataSource(IPartDataSource dataSource)
}

/// <inheritdoc/>
public async Task AddBufferAsync(StreamPartBuffer buffer, CancellationToken cancellationToken)
public void AddBuffer(StreamPartBuffer buffer)
{
ThrowIfDisposed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public async Task ProcessPartAsync_BuffersPartData()

// Assert - should add buffer to manager
mockBufferManager.Verify(
x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()),
x => x.AddBuffer(It.IsAny<StreamPartBuffer>()),
Times.Once);
}

Expand All @@ -92,9 +92,8 @@ public async Task ProcessPartAsync_ReadsExactContentLength()

StreamPartBuffer capturedBuffer = null;
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()))
.Callback<StreamPartBuffer, CancellationToken>((buffer, ct) => capturedBuffer = buffer)
.Returns(Task.CompletedTask);
mockBufferManager.Setup(x => x.AddBuffer(It.IsAny<StreamPartBuffer>()))
.Callback<StreamPartBuffer>((buffer) => capturedBuffer = buffer);

var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
Expand Down Expand Up @@ -124,9 +123,8 @@ public async Task ProcessPartAsync_HandlesSmallPart()

StreamPartBuffer capturedBuffer = null;
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()))
.Callback<StreamPartBuffer, CancellationToken>((buffer, ct) => capturedBuffer = buffer)
.Returns(Task.CompletedTask);
mockBufferManager.Setup(x => x.AddBuffer(It.IsAny<StreamPartBuffer>()))
.Callback<StreamPartBuffer>((buffer) => capturedBuffer = buffer);

var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
Expand Down Expand Up @@ -155,9 +153,8 @@ public async Task ProcessPartAsync_HandlesLargePart()

StreamPartBuffer capturedBuffer = null;
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()))
.Callback<StreamPartBuffer, CancellationToken>((buffer, ct) => capturedBuffer = buffer)
.Returns(Task.CompletedTask);
mockBufferManager.Setup(x => x.AddBuffer(It.IsAny<StreamPartBuffer>()))
.Callback<StreamPartBuffer>((buffer) => capturedBuffer = buffer);

var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
Expand Down Expand Up @@ -191,9 +188,8 @@ public async Task ProcessPartAsync_PreservesDataIntegrity()

StreamPartBuffer capturedBuffer = null;
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()))
.Callback<StreamPartBuffer, CancellationToken>((buffer, ct) => capturedBuffer = buffer)
.Returns(Task.CompletedTask);
mockBufferManager.Setup(x => x.AddBuffer(It.IsAny<StreamPartBuffer>()))
.Callback<StreamPartBuffer>((buffer) => capturedBuffer = buffer);

var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
Expand Down Expand Up @@ -234,7 +230,7 @@ public async Task ProcessPartAsync_HandlesZeroByteResponse()

// Assert - should handle empty response gracefully
mockBufferManager.Verify(
x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()),
x => x.AddBuffer(It.IsAny<StreamPartBuffer>()),
Times.Once);
}

Expand Down Expand Up @@ -301,7 +297,7 @@ public async Task ProcessPartAsync_WithUnexpectedEOF_DoesNotBufferPartialData()

// Assert - should NOT have added any buffer to manager since download failed
mockBufferManager.Verify(
x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()),
x => x.AddBuffer(It.IsAny<StreamPartBuffer>()),
Times.Never);
}

Expand Down Expand Up @@ -335,17 +331,14 @@ public async Task ProcessPartAsync_WithCancelledToken_ThrowsTaskCanceledExceptio
}

[TestMethod]
public async Task ProcessPartAsync_PassesCancellationTokenToBufferManager()
public async Task ProcessPartAsync_CallsAddBufferOnce()
{
// Arrange
var partSize = 1024;
var partData = new byte[partSize];

CancellationToken capturedToken = default;
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(x => x.AddBufferAsync(It.IsAny<StreamPartBuffer>(), It.IsAny<CancellationToken>()))
.Callback<StreamPartBuffer, CancellationToken>((buffer, ct) => capturedToken = ct)
.Returns(Task.CompletedTask);
mockBufferManager.Setup(x => x.AddBuffer(It.IsAny<StreamPartBuffer>()));

var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
Expand All @@ -361,8 +354,8 @@ public async Task ProcessPartAsync_PassesCancellationTokenToBufferManager()
// Act
await handler.ProcessPartAsync(1, response, cts.Token);

// Assert
Assert.AreEqual(cts.Token, capturedToken);
// Assert - verify AddBuffer was called exactly once
mockBufferManager.Verify(x => x.AddBuffer(It.IsAny<StreamPartBuffer>()), Times.Once);
}

#endregion
Expand Down
44 changes: 22 additions & 22 deletions sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public async Task NextExpectedPartNumber_IncrementsAfterPartComplete()
// Add part 1
byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);

// Read part 1 completely
byte[] readBuffer = new byte[512];
Expand Down Expand Up @@ -134,7 +134,7 @@ public async Task WaitForBufferSpaceAsync_WhenMaxPartsReached_Blocks()
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(i, testBuffer, 512);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);
}

// Act - Try to wait for space (should block)
Expand Down Expand Up @@ -169,7 +169,7 @@ public async Task WaitForBufferSpaceAsync_AfterRelease_AllowsAccess()
await manager.WaitForBufferSpaceAsync(CancellationToken.None);
byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);

// Release space
manager.ReleaseBufferSpace();
Expand Down Expand Up @@ -226,10 +226,10 @@ public async Task WaitForBufferSpaceAsync_WithCancellation_ThrowsOperationCancel

#endregion

#region AddBufferAsync Tests
#region AddBuffer Tests

[TestMethod]
public async Task AddBufferAsync_CreatesBufferedDataSource()
public async Task AddBuffer_CreatesBufferedDataSource()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
Expand All @@ -241,7 +241,7 @@ public async Task AddBufferAsync_CreatesBufferedDataSource()
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);

// Act
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);

// Assert - Should be able to read from part 1
byte[] readBuffer = new byte[512];
Expand All @@ -256,7 +256,7 @@ public async Task AddBufferAsync_CreatesBufferedDataSource()

[TestMethod]
[ExpectedException(typeof(ArgumentNullException))]
public async Task AddBufferAsync_WithNullBuffer_ThrowsArgumentNullException()
public void AddBuffer_WithNullBuffer_ThrowsArgumentNullException()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
Expand All @@ -265,7 +265,7 @@ public async Task AddBufferAsync_WithNullBuffer_ThrowsArgumentNullException()
try
{
// Act
await manager.AddBufferAsync(null, CancellationToken.None);
manager.AddBuffer(null);

// Assert - ExpectedException
}
Expand All @@ -276,7 +276,7 @@ public async Task AddBufferAsync_WithNullBuffer_ThrowsArgumentNullException()
}

[TestMethod]
public async Task AddBufferAsync_SignalsPartAvailable()
public async Task AddBuffer_SignalsPartAvailable()
{
// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
Expand All @@ -297,7 +297,7 @@ public async Task AddBufferAsync_SignalsPartAvailable()
// Add the part
byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);

// Assert - Read should complete
int bytesRead = await readTask;
Expand Down Expand Up @@ -411,7 +411,7 @@ public async Task ReadAsync_ReadsDataSequentially()
Buffer.BlockCopy(testData, 0, testBuffer, 0, 512);

var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);

// Act
byte[] readBuffer = new byte[512];
Expand Down Expand Up @@ -439,7 +439,7 @@ public async Task ReadAsync_AdvancesNextExpectedPartNumber()
// Add part 1
byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);

// Read part 1 completely
byte[] readBuffer = new byte[512];
Expand Down Expand Up @@ -572,7 +572,7 @@ public async Task ReadAsync_WaitsForPartAvailability()
// Add the part asynchronously
byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);

// Assert - Read should complete
int bytesRead = await readTask;
Expand Down Expand Up @@ -653,14 +653,14 @@ public async Task ReadAsync_ReadingAcrossPartBoundary_FillsBuffer()
byte[] testBuffer1 = ArrayPool<byte>.Shared.Rent(100);
Buffer.BlockCopy(testData1, 0, testBuffer1, 0, 100);
var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100);
await manager.AddBufferAsync(partBuffer1, CancellationToken.None);
manager.AddBuffer(partBuffer1);

// Add Part 2 (100 bytes)
byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 100);
byte[] testBuffer2 = ArrayPool<byte>.Shared.Rent(100);
Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100);
var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100);
await manager.AddBufferAsync(partBuffer2, CancellationToken.None);
manager.AddBuffer(partBuffer2);

// Act - Request 150 bytes (spans both parts)
byte[] readBuffer = new byte[150];
Expand Down Expand Up @@ -700,7 +700,7 @@ public async Task ReadAsync_MultiplePartsInSingleRead_AdvancesCorrectly()
byte[] testBuffer = ArrayPool<byte>.Shared.Rent(50);
Buffer.BlockCopy(testData, 0, testBuffer, 0, 50);
var partBuffer = new StreamPartBuffer(i, testBuffer, 50);
await manager.AddBufferAsync(partBuffer, CancellationToken.None);
manager.AddBuffer(partBuffer);
}

// Act - Read 150 bytes (all 3 parts)
Expand Down Expand Up @@ -729,7 +729,7 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart()
// Add part 1
byte[] testBuffer1 = ArrayPool<byte>.Shared.Rent(100);
var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100);
await manager.AddBufferAsync(partBuffer1, CancellationToken.None);
manager.AddBuffer(partBuffer1);

// Read part 1 completely
byte[] readBuffer = new byte[100];
Expand All @@ -741,7 +741,7 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart()
// Add part 2
byte[] testBuffer2 = ArrayPool<byte>.Shared.Rent(100);
var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100);
await manager.AddBufferAsync(partBuffer2, CancellationToken.None);
manager.AddBuffer(partBuffer2);

// Read part 2
int bytesRead = await manager.ReadAsync(readBuffer, 0, 100, CancellationToken.None);
Expand All @@ -768,14 +768,14 @@ public async Task ReadAsync_EmptyPart_ContinuesToNextPart()
// Add empty part 1
byte[] testBuffer1 = ArrayPool<byte>.Shared.Rent(100);
var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 0); // 0 bytes
await manager.AddBufferAsync(partBuffer1, CancellationToken.None);
manager.AddBuffer(partBuffer1);

// Add part 2 with data
byte[] testData2 = MultipartDownloadTestHelpers.GenerateTestData(100, 0);
byte[] testBuffer2 = ArrayPool<byte>.Shared.Rent(100);
Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100);
var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100);
await manager.AddBufferAsync(partBuffer2, CancellationToken.None);
manager.AddBuffer(partBuffer2);

// Act - Try to read 100 bytes starting from part 1
byte[] readBuffer = new byte[100];
Expand Down Expand Up @@ -945,7 +945,7 @@ public void Dispose_DisposesAllDataSources()

byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
manager.AddBufferAsync(partBuffer, CancellationToken.None).Wait();
manager.AddBuffer(partBuffer);

// Act
manager.Dispose();
Expand All @@ -963,7 +963,7 @@ public void Dispose_ClearsCollection()

byte[] testBuffer = ArrayPool<byte>.Shared.Rent(512);
var partBuffer = new StreamPartBuffer(1, testBuffer, 512);
manager.AddBufferAsync(partBuffer, CancellationToken.None).Wait();
manager.AddBuffer(partBuffer);

// Act
manager.Dispose();
Expand Down