Skip to content

Commit

Permalink
Handle concurrent WebSockets messages (#6210)
Browse files Browse the repository at this point in the history
* Test for #6169

* Use async on Subscription message processing

* Fix `ScheduleAction` usages

- Use async everywhere

* Parametrize test

* Reduce number of messages

* Add test for multiple concurrent subscriptions

* Add locks to avoid concurrency issues

- Locks :(

* Dispose semaphore

* Targeted new

* Reduce delays
  • Loading branch information
emlautarom1 authored Oct 25, 2023
1 parent a94ba7d commit 646ab7a
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,13 @@ public NewPendingUserOpsSubscription(

private void OnNewPending(object? sender, UserOperationEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result;
if (_includeUserOperations)
{
result = CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint });
}
else
{
result = CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
}
JsonRpcDuplexClient.SendJsonRpcResult(result);
result = _includeUserOperations
? CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint })
: CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"newPendingUserOperations subscription {Id} printed hash of newPendingUserOperations.");
});
}
Expand All @@ -89,5 +84,3 @@ public override void Dispose()
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,13 @@ public NewReceivedUserOpsSubscription(

private void OnNewReceived(object? sender, UserOperationEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result;
if (_includeUserOperations)
{
result = CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint });
}
else
{
result = CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
}
JsonRpcDuplexClient.SendJsonRpcResult(result);
result = _includeUserOperations
? CreateSubscriptionMessage(new { UserOperation = new UserOperationRpc(e.UserOperation), e.EntryPoint })
: CreateSubscriptionMessage(new { UserOperation = e.UserOperation.RequestId, e.EntryPoint });
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"newReceivedUserOperations subscription {Id} printed hash of newReceivedUserOperations.");
});
}
Expand All @@ -89,5 +84,3 @@ public override void Dispose()
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Filters;
using Nethermind.Blockchain.Find;
using Nethermind.Blockchain.Receipts;
Expand All @@ -18,17 +18,16 @@
using Nethermind.Core.Collections;
using Nethermind.Core.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Db;
using Nethermind.Db.Blooms;
using Nethermind.Facade.Eth;
using Nethermind.Int256;
using Nethermind.JsonRpc.Modules;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.JsonRpc.Modules.Subscribe;
using Nethermind.JsonRpc.WebSockets;
using Nethermind.Logging;
using Nethermind.Serialization.Json;
using Nethermind.Sockets;
using Nethermind.Specs;
using Nethermind.State.Repositories;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.TxPool;
using Newtonsoft.Json;
Expand Down Expand Up @@ -805,6 +804,101 @@ public void NewPendingTransactionsSubscription_on_NewPending_with_includeTransac
expectedResult.Should().Be(serialized);
}

[TestCase(2)]
[TestCase(5)]
[TestCase(10)]
[Explicit("Requires a WS server running")]
public async Task NewPendingTransactionSubscription_multiple_fast_messages(int messages)
{
ITxPool txPool = Substitute.For<ITxPool>();

using ClientWebSocket socket = new();
await socket.ConnectAsync(new Uri("ws://localhost:1337/"), CancellationToken.None);

using ISocketHandler handler = new WebSocketHandler(socket, NullLogManager.Instance);
using JsonRpcSocketsClient client = new(
clientName: "TestClient",
handler: handler,
endpointType: RpcEndpoint.Ws,
jsonRpcProcessor: null!,
jsonRpcService: null!,
jsonRpcLocalStats: new NullJsonRpcLocalStats(),
jsonSerializer: new EthereumJsonSerializer()
);

using NewPendingTransactionsSubscription subscription = new(
jsonRpcDuplexClient: client,
txPool: txPool,
logManager: LimboLogs.Instance);

for (int i = 0; i < messages; i++)
{
Transaction tx = new();
txPool.NewPending += Raise.EventWith(new TxEventArgs(tx));
}

// Wait until all messages are sent
await Task.Delay(1_000);
}

[TestCase(2)]
[TestCase(5)]
[TestCase(10)]
[Explicit("Requires a WS server running")]
public async Task MultipleSubscriptions_concurrent_fast_messages(int messages)
{
using ClientWebSocket socket = new();
await socket.ConnectAsync(new Uri("ws://localhost:1337/"), CancellationToken.None);

using ISocketHandler handler = new WebSocketHandler(socket, NullLogManager.Instance);
using JsonRpcSocketsClient client = new(
clientName: "TestClient",
handler: handler,
endpointType: RpcEndpoint.Ws,
jsonRpcProcessor: null!,
jsonRpcService: null!,
jsonRpcLocalStats: new NullJsonRpcLocalStats(),
jsonSerializer: new EthereumJsonSerializer()
);

Task subA = Task.Run(() =>
{
ITxPool txPool = Substitute.For<ITxPool>();
using NewPendingTransactionsSubscription subscription = new(
// ReSharper disable once AccessToDisposedClosure
jsonRpcDuplexClient: client,
txPool: txPool,
logManager: LimboLogs.Instance);
for (int i = 0; i < messages; i++)
{
Transaction tx = new();
txPool.NewPending += Raise.EventWith(new TxEventArgs(tx));
}
});
Task subB = Task.Run(() =>
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
using NewHeadSubscription subscription = new(
// ReSharper disable once AccessToDisposedClosure
jsonRpcDuplexClient: client,
blockTree: blockTree,
specProvider: new TestSpecProvider(new ReleaseSpec()),
logManager: LimboLogs.Instance);
for (int i = 0; i < messages; i++)
{
BlockReplacementEventArgs eventArgs = new(Build.A.Block.TestObject);
blockTree.BlockAddedToMain += Raise.EventWith(eventArgs);
}
});

