Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit 53be85c

Browse files
authored
Enable SocketsHttpHandler cancellation support (#27029)
* Enable SocketsHttpHandler cancellation support This change significantly improves the cancellation support in SocketsHttpHandler. Previously we were passing the CancellationToken around to every method, eventually bottoming out in calls to the underlying Stream which then ends up passing them down to the underlying Socket. But today Socket's support for cancellation is minimal, only doing up-front checks; if cancellation is requested during the socket operation rather than before, the request will be ignored. Since HttpClient implements features like timeouts on top of cancellation support, it's important to do better than this. The change implements cancellation by registering with the CancellationToken to dispose of the connection. This will cause any reads/writes to wake up. We then translate resulting exceptions into cancellation exceptions. When in the main SendAsync method, we register once for the whole body of the operation until the point that we're returning the response message. For individual operations on the response content stream, we register per operation; however, when feasible we try to avoid the registration costs by only registering if operations don't complete synchronously. We also account for the case that on Unix, closing the connection may result in read operations waking up not with an exception but rather with EOF, which we also need to translate into cancellation when appropriate. Along the way I cleaned up a few minor issues as well. I also added a bunch of cancellation-related tests: - Test cancellation occurring while sending request content - Test cancellation occurring while receiving response headers - Test cancellation occurring while receiving response body and using a buffered operation - Test that all of the above are triggerable with CancellationTokenSource.Cancel, HttpClient.CancelPendingRequests, and HttpClient.Dispose - Test cancellation occurring while receiving response body and using an unbuffered operation, either a ReadAsync or CopyToAsync on the response stream - Test that a CancelPendingRequests doesn't affect unbuffered operations on the response stream There are deficiencies here in the existing handlers, and tests have been selectively disabled accordingly (I also fixed a couple cases that naturally fell out of the changes I was making for SocketsHttpHandler). SocketsHttpHandler passes now for all of them. * Add test that Dispose doesn't cancel response stream
1 parent c705032 commit 53be85c

27 files changed

+1092
-451
lines changed

src/Common/src/System/Net/Http/NoWriteNoSeekStreamContent.cs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,26 @@ namespace System.Net.Http
1414
internal sealed class NoWriteNoSeekStreamContent : HttpContent
1515
{
1616
private readonly Stream _content;
17-
private readonly CancellationToken _cancellationToken;
1817
private bool _contentConsumed;
1918

20-
internal NoWriteNoSeekStreamContent(Stream content, CancellationToken cancellationToken)
19+
internal NoWriteNoSeekStreamContent(Stream content)
2120
{
2221
Debug.Assert(content != null);
2322
Debug.Assert(content.CanRead);
2423
Debug.Assert(!content.CanWrite);
2524
Debug.Assert(!content.CanSeek);
2625

2726
_content = content;
28-
_cancellationToken = cancellationToken;
2927
}
3028

31-
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
29+
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) =>
30+
SerializeToStreamAsync(stream, context, CancellationToken.None);
31+
32+
internal
33+
#if HTTP_DLL
34+
override
35+
#endif
36+
Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken)
3237
{
3338
Debug.Assert(stream != null);
3439

@@ -39,7 +44,7 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext c
3944
_contentConsumed = true;
4045

4146
const int BufferSize = 8192;
42-
Task copyTask = _content.CopyToAsync(stream, BufferSize, _cancellationToken);
47+
Task copyTask = _content.CopyToAsync(stream, BufferSize, cancellationToken);
4348
if (copyTask.IsCompleted)
4449
{
4550
try { _content.Dispose(); } catch { } // same as StreamToStreamCopy behavior
@@ -75,6 +80,10 @@ protected override void Dispose(bool disposing)
7580
base.Dispose(disposing);
7681
}
7782

78-
protected override Task<Stream> CreateContentReadStreamAsync() => Task.FromResult<Stream>(_content);
83+
protected override Task<Stream> CreateContentReadStreamAsync() => Task.FromResult(_content);
84+
85+
#if HTTP_DLL
86+
internal override Stream TryCreateContentReadStream() => _content;
87+
#endif
7988
}
8089
}

