Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit 06aa6f0

Browse files
authored
Reduce buffering in ManagedWebSocket.ReceiveAsync (#28577)
* Change SocketsHttpHandler's 101 response stream to buffer response reads The connection already has a buffer; use it. * Reduce buffering in ManagedWebSocket.ReceiveAsync Today we use a big-ish receive buffer, reading all data into the receive buffer and then copying from there into the caller-provided destination buffer. With this change, we assume that the underlying stream is providing the desired level of buffering, and thus only use a buffer small enough to handle message headers and control payloads (the latter for simplicity, as no caller-supplied buffer is available there). * Change ClientWebSocket to avoid creating a large buffer
1 parent ef52017 commit 06aa6f0

File tree

4 files changed

+109
-33
lines changed

4 files changed

+109
-33
lines changed

src/Common/src/System/Net/WebSockets/ManagedWebSocket.cs

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ public static ManagedWebSocket CreateFromConnectedStream(
6363
private const int MaxControlPayloadLength = 125;
6464
/// <summary>Length of the mask XOR'd with the payload data.</summary>
6565
private const int MaskLength = 4;
66-
/// <summary>Default length of a receive buffer to create when an invalid scratch buffer is provided.</summary>
67-
private const int DefaultReceiveBufferSize = 0x1000;
6866

6967
/// <summary>The stream used to communicate with the remote server.</summary>
7068
private readonly Stream _stream;
@@ -184,8 +182,11 @@ private ManagedWebSocket(Stream stream, bool isServer, string subprotocol, TimeS
184182
// socket rents a similarly sized buffer from the pool for its duration, we'll end up draining
185183
// the pool, such that other web sockets will allocate anyway, as will anyone else in the process using the
186184
// pool. If someone wants to pool, they can do so by passing in the buffer they want to use, and they can
187-
// get it from whatever pool they like.
188-
_receiveBuffer = buffer.Length >= MaxMessageHeaderLength ? buffer : new byte[DefaultReceiveBufferSize];
185+
// get it from whatever pool they like. If we create our own buffer, it's small, large enough for message
186+
// headers and control payloads, but data for other message payloads is read directly into the buffers
187+
// passed into ReceiveAsync.
188+
const int ReceiveBufferMinLength = MaxControlPayloadLength;
189+
_receiveBuffer = buffer.Length >= ReceiveBufferMinLength ? buffer : new byte[ReceiveBufferMinLength];
189190

190191
// Set up the abort source so that if it's triggered, we transition the instance appropriately.
191192
_abortSource.Token.Register(s =>
@@ -697,32 +698,56 @@ private async ValueTask<TWebSocketReceiveResult> ReceiveAsyncPrivate<TWebSocketR
697698
}
698699

699700
// Otherwise, read as much of the payload as we can efficiently, and upate the header to reflect how much data
700-
// remains for future reads.
701-
int bytesToCopy = Math.Min(payloadBuffer.Length, (int)Math.Min(header.PayloadLength, _receiveBuffer.Length));
702-
Debug.Assert(bytesToCopy > 0, $"Expected {nameof(bytesToCopy)} > 0");
703-
if (_receiveBufferCount < bytesToCopy)
701+
// remains for future reads. We first need to copy any data that may be lingering in the receive buffer
702+
// into the destination; then to minimize ReceiveAsync calls, we want to read as much as we can, stopping
703+
// only when we've either read the whole message or when we've filled the payload buffer.
704+
705+
// First copy any data lingering in the receive buffer.
706+
int totalBytesReceived = 0;
707+
if (_receiveBufferCount > 0)
704708
{
705-
await EnsureBufferContainsAsync(bytesToCopy, throwOnPrematureClosure: true).ConfigureAwait(false);
709+
int receiveBufferBytesToCopy = Math.Min(payloadBuffer.Length, (int)Math.Min(header.PayloadLength, _receiveBufferCount));
710+
Debug.Assert(receiveBufferBytesToCopy > 0);
711+
_receiveBuffer.Span.Slice(_receiveBufferOffset, receiveBufferBytesToCopy).CopyTo(payloadBuffer.Span);
712+
ConsumeFromBuffer(receiveBufferBytesToCopy);
713+
totalBytesReceived += receiveBufferBytesToCopy;
714+
Debug.Assert(
715+
_receiveBufferCount == 0 ||
716+
totalBytesReceived == payloadBuffer.Length ||
717+
totalBytesReceived == header.PayloadLength);
718+
}
719+
720+
// Then read directly into the payload buffer until we've hit a limit.
721+
while (totalBytesReceived < payloadBuffer.Length &&
722+
totalBytesReceived < header.PayloadLength)
723+
{
724+
int numBytesRead = await _stream.ReadAsync(payloadBuffer.Slice(
725+
totalBytesReceived,
726+
(int)Math.Min(payloadBuffer.Length, header.PayloadLength) - totalBytesReceived)).ConfigureAwait(false);
727+
if (numBytesRead <= 0)
728+
{
729+
ThrowIfEOFUnexpected(throwOnPrematureClosure: true);
730+
break;
731+
}
732+
totalBytesReceived += numBytesRead;
706733
}
707734

708735
if (_isServer)
709736
{
710-
_receivedMaskOffsetOffset = ApplyMask(_receiveBuffer.Span.Slice(_receiveBufferOffset, bytesToCopy), header.Mask, _receivedMaskOffsetOffset);
737+
_receivedMaskOffsetOffset = ApplyMask(payloadBuffer.Span.Slice(0, totalBytesReceived), header.Mask, _receivedMaskOffsetOffset);
711738
}
712-
_receiveBuffer.Span.Slice(_receiveBufferOffset, bytesToCopy).CopyTo(payloadBuffer.Span.Slice(0, bytesToCopy));
713-
ConsumeFromBuffer(bytesToCopy);
714-
header.PayloadLength -= bytesToCopy;
739+
header.PayloadLength -= totalBytesReceived;
715740

716741
// If this a text message, validate that it contains valid UTF8.
717742
if (header.Opcode == MessageOpcode.Text &&
718-
!TryValidateUtf8(payloadBuffer.Span.Slice(0, bytesToCopy), header.Fin, _utf8TextState))
743+
!TryValidateUtf8(payloadBuffer.Span.Slice(0, totalBytesReceived), header.Fin, _utf8TextState))
719744
{
720745
await CloseWithReceiveErrorAndThrowAsync(WebSocketCloseStatus.InvalidPayloadData, WebSocketError.Faulted).ConfigureAwait(false);
721746
}
722747

723748
_lastReceiveHeader = header;
724749
return resultGetter.GetResult(
725-
bytesToCopy,
750+
totalBytesReceived,
726751
header.Opcode == MessageOpcode.Text ? WebSocketMessageType.Text : WebSocketMessageType.Binary,
727752
header.Fin && header.PayloadLength == 0,
728753
null, null);
@@ -1165,27 +1190,32 @@ private async Task EnsureBufferContainsAsync(int minimumRequiredBytes, bool thro
11651190
{
11661191
int numRead = await _stream.ReadAsync(_receiveBuffer.Slice(_receiveBufferCount, _receiveBuffer.Length - _receiveBufferCount), default).ConfigureAwait(false);
11671192
Debug.Assert(numRead >= 0, $"Expected non-negative bytes read, got {numRead}");
1168-
_receiveBufferCount += numRead;
1169-
if (numRead == 0)
1193+
if (numRead <= 0)
11701194
{
1171-
// The connection closed before we were able to read everything we needed.
1172-
// If it was due to use being disposed, fail. If it was due to the connection
1173-
// being closed and it wasn't expected, fail. If it was due to the connection
1174-
// being closed and that was expected, exit gracefully.
1175-
if (_disposed)
1176-
{
1177-
throw new ObjectDisposedException(nameof(WebSocket));
1178-
}
1179-
else if (throwOnPrematureClosure)
1180-
{
1181-
throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely);
1182-
}
1195+
ThrowIfEOFUnexpected(throwOnPrematureClosure);
11831196
break;
11841197
}
1198+
_receiveBufferCount += numRead;
11851199
}
11861200
}
11871201
}
11881202

