Skip to content

Commit

Permalink
[release/8.0] Fix HTTP/2 WebSocket Abort
Browse files Browse the repository at this point in the history
  • Loading branch information
CarnaViire authored and carlossanlop committed Feb 14, 2024
1 parent 49e1307 commit 0c7efec
Show file tree
Hide file tree
Showing 10 changed files with 902 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,14 @@ private int WriteHeaderCollection(HttpRequestMessage request, HttpHeaders header
continue;
}

// Extended connect requests will use the response content stream for bidirectional communication.
// We will ignore any content set for such requests in Http2Stream.SendRequestBodyAsync, as it has no defined semantics.
// Drop the Content-Length header as well in the unlikely case it was set.
if (knownHeader == KnownHeaders.ContentLength && request.IsExtendedConnectRequest)
{
continue;
}

// For all other known headers, send them via their pre-encoded name and the associated value.
WriteBytes(knownHeader.Http2EncodedName, ref headerBuffer);
string? separator = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ public Http2Stream(HttpRequestMessage request, Http2Connection connection)

_headerBudgetRemaining = connection._pool.Settings.MaxResponseHeadersByteLength;

if (_request.Content == null)
// Extended connect requests will use the response content stream for bidirectional communication.
// We will ignore any content set for such requests in SendRequestBodyAsync, as it has no defined semantics.
if (_request.Content == null || _request.IsExtendedConnectRequest)
{
_requestCompletionState = StreamCompletionState.Completed;
if (_request.IsExtendedConnectRequest)
Expand Down Expand Up @@ -173,7 +175,9 @@ public HttpResponseMessage GetAndClearResponse()

public async Task SendRequestBodyAsync(CancellationToken cancellationToken)
{
if (_request.Content == null)
// Extended connect requests will use the response content stream for bidirectional communication.
// Ignore any content set for such requests, as it has no defined semantics.
if (_request.Content == null || _request.IsExtendedConnectRequest)
{
Debug.Assert(_requestCompletionState == StreamCompletionState.Completed);
return;
Expand Down Expand Up @@ -250,6 +254,7 @@ public async Task SendRequestBodyAsync(CancellationToken cancellationToken)
// and we also don't want to propagate any error to the caller, in particular for non-duplex scenarios.
Debug.Assert(_responseCompletionState == StreamCompletionState.Completed);
_requestCompletionState = StreamCompletionState.Completed;
Debug.Assert(!ConnectProtocolEstablished);
Complete();
return;
}
Expand All @@ -261,6 +266,7 @@ public async Task SendRequestBodyAsync(CancellationToken cancellationToken)

_requestCompletionState = StreamCompletionState.Failed;
SendReset();
Debug.Assert(!ConnectProtocolEstablished);
Complete();
}

Expand Down Expand Up @@ -313,6 +319,7 @@ public async Task SendRequestBodyAsync(CancellationToken cancellationToken)

if (complete)
{
Debug.Assert(!ConnectProtocolEstablished);
Complete();
}
}
Expand Down Expand Up @@ -420,7 +427,17 @@ private void Cancel()
if (sendReset)
{
SendReset();
Complete();

// Extended CONNECT notes:
//
// To prevent from calling it *twice*, Extended CONNECT stream's Complete() is only
// called from CloseResponseBody(), as CloseResponseBody() is *always* called
// from Extended CONNECT stream's Dispose().

if (!ConnectProtocolEstablished)
{
Complete();
}
}
}

Expand Down Expand Up @@ -810,7 +827,20 @@ public void OnHeadersComplete(bool endStream)
Debug.Assert(_responseCompletionState == StreamCompletionState.InProgress, $"Response already completed with state={_responseCompletionState}");

_responseCompletionState = StreamCompletionState.Completed;
if (_requestCompletionState == StreamCompletionState.Completed)

// Extended CONNECT notes:
//
// To prevent from calling it *prematurely*, Extended CONNECT stream's Complete() is only
// called from CloseResponseBody(), as CloseResponseBody() is *only* called
// from Extended CONNECT stream's Dispose().
//
// Due to bidirectional streaming nature of the Extended CONNECT request,
// the *write side* of the stream can only be completed by calling Dispose().
//
// The streaming in both ways happens over the single "response" stream instance, which makes
// _requestCompletionState *not indicative* of the actual state of the write side of the stream.

if (_requestCompletionState == StreamCompletionState.Completed && !ConnectProtocolEstablished)
{
Complete();
}
Expand Down Expand Up @@ -871,7 +901,20 @@ public void OnResponseData(ReadOnlySpan<byte> buffer, bool endStream)
Debug.Assert(_responseCompletionState == StreamCompletionState.InProgress, $"Response already completed with state={_responseCompletionState}");