src/Common/src/System/Net/Logging/NetEventSource.Common.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,9 @@ private static void DebugValidateArg(FormattableString arg)
395395
Debug.Assert(IsEnabled || arg == null, $"Should not be formatting FormattableString \"{arg}\" if tracing isn't enabled");
396396
}
397397

398-
public static new bool IsEnabled => Log.IsEnabled();
398+
public static new bool IsEnabled =>
399+
Log.IsEnabled();
400+
//true; // uncomment for debugging only
399401

400402
[NonEvent]
401403
public static string IdOf(object value) => value != null ? value.GetType().Name + "#" + GetHashCode(value) : NullInstance;

src/System.Net.Http.WinHttpHandler/src/System/Net/Http/WinHttpResponseParser.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public static HttpResponseMessage CreateResponseMessage(
9292
}
9393
}
9494

95-
response.Content = new NoWriteNoSeekStreamContent(decompressedStream, state.CancellationToken);
95+
response.Content = new NoWriteNoSeekStreamContent(decompressedStream);
9696
response.RequestMessage = request;
9797

9898
// Parse raw response headers and place them into response message.

src/System.Net.Http/src/System.Net.Http.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<?xml version="1.0" encoding="utf-8"?>
1+
<?xml version="1.0" encoding="utf-8"?>
22
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
33
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
44
<PropertyGroup>
@@ -137,11 +137,11 @@
137137
<Compile Include="System\Net\Http\SocketsHttpHandler\DecompressionHandler.cs" />
138138
<Compile Include="System\Net\Http\SocketsHttpHandler\EmptyReadStream.cs" />
139139
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnection.cs" />
140-
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionContent.cs" />
141140
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionHandler.cs" />
142141
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionKey.cs" />
143142
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionPool.cs" />
144143
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionPools.cs" />
144+
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionResponseContent.cs" />
145145
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionSettings.cs" />
146146
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpContentDuplexStream.cs" />
147147
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpContentReadStream.cs" />
@@ -464,4 +464,4 @@
464464
<Reference Include="System.Security.Cryptography.Primitives" />
465465
</ItemGroup>
466466
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.targets))\dir.targets" />
467-
</Project>
467+
</Project>

src/System.Net.Http/src/System/Net/Http/CurlHandler/CurlHandler.CurlResponseMessage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ internal CurlResponseMessage(EasyRequest easy)
2222
Debug.Assert(easy != null, "Expected non-null EasyRequest");
2323
RequestMessage = easy._requestMessage;
2424
ResponseStream = new CurlResponseStream(easy);
25-
Content = new NoWriteNoSeekStreamContent(ResponseStream, CancellationToken.None);
25+
Content = new NoWriteNoSeekStreamContent(ResponseStream);
2626

2727
// On Windows, we pass the equivalent of the easy._cancellationToken
2828
// in to StreamContent's ctor. This in turn passes that token through

