Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit 4ca662c

Browse files
authored
Add response drain max time to SocketsHttpHandler (#27708)
* Add response drain max time to SocketsHttpHandler Adds support for a max time limit on how long we'll try to drain a connection before returning it to the pool, and adds a (currently non-public) knob for controlling that limit. This also fixes a race condition with regards to cancellation and chunked responses. If cancellation was requested after returning the connection to the pool but before we disposed of the cancellation registration, someone else could get the pooled connection and then cancellation of one request could end up canceling the other request. * Address PR feedback
1 parent 9ce9033 commit 4ca662c

File tree

9 files changed

+238
-43
lines changed

9 files changed

+238
-43
lines changed

src/Common/src/System/Net/Http/HttpHandlerDefaults.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ internal static class HttpHandlerDefaults
1414
public const int DefaultMaxAutomaticRedirections = 50;
1515
public const int DefaultMaxConnectionsPerServer = int.MaxValue;
1616
public const int DefaultMaxResponseDrainSize = 1024 * 1024;
17+
public static readonly TimeSpan DefaultMaxResponseDrainTime = TimeSpan.FromSeconds(2);
1718
public const int DefaultMaxResponseHeadersLength = 64; // Units in K (1024) bytes.
1819
public const DecompressionMethods DefaultAutomaticDecompression = DecompressionMethods.None;
1920
public const bool DefaultAutomaticRedirection = true;

src/System.Net.Http/src/ILLinkTrim.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,9 @@
22
<assembly fullname="System.Net.Http">
33
<!-- Anonymous types are used with DiagnosticSource logging and subscribers reflect over those, calling their public getters. -->
44
<type fullname="*f__AnonymousType*" />
5+
<type fullname="System.Net.Http.SocketsHttpHandler"> <!-- TODO #27685: Remove once public. -->
6+
<method name="get_MaxResponseDrainTime" />
7+
<method name="set_MaxResponseDrainTime" />
8+
</type>
59
</assembly>
610
</linker>

src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ChunkedEncodingReadStream.cs

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationT
4545
}
4646