_responseCompletionState = StreamCompletionState.Completed;
if (_requestCompletionState == StreamCompletionState.Completed)

// Extended CONNECT notes:
//
// To prevent from calling it *prematurely*, Extended CONNECT stream's Complete() is only
// called from CloseResponseBody(), as CloseResponseBody() is *only* called
// from Extended CONNECT stream's Dispose().
//
// Due to bidirectional streaming nature of the Extended CONNECT request,
// the *write side* of the stream can only be completed by calling Dispose().
//
// The streaming in both ways happens over the single "response" stream instance, which makes
// _requestCompletionState *not indicative* of the actual state of the write side of the stream.

if (_requestCompletionState == StreamCompletionState.Completed && !ConnectProtocolEstablished)
{
Complete();
}
Expand Down Expand Up @@ -1036,17 +1079,17 @@ public async Task ReadResponseHeadersAsync(CancellationToken cancellationToken)
Debug.Assert(_response != null && _response.Content != null);
// Start to process the response body.
var responseContent = (HttpConnectionResponseContent)_response.Content;
if (emptyResponse)
if (ConnectProtocolEstablished)
{
responseContent.SetStream(new Http2ReadWriteStream(this, closeResponseBodyOnDispose: true));
}
else if (emptyResponse)
{
// If there are any trailers, copy them over to the response. Normally this would be handled by
// the response stream hitting EOF, but if there is no response body, we do it here.
MoveTrailersToResponseMessage(_response);
responseContent.SetStream(EmptyReadStream.Instance);
}
else if (ConnectProtocolEstablished)
{
responseContent.SetStream(new Http2ReadWriteStream(this));
}
else
{
responseContent.SetStream(new Http2ReadStream(this));
Expand Down Expand Up @@ -1309,8 +1352,25 @@ private async ValueTask SendDataAsync(ReadOnlyMemory<byte> buffer, CancellationT
}
}

// This method should only be called from Http2ReadWriteStream.Dispose()
private void CloseResponseBody()
{
// Extended CONNECT notes:
//
// Due to bidirectional streaming nature of the Extended CONNECT request,
// the *write side* of the stream can only be completed by calling Dispose()
// (which, for Extended CONNECT case, will in turn call CloseResponseBody())
//
// Similarly to QuicStream, disposal *gracefully* closes the write side of the stream
// (unless we've received RST_STREAM before) and *abortively* closes the read side
// of the stream (unless we've received EOS before).

if (ConnectProtocolEstablished && _resetException is null)
{
// Gracefully close the write side of the Extended CONNECT stream
_connection.LogExceptions(_connection.SendEndStreamAsync(StreamId));
}

// Check if the response body has been fully consumed.
bool fullyConsumed = false;
Debug.Assert(!Monitor.IsEntered(SyncObject));
Expand All @@ -1323,6 +1383,7 @@ private void CloseResponseBody()
}

// If the response body isn't completed, cancel it now.
// This includes aborting the read side of the Extended CONNECT stream.
if (!fullyConsumed)
{
Cancel();
Expand All @@ -1337,6 +1398,12 @@ private void CloseResponseBody()

lock (SyncObject)
{
if (ConnectProtocolEstablished)
{
// This should be the only place where Extended Connect stream is completed
Complete();
}

_responseBuffer.Dispose();
}
}
Expand Down Expand Up @@ -1430,10 +1497,7 @@ private enum StreamCompletionState : byte

private sealed class Http2ReadStream : Http2ReadWriteStream
{
public Http2ReadStream(Http2Stream http2Stream) : base(http2Stream)
{
base.CloseResponseBodyOnDispose = true;
}
public Http2ReadStream(Http2Stream http2Stream) : base(http2Stream, closeResponseBodyOnDispose: true) { }

public override bool CanWrite => false;

Expand Down Expand Up @@ -1482,12 +1546,13 @@ public class Http2ReadWriteStream : HttpBaseStream
private Http2Stream? _http2Stream;
private readonly HttpResponseMessage _responseMessage;

public Http2ReadWriteStream(Http2Stream http2Stream)
public Http2ReadWriteStream(Http2Stream http2Stream, bool closeResponseBodyOnDispose = false)
{
Debug.Assert(http2Stream != null);
Debug.Assert(http2Stream._response != null);
_http2Stream = http2Stream;
_responseMessage = _http2Stream._response;
CloseResponseBodyOnDispose = closeResponseBodyOnDispose;
}

~Http2ReadWriteStream()
Expand All @@ -1503,7 +1568,7 @@ public Http2ReadWriteStream(Http2Stream http2Stream)
}
}

protected bool CloseResponseBodyOnDispose { get; set; }
protected bool CloseResponseBodyOnDispose { get; private init; }

protected override void Dispose(bool disposing)
{
Expand Down

0 comments on commit 0c7efec

Please sign in to comment.