Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Http2Connection buffer management #79484

Merged
merged 3 commits into from Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
108 changes: 46 additions & 62 deletions src/libraries/Common/src/System/Net/ArrayBuffer.cs
Expand Up @@ -3,6 +3,7 @@

using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

namespace System.Net
Expand Down Expand Up @@ -30,8 +31,12 @@ internal struct ArrayBuffer : IDisposable

public ArrayBuffer(int initialSize, bool usePool = false)
{
Debug.Assert(initialSize > 0 || usePool);
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved

_usePool = usePool;
_bytes = usePool ? ArrayPool<byte>.Shared.Rent(initialSize) : new byte[initialSize];
_bytes = initialSize == 0
? Array.Empty<byte>()
: usePool ? ArrayPool<byte>.Shared.Rent(initialSize) : new byte[initialSize];
Comment on lines +37 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note ArrayPool will use Array.Empty for 0 byte requests, so this could also be:

_bytes =
    usePool ? ArrayPool<byte>.Shared.Rent(initialSize) :
    initialSize == 0 ? Array.Empty<byte>() :
    new byte[initialSize];

It's unlikely to really matter, but if we expect initialSize to most commonly be non-zero, you might want to reorder it.

_activeStart = 0;
_availableStart = 0;
}
Expand All @@ -54,12 +59,26 @@ public void Dispose()
byte[] array = _bytes;
_bytes = null!;

if (_usePool && array != null)
if (array is not null)
{
ArrayPool<byte>.Shared.Return(array);
ReturnBufferIfPooled(array);
}
}

// This is different from Dispose as the instance remains usable afterwards (_bytes will not be null).
public void ClearAndReturnBuffer()
{
Debug.Assert(_usePool);
Debug.Assert(_bytes is not null);

_activeStart = 0;
_availableStart = 0;

byte[] bufferToReturn = _bytes;
_bytes = Array.Empty<byte>();
ReturnBufferIfPooled(bufferToReturn);
}

public int ActiveLength => _availableStart - _activeStart;
public Span<byte> ActiveSpan => new Span<byte>(_bytes, _activeStart, _availableStart - _activeStart);
public ReadOnlySpan<byte> ActiveReadOnlySpan => new ReadOnlySpan<byte>(_bytes, _activeStart, _availableStart - _activeStart);
Expand Down Expand Up @@ -94,10 +113,23 @@ public void Commit(int byteCount)
}

// Ensure at least [byteCount] bytes to write to.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EnsureAvailableSpace(int byteCount)
{
if (byteCount <= AvailableLength)
if (byteCount > AvailableLength)
{
EnsureAvailableSpaceCore(byteCount);
}
}

private void EnsureAvailableSpaceCore(int byteCount)
{
Debug.Assert(AvailableLength < byteCount);

if (_bytes.Length == 0)
{
Debug.Assert(_usePool && _activeStart == 0 && _availableStart == 0);
_bytes = ArrayPool<byte>.Shared.Rent(byteCount);
return;
}

Expand Down Expand Up @@ -134,72 +166,24 @@ public void EnsureAvailableSpace(int byteCount)
_activeStart = 0;

_bytes = newBytes;
if (_usePool)
{
ArrayPool<byte>.Shared.Return(oldBytes);
}
ReturnBufferIfPooled(oldBytes);

Debug.Assert(byteCount <= AvailableLength);
}

// Ensure at least [byteCount] bytes to write to, up to the specified limit
public void TryEnsureAvailableSpaceUpToLimit(int byteCount, int limit)
public void Grow()
{
if (byteCount <= AvailableLength)
{
return;
}

int totalFree = _activeStart + AvailableLength;
if (byteCount <= totalFree)
{
// We can free up enough space by just shifting the bytes down, so do so.
Buffer.BlockCopy(_bytes, _activeStart, _bytes, 0, ActiveLength);
_availableStart = ActiveLength;
_activeStart = 0;
Debug.Assert(byteCount <= AvailableLength);
return;
}

if (_bytes.Length >= limit)
{
// Already at limit, can't grow further.
return;
}

// Double the size of the buffer until we have enough space, or we hit the limit
int desiredSize = Math.Min(ActiveLength + byteCount, limit);
int newSize = _bytes.Length;
do
{
newSize = Math.Min(newSize * 2, limit);
} while (newSize < desiredSize);

byte[] newBytes = _usePool ?
ArrayPool<byte>.Shared.Rent(newSize) :
new byte[newSize];
byte[] oldBytes = _bytes;

if (ActiveLength != 0)
{
Buffer.BlockCopy(oldBytes, _activeStart, newBytes, 0, ActiveLength);
}

_availableStart = ActiveLength;
_activeStart = 0;

_bytes = newBytes;
if (_usePool)
{
ArrayPool<byte>.Shared.Return(oldBytes);
}

Debug.Assert(byteCount <= AvailableLength || desiredSize == limit);
EnsureAvailableSpaceCore(AvailableLength + 1);
}

