Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle concurrent WebSockets messages #6210

Merged
merged 10 commits into from
Oct 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,13 @@ public class NewPendingUserOpsSubscription : Subscription

private void OnNewPending(object? sender, UserOperationEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result;
if (_includeUserOperations)
{
result = CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint });
}
else
{
result = CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
}
JsonRpcDuplexClient.SendJsonRpcResult(result);
result = _includeUserOperations
? CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint })
: CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"newPendingUserOperations subscription {Id} printed hash of newPendingUserOperations.");
});
}
Expand All @@ -89,5 +84,3 @@ public override void Dispose()
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,13 @@ public class NewReceivedUserOpsSubscription : Subscription

private void OnNewReceived(object? sender, UserOperationEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result;
if (_includeUserOperations)
{
result = CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint });
}
else
{
result = CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
}
JsonRpcDuplexClient.SendJsonRpcResult(result);
result = _includeUserOperations
? CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint })
: CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"newReceivedUserOperations subscription {Id} printed hash of newReceivedUserOperations.");
});
}
Expand All @@ -89,5 +84,3 @@ public override void Dispose()
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Filters;
using Nethermind.Blockchain.Find;
using Nethermind.Blockchain.Receipts;
Expand All @@ -18,17 +18,16 @@
using Nethermind.Core.Collections;
using Nethermind.Core.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Db;
using Nethermind.Db.Blooms;
using Nethermind.Facade.Eth;
using Nethermind.Int256;
using Nethermind.JsonRpc.Modules;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.JsonRpc.Modules.Subscribe;
using Nethermind.JsonRpc.WebSockets;
using Nethermind.Logging;
using Nethermind.Serialization.Json;
using Nethermind.Sockets;
using Nethermind.Specs;
using Nethermind.State.Repositories;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.TxPool;
using Newtonsoft.Json;
Expand Down Expand Up @@ -805,6 +804,101 @@ public void NewPendingTransactionsSubscription_on_NewPending_with_includeTransac
expectedResult.Should().Be(serialized);
}

[TestCase(2)]
[TestCase(5)]
[TestCase(10)]
[Explicit("Requires a WS server running")]
public async Task NewPendingTransactionSubscription_multiple_fast_messages(int messages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make test with competing subscriptions (same/different channel)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test and it fails: since each subscription handles it's own queue of tasks we end up with the same problem.

{
ITxPool txPool = Substitute.For<ITxPool>();

using ClientWebSocket socket = new();
await socket.ConnectAsync(new Uri("ws://localhost:1337/"), CancellationToken.None);

using ISocketHandler handler = new WebSocketHandler(socket, NullLogManager.Instance);
using JsonRpcSocketsClient client = new(
clientName: "TestClient",
handler: handler,
endpointType: RpcEndpoint.Ws,
jsonRpcProcessor: null!,
jsonRpcService: null!,
jsonRpcLocalStats: new NullJsonRpcLocalStats(),
jsonSerializer: new EthereumJsonSerializer()
);

using NewPendingTransactionsSubscription subscription = new(
jsonRpcDuplexClient: client,
txPool: txPool,
logManager: LimboLogs.Instance);

for (int i = 0; i < messages; i++)
{
Transaction tx = new();
txPool.NewPending += Raise.EventWith(new TxEventArgs(tx));
}

// Wait until all messages are sent
await Task.Delay(10_000);
}

[TestCase(2)]
[TestCase(5)]
[TestCase(10)]
[Explicit("Requires a WS server running")]
public async Task MultipleSubscriptions_concurrent_fast_messages(int messages)
{
using ClientWebSocket socket = new();
await socket.ConnectAsync(new Uri("ws://localhost:1337/"), CancellationToken.None);

using ISocketHandler handler = new WebSocketHandler(socket, NullLogManager.Instance);
using JsonRpcSocketsClient client = new(
clientName: "TestClient",
handler: handler,
endpointType: RpcEndpoint.Ws,
jsonRpcProcessor: null!,
jsonRpcService: null!,
jsonRpcLocalStats: new NullJsonRpcLocalStats(),
jsonSerializer: new EthereumJsonSerializer()
);

Task subA = Task.Run(() =>
{
ITxPool txPool = Substitute.For<ITxPool>();
using NewPendingTransactionsSubscription subscription = new(
// ReSharper disable once AccessToDisposedClosure
jsonRpcDuplexClient: client,
txPool: txPool,
logManager: LimboLogs.Instance);

for (int i = 0; i < messages; i++)
{
Transaction tx = new();
txPool.NewPending += Raise.EventWith(new TxEventArgs(tx));
}
});
Task subB = Task.Run(() =>
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
using NewHeadSubscription subscription = new(
// ReSharper disable once AccessToDisposedClosure
jsonRpcDuplexClient: client,
blockTree: blockTree,
specProvider: new TestSpecProvider(new ReleaseSpec()),
logManager: LimboLogs.Instance);

for (int i = 0; i < messages; i++)
{
BlockReplacementEventArgs eventArgs = new(Build.A.Block.TestObject);
blockTree.BlockAddedToMain += Raise.EventWith(eventArgs);
}
});

await Task.WhenAll(subA, subB);

// Wait until all messages are sent
await Task.Delay(10_000);
}

[Test]
public void NewHeadSubscription_with_baseFeePerGas_test()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public class DroppedPendingTransactionsSubscription : Subscription

