Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QUIC stream limits #52704

Merged
merged 24 commits into from
Jun 6, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9b107cd
Stream limits are ints since they cannot be greater than ushort.MaxVa…
ManickaP May 6, 2021
fae4adc
Stream opening APIs made async.
ManickaP May 6, 2021
f74915e
Proper implementation of OpenStreamAsync, waiting for the msquic event.
ManickaP May 10, 2021
8e92adb
WaitForAvailable*StreamsAsync
ManickaP May 10, 2021
dbb626d
WaitForAvailable*StreamsAsync implementation
ManickaP May 11, 2021
20a5eee
WaitForAvailable*StreamsAsync usage in H3
ManickaP May 11, 2021
fdc4575
WaitForAvailable*StreamsAsync usage in H3 - fixed existing tests
ManickaP May 13, 2021
d9c087d
Added and fixed H/3 test.
ManickaP May 13, 2021
622aa8f
Merge branch 'main' into mapichov/32079_stream_limit
ManickaP May 14, 2021
30efc26
Post merge code fix
ManickaP May 14, 2021
47358ef
Added some more tests and fixed token source
ManickaP May 14, 2021
ea92437
Fixed mock implementation and fixed completion source in msquic one.
ManickaP May 20, 2021
36567a0
Merge branch 'main' into mapichov/32079_stream_limit
ManickaP May 21, 2021
04c559d
Fixed active stream counting in mock connection.
ManickaP May 21, 2021
6bd9ea7
Refactored stream limit work in mocks.
ManickaP May 21, 2021
78b526d
Addressed PR comments.
ManickaP May 24, 2021
9e64240
Merge branch 'main' into mapichov/32079_stream_limit
ManickaP May 24, 2021
754be6f
Addressed feedback about stream openning.
ManickaP May 25, 2021
c5bb854
Added quic tests for WaitForAvailableStreamAsync.
ManickaP May 26, 2021
910ab9f
Merge branch 'main' into mapichov/32079_stream_limit
ManickaP May 26, 2021
97c4e53
Fixed access to _connection only under lock.
ManickaP May 28, 2021
97b3185
Merge branch 'main' into mapichov/32079_stream_limit
ManickaP Jun 1, 2021
cbdd1df
Disabled one misbehaving test.
ManickaP Jun 3, 2021
7eb7fb1
Addressed feedback.
ManickaP Jun 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using System.Linq;
using System.Net.Http.Functional.Tests;
using System.Threading;

namespace System.Net.Test.Common
{
Expand Down Expand Up @@ -62,14 +63,14 @@ public async Task CloseAsync(long errorCode)
_closed = true;
}

public Http3LoopbackStream OpenUnidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default)
{
return new Http3LoopbackStream(_connection.OpenUnidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenUnidirectionalStreamAsync(cancellationToken).ConfigureAwait(false));
}

public Http3LoopbackStream OpenBidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default)
{
return new Http3LoopbackStream(_connection.OpenBidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false));
}

public static int GetRequestId(QuicStream stream)
Expand Down
54 changes: 42 additions & 12 deletions src/libraries/Common/tests/System/Net/Http/Http3LoopbackServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@ public sealed class Http3LoopbackServer : GenericLoopbackServer

public override Uri Address => new Uri($"https://{_listener.ListenEndPoint}/");

public Http3LoopbackServer(QuicImplementationProvider quicImplementationProvider = null, GenericLoopbackOptions options = null)
public Http3LoopbackServer(QuicImplementationProvider quicImplementationProvider = null, Http3Options options = null)
{
options ??= new GenericLoopbackOptions();
options ??= new Http3Options();

_cert = Configuration.Certificates.GetServerCertificate();

var sslOpts = new SslServerAuthenticationOptions
var listenerOptions = new QuicListenerOptions()
{
EnabledSslProtocols = options.SslProtocols,
ApplicationProtocols = new List<SslApplicationProtocol>
ListenEndPoint = new IPEndPoint(options.Address, 0),
ServerAuthenticationOptions = new SslServerAuthenticationOptions
{
new SslApplicationProtocol("h3-31"),
new SslApplicationProtocol("h3-30"),
new SslApplicationProtocol("h3-29")
EnabledSslProtocols = options.SslProtocols,
ApplicationProtocols = new List<SslApplicationProtocol>
{
new SslApplicationProtocol("h3-31"),
new SslApplicationProtocol("h3-30"),
new SslApplicationProtocol("h3-29")
},
ServerCertificate = _cert,
ClientCertificateRequired = false
},
ServerCertificate = _cert,
ClientCertificateRequired = false
MaxUnidirectionalStreams = options.MaxUnidirectionalStreams,
MaxBidirectionalStreams = options.MaxBidirectionalStreams,
};

_listener = new QuicListener(quicImplementationProvider ?? QuicImplementationProviders.Default, new IPEndPoint(options.Address, 0), sslOpts);
_listener = new QuicListener(quicImplementationProvider ?? QuicImplementationProviders.Default, listenerOptions);
}