1203+
private void ThrowIfEOFUnexpected(bool throwOnPrematureClosure)
1204+
{
1205+
// The connection closed before we were able to read everything we needed.
1206+
// If it was due to us being disposed, fail. If it was due to the connection
1207+
// being closed and it wasn't expected, fail. If it was due to the connection
1208+
// being closed and that was expected, exit gracefully.
1209+
if (_disposed)
1210+
{
1211+
throw new ObjectDisposedException(nameof(WebSocket));
1212+
}
1213+
if (throwOnPrematureClosure)
1214+
{
1215+
throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely);
1216+
}
1217+
}
1218+
11891219
/// <summary>Gets a send buffer from the pool.</summary>
11901220
private void AllocateSendBuffer(int minLength)
11911221
{

src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,52 @@ private async ValueTask<int> ReadAsync(Memory<byte> destination)
12961296
return count;
12971297
}
12981298

1299+
private ValueTask<int> ReadBufferedAsync(Memory<byte> destination)
1300+
{
1301+
// If the caller provided buffer, and thus the amount of data desired to be read,
1302+
// is larger than the internal buffer, there's no point going through the internal
1303+
// buffer, so just do an unbuffered read.
1304+
return destination.Length >= _readBuffer.Length ?
1305+
ReadAsync(destination) :
1306+
ReadBufferedAsyncCore(destination);
1307+
}
1308+
1309+
private async ValueTask<int> ReadBufferedAsyncCore(Memory<byte> destination)
1310+
{
1311+
// This is called when reading the response body.
1312+
1313+
int remaining = _readLength - _readOffset;
1314+
if (remaining > 0)
1315+
{
1316+
// We have data in the read buffer. Return it to the caller.
1317+
if (destination.Length <= remaining)
1318+
{
1319+
ReadFromBuffer(destination.Span);
1320+
return destination.Length;
1321+
}
1322+
else
1323+
{
1324+
ReadFromBuffer(destination.Span.Slice(0, remaining));
1325+
return remaining;
1326+
}
1327+
}
1328+
1329+
// No data in read buffer.
1330+
_readOffset = _readLength = 0;
1331+
1332+
// Do a buffered read directly against the underlying stream.
1333+
Debug.Assert(_readAheadTask == null, "Read ahead task should have been consumed as part of the headers.");
1334+
int bytesRead = await _stream.ReadAsync(_readBuffer.AsMemory()).ConfigureAwait(false);
1335+
if (NetEventSource.IsEnabled) Trace($"Received {bytesRead} bytes.");
1336+
_readLength = bytesRead;
1337+
1338+
// Hand back as much data as we can fit.
1339+
int bytesToCopy = Math.Min(bytesRead, destination.Length);
1340+
_readBuffer.AsSpan(0, bytesToCopy).CopyTo(destination.Span);
1341+
_readOffset = bytesToCopy;
1342+
return bytesToCopy;
1343+
}
1344+
12991345
private async Task CopyFromBufferAsync(Stream destination, int count, CancellationToken cancellationToken)
13001346
{
13011347
Debug.Assert(count <= _readLength - _readOffset);

src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/RawConnectionStream.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
2626
return 0;
2727
}
2828

