Skip to content

Commit

Permalink
Fix JSON-RPC response streaming (#6078)
Browse files Browse the repository at this point in the history
* Add `WebSocketStream`

- Wrap a `WebSocket` supporting `Stream` operations

* Add single test

* Include possible fix notes

* Introduce 'CounterStream'

- Decorator that counts the bytes written

* Do not take ownership of the underlying WebSocket

- Do not dispose `_socket`
- When disposing, just send an "end of message"

* Add 'SendUsingStream' to 'ISocketHandler'

- Instead of using 'SendRawAsync' operate on the socket using a 'Stream'

* Add more tests

- Restructure tests (IPC, WebSockets)

* Remove unused safe null operators

* Replace implementation with 'Stream's

* Use Test annotations

* Test 'WebSockets' impl

* Add 'NullJsonRpcLocalStats'

- Do nothing on 'ReportCall'
- Return default value on 'GetMethodStats'
- Useful during testing

* Initial WebSockets collection tests

* Refactor message counting for WebSockets

* Refactor IPC tests

* Test sending collections through IPC

* Increase test case count

* Use 'NullJsonRpcLocalStats' when required

* Send collections using streams

* Invert if branches

* Add tests for 'maxBatchResponseBodySize' option

* Reorder fields, rename server

* Fix random object usages

* Throw 'InvalidOperationException' when socket is null
  • Loading branch information
emlautarom1 committed Sep 11, 2023
1 parent 8571d76 commit fad0455
Show file tree
Hide file tree
Showing 8 changed files with 680 additions and 46 deletions.
443 changes: 443 additions & 0 deletions src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions src/Nethermind/Nethermind.JsonRpc/NullJsonRpcLocalStats.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Threading.Tasks;

namespace Nethermind.JsonRpc;

public class NullJsonRpcLocalStats : IJsonRpcLocalStats
{

public Task ReportCall(RpcReport report, long elapsedMicroseconds = 0, long? size = null)
{
return Task.CompletedTask;
}
public MethodStats GetMethodStats(string methodName)
{
return new MethodStats();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ private void IncrementBytesSentMetric(int size)

public virtual async Task<int> SendJsonRpcResult(JsonRpcResult result)
{
await using Stream stream = _handler.SendUsingStream();
await using Stream buffered = new BufferedStream(stream);
await using CounterStream resultData = new CounterStream(buffered);

if (result.IsCollection)
{
int singleResponseSize = 1;
bool isFirst = true;
await _handler.SendRawAsync(_jsonOpeningBracket, false);
await resultData.WriteAsync(_jsonOpeningBracket);
JsonRpcBatchResultAsyncEnumerator enumerator = result.BatchedResponses!.GetAsyncEnumerator(CancellationToken.None);
try
{
Expand All @@ -126,16 +129,14 @@ public virtual async Task<int> SendJsonRpcResult(JsonRpcResult result)
{
if (!isFirst)
{
await _handler.SendRawAsync(_jsonComma, false);
singleResponseSize += 1;
await resultData.WriteAsync(_jsonComma);
}

isFirst = false;
singleResponseSize += await SendJsonRpcResultEntry(entry, false);
SendJsonRpcResultEntry(resultData, entry);
_ = _jsonRpcLocalStats.ReportCall(entry.Report);

// We reached the limit and don't want to responded to more request in the batch
if (!_jsonRpcContext.IsAuthenticated && singleResponseSize > _maxBatchResponseBodySize)
if (!_jsonRpcContext.IsAuthenticated && resultData.WrittenBytes > _maxBatchResponseBodySize)
{
enumerator.IsStopped = true;
}
Expand All @@ -147,52 +148,37 @@ public virtual async Task<int> SendJsonRpcResult(JsonRpcResult result)
await enumerator.DisposeAsync();
}

await _handler.SendRawAsync(_jsonClosingBracket, true);
singleResponseSize += 1;

return singleResponseSize;
await resultData.WriteAsync(_jsonClosingBracket);
}
else
{
return await SendJsonRpcResultEntry(result.SingleResponse!.Value);
SendJsonRpcResultEntry(resultData, result.SingleResponse!.Value);
}

// ? What if we write more than int.MaxValue.
// Result could be negative
return (int)resultData.WrittenBytes;
}

private async Task<int> SendJsonRpcResultEntry(JsonRpcResult.Entry result, bool endOfMessage = true)
private void SendJsonRpcResultEntry(Stream dest, JsonRpcResult.Entry result)
{
void SerializeTimeoutException(MemoryStream stream, JsonRpcResult.Entry result)
using JsonRpcResult.Entry entry = result;

try
{
_jsonSerializer.Serialize(stream, _jsonRpcService.GetErrorResponse(
ErrorCodes.Timeout,
"Request was canceled due to enabled timeout.",
result.Response.Id,
result.Response.MethodName));
_jsonSerializer.SerializeWaitForEnumeration(dest, result.Response);
}

using (result)
catch (Exception e) when (e is OperationCanceledException || e.InnerException is OperationCanceledException)
{
await using MemoryStream resultData = new();

try
{
_jsonSerializer.SerializeWaitForEnumeration(resultData, result.Response);
}
catch (Exception e) when (e.InnerException is OperationCanceledException)
{
SerializeTimeoutException(resultData, result);
}
catch (OperationCanceledException)
{
SerializeTimeoutException(resultData, result);
}

if (resultData.TryGetBuffer(out ArraySegment<byte> data))
{
await _handler.SendRawAsync(data, endOfMessage);
return data.Count;
}

return (int)resultData.Length;
_jsonSerializer.Serialize(
dest,
_jsonRpcService.GetErrorResponse(
ErrorCodes.Timeout,
"Request was canceled due to enabled timeout.",
result.Response.Id,
result.Response.MethodName
)
);
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions src/Nethermind/Nethermind.Sockets/CounterStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.IO;

namespace Nethermind.Sockets;

public class CounterStream : Stream
{
private readonly Stream _stream;

public CounterStream(Stream stream)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
}

public long WrittenBytes { get; private set; }

public override void Flush() => _stream.Flush();

public override int Read(byte[] buffer, int offset, int count) => _stream.Read(buffer, offset, count);

public override long Seek(long offset, SeekOrigin origin) => _stream.Seek(offset, origin);

public override void SetLength(long value) => _stream.SetLength(value);

public override void Write(byte[] buffer, int offset, int count)
{
_stream.Write(buffer, offset, count);
WrittenBytes += count;
}

public override bool CanRead
{
get => _stream.CanRead;
}

public override bool CanSeek
{
get => _stream.CanSeek;
}

public override bool CanWrite
{
get => _stream.CanWrite;
}

public override long Length
{
get => _stream.Length;
}

public override bool CanTimeout
{
get => _stream.CanTimeout;
}

public override long Position
{
get => _stream.Position;
set => _stream.Position = value;
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Sockets/ISocketHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.IO;
using System.Threading.Tasks;

namespace Nethermind.Sockets
Expand All @@ -15,5 +16,6 @@ public interface ISocketHandler : IDisposable
Task SendRawAsync(ArraySegment<byte> data, bool endMessage = true);
Task<ReceiveResult?> GetReceiveResult(ArraySegment<byte> buffer);
Task CloseAsync(ReceiveResult? result);
Stream SendUsingStream();
}
}
8 changes: 7 additions & 1 deletion src/Nethermind/Nethermind.Sockets/IpcSocketsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.IO;
using System.Net.Sockets;
using System.Threading.Tasks;

Expand Down Expand Up @@ -44,9 +45,14 @@ public Task CloseAsync(ReceiveResult? result)
return Task.Factory.StartNew(_socket.Close);
}

public Stream SendUsingStream()
{
return new NetworkStream(_socket, FileAccess.Write, ownsSocket: false);
}

public void Dispose()
{
_socket?.Dispose();
_socket.Dispose();
}
}
}
10 changes: 8 additions & 2 deletions src/Nethermind/Nethermind.Sockets/WebSocketHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.IO;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Threading;
Expand All @@ -18,7 +19,7 @@ public class WebSocketHandler : ISocketHandler
public WebSocketHandler(WebSocket webSocket, ILogManager logManager)
{
_webSocket = webSocket;
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_logger = logManager.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
}

public Task SendRawAsync(ArraySegment<byte> data, bool endOfMessage = true) =>
Expand Down Expand Up @@ -108,9 +109,14 @@ public Task CloseAsync(ReceiveResult? result)
return Task.CompletedTask;
}

public Stream SendUsingStream()
{
return new WebSocketStream(_webSocket, WebSocketMessageType.Text);
}

public void Dispose()
{
_webSocket?.Dispose();
_webSocket.Dispose();
}
}
}
108 changes: 108 additions & 0 deletions src/Nethermind/Nethermind.Sockets/WebSocketStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Sockets;