public void Grow()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ReturnBufferIfPooled(byte[] buffer)
{
EnsureAvailableSpace(AvailableLength + 1);
// The buffer may be Array.Empty<byte>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayPool is fine with Array.Empty being returned; it just gets ignored.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the extra check, given that we won't be calling ClearAndReturnBuffer on already-empty buffers often.

if (_usePool && buffer.Length > 0)
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
Expand Up @@ -29,6 +29,7 @@ internal sealed partial class Http2Connection : HttpConnectionBase
private readonly Stream _stream;

// NOTE: These are mutable structs; do not make these readonly.
// ProcessIncomingFramesAsync and ProcessOutgoingFramesAsync are responsible for disposing/returning their respective buffers.
private ArrayBuffer _incomingBuffer;
private ArrayBuffer _outgoingBuffer;

Expand Down Expand Up @@ -89,10 +90,12 @@ internal sealed partial class Http2Connection : HttpConnectionBase

#if DEBUG
// In debug builds, start with a very small buffer to induce buffer growing logic.
private const int InitialConnectionBufferSize = 4;
private const int InitialConnectionBufferSize = FrameHeader.Size;
#else
private const int InitialConnectionBufferSize = 4096;
// Rent enough space to receive a full data frame in one read call.
private const int InitialConnectionBufferSize = FrameHeader.Size + FrameHeader.MaxPayloadLength;
#endif

// The default initial window size for streams and connections according to the RFC:
// https://datatracker.ietf.org/doc/html/rfc7540#section-5.2.1
// Unlike HttpHandlerDefaults.DefaultInitialHttp2StreamWindowSize, this value should never be changed.
Expand Down Expand Up @@ -139,8 +142,8 @@ public Http2Connection(HttpConnectionPool pool, Stream stream)
_pool = pool;
_stream = stream;

_incomingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
_outgoingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
_incomingBuffer = new ArrayBuffer(initialSize: 0, usePool: true);
_outgoingBuffer = new ArrayBuffer(initialSize: 0, usePool: true);

_hpackDecoder = new HPackDecoder(maxHeadersLength: pool.Settings.MaxResponseHeadersByteLength);

Expand Down Expand Up @@ -239,11 +242,15 @@ public async ValueTask SetupAsync(CancellationToken cancellationToken)
_ = ProcessIncomingFramesAsync();
await _stream.WriteAsync(_outgoingBuffer.ActiveMemory, cancellationToken).ConfigureAwait(false);
_rttEstimator.OnInitialSettingsSent();
_outgoingBuffer.Discard(_outgoingBuffer.ActiveLength);

_outgoingBuffer.ClearAndReturnBuffer();
}
catch (Exception e)
{
// ProcessIncomingFramesAsync and ProcessOutgoingFramesAsync are responsible for disposing/returning their respective buffers.
// SetupAsync is the exception as it's responsible for starting the ProcessOutgoingFramesAsync loop.
// As we're about to throw and ProcessOutgoingFramesAsync will never be called, we must return the buffer here.
_outgoingBuffer.Dispose();

Dispose();

if (e is OperationCanceledException oce && oce.CancellationToken == cancellationToken)
Expand Down Expand Up @@ -428,9 +435,13 @@ private async ValueTask<FrameHeader> ReadFrameAsync(bool initialFrame = false)
// Ensure we've read enough data for the frame header.
if (_incomingBuffer.ActiveLength < FrameHeader.Size)
{
_incomingBuffer.EnsureAvailableSpace(FrameHeader.Size - _incomingBuffer.ActiveLength);
do
{
// Issue a zero-byte read to avoid potentially pinning the buffer while waiting for more data.
await _stream.ReadAsync(Memory<byte>.Empty).ConfigureAwait(false);

_incomingBuffer.EnsureAvailableSpace(FrameHeader.Size);

int bytesRead = await _stream.ReadAsync(_incomingBuffer.AvailableMemory).ConfigureAwait(false);
_incomingBuffer.Commit(bytesRead);
if (bytesRead == 0)
Expand Down Expand Up @@ -469,6 +480,9 @@ private async ValueTask<FrameHeader> ReadFrameAsync(bool initialFrame = false)
_incomingBuffer.EnsureAvailableSpace(frameHeader.PayloadLength - _incomingBuffer.ActiveLength);
do
{
// Issue a zero-byte read to avoid potentially pinning the buffer while waiting for more data.
await _stream.ReadAsync(Memory<byte>.Empty).ConfigureAwait(false);

int bytesRead = await _stream.ReadAsync(_incomingBuffer.AvailableMemory).ConfigureAwait(false);
_incomingBuffer.Commit(bytesRead);
if (bytesRead == 0) ThrowPrematureEOF(frameHeader.PayloadLength);
Expand Down Expand Up @@ -531,9 +545,21 @@ private async Task ProcessIncomingFramesAsync()
// the entire frame's needs (not just the header).
if (_incomingBuffer.ActiveLength < FrameHeader.Size)
{
_incomingBuffer.EnsureAvailableSpace(FrameHeader.Size - _incomingBuffer.ActiveLength);
do
{
// Issue a zero-byte read to avoid potentially pinning the buffer while waiting for more data.
ValueTask<int> zeroByteReadTask = _stream.ReadAsync(Memory<byte>.Empty);
if (!zeroByteReadTask.IsCompletedSuccessfully && _incomingBuffer.ActiveLength == 0)
{
// No data is available yet. Return the receive buffer back to the pool while we wait.
_incomingBuffer.ClearAndReturnBuffer();
}
await zeroByteReadTask.ConfigureAwait(false);

// While we only need FrameHeader.Size bytes to complete this read, it's better if we rent more
// to avoid multiple ReadAsync calls and resizes once we start copying the content.
_incomingBuffer.EnsureAvailableSpace(InitialConnectionBufferSize);

int bytesRead = await _stream.ReadAsync(_incomingBuffer.AvailableMemory).ConfigureAwait(false);
Debug.Assert(bytesRead >= 0);
_incomingBuffer.Commit(bytesRead);
Expand Down Expand Up @@ -605,6 +631,10 @@ private async Task ProcessIncomingFramesAsync()

Abort(e);
}
finally
{
_incomingBuffer.Dispose();
}
}

// Note, this will return null for a streamId that's no longer in use.
Expand Down Expand Up @@ -1252,6 +1282,11 @@ private async Task ProcessOutgoingFramesAsync()
{
await FlushOutgoingBytesAsync().ConfigureAwait(false);
}

if (_outgoingBuffer.ActiveLength == 0)
{
_outgoingBuffer.ClearAndReturnBuffer();
}
}
}
catch (Exception e)
Expand All @@ -1260,6 +1295,10 @@ private async Task ProcessOutgoingFramesAsync()

Debug.Fail($"Unexpected exception in {nameof(ProcessOutgoingFramesAsync)}: {e}");
}
finally
{
_outgoingBuffer.Dispose();
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved
}
}