29-
ValueTask<int> readTask = _connection.ReadAsync(buffer);
29+
ValueTask<int> readTask = _connection.ReadBufferedAsync(buffer);
3030
int bytesRead;
3131
if (readTask.IsCompletedSuccessfully)
3232
{

src/System.Net.WebSockets.Client/src/System/Net/WebSockets/WebSocketHandle.Managed.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,12 @@ public async Task ConnectAsyncCore(Uri uri, CancellationToken cancellationToken,
204204
}
205205

206206
// Get or create the buffer to use
207-
const int MinBufferSize = 14; // from ManagedWebSocket.MaxMessageHeaderLength
207+
const int MinBufferSize = 125; // from ManagedWebSocket.MaxControlPayloadLength
208208
ArraySegment<byte> optionsBuffer = options.Buffer.GetValueOrDefault();
209209
Memory<byte> buffer =
210210
optionsBuffer.Count >= MinBufferSize ? optionsBuffer : // use the provided buffer if it's big enough
211-
options.ReceiveBufferSize >= MinBufferSize ? new byte[options.ReceiveBufferSize] : // or use the requested size if it's big enough
212-
Memory<byte>.Empty; // or let WebSocket.CreateFromStream use its default
211+
default; // or let WebSocket.CreateFromStream use its default
212+
// options.ReceiveBufferSize is ignored, as we rely on the buffer inside the SocketsHttpHandler stream
213213

214214
// Get the response stream and wrap it in a web socket.
215215
Stream connectedStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

0 commit comments

Comments
 (0)