public class WebSocketStream : Stream
{
private WebSocket? _socket;
private readonly WebSocketMessageType _messageType;

public WebSocketStream(WebSocket socket, WebSocketMessageType messageType)
{
_socket = socket;
_messageType = messageType;
}

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ThrowIfDisposed();
_ = _socket ?? throw new InvalidOperationException($"The underlying {nameof(WebSocket)} is null");

if (_socket.State is WebSocketState.Closed or WebSocketState.CloseReceived or WebSocketState.CloseSent)
{
return 0;
}

ArraySegment<byte> segment = new(buffer, offset, count);
WebSocketReceiveResult result = await _socket.ReceiveAsync(segment, cancellationToken);

if (result.MessageType == WebSocketMessageType.Close)
{
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Remote close", cancellationToken);
return 0;
}

return result.Count;
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ThrowIfDisposed();
_ = _socket ?? throw new ArgumentNullException(nameof(_socket));
if (_socket.State != WebSocketState.Open) { throw new IOException($"WebSocket not open ({_socket.State})"); }

ArraySegment<byte> segment = new(buffer, offset, count);
await _socket.SendAsync(segment, _messageType, false, cancellationToken);
}

public override void Flush() { }

public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}

public override void SetLength(long value)
{
throw new NotSupportedException();
}

public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

protected override void Dispose(bool disposing)
{
try
{
if (disposing)
{
_socket?.SendAsync(ReadOnlyMemory<byte>.Empty, WebSocketMessageType.Text, true, CancellationToken.None);
}
}
finally
{
_socket = null;
base.Dispose(disposing);
}
}

private void ThrowIfDisposed()
{
if (_socket == null) throw new ObjectDisposedException(nameof(_socket));
}
}

0 comments on commit fad0455

Please sign in to comment.