Skip to content

Commit

Permalink
HTTP/3: Avoid per-request cancellation token allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK committed Jul 21, 2022
1 parent c10f4c4 commit 024e72f
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading.Tasks;
using System;

namespace Microsoft.AspNetCore.Connections.Features;

/// <summary>
/// Represents the close action for a stream.
/// </summary>
public interface IStreamClosedFeature
{
/// <summary>
/// Registers a callback to be invoked when a stream is closed.
/// If the stream is already in a closed state, the callback will be run immediately.
/// </summary>
/// <param name="callback">The callback to invoke after the stream is closed.</param>
/// <param name="state">The state to pass into the callback.</param>
void OnClosed(Action<object?> callback, object? state);
}
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#nullable enable
Microsoft.AspNetCore.Connections.Features.IStreamClosedFeature
Microsoft.AspNetCore.Connections.Features.IStreamClosedFeature.OnClosed(System.Action<object?>! callback, object? state) -> void
12 changes: 10 additions & 2 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ internal abstract class Http3ControlStream : IHttp3Stream, IThreadPoolWorkItem
private readonly Http3StreamContext _context;
private readonly Http3PeerSettings _serverPeerSettings;
private readonly IStreamIdFeature _streamIdFeature;
private readonly IStreamClosedFeature _streamClosedFeature;
private readonly IProtocolErrorCodeFeature _errorCodeFeature;
private readonly Http3RawFrame _incomingFrame = new Http3RawFrame();
private volatile int _isClosed;
private long _headerType;
private int _gracefulCloseInitiator;
private bool _connectionClosed;

private bool _haveReceivedSettingsFrame;

Expand All @@ -39,6 +41,7 @@ public Http3ControlStream(Http3StreamContext context, long? headerType)
_context = context;
_serverPeerSettings = context.ServerPeerSettings;
_streamIdFeature = context.ConnectionFeatures.GetRequiredFeature<IStreamIdFeature>();
_streamClosedFeature = context.ConnectionFeatures.GetRequiredFeature<IStreamClosedFeature>();
_errorCodeFeature = context.ConnectionFeatures.GetRequiredFeature<IProtocolErrorCodeFeature>();
_headerType = headerType ?? -1;

Expand All @@ -57,6 +60,7 @@ public Http3ControlStream(Http3StreamContext context, long? headerType)
private void OnStreamClosed()
{
Abort(new ConnectionAbortedException("HTTP_CLOSED_CRITICAL_STREAM"), Http3ErrorCode.InternalError);
_connectionClosed = true;
}

public PipeReader Input => _context.Transport.Input;
Expand Down Expand Up @@ -237,7 +241,7 @@ private async Task HandleControlStream()