src/System.Net.Http/src/System/Net/Http/HttpClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ private async Task<HttpResponseMessage> FinishSendAsyncBuffered(
475475
// Buffer the response content if we've been asked to and we have a Content to buffer.
476476
if (response.Content != null)
477477
{
478-
await response.Content.LoadIntoBufferAsync(_maxResponseContentBufferSize).ConfigureAwait(false);
478+
await response.Content.LoadIntoBufferAsync(_maxResponseContentBufferSize, cts.Token).ConfigureAwait(false);
479479
}
480480

481481
if (NetEventSource.IsEnabled) NetEventSource.ClientSendCompleted(this, response, request);

src/System.Net.Http/src/System/Net/Http/HttpContent.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,18 @@ internal Stream TryReadAsStream()
299299

300300
protected abstract Task SerializeToStreamAsync(Stream stream, TransportContext context);
301301

302-
public Task CopyToAsync(Stream stream, TransportContext context)
302+
// TODO #9071: Expose this publicly. Until it's public, only sealed or internal types should override it, and then change
303+
// their SerializeToStreamAsync implementation to delegate to this one. They need to be sealed as otherwise an external
304+
// type could derive from it and override SerializeToStreamAsync(stream, context) further, at which point when
305+
// HttpClient calls SerializeToStreamAsync(stream, context, cancellationToken), their custom override will be skipped.
306+
internal virtual Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken) =>
307+
SerializeToStreamAsync(stream, context);
308+
309+
public Task CopyToAsync(Stream stream, TransportContext context) =>
310+
CopyToAsync(stream, context, CancellationToken.None);
311+
312+
// TODO #9071: Expose this publicly.
313+
internal Task CopyToAsync(Stream stream, TransportContext context, CancellationToken cancellationToken)
303314
{
304315
CheckDisposed();
305316
if (stream == null)
@@ -313,11 +324,11 @@ public Task CopyToAsync(Stream stream, TransportContext context)
313324
ArraySegment<byte> buffer;
314325
if (TryGetBuffer(out buffer))
315326
{
316-
task = stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count);
327+
task = stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
317328
}
318329
else
319330
{
320-
task = SerializeToStreamAsync(stream, context);
331+
task = SerializeToStreamAsync(stream, context, cancellationToken);
321332
CheckTaskNotNull(task);
322333
}
323334

@@ -354,7 +365,10 @@ public Task LoadIntoBufferAsync()
354365
// No "CancellationToken" parameter needed since canceling the CTS will close the connection, resulting
355366
// in an exception being thrown while we're buffering.
356367
// If buffering is used without a connection, it is supposed to be fast, thus no cancellation required.
357-
public Task LoadIntoBufferAsync(long maxBufferSize)
368+
public Task LoadIntoBufferAsync(long maxBufferSize) =>
369+
LoadIntoBufferAsync(maxBufferSize, CancellationToken.None);
370+
371+
internal Task LoadIntoBufferAsync(long maxBufferSize, CancellationToken cancellationToken)
358372
{
359373
CheckDisposed();
360374
if (maxBufferSize > HttpContent.MaxBufferSize)
@@ -382,7 +396,7 @@ public Task LoadIntoBufferAsync(long maxBufferSize)
382396

383397
try
384398
{
385-
Task task = SerializeToStreamAsync(tempBuffer, null);
399+
Task task = SerializeToStreamAsync(tempBuffer, null, cancellationToken);
386400
CheckTaskNotNull(task);
387401
return LoadIntoBufferAsyncCore(task, tempBuffer);
388402
}

src/System.Net.Http/src/System/Net/Http/NetEventSource.Http.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public void HeadersInvalidValue(string name, string rawValue) =>
6363
[Event(HandlerMessageId, Keywords = Keywords.Debug, Level = EventLevel.Verbose)]
6464
public void HandlerMessage(int handlerId, int workerId, int requestId, string memberName, string message) =>
6565
WriteEvent(HandlerMessageId, handlerId, workerId, requestId, memberName, message);
66+
//Console.WriteLine($"{handlerId}/{workerId}/{requestId}: ({memberName}): {message}"); // uncomment for debugging only
6667

6768
[NonEvent]
6869
private unsafe void WriteEvent(int eventId, int arg1, int arg2, int arg3, string arg4, string arg5)

src/System.Net.Http/src/System/Net/Http/ReadOnlyMemoryContent.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
using System.IO;
66
using System.Runtime.InteropServices;
7+
using System.Threading;
78
using System.Threading.Tasks;
89

910
namespace System.Net.Http
@@ -26,6 +27,9 @@ public ReadOnlyMemoryContent(ReadOnlyMemory<byte> content)
2627
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) =>
2728
stream.WriteAsync(_content);
2829

