-
Notifications
You must be signed in to change notification settings - Fork 395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix JSON-RPC response streaming #6078
Conversation
- Wrap a `WebSocket` supporting `Stream` operations
- Decorator that counts the bytes written
- Do not dispose `_socket` - When disposing, just send an "end of message"
- Instead of using 'SendRawAsync' operate on the socket using a 'Stream'
- Restructure tests (IPC, WebSockets)
- Test still could be flaky
- Do nothing on 'ReportCall' - Return default value on 'GetMethodStats' - Useful during testing
- Use delays to synchronize =(
- We already are using delays
// ? What if we write more than int.MaxValue. | ||
// Result could be negative | ||
// return (int)resultData.WrittenBytes; | ||
return (int)resultData.WrittenBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, we could send infinite number of bytes, but our result is bounded by int.MaxValue
according to the result type of this operation.
The CounterStream
uses a long internally, but this still could be problematic (we are bounded by long.MaxSize
).
_ = _jsonRpcLocalStats.ReportCall(entry.Report); | ||
|
||
// We reached the limit and don't want to responded to more request in the batch | ||
if (!_jsonRpcContext.IsAuthenticated && singleResponseSize > _maxBatchResponseBodySize) | ||
if (!_jsonRpcContext.IsAuthenticated && resultData.WrittenBytes > _maxBatchResponseBodySize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature is fundamentally broken. Consider the following scenarios where we are sending JsonRpcResult
s and each value in <>
represents the number of bytes:
Scenario 1: [ <100>, <100>, <999999> ] (Collection)
Scenario 2: { items: [ <999999>, <999999>, <999999>, ... ] } (Single)
On 1
, if we limit set _maxBatchResponseBodySize
to 300
, that is, we limit the response size to 300 bytes, we will fully send the last element of the collection, sending a total of 100 + 100 + 9999999
bytes, which exceeds the limit.
On 2
, since we're dealing with a single element, we do not perform any checks, despite the element containing a collection of items that could exceed any possible limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A proper solution would require the underlying stream (resultData
) to stop as soon as the limit is reached, just by taking a look at the number of bytes written. A decorator like LimitedStream(Stream stream, long writeAtMostBytes)
would suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are tests (ignored for now) that show how this feature is currently broken (see: https://github.com/NethermindEth/nethermind/pull/6078/files#diff-4fc7730d1718514a2e6903a9a4470be7f5809c327011ae97ff5a75f8e04d825eR127).
"Request was canceled due to enabled timeout.", | ||
result.Response.Id, | ||
result.Response.MethodName)); | ||
_jsonSerializer.SerializeWaitForEnumeration(dest, result.Response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite Stream
supporting async operations, internally we're using Newtonsoft.Json which will only use System.IO.Stream.Write(byte[] buffer, int offset, int count)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think we're still using Newtonsoft.Json, after Ben's changes? right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm I didnt notice PR merged xD
for (int i = 0; i < messageCount; i++) | ||
{ | ||
await client.SendJsonRpcResult(result); | ||
await Task.Delay(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really prefer a different approach to ensure that the "server" has finished processing all messages instead of relying on delays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ManualResetEventSlim
?
@@ -15,5 +16,6 @@ public interface ISocketHandler : IDisposable | |||
Task SendRawAsync(ArraySegment<byte> data, bool endMessage = true); | |||
Task<ReceiveResult?> GetReceiveResult(ArraySegment<byte> buffer); | |||
Task CloseAsync(ReceiveResult? result); | |||
Stream SendUsingStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetUnderlyingStream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially used a name like AsStream
, but I found it to be confusing:
- For
Ipc
, theNetworkStream
does not own the underlyingSocket
, meaning that disposing this stream does not close the connection. - Same for
WebSockets
where on dispose we send a "end of message" message and it does not manage lifecycle events of the connection (ex. "close" messages).
For both cases, each Stream is intended to be used to send a single message during the lifetime o the underlying Socket. What do you think @LukaszRozmej?
|
||
public class WebSocketStream : Stream | ||
{ | ||
private WebSocket? _socket; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When it would be null?
} | ||
finally | ||
{ | ||
_socket = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why nulling it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to use a WebSocketStream
for a single message, so we want to ensure that we're not using it to send a message after one has already been sent.
|
||
private void ThrowIfDisposed() | ||
{ | ||
if (_socket == null) throw new ObjectDisposedException(nameof(_socket)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better to have just Disposed flag?
src/Nethermind/Nethermind.JsonRpc.Test/Sockets/RandomStringArray.cs
Outdated
Show resolved
Hide resolved
for (int i = 0; i < messageCount; i++) | ||
{ | ||
await client.SendJsonRpcResult(result); | ||
await Task.Delay(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ManualResetEventSlim
?
- Remove 'RandomX' classes, prefer methods
- Might help with flaky tests on CI
- It actually has no effect on CI
I'm curious if these changes are related to #6169. Every now and again, for |
I'm seeing the same issue now when I test against IPC streaming with subscriptions. Perhaps this is related once again. I opened issue #6614 describing the issue. |
Fixes #5955
Changes
Ipc
andWebSockets
Types of changes
What types of changes does your code introduce?
Testing
Requires testing
If yes, did you write tests?
Notes on testing
Task.Delay
for synchronization.Documentation
Requires documentation update
Requires explanation in Release Notes
Remarks
During testing I found that a feature is not working properly (
maxBatchResponseBodySize
)