4747
// Try to consume from data we already have in the buffer.
48-
int bytesRead = ReadChunksFromConnectionBuffer(destination.Span);
48+
int bytesRead = ReadChunksFromConnectionBuffer(destination.Span, cancellationRegistration: default);
4949
if (bytesRead > 0)
5050
{
5151
return new ValueTask<int>(bytesRead);
@@ -108,7 +108,7 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> destination, Cancellatio
108108

109109
// Now that we have more, see if we can get any response data, and if
110110
// we can we're done.
111-
int bytesCopied = ReadChunksFromConnectionBuffer(destination.Span);
111+
int bytesCopied = ReadChunksFromConnectionBuffer(destination.Span, ctr);
112112
if (bytesCopied > 0)
113113
{
114114
return bytesCopied;
@@ -144,7 +144,7 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell
144144
{
145145
while (true)
146146
{
147-
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue);
147+
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue, ctr);
148148
if (bytesRead.Length == 0)
149149
{
150150
break;
@@ -171,12 +171,12 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell
171171
}
172172
}
173173

174-
private int ReadChunksFromConnectionBuffer(Span<byte> destination)
174+
private int ReadChunksFromConnectionBuffer(Span<byte> destination, CancellationTokenRegistration cancellationRegistration)
175175
{
176176
int totalBytesRead = 0;
177177
while (destination.Length > 0)
178178
{
179-
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(destination.Length);
179+
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(destination.Length, cancellationRegistration);
180180
Debug.Assert(bytesRead.Length <= destination.Length);
181181
if (bytesRead.Length == 0)
182182
{
@@ -190,7 +190,7 @@ private int ReadChunksFromConnectionBuffer(Span<byte> destination)
190190
return totalBytesRead;
191191
}
192192

193-
private ReadOnlyMemory<byte> ReadChunkFromConnectionBuffer(int maxBytesToRead)
193+
private ReadOnlyMemory<byte> ReadChunkFromConnectionBuffer(int maxBytesToRead, CancellationTokenRegistration cancellationRegistration)
194194
{
195195
Debug.Assert(maxBytesToRead > 0);
196196

@@ -287,6 +287,16 @@ private ReadOnlyMemory<byte> ReadChunkFromConnectionBuffer(int maxBytesToRead)
287287

288288
if (currentLine.IsEmpty)
289289
{
290+
// Dispose of the registration and then check whether cancellation has been
291+
// requested. This is necessary to make determinstic a race condition between
292+
// cancellation being requested and unregistering from the token. Otherwise,
293+
// it's possible cancellation could be requested just before we unregister and
294+
// we then return a connection to the pool that has been or will be disposed
295+
// (e.g. if a timer is used and has already queued its callback but the
296+
// callback hasn't yet run).
297+
cancellationRegistration.Dispose();
298+
cancellationRegistration.Token.ThrowIfCancellationRequested();
299+
290300
_state = ParsingState.Done;
291301
_connection.CompleteResponse();
292302
_connection = null;
@@ -344,32 +354,52 @@ public override async Task<bool> DrainAsync(int maxDrainBytes)
344354
{
345355
Debug.Assert(_connection != null);
346356

347-
int drainedBytes = 0;
348-
while (true)
357+
CancellationTokenSource cts = null;
358+
CancellationTokenRegistration ctr = default;
359+
try
349360
{
350-
drainedBytes += _connection.RemainingBuffer.Length;
361+
int drainedBytes = 0;
351362
while (true)
352363
{
353-
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue);
354-
if (bytesRead.Length == 0)
364+
drainedBytes += _connection.RemainingBuffer.Length;
365+
while (true)
355366
{
356-
break;
367+
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue, ctr);
368+
if (bytesRead.Length == 0)
369+
{
370+
break;
371+
}
357372
}
358-
}
359373

360-
// When ReadChunkFromConnectionBuffer reads the final chunk, it will clear out _connection
361-
// and return the connection to the pool.
362-
if (_connection == null)
363-
{
364-
return true;
365-
}
374+
// When ReadChunkFromConnectionBuffer reads the final chunk, it will clear out _connection
375+
// and return the connection to the pool.
376+
if (_connection == null)
377+
{
378+
return true;
379+
}
366380

367-
if (drainedBytes >= maxDrainBytes)
368-
{
369-
return false;
370-
}
381+
if (drainedBytes >= maxDrainBytes)
382+
{
383+
return false;
384+
}
385+
386+
if (cts == null) // only create the drain timer if we have to go async
387+
{
388+
TimeSpan drainTime = _connection._pool.Settings._maxResponseDrainTime;
389+
if (drainTime != Timeout.InfiniteTimeSpan)
390+
{
391+
cts = new CancellationTokenSource((int)drainTime.TotalMilliseconds);
392+
ctr = cts.Token.Register(s => ((HttpConnection)s).Dispose(), _connection);
393+
}
394+
}
371395

372-
await _connection.FillAsync().ConfigureAwait(false);
396+
await _connection.FillAsync().ConfigureAwait(false);
397+
}
398+
}
399+
finally
400+
{
401+
ctr.Dispose();
402+
cts?.Dispose();
373403
}
374404
}
375405
}

src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ContentLengthReadStream.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,16 +174,42 @@ public override async Task<bool> DrainAsync(int maxDrainBytes)
174174
return false;
175175
}
176176

177-
while (true)
177+
CancellationTokenSource cts = null;
178+
CancellationTokenRegistration ctr = default;
179+
TimeSpan drainTime = _connection._pool.Settings._maxResponseDrainTime;
180+
if (drainTime != Timeout.InfiniteTimeSpan)
178181
{
179-
await _connection.FillAsync().ConfigureAwait(false);
180-
ReadFromConnectionBuffer(int.MaxValue);
181-
if (_contentBytesRemaining == 0)
182+
cts = new CancellationTokenSource((int)drainTime.TotalMilliseconds);
183+
ctr = cts.Token.Register(s => ((HttpConnection)s).Dispose(), _connection);
184+
}
185+
try
186+
{
187+
while (true)
182188
{
183-
Finish();
184-
return true;
189+
await _connection.FillAsync().ConfigureAwait(false);
190+
ReadFromConnectionBuffer(int.MaxValue);
191+
if (_contentBytesRemaining == 0)
192+
{
193+
// Dispose of the registration and then check whether cancellation has been
194+
// requested. This is necessary to make determinstic a race condition between
195+
// cancellation being requested and unregistering from the token. Otherwise,
196+
// it's possible cancellation could be requested just before we unregister and
197+
// we then return a connection to the pool that has been or will be disposed
198+
// (e.g. if a timer is used and has already queued its callback but the
199+
// callback hasn't yet run).
200+
ctr.Dispose();
201+
ctr.Token.ThrowIfCancellationRequested();
202+
203+
Finish();
204+
return true;
205+
}
185206
}
186207
}
208+
finally
209+
{
210+
ctr.Dispose();
211+
cts?.Dispose();
212+
}
187213
}
188214
}
189215
}

src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ internal sealed class HttpConnectionSettings
2727

