Skip to content

Commit

Permalink
QUIC stream limits (#52704)
Browse files Browse the repository at this point in the history
Implements the 3rd option Allowing the caller to perform their own wait from #32079 (comment)
Adds WaitForAvailable(Bidi|Uni)rectionalStreamsAsync:
- triggered by peer announcement about new streams (QUIC_CONNECTION_EVENT_TYPE.STREAMS_AVAILABLE)
- if the connection is closed/disposed, the method throws QuicConnectionAbortedException which fitted our H3 better than boolean (can be changed)
Changes stream limit type to int
  • Loading branch information
ManickaP committed Jun 6, 2021
1 parent a9d2f03 commit a822d39
Show file tree
Hide file tree
Showing 17 changed files with 611 additions and 163 deletions.
54 changes: 42 additions & 12 deletions src/libraries/Common/tests/System/Net/Http/Http3LoopbackServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@ public sealed class Http3LoopbackServer : GenericLoopbackServer

public override Uri Address => new Uri($"https://{_listener.ListenEndPoint}/");

public Http3LoopbackServer(QuicImplementationProvider quicImplementationProvider = null, GenericLoopbackOptions options = null)
public Http3LoopbackServer(QuicImplementationProvider quicImplementationProvider = null, Http3Options options = null)
{
options ??= new GenericLoopbackOptions();
options ??= new Http3Options();

_cert = Configuration.Certificates.GetServerCertificate();

var sslOpts = new SslServerAuthenticationOptions
var listenerOptions = new QuicListenerOptions()
{
EnabledSslProtocols = options.SslProtocols,
ApplicationProtocols = new List<SslApplicationProtocol>
ListenEndPoint = new IPEndPoint(options.Address, 0),
ServerAuthenticationOptions = new SslServerAuthenticationOptions
{
new SslApplicationProtocol("h3-31"),
new SslApplicationProtocol("h3-30"),
new SslApplicationProtocol("h3-29")
EnabledSslProtocols = options.SslProtocols,
ApplicationProtocols = new List<SslApplicationProtocol>
{
new SslApplicationProtocol("h3-31"),
new SslApplicationProtocol("h3-30"),
new SslApplicationProtocol("h3-29")
},
ServerCertificate = _cert,
ClientCertificateRequired = false
},
ServerCertificate = _cert,
ClientCertificateRequired = false
MaxUnidirectionalStreams = options.MaxUnidirectionalStreams,
MaxBidirectionalStreams = options.MaxBidirectionalStreams,
};

_listener = new QuicListener(quicImplementationProvider ?? QuicImplementationProviders.Default, new IPEndPoint(options.Address, 0), sslOpts);
_listener = new QuicListener(quicImplementationProvider ?? QuicImplementationProviders.Default, listenerOptions);
}

public override void Dispose()
Expand Down Expand Up @@ -82,7 +88,7 @@ public Http3LoopbackServerFactory(QuicImplementationProvider quicImplementationP

public override GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null)
{
return new Http3LoopbackServer(_quicImplementationProvider, options);
return new Http3LoopbackServer(_quicImplementationProvider, CreateOptions(options));
}

public override async Task CreateServerAsync(Func<GenericLoopbackServer, Uri, Task> funcAsync, int millisecondsTimeout = 60000, GenericLoopbackOptions options = null)
Expand All @@ -97,5 +103,29 @@ public override Task<GenericLoopbackConnection> CreateConnectionAsync(Socket soc
// This method is always unacceptable to call for HTTP/3.
throw new NotImplementedException("HTTP/3 does not operate over a Socket.");
}

private static Http3Options CreateOptions(GenericLoopbackOptions options)
{
Http3Options http3Options = new Http3Options();
if (options != null)
{
http3Options.Address = options.Address;
http3Options.UseSsl = options.UseSsl;
http3Options.SslProtocols = options.SslProtocols;
http3Options.ListenBacklog = options.ListenBacklog;
}
return http3Options;
}
}
public class Http3Options : GenericLoopbackOptions
{
public int MaxUnidirectionalStreams {get; set; }

public int MaxBidirectionalStreams {get; set; }
public Http3Options()
{
MaxUnidirectionalStreams = 100;
MaxBidirectionalStreams = 100;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ internal sealed class Http3Connection : HttpConnectionBase, IDisposable
private int _haveServerQpackDecodeStream;
private int _haveServerQpackEncodeStream;

// Manages MAX_STREAM count from server.
private long _maximumRequestStreams;
private long _requestStreamsRemaining;
private readonly Queue<TaskCompletionSourceWithCancellation<bool>> _waitingRequests = new Queue<TaskCompletionSourceWithCancellation<bool>>();

// A connection-level error will abort any future operations.
private Exception? _abortException;

Expand Down Expand Up @@ -87,8 +82,6 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority? origin, HttpAutho
string altUsedValue = altUsedDefaultPort ? authority.IdnHost : authority.IdnHost + ":" + authority.Port.ToString(Globalization.CultureInfo.InvariantCulture);
_altUsedEncodedHeader = QPack.QPackEncoder.EncodeLiteralHeaderFieldWithoutNameReferenceToArray(KnownHeaders.AltUsed.Name, altUsedValue);

_maximumRequestStreams = _requestStreamsRemaining = connection.GetRemoteAvailableBidirectionalStreamCount();

// Errors are observed via Abort().
_ = SendSettingsAsync();

Expand Down Expand Up @@ -166,54 +159,41 @@ public override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage req
{
Debug.Assert(async);

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.

TaskCompletionSourceWithCancellation<bool>? waitForAvailableStreamTcs = null;

lock (SyncObj)
{
long remaining = _requestStreamsRemaining;

if (remaining > 0)
{
_requestStreamsRemaining = remaining - 1;
}
else
{
waitForAvailableStreamTcs = new TaskCompletionSourceWithCancellation<bool>();
_waitingRequests.Enqueue(waitForAvailableStreamTcs);
}
}

if (waitForAvailableStreamTcs != null)
{
await waitForAvailableStreamTcs.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false);
}

// Allocate an active request

QuicStream? quicStream = null;
Http3RequestStream? requestStream = null;
ValueTask waitTask = default;

try
{
lock (SyncObj)
while (true)
{
if (_connection != null)
lock (SyncObj)
{
quicStream = _connection.OpenBidirectionalStream();
requestStream = new Http3RequestStream(request, this, quicStream);
_activeRequests.Add(quicStream, requestStream);
if (_connection == null)
{
break;
}

if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0)
{
quicStream = _connection.OpenBidirectionalStream();
requestStream = new Http3RequestStream(request, this, quicStream);
_activeRequests.Add(quicStream, requestStream);
break;
}
waitTask = _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken);
}

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.
await waitTask.ConfigureAwait(false);
}

if (quicStream == null)
{
throw new HttpRequestException(SR.net_http_request_aborted, null, RequestRetryType.RetryOnConnectionFailure);
}

// 0-byte write to force QUIC to allocate a stream ID.
await quicStream.WriteAsync(Array.Empty<byte>(), cancellationToken).ConfigureAwait(false);
requestStream!.StreamId = quicStream.StreamId;

bool goAway;
Expand Down Expand Up @@ -246,76 +226,6 @@ public override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage req
}
}

