Skip to content

Commit 12b92d1

Browse files
committed
Optimize part streaming
1 parent a522b17 commit 12b92d1

File tree

10 files changed

+2188
-350
lines changed

10 files changed

+2188
-350
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"services": [
3+
{
4+
"serviceName": "S3",
5+
"type": "patch",
6+
"changeLogMessages": [
7+
"Optimized multipart download manager to stream responses directly where applicable."
8+
]
9+
}
10+
]
11+
}

sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs

Lines changed: 176 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,26 @@
3030
namespace Amazon.S3.Transfer.Internal
3131
{
3232
/// <summary>
33-
/// Buffers downloaded parts in memory using <see cref="ArrayPool{T}"/> and <see cref="IPartBufferManager"/>.
34-
/// Implements current streaming behavior for multipart downloads.
33+
/// Handles multipart download data with intelligent stream-vs-buffer decision making.
34+
/// Optimizes for sequential part arrival by streaming directly to consumer when possible,
35+
/// while buffering out-of-order parts into memory using <see cref="ArrayPool{T}"/>.
3536
/// </summary>
37+
/// <remarks>
38+
/// <para><strong>Optimization Strategy:</strong></para>
39+
/// <list type="bullet">
40+
/// <item>Parts arriving in expected order (matching NextExpectedPartNumber) stream directly to consumer</item>
41+
/// <item>Out-of-order parts buffer into ArrayPool memory for later sequential consumption</item>
42+
/// <item>Best case: All parts in order → zero buffering → pure streaming</item>
43+
/// <item>Worst case: All parts out of order → full buffering (original behavior)</item>
44+
/// </list>
45+
///
46+
///
47+
/// <para><strong>Response Ownership:</strong></para>
48+
/// <list type="bullet">
49+
/// <item>Streaming: StreamingDataSource takes ownership and disposes after reading</item>
50+
/// <item>Buffering: Handler disposes response immediately after buffering completes</item>
51+
/// </list>
52+
/// </remarks>
3653
internal class BufferedPartDataHandler : IPartDataHandler
3754
{
3855
private readonly IPartBufferManager _partBufferManager;
@@ -64,28 +81,163 @@ public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationTo
6481
}
6582

6683
/// <inheritdoc/>
84+
/// <remarks>
85+
/// <para>
86+
/// Intelligently chooses between streaming and buffering based on part arrival order:
87+
/// </para>
88+
/// <list type="bullet">
89+
/// <item>If partNumber matches NextExpectedPartNumber: Stream directly (no buffering)</item>
90+
/// <item>Otherwise: Buffer into memory for later sequential consumption</item>
91+
/// </list>
92+
/// <para><strong>Response Ownership:</strong></para>
93+
/// <para>
94+
/// This method takes ownership of the response and is responsible for disposing it in ALL cases,
95+
/// including error scenarios. Callers must NOT dispose the response after calling this method.
96+
/// </para>
97+
/// </remarks>
6798
public async Task ProcessPartAsync(
6899
int partNumber,
69100
GetObjectResponse response,
70101
CancellationToken cancellationToken)
71102
{
72-
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Starting to buffer part from response stream - ContentLength={1}",
73-
partNumber, response.ContentLength);
103+
if (partNumber == _partBufferManager.NextExpectedPartNumber)
104+
{
105+
await ProcessStreamingPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false);
106+
}
107+
else
108+
{
109+
await ProcessBufferedPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false);
110+
}
111+
}
112+
113+
/// <summary>
114+
/// Processes a part that arrives in expected order by streaming it directly without buffering.
115+
/// Takes ownership of the response and transfers it to the StreamingDataSource.
116+
/// </summary>
117+
/// <param name="partNumber">The part number being processed.</param>
118+
/// <param name="response">The GetObjectResponse containing the part data. Ownership is transferred to StreamingDataSource.</param>
119+
/// <param name="cancellationToken">Cancellation token for the operation.</param>
120+
/// <remarks>
121+
/// This method is called when the part arrives in the expected sequential order, allowing
122+
/// for optimal zero-copy streaming directly to the consumer without buffering into memory.
123+
///
124+
/// OWNERSHIP TRANSFER:
125+
/// 1. Response is passed to StreamingDataSource constructor (StreamingDataSource takes ownership)
126+
/// 2. StreamingDataSource is added to buffer manager (buffer manager takes ownership)
127+
/// 3. After successful AddBufferAsync, we null out our reference to mark ownership transfer
128+
/// 4. Buffer manager will dispose StreamingDataSource (which disposes response) during cleanup
129+
///
130+
/// ERROR HANDLING:
131+
/// - If StreamingDataSource constructor fails: We dispose the response (still our responsibility)
132+
/// - If constructor succeeds but AddBufferAsync fails: StreamingDataSource.Dispose() handles the response
133+
/// - If AddBufferAsync succeeds: Buffer manager owns everything and will clean up
134+
/// </remarks>
135+
private async Task ProcessStreamingPartAsync(
136+
int partNumber,
137+
GetObjectResponse response,
138+
CancellationToken cancellationToken)
139+
{
140+
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Matches NextExpectedPartNumber - streaming directly without buffering",
141+
partNumber);
74142