if (result.IsCompleted)
{
if (!_context.StreamContext.ConnectionClosed.IsCancellationRequested)
if (!_connectionClosed)
{
// https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-6.2.1-2
throw new Http3ConnectionErrorException(CoreStrings.Http3ErrorControlStreamClientClosedInbound, Http3ErrorCode.ClosedCriticalStream);
Expand Down Expand Up @@ -298,7 +302,11 @@ private ValueTask ProcessSettingsFrameAsync(ReadOnlySequence<byte> payload)
}

_haveReceivedSettingsFrame = true;
using var closedRegistration = _context.StreamContext.ConnectionClosed.Register(state => ((Http3ControlStream)state!).OnStreamClosed(), this);
_streamClosedFeature.OnClosed(static state =>
{
var stream = (Http3ControlStream)state!;
stream.OnStreamClosed();
}, this);

while (true)
{
Expand Down
67 changes: 28 additions & 39 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpS
private IProtocolErrorCodeFeature _errorCodeFeature = default!;
private IStreamIdFeature _streamIdFeature = default!;
private IStreamAbortFeature _streamAbortFeature = default!;
private IStreamClosedFeature _streamClosedFeature = default!;
private PseudoHeaderFields _parsedPseudoHeaderFields;
private StreamCompletionFlags _completionState;
private int _isClosed;
Expand Down Expand Up @@ -88,6 +89,7 @@ public void Initialize(Http3StreamContext context)
_errorCodeFeature = _context.ConnectionFeatures.GetRequiredFeature<IProtocolErrorCodeFeature>();
_streamIdFeature = _context.ConnectionFeatures.GetRequiredFeature<IStreamIdFeature>();
_streamAbortFeature = _context.ConnectionFeatures.GetRequiredFeature<IStreamAbortFeature>();
_streamClosedFeature = _context.ConnectionFeatures.GetRequiredFeature<IStreamClosedFeature>();

_appCompletedTaskSource.Reset();
_isClosed = 0;
Expand Down Expand Up @@ -144,7 +146,7 @@ private void AbortCore(Exception exception, Http3ErrorCode errorCode)
{
lock (_completionLock)
{
if (IsCompleted)
if (IsCompleted || IsAborted)
{
return;
}
Expand Down Expand Up @@ -573,6 +575,28 @@ private bool TryClose()
{
Exception? error = null;

// With HTTP/3 the write-side of the stream can be aborted by the client after the server
// has finished reading incoming content. That means errors can happen after the Input loop
// has finished reading.
//
// To get notification of request aborted we register to the stream closing or complete.
// It will notify this type that the client has aborted the request and Kestrel will complete
// pipes and cancel the HttpContext.RequestAborted token.
_streamClosedFeature.OnClosed(static s =>
{
var stream = (Http3Stream)s!;
if (!stream.IsCompleted)
{
// An error code value other than -1 indicates a value was set and the request didn't gracefully complete.
var errorCode = stream._errorCodeFeature.Error;
if (errorCode >= 0)
{
stream.AbortCore(new IOException(CoreStrings.HttpStreamResetByClient), (Http3ErrorCode)errorCode);
}
}
}, this);

try
{
while (_isClosed == 0)
Expand Down Expand Up @@ -643,44 +667,9 @@ private bool TryClose()
? new ValueTask(_appCompletedTaskSource, _appCompletedTaskSource.Version)
: ValueTask.CompletedTask;

if (!appCompletedTask.IsCompletedSuccessfully)
{
// At this point in the stream's read-side is complete. However, with HTTP/3
// the write-side of the stream can still be aborted by the client on request
// aborted.
//
// To get notification of request aborted we register to connection closed
// token. It will notify this type that the client has aborted the request
// and Kestrel will complete pipes and cancel the RequestAborted token.
//
// Only subscribe to this event after the stream's read-side is complete to
// avoid interactions between reading that is in-progress and an abort.
// This means while reading, read-side abort will handle getting abort notifications.
//
// We don't need to hang on to the CancellationTokenRegistration from register.
// The CTS is cleaned up in StreamContext.DisposeAsync.
//
// TODO: Consider a better way to provide this notification. For perf we want to
// make the ConnectionClosed CTS pay-for-play, and change this event to use
// something that is more lightweight than a CTS.
_context.StreamContext.ConnectionClosed.Register(static s =>
{
var stream = (Http3Stream)s!;
if (!stream.IsCompleted)
{
// An error code value other than -1 indicates a value was set and the request didn't gracefully complete.
var errorCode = stream._errorCodeFeature.Error;
if (errorCode >= 0)
{
stream.AbortCore(new IOException(CoreStrings.HttpStreamResetByClient), (Http3ErrorCode)errorCode);
}
}
}, this);

// Make sure application func is completed before completing writer.
await appCompletedTask;
}
// At this point, assuming an error wasn't thrown, the stream's read-side is complete.
// Make sure application func is completed before completing writer.
await appCompletedTask;

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ public override void Execute()
}
}

private class TestConnectionFeatures : IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature
private class TestConnectionFeatures : IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature, IStreamClosedFeature
{
public TestConnectionFeatures()
{
var featureCollection = new FeatureCollection();
featureCollection.Set<IProtocolErrorCodeFeature>(this);
featureCollection.Set<IStreamIdFeature>(this);
featureCollection.Set<IStreamAbortFeature>(this);
featureCollection.Set<IStreamClosedFeature>(this);

FeatureCollection = featureCollection;
}
Expand All @@ -92,5 +93,10 @@ void IStreamAbortFeature.AbortWrite(long errorCode, ConnectionAbortedException a
{
throw new NotImplementedException();
}

void IStreamClosedFeature.OnClosed(Action<object> callback, object state)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal;

internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature
internal sealed partial class QuicStreamContext :
IPersistentStateFeature,
IStreamDirectionFeature,
IProtocolErrorCodeFeature,
IStreamIdFeature,
IStreamAbortFeature,
IStreamClosedFeature
{
private readonly record struct CloseAction(Action<object?> Callback, object? State);

private IDictionary<object, object?>? _persistentState;
private long? _error;
private List<CloseAction>? _onClosed;

public bool CanRead { get; private set; }
public bool CanWrite { get; private set; }
Expand Down Expand Up @@ -72,12 +81,32 @@ public void AbortWrite(long errorCode, ConnectionAbortedException abortReason)
}
}

void IStreamClosedFeature.OnClosed(Action<object?> callback, object? state)
{
lock (_shutdownLock)
{
if (!_streamClosed)
{
if (_onClosed == null)
{
_onClosed = new List<CloseAction>();
}
_onClosed.Add(new CloseAction(callback, state));
return;
}
}

// Stream has already closed. Execute callback inline.
callback(state);
}

private void InitializeFeatures()
{
_currentIPersistentStateFeature = this;
_currentIStreamDirectionFeature = this;
_currentIProtocolErrorCodeFeature = this;
_currentIStreamIdFeature = this;
_currentIStreamAbortFeature = this;
_currentIStreamClosedFeature = this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal partial class QuicStreamContext : TransportConnection, IPooledStream, I
private readonly CompletionPipeReader _transportPipeReader;
private readonly CompletionPipeWriter _transportPipeWriter;
private readonly ILogger _log;
private CancellationTokenSource _streamClosedTokenSource = default!;
private CancellationTokenSource? _streamClosedTokenSource;
private string? _connectionId;
private const int MinAllocBufferSize = 4096;
private volatile Exception? _shutdownReadReason;
Expand All @@ -39,7 +39,6 @@ internal partial class QuicStreamContext : TransportConnection, IPooledStream, I
private bool _streamClosed;
private bool _serverAborted;
private bool _clientAbort;
private TaskCompletionSource _waitForConnectionClosedTcs = default!;
private readonly object _shutdownLock = new object();

public QuicStreamContext(QuicConnectionContext connection, QuicTransportContext context)
Expand Down Expand Up @@ -82,12 +81,8 @@ public void Initialize(QuicStream stream)

_stream = stream;

if (!(_streamClosedTokenSource?.TryReset() ?? false))
{
_streamClosedTokenSource = new CancellationTokenSource();
}

ConnectionClosed = _streamClosedTokenSource.Token;
_streamClosedTokenSource = null;
_onClosed?.Clear();

InitializeFeatures();

Expand All @@ -109,8 +104,6 @@ public void Initialize(QuicStream stream)
_streamClosed = false;
_serverAborted = false;
_clientAbort = false;
// TODO - resetable TCS
_waitForConnectionClosedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

// Only reset pipes if the stream has been reused.
if (CanReuse)
Expand All @@ -122,6 +115,20 @@ public void Initialize(QuicStream stream)
CanReuse = false;
}

public override CancellationToken ConnectionClosed
{
get
{
// Allocate CTS only if requested.
if (_streamClosedTokenSource == null)
{
_streamClosedTokenSource = new CancellationTokenSource();
}
return _streamClosedTokenSource.Token;
}
set => throw new NotSupportedException();
}

public override string ConnectionId
{
get => _connectionId ??= StringUtilities.ConcatAsHexSuffix(_connection.ConnectionId, ':', (uint)StreamId);
Expand Down Expand Up @@ -162,7 +169,7 @@ private async Task StartAsync()
await receiveTask;
await sendTask;

await FireStreamClosedAsync();
FireStreamClosed();
}
catch (Exception ex)
{
Expand Down Expand Up @@ -311,30 +318,39 @@ async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTas
return _shutdownReadReason ?? _shutdownReason ?? error;
}

private Task FireStreamClosedAsync()
private void FireStreamClosed()
{
// Guard against scheduling this multiple times
if (_streamClosed)
lock (_shutdownLock)
{
return Task.CompletedTask;
if (_streamClosed)
{
return;
}

_streamClosed = true;
}

_streamClosed = true;
var onClosed = _onClosed;

ThreadPool.UnsafeQueueUserWorkItem(state =>
if (onClosed != null)
{
state.CancelConnectionClosedToken();
state._waitForConnectionClosedTcs.TrySetResult();
},
this,
preferLocal: false);
foreach (var closeAction in onClosed)
{
closeAction.Callback(closeAction.State);
}
}

return _waitForConnectionClosedTcs.Task;
if (_streamClosedTokenSource != null)
{
CancelConnectionClosedToken();
}
}

private void CancelConnectionClosedToken()
{
Debug.Assert(_streamClosedTokenSource != null);

try
{
_streamClosedTokenSource.Cancel();
Expand Down Expand Up @@ -567,6 +583,6 @@ public void Dispose()
// Called when the stream is no longer reused.
public void DisposeCore()
{
_streamClosedTokenSource.Dispose();
_streamClosedTokenSource?.Dispose();
}
}
Loading

0 comments on commit 024e72f

Please sign in to comment.