await Task.WhenAll(subA, subB);

// Wait until all messages are sent
await Task.Delay(1_000);
}

[Test]
public void NewHeadSubscription_with_baseFeePerGas_test()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public DroppedPendingTransactionsSubscription(

private void OnEvicted(object? sender, TxEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result = CreateSubscriptionMessage(e.Transaction.Hash);
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace)
_logger.Trace(
$"DroppedPendingTransactions subscription {Id} printed hash of DroppedPendingTransaction.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Filters;
using Nethermind.Blockchain.Find;
Expand Down Expand Up @@ -62,10 +63,10 @@ private void OnReceiptsInserted(object? sender, ReceiptsEventArgs e)

private void TryPublishReceiptsInBackground(BlockHeader blockHeader, Func<TxReceipt[]> getReceipts, string eventName, bool removed)
{
ScheduleAction(() => TryPublishEvent(blockHeader, getReceipts(), eventName, removed));
ScheduleAction(async () => await TryPublishEvent(blockHeader, getReceipts(), eventName, removed));
}

private void TryPublishEvent(BlockHeader blockHeader, TxReceipt[] receipts, string eventName, bool removed)
private async Task TryPublishEvent(BlockHeader blockHeader, TxReceipt[] receipts, string eventName, bool removed)
{
BlockHeader fromBlock = _blockTree.FindHeader(_filter.FromBlock);
BlockHeader toBlock = _blockTree.FindHeader(_filter.ToBlock, true);
Expand All @@ -80,7 +81,7 @@ private void TryPublishEvent(BlockHeader blockHeader, TxReceipt[] receipts, stri
foreach (var filterLog in filterLogs)
{
JsonRpcResult result = CreateSubscriptionMessage(filterLog);
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"Logs subscription {Id} printed new log.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public NewHeadSubscription(

private void OnBlockAddedToMain(object? sender, BlockReplacementEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result = CreateSubscriptionMessage(new BlockForRpc(e.Block, _includeTransactions, _specProvider));
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"NewHeads subscription {Id} printed new block");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;
using Nethermind.JsonRpc.Data;
using Nethermind.JsonRpc.Modules.Eth;
using Nethermind.Logging;
Expand Down Expand Up @@ -31,10 +32,10 @@ public NewPendingTransactionsSubscription(

private void OnNewPending(object? sender, TxEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
JsonRpcResult result = CreateSubscriptionMessage(_includeTransactions ? new TransactionForRpc(e.Transaction) : e.Transaction.Hash);
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
if (_logger.IsTrace) _logger.Trace($"NewPendingTransactions subscription {Id} printed hash of NewPendingTransaction.");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected Subscription(IJsonRpcDuplexClient jsonRpcDuplexClient)
public string Id { get; }
public abstract string Type { get; }
public IJsonRpcDuplexClient JsonRpcDuplexClient { get; }
private Channel<Action> SendChannel { get; } = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
private Channel<Func<Task>> SendChannel { get; } = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions { SingleReader = true });

public virtual void Dispose()
{
Expand All @@ -42,7 +42,7 @@ protected JsonRpcResult CreateSubscriptionMessage(object result)
}, default);
}

protected void ScheduleAction(Action action)
protected void ScheduleAction(Func<Task> action)
{
SendChannel.Writer.TryWrite(action);
}
Expand All @@ -55,11 +55,11 @@ private void ProcessMessages()
{
while (await SendChannel.Reader.WaitToReadAsync())
{
while (SendChannel.Reader.TryRead(out Action action))
while (SendChannel.Reader.TryRead(out Func<Task> action))
{
try
{
action();
await action();
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private class SubscriptionSyncingResult

private void OnConditionsChange(object? sender, BlockEventArgs e)
{
ScheduleAction(() =>
ScheduleAction(async () =>
{
SyncingResult syncingResult = _ethSyncingInfo.GetFullInfo();
bool isSyncing = syncingResult.IsSyncing;
Expand Down Expand Up @@ -78,7 +78,7 @@ private void OnConditionsChange(object? sender, BlockEventArgs e)
}
JsonRpcDuplexClient.SendJsonRpcResult(result);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
_logger.Trace($"Syncing subscription {Id} printed SyncingResult object.");
});
}
Expand Down
Loading

0 comments on commit 646ab7a

Please sign in to comment.