/// <summary>
/// Waits for MAX_STREAMS to be raised by the server.
/// </summary>
private Task WaitForAvailableRequestStreamAsync(CancellationToken cancellationToken)
{
TaskCompletionSourceWithCancellation<bool> tcs;

lock (SyncObj)
{
long remaining = _requestStreamsRemaining;

if (remaining > 0)
{
_requestStreamsRemaining = remaining - 1;
return Task.CompletedTask;
}

tcs = new TaskCompletionSourceWithCancellation<bool>();
_waitingRequests.Enqueue(tcs);
}

// Note: cancellation on connection shutdown is handled in CancelWaiters.
return tcs.WaitWithCancellationAsync(cancellationToken).AsTask();
}

/// <summary>
/// Cancels any waiting SendAsync calls.
/// </summary>
/// <remarks>Requires <see cref="SyncObj"/> to be held.</remarks>
private void CancelWaiters()
{
Debug.Assert(Monitor.IsEntered(SyncObj));

while (_waitingRequests.TryDequeue(out TaskCompletionSourceWithCancellation<bool>? tcs))
{
tcs.TrySetException(new HttpRequestException(SR.net_http_request_aborted, null, RequestRetryType.RetryOnConnectionFailure));
}
}

// TODO: how do we get this event? -> HandleEventStreamsAvailable reports currently available Uni/Bi streams
private void OnMaximumStreamCountIncrease(long newMaximumStreamCount)
{
lock (SyncObj)
{
if (newMaximumStreamCount <= _maximumRequestStreams)
{
return;
}

IncreaseRemainingStreamCount(newMaximumStreamCount - _maximumRequestStreams);
_maximumRequestStreams = newMaximumStreamCount;
}
}

private void IncreaseRemainingStreamCount(long delta)
{
Debug.Assert(Monitor.IsEntered(SyncObj));
Debug.Assert(delta > 0);

_requestStreamsRemaining += delta;

while (_requestStreamsRemaining != 0 && _waitingRequests.TryDequeue(out TaskCompletionSourceWithCancellation<bool>? tcs))
{
if (tcs.TrySetResult(true))
{
--_requestStreamsRemaining;
}
}
}

/// <summary>
/// Aborts the connection with an error.
/// </summary>
Expand Down Expand Up @@ -358,7 +268,6 @@ internal Exception Abort(Exception abortException)
_connectionClosedTask = _connection.CloseAsync((long)connectionResetErrorCode).AsTask();
}

CancelWaiters();
CheckForShutdown();
}

Expand Down Expand Up @@ -396,7 +305,6 @@ private void OnServerGoAway(long lastProcessedStreamId)
}
}

CancelWaiters();
CheckForShutdown();
}

Expand All @@ -414,8 +322,6 @@ public void RemoveStream(QuicStream stream)
bool removed = _activeRequests.Remove(stream);
Debug.Assert(removed == true);

IncreaseRemainingStreamCount(1);

if (ShuttingDown)
{
CheckForShutdown();
Expand Down
Loading

0 comments on commit a822d39

Please sign in to comment.