75-
// Buffer the part from the response stream into memory
76-
var buffer = await BufferPartFromResponseAsync(
77-
partNumber,
78-
response,
79-
cancellationToken).ConfigureAwait(false);
143+
StreamingDataSource streamingDataSource = null;
144+
var ownsResponse = true; // Track if we still own the response
145+
146+
try
147+
{
148+
// Create a StreamingDataSource that will stream directly from the response
149+
// If successful, StreamingDataSource takes ownership of the response and will dispose it
150+
streamingDataSource = new StreamingDataSource(partNumber, response);
151+
ownsResponse = false; // Ownership transferred to StreamingDataSource
80152

81-
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into memory",
82-
partNumber, buffer.Length);
153+
// Add the streaming data source to the buffer manager
154+
// After this succeeds, the buffer manager owns the data source
155+
await _partBufferManager.AddBufferAsync(streamingDataSource, cancellationToken).ConfigureAwait(false);
83156

84-
// Add the buffered part to the buffer manager
85-
await _partBufferManager.AddBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
157+
// Mark ownership transfer by nulling our reference
158+
// If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it
159+
streamingDataSource = null;
86160

87-
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager",
88-
partNumber);
161+
// Release capacity immediately since we're not holding anything in memory
162+
_partBufferManager.ReleaseBufferSpace();
163+
164+
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] StreamingDataSource added and capacity released",
165+
partNumber);
166+
}
167+
catch (Exception ex)
168+
{
169+
Logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to process streaming part", partNumber);
170+
171+
// Dispose response if we still own it (constructor failed before taking ownership)
172+
if (ownsResponse)
173+
response?.Dispose();
174+
175+
// Dispose StreamingDataSource if we created it but buffer manager doesn't own it yet
176+
// If null, the buffer manager owns it and will handle cleanup
177+
streamingDataSource?.Dispose();
178+
179+
throw;
180+
}
181+
}
182+
183+
/// <summary>
184+
/// Processes a part that arrives out of order by buffering it into memory.
185+
/// Takes ownership of the response and disposes it after buffering completes.
186+
/// </summary>
187+
/// <param name="partNumber">The part number being processed.</param>
188+
/// <param name="response">The GetObjectResponse containing the part data. This method owns and disposes it.</param>
189+
/// <param name="cancellationToken">Cancellation token for the operation.</param>
190+
/// <remarks>
191+
/// This method is called when the part arrives out of the expected sequential order.
192+
/// The part data is buffered into ArrayPool memory for later sequential consumption.
193+
///
194+
/// OWNERSHIP:
195+
/// - Response is read and buffered into StreamPartBuffer
196+
/// - Response is disposed immediately after buffering (no longer needed)
197+
/// - StreamPartBuffer is added to buffer manager (buffer manager takes ownership)
198+
/// - Buffer manager will dispose StreamPartBuffer during cleanup
199+
///
200+
/// ERROR HANDLING:
201+
/// - Always dispose response in catch block since we own it throughout this method
202+
/// - BufferPartFromResponseAsync handles its own cleanup of StreamPartBuffer on error
203+
/// </remarks>
204+
private async Task ProcessBufferedPartAsync(
205+
int partNumber,
206+
GetObjectResponse response,
207+
CancellationToken cancellationToken)
208+
{
209+
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Out of order (NextExpected={1}) - buffering to memory",
210+
partNumber, _partBufferManager.NextExpectedPartNumber);
211+
212+
try
213+
{
214+
// Buffer the part from the response stream into memory
215+
var buffer = await BufferPartFromResponseAsync(
216+
partNumber,
217+
response,
218+
cancellationToken).ConfigureAwait(false);
219+
220+
// Response has been fully read and buffered - dispose it now
221+
response?.Dispose();
222+
223+
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Buffered {1} bytes into memory",
224+
partNumber, buffer.Length);
225+
226+
// Add the buffered part to the buffer manager
227+
await _partBufferManager.AddBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
228+
229+
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Added to buffer manager (capacity will be released after consumption)",
230+
partNumber);
231+
}
232+
catch (Exception ex)
233+
{
234+
Logger.Error(ex, "BufferedPartDataHandler: [Part {0}] Failed to process buffered part", partNumber);
235+
236+
// We own the response throughout this method, so dispose it on error
237+
response?.Dispose();
238+
239+
throw;
240+
}
89241
}
90242

91243
/// <inheritdoc/>
@@ -112,6 +264,15 @@ public void Dispose()
112264
// _partBufferManager is owned by caller, don't dispose
113265
}
114266