private Task SendSettingsAckAsync() =>
Expand Down Expand Up @@ -1330,7 +1369,7 @@ private void WriteIndexedHeader(int index, ref ArrayBuffer headerBuffer)
int bytesWritten;
while (!HPackEncoder.EncodeIndexedHeaderField(index, headerBuffer.AvailableSpan, out bytesWritten))
{
headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
headerBuffer.Grow();
}

headerBuffer.Commit(bytesWritten);
Expand All @@ -1343,7 +1382,7 @@ private void WriteIndexedHeader(int index, string value, ref ArrayBuffer headerB
int bytesWritten;
while (!HPackEncoder.EncodeLiteralHeaderFieldWithoutIndexing(index, value, valueEncoding: null, headerBuffer.AvailableSpan, out bytesWritten))
{
headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
headerBuffer.Grow();
}

headerBuffer.Commit(bytesWritten);
Expand All @@ -1356,7 +1395,7 @@ private void WriteLiteralHeader(string name, ReadOnlySpan<string> values, Encodi
int bytesWritten;
while (!HPackEncoder.EncodeLiteralHeaderFieldWithoutIndexingNewName(name, values, HttpHeaderParser.DefaultSeparator, valueEncoding, headerBuffer.AvailableSpan, out bytesWritten))
{
headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
headerBuffer.Grow();
}

headerBuffer.Commit(bytesWritten);
Expand All @@ -1369,7 +1408,7 @@ private void WriteLiteralHeaderValues(ReadOnlySpan<string> values, string? separ
int bytesWritten;
while (!HPackEncoder.EncodeStringLiterals(values, separator, valueEncoding, headerBuffer.AvailableSpan, out bytesWritten))
{
headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
headerBuffer.Grow();
}

headerBuffer.Commit(bytesWritten);
Expand All @@ -1382,7 +1421,7 @@ private void WriteLiteralHeaderValue(string value, Encoding? valueEncoding, ref
int bytesWritten;
while (!HPackEncoder.EncodeStringLiteral(value, valueEncoding, headerBuffer.AvailableSpan, out bytesWritten))
{
headerBuffer.EnsureAvailableSpace(headerBuffer.AvailableLength + 1);
headerBuffer.Grow();
}

headerBuffer.Commit(bytesWritten);
Expand All @@ -1392,11 +1431,7 @@ private void WriteBytes(ReadOnlySpan<byte> bytes, ref ArrayBuffer headerBuffer)
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(bytes.Length)}={bytes.Length}");

if (bytes.Length > headerBuffer.AvailableLength)
{
headerBuffer.EnsureAvailableSpace(bytes.Length);
}

headerBuffer.EnsureAvailableSpace(bytes.Length);
bytes.CopyTo(headerBuffer.AvailableSpan);
headerBuffer.Commit(bytes.Length);
}
Expand Down Expand Up @@ -1855,6 +1890,10 @@ private void FinalTeardown()
_connectionWindow.Dispose();
_writeChannel.Writer.Complete();

// We're not disposing the _incomingBuffer and _outgoingBuffer here as they may still be in use by
// ProcessIncomingFramesAsync and ProcessOutgoingFramesAsync respectively, and those methods are
// responsible for returning the buffers.

if (HttpTelemetry.Log.IsEnabled())
{
if (Interlocked.Exchange(ref _markedByTelemetryStatus, TelemetryStatus_Closed) == TelemetryStatus_Opened)
Expand Down