Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure that reactors' implementation of Receive() method are non-blocking #2685

Open
cason opened this issue Mar 28, 2024 · 1 comment
Open
Assignees
Labels
enhancement New feature or request p2p

Comments

@cason
Copy link
Contributor

cason commented Mar 28, 2024

Motivated by #2533.

The Reactor interface defines methods that a component must implement in order in order to use the communication services provided by the p2p layer. A core method that a reactor has to be implement is Receive(), invoked by the p2p layer to deliver a message to the reactor. The implementation of this method should be non-blocking (see details here).

If the implementation of Receive() in some reactor blocks when processing a message received from a given peer, the receive routine (recvRoutine()) of the p2p connection with that peer also blocks until the Receive() call returns:

// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(channelID, msgBytes)

This scenario has some consequences:

  1. Messages received from the same peer are not handled by the p2p connection. This means that other reactors are affected, since they will not receive new messages until the blocking Receive() call returns;
  2. As described in feat: pause peer timeouts during longer block execution #2533, ping/pong messages, that play the role of protocol-level keepalive messages, are not handled either. As a result, the p2p connection pong timeout expires, then the connection produces an error and quits, which leads the switch to disconnect from that peer.

Possible solutions

This situation could be avoided if the Receive() call was not executed by the receive routine of the p2p connection. This change, however, is far from trivial, given the design of the p2p multiples connections. Moreover, even if the Receive() call is executed in a different routine, this other routine will eventually block if the Receive() method of a reactor blocks for a long period of time.

The other approach, suggested in this issue, is to review the implementation of each reactor to make sure that the Receive() method is not blocking. This approach is also not trivial, as ultimately the way to prevent this is to buffer received messages that cannot be immediately processed. Since buffers are always finite in size, this would eventually lead to dropping messages.

@evan-forbes
Copy link

evan-forbes commented Jun 3, 2024

one simple solution that we played around with was

https://github.com/celestiaorg/celestia-core/blob/146746b114624029432720cfa74e0a4a55a87b9b/p2p/base_reactor.go#L98-L105

the envelop buffers have to be quite large in order not to block for the consensus reactor, at least when blocks are large. As one might expect, most of messages that end up blocking are small messages, not larger messages such as block parts. The most common blocking channels are consensus state and votes, then block parts, at least from some recent experiments.

We're collecting some tcp packet traces soon, but when traced on a local machine we can see that the reason we're not able to utilize all of the bandwidth is simply because we are unable to empty the tcp buffers fast enough. This also explains why other optimizations (like not using a mempool) or changes to prioritization don't make any meaningful change to throughput, at least for large blocks.

One reason, but not the only reason, we block is because all incoming messages from all peers are effectively processed synchronously in the consensus reactor. This also explains the age old issue when connecting more peers reduces throughput. While not frequent, this can result in processing a vote, block part, or state message taking up to 700ms (!). Below is the graph of a particularly egregious example where we see max waits of 2.5seconds. (time taken to process a msg after receiving it in ms on y axis, channel on the x axis, max, average w/ stdev bars, and then the number of msgs for that channel that took over 100ms)

slow processing

Another is because we are not buffering the tcp connection properly. For example when we increase this constant, we see a meaningful but modest increase in throughput. I'm still working through the like 5 io.Reader/io.Writer buffered and unbuffered wrappers around the tcp connections. There are so many io.Reader/io.Writer wrapper around the tcp connections, its difficult to grok which need buffers and which actually degrade performance when we increase them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request p2p
Projects
Status: Todo
Development

No branches or pull requests

2 participants