267+
/// <summary>
268+
/// Buffers a part from the GetObjectResponse stream into ArrayPool memory.
269+
/// Used when a part arrives out of order and cannot be streamed directly.
270+
/// </summary>
271+
/// <param name="partNumber">The part number being buffered.</param>
272+
/// <param name="response">The GetObjectResponse containing the part data stream.</param>
273+
/// <param name="cancellationToken">Cancellation token for the operation.</param>
274+
/// <returns>A <see cref="StreamPartBuffer"/> containing the buffered part data.</returns>
275+
/// <exception cref="Exception">Thrown when buffering fails. The StreamPartBuffer will be disposed automatically.</exception>
115276
private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
116277
int partNumber,
117278
GetObjectResponse response,

sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,26 +69,42 @@ public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationTo
6969
}
7070

7171
/// <inheritdoc/>
72+
/// <remarks>
73+
/// <para><strong>Response Ownership:</strong></para>
74+
/// <para>
75+
/// This method takes ownership of the response and is responsible for disposing it in ALL cases,
76+
/// including error scenarios. Callers must NOT dispose the response after calling this method.
77+
/// </para>
78+
/// </remarks>
7279
public async Task ProcessPartAsync(
7380
int partNumber,
7481
GetObjectResponse response,
7582
CancellationToken cancellationToken)
7683
{
77-
Logger.DebugFormat("FilePartDataHandler: [Part {0}] Starting to process part - ContentLength={1}",
78-
partNumber, response.ContentLength);
84+
try
85+
{
86+
Logger.DebugFormat("FilePartDataHandler: [Part {0}] Starting to process part - ContentLength={1}",
87+
partNumber, response.ContentLength);
7988

80-
// Calculate offset for this part based on ContentRange or part number
81-
long offset = GetPartOffset(response, partNumber);
89+
// Calculate offset for this part based on ContentRange or part number
90+
long offset = GetPartOffset(response, partNumber);
8291

83-
Logger.DebugFormat("FilePartDataHandler: [Part {0}] Calculated file offset={1}",
84-
partNumber, offset);
92+
Logger.DebugFormat("FilePartDataHandler: [Part {0}] Calculated file offset={1}",
93+
partNumber, offset);
8594

86-
// Write part data to file at the calculated offset
87-
await WritePartToFileAsync(offset, response, cancellationToken)
88-
.ConfigureAwait(false);
95+
// Write part data to file at the calculated offset
96+
await WritePartToFileAsync(offset, response, cancellationToken)
97+
.ConfigureAwait(false);
8998

90-
Logger.DebugFormat("FilePartDataHandler: [Part {0}] File write completed successfully",
91-
partNumber);
99+
Logger.DebugFormat("FilePartDataHandler: [Part {0}] File write completed successfully",
100+
partNumber);
101+
}
102+
finally
103+
{
104+
// Always dispose response after writing to disk (success or failure)
105+
// This releases the HTTP connection back to the pool
106+
response?.Dispose();
107+
}
92108
}
93109

94110
/// <inheritdoc/>

sdk/src/Services/S3/Custom/Transfer/Internal/IPartBufferManager.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ internal interface IPartBufferManager : IDisposable
4646
/// <returns>A task that completes when the buffer has been added and signaling is complete.</returns>
4747
Task AddBufferAsync(StreamPartBuffer buffer, CancellationToken cancellationToken);
4848

49+
/// <summary>
50+
/// Adds a part data source (streaming or buffered) and signals readers when next expected part arrives.
51+
/// </summary>
52+
/// <param name="dataSource">The part data source to add (can be StreamingDataSource or BufferedDataSource).</param>
53+
/// <param name="cancellationToken">A token to cancel the operation.</param>
54+
/// <returns>A task that completes when the data source has been added and signaling is complete.</returns>
55+
Task AddBufferAsync(IPartDataSource dataSource, CancellationToken cancellationToken);
56+
4957
/// <summary>
5058
/// Reads data from the buffer manager. Automatically handles sequential part consumption
5159
/// and reads across part boundaries to fill the buffer when possible, matching standard Stream.Read() behavior.

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -387,25 +387,26 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even
387387
partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests);
388388
}
389389

390-
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Starting buffering", partNumber);
390+
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing part (handler will decide: stream or buffer)", partNumber);
391391

392392
// Delegate data handling to the handler
393+
// IMPORTANT: Handler takes ownership of response and is responsible for disposing it in ALL cases:
394+
// - If streaming: StreamingDataSource takes ownership and disposes when consumer finishes reading
395+
// - If buffering: Handler disposes immediately after copying data to buffer
396+
// - On error: Handler disposes in its catch block before rethrowing
393397
await _dataHandler.ProcessPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false);
394398

395-
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffering completed successfully", partNumber);
399+
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing completed successfully", partNumber);
396400
}
397401
catch (Exception ex)
398402
{
399403
Logger.Error(ex, "MultipartDownloadManager: [Part {0}] Download failed", partNumber);
404+
400405
// Release capacity on failure
406+
// Note: Handler is responsible for disposing response in all cases, including errors
401407
_dataHandler.ReleaseCapacity();
402408
throw;
403409
}
404-
finally
405-
{
406-
// Always dispose the response since we never transfer ownership
407-
response?.Dispose();
408-
}
409410
}
410411

411412

0 commit comments

Comments
 (0)