30+
internal override Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken) =>
31+
stream.WriteAsync(_content, cancellationToken);
32+
2933
protected internal override bool TryComputeLength(out long length)
3034
{
3135
length = _content.Length;

src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ChunkedEncodingReadStream.cs

Lines changed: 72 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ public ChunkedEncodingReadStream(HttpConnection connection) : base(connection)
3030
{
3131
}
3232

33-
private async Task<bool> TryGetNextChunkAsync(CancellationToken cancellationToken)
33+
private async Task<bool> TryGetNextChunkAsync()
3434
{
3535
Debug.Assert(_chunkBytesRemaining == 0);
3636

3737
// Read the start of the chunk line.
3838
_connection._allowedReadLineBytes = MaxChunkBytesAllowed;
39-
ArraySegment<byte> line = await _connection.ReadNextLineAsync(cancellationToken).ConfigureAwait(false);
39+
ArraySegment<byte> line = await _connection.ReadNextLineAsync().ConfigureAwait(false);
4040

4141
// Parse the hex value.
4242
if (!Utf8Parser.TryParse(line.AsReadOnlySpan(), out ulong chunkSize, out int bytesConsumed, 'X'))
@@ -73,7 +73,7 @@ private async Task<bool> TryGetNextChunkAsync(CancellationToken cancellationToke
7373
while (true)
7474
{
7575
_connection._allowedReadLineBytes = MaxTrailingHeaderLength;
76-
if (LineIsEmpty(await _connection.ReadNextLineAsync(cancellationToken).ConfigureAwait(false)))
76+
if (LineIsEmpty(await _connection.ReadNextLineAsync().ConfigureAwait(false)))
7777
{
7878
break;
7979
}
@@ -84,59 +84,77 @@ private async Task<bool> TryGetNextChunkAsync(CancellationToken cancellationToke
8484
return false;
8585
}
8686

87-
private async Task ConsumeChunkBytesAsync(ulong bytesConsumed, CancellationToken cancellationToken)
87+
private Task ConsumeChunkBytesAsync(ulong bytesConsumed)
8888
{
8989
Debug.Assert(bytesConsumed <= _chunkBytesRemaining);
9090
_chunkBytesRemaining -= bytesConsumed;
91-
if (_chunkBytesRemaining == 0)
91+
return _chunkBytesRemaining != 0 ?
92+
Task.CompletedTask :
93+
ReadNextLineAndThrowIfNotEmptyAsync();
94+
}
95+
96+
private async Task ReadNextLineAndThrowIfNotEmptyAsync()
97+
{
98+
_connection._allowedReadLineBytes = 2; // \r\n
99+
if (!LineIsEmpty(await _connection.ReadNextLineAsync().ConfigureAwait(false)))
92100
{
93-
_connection._allowedReadLineBytes = 2; // \r\n
94-
if (!LineIsEmpty(await _connection.ReadNextLineAsync(cancellationToken).ConfigureAwait(false)))
95-
{
96-
ThrowInvalidHttpResponse();
97-
}
101+
ThrowInvalidHttpResponse();
98102
}
99103
}
100104

101105
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
102106
{
103107
ValidateBufferArgs(buffer, offset, count);
104-
return ReadAsync(new Memory<byte>(buffer, offset, count)).AsTask();
108+
return ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
105109
}
106110

107-
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
111+
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
108112
{
113+
cancellationToken.ThrowIfCancellationRequested();
114+
109115
if (_connection == null || destination.Length == 0)
110116
{
111117
// Response body fully consumed or the caller didn't ask for any data
112118
return 0;
113119
}
114120

115-
if (_chunkBytesRemaining == 0)
121+
CancellationTokenRegistration ctr = _connection.RegisterCancellation(cancellationToken);
122+
try
116123
{
117-
if (!await TryGetNextChunkAsync(cancellationToken).ConfigureAwait(false))
124+
if (_chunkBytesRemaining == 0)
118125
{
119-
// End of response body
120-
return 0;
126+
if (!await TryGetNextChunkAsync().ConfigureAwait(false))
127+
{
128+
// End of response body
129+
return 0;
130+
}
121131
}
122-
}
123132

124-
if (_chunkBytesRemaining < (ulong)destination.Length)
125-
{
126-
destination = destination.Slice(0, (int)_chunkBytesRemaining);
127-
}
133+
if (_chunkBytesRemaining < (ulong)destination.Length)
134+
{
135+
destination = destination.Slice(0, (int)_chunkBytesRemaining);
136+
}
128137

129-
int bytesRead = await _connection.ReadAsync(destination, cancellationToken).ConfigureAwait(false);
138+
int bytesRead = await _connection.ReadAsync(destination).ConfigureAwait(false);
130139

131-
if (bytesRead <= 0)
132-
{
133-
// Unexpected end of response stream
134-
throw new IOException(SR.net_http_invalid_response);
135-
}
140+
if (bytesRead <= 0)
141+
{
142+
// Unexpected end of response stream
143+
throw new IOException(SR.net_http_invalid_response);
144+
}
136145

137-
await ConsumeChunkBytesAsync((ulong)bytesRead, cancellationToken).ConfigureAwait(false);
146+
await ConsumeChunkBytesAsync((ulong)bytesRead).ConfigureAwait(false);
138147

139-
return bytesRead;
148+
return bytesRead;
149+
}
150+
catch (Exception exc) when (ShouldWrapInOperationCanceledException(exc, cancellationToken))
151+
{
152+
throw new OperationCanceledException(s_cancellationMessage, exc, cancellationToken);
153+
}
154+
finally
155+
{
156+
ctr.Dispose();
157+
}
140158
}
141159

142160
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
@@ -145,23 +163,41 @@ public override async Task CopyToAsync(Stream destination, int bufferSize, Cance
145163
{
146164
throw new ArgumentNullException(nameof(destination));
147165
}
166+
if (bufferSize <= 0)
167+
{
168+
throw new ArgumentOutOfRangeException(nameof(bufferSize));
169+
}
170+
171+
cancellationToken.ThrowIfCancellationRequested();
148172

149173
if (_connection == null)
150174
{
151175
// Response body fully consumed
152176
return;
153177
}
154178

155-
if (_chunkBytesRemaining > 0)
179+
CancellationTokenRegistration ctr = _connection.RegisterCancellation(cancellationToken);
180+
try
156181
{
157-
await _connection.CopyToAsync(destination, _chunkBytesRemaining, cancellationToken).ConfigureAwait(false);
158-
await ConsumeChunkBytesAsync(_chunkBytesRemaining, cancellationToken).ConfigureAwait(false);
159-
}
182+
if (_chunkBytesRemaining > 0)
183+
{
184+
await _connection.CopyToAsync(destination, _chunkBytesRemaining).ConfigureAwait(false);
185+
await ConsumeChunkBytesAsync(_chunkBytesRemaining).ConfigureAwait(false);
186+
}
160187

161-
while (await TryGetNextChunkAsync(cancellationToken).ConfigureAwait(false))
188+
while (await TryGetNextChunkAsync().ConfigureAwait(false))
189+
{
190+
await _connection.CopyToAsync(destination, _chunkBytesRemaining).ConfigureAwait(false);
191+
await ConsumeChunkBytesAsync(_chunkBytesRemaining).ConfigureAwait(false);
192+
}
193+
}
194+
catch (Exception exc) when (ShouldWrapInOperationCanceledException(exc, cancellationToken))
195+
{
196+
throw CreateOperationCanceledException(exc, cancellationToken);
197+
}
198+
finally
162199
{
163-
await _connection.CopyToAsync(destination, _chunkBytesRemaining, cancellationToken).ConfigureAwait(false);
164-
await ConsumeChunkBytesAsync(_chunkBytesRemaining, cancellationToken).ConfigureAwait(false);
200+
ctr.Dispose();
165201
}
166202
}
167203
}

0 commit comments

Comments
 (0)