private void OnEvicted(object? sender, TxEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result = CreateSubscriptionMessage(e.Transaction.Hash);
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace)
_logger.Trace(
$"DroppedPendingTransactions subscription {Id} printed hash of DroppedPendingTransaction.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Filters;
using Nethermind.Blockchain.Find;
Expand Down Expand Up @@ -62,10 +63,10 @@ private void OnReceiptsInserted(object? sender, ReceiptsEventArgs e)

private void TryPublishReceiptsInBackground(BlockHeader blockHeader, Func<TxReceipt[]> getReceipts, string eventName, bool removed)
{
ScheduleAction(() => TryPublishEvent(blockHeader, getReceipts(), eventName, removed));
ScheduleAction(async () => await TryPublishEvent(blockHeader, getReceipts(), eventName, removed));
}

private void TryPublishEvent(BlockHeader blockHeader, TxReceipt[] receipts, string eventName, bool removed)
private async Task TryPublishEvent(BlockHeader blockHeader, TxReceipt[] receipts, string eventName, bool removed)
{
BlockHeader fromBlock = _blockTree.FindHeader(_filter.FromBlock);
BlockHeader toBlock = _blockTree.FindHeader(_filter.ToBlock, true);
Expand All @@ -80,7 +81,7 @@ private void TryPublishEvent(BlockHeader blockHeader, TxReceipt[] receipts, stri
foreach (var filterLog in filterLogs)
{
JsonRpcResult result = CreateSubscriptionMessage(filterLog);
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"Logs subscription {Id} printed new log.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public class NewHeadSubscription : Subscription

private void OnBlockAddedToMain(object? sender, BlockReplacementEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result = CreateSubscriptionMessage(new BlockForRpc(e.Block, _includeTransactions, _specProvider));

JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"NewHeads subscription {Id} printed new block");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;
using Nethermind.JsonRpc.Data;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.Logging;
Expand Down Expand Up @@ -31,10 +32,10 @@ public class NewPendingTransactionsSubscription : Subscription

private void OnNewPending(object? sender, TxEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result = CreateSubscriptionMessage(_includeTransactions ? new TransactionForRpc(e.Transaction) : e.Transaction.Hash);
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"NewPendingTransactions subscription {Id} printed hash of NewPendingTransaction.");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected Subscription(IJsonRpcDuplexClient jsonRpcDuplexClient)
public string Id { get; }
public abstract string Type { get; }
public IJsonRpcDuplexClient JsonRpcDuplexClient { get; }
private Channel<Action> SendChannel { get; } = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would making the channel static - shared between subscriptions - fix the problem?
I think regardless we could/should make it static, there is no point in each subscription having own channel and it may cause problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would making the channel static - shared between subscriptions - fix the problem?

Unfortunately, this also does not work. We still have the same original concurrency issues (web socket gets in a bad state).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why making the channel static didn't fix the problem? Then it is shared by all subscriptions and should have make all messages sequential?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe channel should be on web socket itself and not on subscription if that is the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if we want to keep the lock we don't need channels at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why making the channel static didn't fix the problem?

I think the reason is that each subscription instance starts its own ProcessMessage loop

protected Subscription(IJsonRpcDuplexClient jsonRpcDuplexClient)
{
Id = string.Concat("0x", Guid.NewGuid().ToString("N"));
JsonRpcDuplexClient = jsonRpcDuplexClient;
ProcessMessages();
}

This means that we ensure no overlapping processing per subscription, but there are no guarantees of synchronization across them.

A possible solution would be to put the lock in the Subscription as static, ensuring that only one subscription at a time is doing work. I don't like this approach because a subscription might not need to be synchronized (because it does not use any shared resources like WebSockets).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if we want to keep the lock we don't need channels at all?

Channels were introduced in #3458 to ensure ordering of messages. I believe that we could remove them and it should not affect the behavior due to locking, but I didn't want to introduce such change in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check, but I don't think that when two thread is waiting for a lock, there is a guarentee that they are released in order.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you can have communication on websocket that isn't subscriptions but normal request-response. I think we could downscale to having 1 channel per all subscriptions though, which would make it cheaper.

private Channel<Func<Task>> SendChannel { get; } = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions { SingleReader = true });

public virtual void Dispose()
{
Expand All @@ -42,7 +42,7 @@ protected JsonRpcResult CreateSubscriptionMessage(object result)
}, default);
}

protected void ScheduleAction(Action action)
protected void ScheduleAction(Func<Task> action)
{
SendChannel.Writer.TryWrite(action);
}
Expand All @@ -55,11 +55,11 @@ private void ProcessMessages()
{
while (await SendChannel.Reader.WaitToReadAsync())
{
while (SendChannel.Reader.TryRead(out Action action))
while (SendChannel.Reader.TryRead(out Func<Task> action))
{
try
{
action();
await action();
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private class SubscriptionSyncingResult

private void OnConditionsChange(object? sender, BlockEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
SyncingResult syncingResult = _ethSyncingInfo.GetFullInfo();
bool isSyncing = syncingResult.IsSyncing;
Expand Down Expand Up @@ -78,7 +78,7 @@ private void OnConditionsChange(object? sender, BlockEventArgs e)
}


JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
_logger.Trace($"Syncing subscription {Id} printed SyncingResult object.");
});
}
Expand Down
Loading
Loading