public override void Dispose()
Expand Down Expand Up @@ -82,7 +88,7 @@ public Http3LoopbackServerFactory(QuicImplementationProvider quicImplementationP

public override GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null)
{
return new Http3LoopbackServer(_quicImplementationProvider, options);
return new Http3LoopbackServer(_quicImplementationProvider, CreateOptions(options));
}

public override async Task CreateServerAsync(Func<GenericLoopbackServer, Uri, Task> funcAsync, int millisecondsTimeout = 60000, GenericLoopbackOptions options = null)
Expand All @@ -97,5 +103,29 @@ public override Task<GenericLoopbackConnection> CreateConnectionAsync(Socket soc
// This method is always unacceptable to call for HTTP/3.
throw new NotImplementedException("HTTP/3 does not operate over a Socket.");
}

private static Http3Options CreateOptions(GenericLoopbackOptions options)
{
Http3Options http3Options = new Http3Options();
if (options != null)
{
http3Options.Address = options.Address;
http3Options.UseSsl = options.UseSsl;
http3Options.SslProtocols = options.SslProtocols;
http3Options.ListenBacklog = options.ListenBacklog;
}
return http3Options;
}
}
public class Http3Options : GenericLoopbackOptions
{
public int MaxUnidirectionalStreams {get; set; }

public int MaxBidirectionalStreams {get; set; }
public Http3Options()
{
MaxUnidirectionalStreams = 100;
MaxBidirectionalStreams = 100;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ internal sealed class Http3Connection : HttpConnectionBase, IDisposable
private int _haveServerQpackDecodeStream;
private int _haveServerQpackEncodeStream;

// Manages MAX_STREAM count from server.
private long _maximumRequestStreams;
private long _requestStreamsRemaining;
private readonly Queue<TaskCompletionSourceWithCancellation<bool>> _waitingRequests = new Queue<TaskCompletionSourceWithCancellation<bool>>();

// A connection-level error will abort any future operations.
private Exception? _abortException;

Expand Down Expand Up @@ -87,8 +82,6 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority? origin, HttpAutho
string altUsedValue = altUsedDefaultPort ? authority.IdnHost : authority.IdnHost + ":" + authority.Port.ToString(Globalization.CultureInfo.InvariantCulture);
_altUsedEncodedHeader = QPack.QPackEncoder.EncodeLiteralHeaderFieldWithoutNameReferenceToArray(KnownHeaders.AltUsed.Name, altUsedValue);

_maximumRequestStreams = _requestStreamsRemaining = connection.GetRemoteAvailableBidirectionalStreamCount();

// Errors are observed via Abort().
_ = SendSettingsAsync();

Expand Down Expand Up @@ -166,42 +159,37 @@ public override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage req
{
Debug.Assert(async);

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.

TaskCompletionSourceWithCancellation<bool>? waitForAvailableStreamTcs = null;

lock (SyncObj)
{
long remaining = _requestStreamsRemaining;

if (remaining > 0)
{
_requestStreamsRemaining = remaining - 1;
}
else
{
waitForAvailableStreamTcs = new TaskCompletionSourceWithCancellation<bool>();
_waitingRequests.Enqueue(waitForAvailableStreamTcs);
}
}

if (waitForAvailableStreamTcs != null)
{
await waitForAvailableStreamTcs.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false);
}

// Allocate an active request

QuicStream? quicStream = null;
Http3RequestStream? requestStream = null;

try
{
if (_connection != null)
ManickaP marked this conversation as resolved.
Show resolved Hide resolved
{
ValueTask<QuicStream> openStreamTask = default;

while (true)
{
lock (SyncObj)
{
if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0)
{
openStreamTask = _connection.OpenBidirectionalStreamAsync(cancellationToken);
break;
}
}

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.
await _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false);
}

quicStream = await openStreamTask.ConfigureAwait(false);
}
lock (SyncObj)
{
if (_connection != null)
if (quicStream != null)
{
quicStream = _connection.OpenBidirectionalStream();
requestStream = new Http3RequestStream(request, this, quicStream);
_activeRequests.Add(quicStream, requestStream);
}
Expand Down Expand Up @@ -246,76 +234,6 @@ public override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage req
}
}