2828
internal int _maxConnectionsPerServer = HttpHandlerDefaults.DefaultMaxConnectionsPerServer;
2929
internal int _maxResponseDrainSize = HttpHandlerDefaults.DefaultMaxResponseDrainSize;
30+
internal TimeSpan _maxResponseDrainTime = HttpHandlerDefaults.DefaultMaxResponseDrainTime;
3031
internal int _maxResponseHeadersLength = HttpHandlerDefaults.DefaultMaxResponseHeadersLength;
3132

3233
internal TimeSpan _pooledConnectionLifetime = HttpHandlerDefaults.DefaultPooledConnectionLifetime;
@@ -58,6 +59,7 @@ public HttpConnectionSettings Clone()
5859
_maxAutomaticRedirections = _maxAutomaticRedirections,
5960
_maxConnectionsPerServer = _maxConnectionsPerServer,
6061
_maxResponseDrainSize = _maxResponseDrainSize,
62+
_maxResponseDrainTime = _maxResponseDrainTime,
6163
_maxResponseHeadersLength = _maxResponseHeadersLength,
6264
_pooledConnectionLifetime = _pooledConnectionLifetime,
6365
_pooledConnectionIdleTimeout = _pooledConnectionIdleTimeout,

src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,22 @@ public int MaxResponseDrainSize
167167
}
168168
}
169169

170+
internal TimeSpan MaxResponseDrainTime // TODO #27685: Expose publicly.
171+
{
172+
get => _settings._maxResponseDrainTime;
173+
set
174+
{
175+
if ((value < TimeSpan.Zero && value != Timeout.InfiniteTimeSpan) ||
176+
(value.TotalMilliseconds > int.MaxValue))
177+
{
178+
throw new ArgumentOutOfRangeException(nameof(value));
179+
}
180+
181+
CheckDisposedOrStarted();
182+
_settings._maxResponseDrainTime = value;
183+
}
184+
}
185+
170186
public int MaxResponseHeadersLength
171187
{
172188
get => _settings._maxResponseHeadersLength;

src/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.ResponseDrain.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
using System.IO;
66
using System.Net.Test.Common;
77
using System.Text;
8+
using System.Threading;
89
using System.Threading.Tasks;
910
using Xunit;
1011

1112
namespace System.Net.Http.Functional.Tests
1213
{
1314
public class HttpClientHandler_ResponseDrain_Test : HttpClientTestBase
1415
{
16+
protected virtual void SetMaxResponseDrainTime(HttpClientHandler handler, TimeSpan time) { }
17+
1518
public enum ContentMode
1619
{
1720
ContentLength,
@@ -131,6 +134,7 @@ await LoopbackServer.CreateClientAndServerAsync(
131134
async url =>
132135
{
133136
HttpClientHandler handler = CreateHttpClientHandler();
137+
SetMaxResponseDrainTime(handler, Timeout.InfiniteTimeSpan);
134138

135139
// Set MaxConnectionsPerServer to 1. This will ensure we will wait for the previous request to drain (or fail to)
136140
handler.MaxConnectionsPerServer = 1;
@@ -208,6 +212,7 @@ await LoopbackServer.CreateClientAndServerAsync(
208212
async url =>
209213
{
210214
HttpClientHandler handler = CreateHttpClientHandler();
215+
SetMaxResponseDrainTime(handler, Timeout.InfiniteTimeSpan);
211216

212217
// Set MaxConnectionsPerServer to 1. This will ensure we will wait for the previous request to drain (or fail to)
213218
handler.MaxConnectionsPerServer = 1;

src/System.Net.Http/tests/FunctionalTests/HttpClientTestBase.cs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,13 @@ protected static HttpClientHandler CreateHttpClientHandler(bool useSocketsHttpHa
4242
return handler;
4343
}
4444

45-
protected static bool IsSocketsHttpHandler(HttpClientHandler handler)
45+
protected static bool IsSocketsHttpHandler(HttpClientHandler handler) =>
46+
GetUnderlyingSocketsHttpHandler(handler) != null;
47+
48+
protected static object GetUnderlyingSocketsHttpHandler(HttpClientHandler handler)
4649
{
4750
FieldInfo field = typeof(HttpClientHandler).GetField("_socketsHttpHandler", BindingFlags.Instance | BindingFlags.NonPublic);
48-
if (field == null)
49-
{
50-
return false;
51-
}
52-
53-
object socketsHttpHandler = field.GetValue(handler);
54-
if (socketsHttpHandler == null)
55-
{
56-
return false;
57-
}
58-
59-
return true;
51+
return field?.GetValue(handler);
6052
}
6153
}
6254
}

0 commit comments

Comments
 (0)