-
Notifications
You must be signed in to change notification settings - Fork 402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle concurrent WebSockets messages #6210
Conversation
On #6078 we introduced streaming for socket messages to avoid OOM errors on big payloads. Unfortunately, this change introduced a new type of problem on subscriptions: it's possible for Nethermind to raise multiple events in a very short span of time, and when sending those messages we need to be careful of not interleaving them. This PR addresses this problem by using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this would require release note information as it could break existing subscriptions in plug-ins.
@@ -22,7 +22,7 @@ protected Subscription(IJsonRpcDuplexClient jsonRpcDuplexClient) | |||
public string Id { get; } | |||
public abstract string Type { get; } | |||
public IJsonRpcDuplexClient JsonRpcDuplexClient { get; } | |||
private Channel<Action> SendChannel { get; } = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would making the channel static - shared between subscriptions - fix the problem?
I think regardless we could/should make it static, there is no point in each subscription having own channel and it may cause problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would making the channel static - shared between subscriptions - fix the problem?
Unfortunately, this also does not work. We still have the same original concurrency issues (web socket gets in a bad state).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why making the channel static didn't fix the problem? Then it is shared by all subscriptions and should have make all messages sequential?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe channel should be on web socket itself and not on subscription if that is the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or if we want to keep the lock we don't need channels at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why making the channel static didn't fix the problem?
I think the reason is that each subscription instance starts its own ProcessMessage
loop
nethermind/src/Nethermind/Nethermind.JsonRpc/Modules/Subscribe/Subscription.cs
Lines 15 to 20 in fe2b14e
protected Subscription(IJsonRpcDuplexClient jsonRpcDuplexClient) | |
{ | |
Id = string.Concat("0x", Guid.NewGuid().ToString("N")); | |
JsonRpcDuplexClient = jsonRpcDuplexClient; | |
ProcessMessages(); | |
} |
This means that we ensure no overlapping processing per subscription, but there are no guarantees of synchronization across them.
A possible solution would be to put the lock in the Subscription
as static, ensuring that only one subscription at a time is doing work. I don't like this approach because a subscription might not need to be synchronized (because it does not use any shared resources like WebSockets).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or if we want to keep the lock we don't need channels at all?
Channels were introduced in #3458 to ensure ordering of messages. I believe that we could remove them and it should not affect the behavior due to locking, but I didn't want to introduce such change in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to double check, but I don't think that when two thread is waiting for a lock, there is a guarentee that they are released in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you can have communication on websocket that isn't subscriptions but normal request-response. I think we could downscale to having 1 channel per all subscriptions though, which would make it cheaper.
[TestCase(5)] | ||
[TestCase(10)] | ||
[Explicit("Requires a WS server running")] | ||
public async Task NewPendingTransactionSubscription_multiple_fast_messages(int messages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make test with competing subscriptions (same/different channel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test and it fails: since each subscription handles it's own queue of tasks we end up with the same problem.
await using CounterStream resultData = new CounterStream(buffered); | ||
|
||
if (result.IsCollection) | ||
await _sendSemaphore.WaitAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The final solution requires the use of locks: through a semaphore, we ensure that only one message at a time is being sent using the underlying WebSocket
.
We preserve the streaming approach, meaning that we can send large objects without failing due to OOM.
5ed6e09
to
a9f7700
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are up for it refactor the whole thing by converting/adapt the underling event into IAsyncEnumerable<something>
, and eventually to a json result. Potentially easier to understand.
* Test for #6169 * Use async on Subscription message processing * Fix `ScheduleAction` usages - Use async everywhere * Parametrize test * Reduce number of messages * Add test for multiple concurrent subscriptions * Add locks to avoid concurrency issues - Locks :( * Dispose semaphore * Targeted new * Reduce delays
Fixes #6169
Changes
Task
instead ofAction
when dealing with subscription messages.Types of changes
What types of changes does your code introduce?
Testing
Requires testing
If yes, did you write tests?
Notes on testing
An integration test was added to reproduce the reported issue. The test is annotated with
[Explicit]
since it requires an external WebSockets server running. We could make the test self contained by running a local server but this has produced issues in the past on CI (#6102).Documentation
Requires documentation update
Requires explanation in Release Notes