/// <summary>
/// Waits for MAX_STREAMS to be raised by the server.
/// </summary>
private Task WaitForAvailableRequestStreamAsync(CancellationToken cancellationToken)
{
TaskCompletionSourceWithCancellation<bool> tcs;

lock (SyncObj)
{
long remaining = _requestStreamsRemaining;

if (remaining > 0)
{
_requestStreamsRemaining = remaining - 1;
return Task.CompletedTask;
}

tcs = new TaskCompletionSourceWithCancellation<bool>();
_waitingRequests.Enqueue(tcs);
}

// Note: cancellation on connection shutdown is handled in CancelWaiters.
return tcs.WaitWithCancellationAsync(cancellationToken).AsTask();
}

/// <summary>
/// Cancels any waiting SendAsync calls.
/// </summary>
/// <remarks>Requires <see cref="SyncObj"/> to be held.</remarks>
private void CancelWaiters()
{
Debug.Assert(Monitor.IsEntered(SyncObj));

while (_waitingRequests.TryDequeue(out TaskCompletionSourceWithCancellation<bool>? tcs))
{
tcs.TrySetException(new HttpRequestException(SR.net_http_request_aborted, null, RequestRetryType.RetryOnConnectionFailure));
}
}

// TODO: how do we get this event? -> HandleEventStreamsAvailable reports currently available Uni/Bi streams
private void OnMaximumStreamCountIncrease(long newMaximumStreamCount)
{
lock (SyncObj)
{
if (newMaximumStreamCount <= _maximumRequestStreams)
{
return;
}

IncreaseRemainingStreamCount(newMaximumStreamCount - _maximumRequestStreams);
_maximumRequestStreams = newMaximumStreamCount;
}
}

private void IncreaseRemainingStreamCount(long delta)
{
Debug.Assert(Monitor.IsEntered(SyncObj));
Debug.Assert(delta > 0);

_requestStreamsRemaining += delta;

while (_requestStreamsRemaining != 0 && _waitingRequests.TryDequeue(out TaskCompletionSourceWithCancellation<bool>? tcs))
{
if (tcs.TrySetResult(true))
{
--_requestStreamsRemaining;
}
}
}

/// <summary>
/// Aborts the connection with an error.
/// </summary>
Expand Down Expand Up @@ -358,7 +276,6 @@ internal Exception Abort(Exception abortException)
_connectionClosedTask = _connection.CloseAsync((long)connectionResetErrorCode).AsTask();
}

CancelWaiters();
CheckForShutdown();
}

Expand Down Expand Up @@ -396,7 +313,6 @@ private void OnServerGoAway(long lastProcessedStreamId)
}
}

CancelWaiters();
CheckForShutdown();
}

Expand All @@ -414,8 +330,6 @@ public void RemoveStream(QuicStream stream)
bool removed = _activeRequests.Remove(stream);
Debug.Assert(removed == true);

IncreaseRemainingStreamCount(1);

if (ShuttingDown)
{
CheckForShutdown();
Expand All @@ -438,7 +352,7 @@ private async Task SendSettingsAsync()
{
try
{
_clientControl = _connection!.OpenUnidirectionalStream();
_clientControl = await _connection!.OpenUnidirectionalStreamAsync().ConfigureAwait(false);
await _clientControl.WriteAsync(_pool.Settings.Http3SettingsFrame, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
Loading