Add new block component to propagate new block messages #2058
Conversation
3c734f6
to
c788319
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an initial pass at part 1. of this issue, any feedback on the general implementation would be great!
Once block is fully processed - send NewBlockHashes to all peers it didn't notify earlier
In terms of part 2, to assert when a block is considered "fully processed", is this as simple as listening for a StatelessBlockImportDone
event?
@property | ||
def is_enabled(self) -> bool: | ||
# should there be a cli flag that disables this? | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a cli flag to disable this feature? Maybe for certain sync modes? Or should it always be enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, can't think of a reason to disable it right now. Maybe just leave always on, for now.
# Add peer to tracker if we've seen this block before | ||
if header.mining_hash in self._peer_block_tracker: | ||
# what if the same peer broadcasts duplicate new blocks? | ||
if sender_peer_str not in self._peer_block_tracker[header.mining_hash]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this check necessary? How likely is it that the same peer would send us a NewBlock
message for the same block twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One lens to use is: could a malicious peer send us the same block over and over again to DoS us? It's not obvious that it would be terrible for this list to get 1000s of elements long with duplicates, so maybe it's not a problem. The presence check is also pretty cheap, though, since the total number of connected peers is also small. So I guess I would keep it.
header.difficulty | ||
) | ||
except ValidationError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth to log a message that sender
has send us an invalid block.
6aa7b76
to
153d96f
Compare
trinity/sync/beam/importer.py
Outdated
@@ -491,7 +491,7 @@ def _broadcast_import_complete( | |||
future.result() if completed else None, | |||
future.exception() if completed else None, | |||
), | |||
broadcast_config, | |||
# broadcast_config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cburgdorf Could you can help clear up my misunderstanding of what's going wrong here. In order for the NewBlockComponent
to "hear" a StatelessBlockImportDone
event, I removed this broadcast_config
which afaik means that rather than just broadcasting to the networking process, it broadcasts to the entire event bus. This seems not ideal. So I tried a couple different ways to subscribe to the StatelessBlockImportDone
event, but was unable to make that connection. Could you point me in the right direction / help me understand how to implement the proper communication between the importer and the NewBlockComponent
? I can also jump on a call if it's easier to describe in person.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which afaik means that rather than just broadcasting to the networking process, it broadcasts to the entire event bus.
That is correct but sending it to the networking process only wouldn't help you here anyway because you want to listen to it in your new component process.
But there's a second important aspect to it if we look at the BroadcastConfig
in detail
trinity/trinity/sync/beam/importer.py
Line 592 in a487b20
event.broadcast_config(), |
It is generated from an incoming event. That in turn means that it will only be send as a reply to the specific code that originated the event.
trinity/trinity/sync/beam/chain.py
Line 766 in a487b20
import_done = await self._event_bus.request(DoStatelessBlockImport(block)) |
So even if multiple places within the networking process would listen for it, only the one that send the DoStatelessBlockImport
would receive this event as a reply.
I think removing the broadcast_config
entirely is actually the right thing to do here and its not as bad as you may think. The event bus has a mechanism to pass subscriptions across different endpoints which in practice means it won't actually send the event to an endpoint that does not have a single subscription to the event.
Now, there is one important thing that I'm not 100 % sure of and I you should double check that. If you do remove the broadcast_config
, will it still be a valid answer ending up as a reply at
trinity/trinity/sync/beam/chain.py
Line 766 in a487b20
import_done = await self._event_bus.request(DoStatelessBlockImport(block)) |
I think it is but I don't remember exactly and it seems the functionality isn't covered by Lahja tests. If it turns out that this would break the request / response, then I think just broadcasting your own event would be a reasonable workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is but I don't remember exactly and it seems the functionality isn't covered by Lahja tests. If it turns out that this would break the request / response, then I think just broadcasting your own event would be a reasonable workaround.
Thanks! This made a lot of sense, using StatelessBlockImportDone
did seem to break the request / response. When using StatelessBlockImportDone
to trigger the broadcast, beam sync would pause (aka not sync the next block) until the next pivot occurred, indicating to me that import_done
was never resolved. I went ahead and introduced a new event, BroadcastImportedBlock
instead.
dbe6e20
to
08798ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems pretty close, GTG after a couple notes.
self._event_bus = event_bus | ||
self._peer_pool = peer_pool | ||
# tracks which peers have seen a block | ||
self._peer_block_tracker: Dict[bytes, List[str]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is going to grow forever, so it at least needs a comment & issue noting that this should be resolved. At some point, old enough headers would have to be removed from the object, to avoid unbounded memory growth.
for peer in eligible_peers: | ||
target_peer = await self._peer_pool.ensure_proxy_peer(peer.session) | ||
target_peer.eth_api.send_new_block_hashes((new_block_hash,)) | ||
self._peer_block_tracker[block.hash].append(str(target_peer)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how being in Trio changes things, but I would worry about something like this holding onto the event loop for too long. It depends on the implementation of ensure_proxy_peer
. If that's not guaranteed to release the loop, then you might want to add trio's version of await asyncio.sleep(0)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's not guaranteed to release the loop...
It's my (limited) understanding that this is not much of a concern in trio, since every await
is a checkpoint (which checks to see if it's time to switch to another task).... but maybe @cburgdorf can help confirm/deny this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly can't tell. @gsalgado is the one who understands all the nitty gritty details of asyncio
and trio
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to https://trio.readthedocs.io/en/stable/reference-core.html#checkpoints, trio currently always switches to a new task at every checkpoint (e.g. an await something(...)
call), so right now there's no risk of us running all iterations of this for-loop without releasing the event loop, regardless of how ensure_proxy_peer()
is implemented. If ensure_proxy_peer()
were actually a sync method (albeit having an async signature) and trio's scheduler behavior changed, then we could end up running all iterations of this for-loop without releasing the event loop, which I believe is what @carver was worried about? I'd say we just need to make sure ensure_proxy_peer()
is not a sync method (i.e. it has one or more trio checkpoints, which is something any method with an async signature should have anyway)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I see from the linked docs that trio's goal is to always release the event loop in every trio-level async function. This is different from asyncio. For example, getting an item from a queue will not release the loop if there is an item ready. So it requires less investigation to make sure ensure_proxy_peer
is okay here. As long as every code path inside the method eventually calls into some base-level trio async function, then we should be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation @gsalgado! Just to help clear my understanding, while ensure_proxy_peer
does have the async
signature and contains a trio checkpoint, that checkpoint is only touched if the inclusion check fails...
trinity/trinity/protocol/common/peer_pool_event_bus.py
Lines 347 to 359 in 65bf6ac
async def ensure_proxy_peer(self, session: SessionAPI) -> TProxyPeer: | |
if session not in self.connected_peers: | |
proxy_peer = self.convert_session_to_proxy_peer( | |
session, | |
self.event_bus, | |
self.broadcast_config | |
) | |
self.connected_peers[session] = proxy_peer | |
self.manager.run_child_service(proxy_peer) | |
await proxy_peer.manager.wait_started() | |
return self.connected_peers[session] |
As long as every code path inside the method eventually calls into some base-level trio async function..
So, we could have the case where we iterate many times without releasing the event loop, but it seems like in this case it's not a concern since the function will return the ProxyPeer
immediately if it exists inside connected_peers
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we could have the case where we iterate many times without releasing the event loop, but it seems like in this case it's not a concern since the function will return the ProxyPeer immediately if it exists inside connected_peers?
I think @carver's concern was not with the performance of ensure_proxy_peer()
itself, but with that of the other calls performed on every iteration of the loop. Currently, the fact that ensure_proxy_peer()
may return immediately (without any trio checkpoints) is not a problem given how the trio scheduler works, but as their docs said that may change. I was going to suggest adding an else:
block to ensure_proxy_peer()
with a trio.checkpoint()
call, but that could cause the return statement to crash if the peer was removed from self.connected_peers()
by another coroutine, so maybe the best option is to add the checkpoint at the beginning of the method, with a comment explaining why we do so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe add a checkpoint at the end of every iteration of the for loop where this is called
trinity/sync/common/events.py
Outdated
@@ -140,6 +140,17 @@ class StatelessBlockImportDone(BaseEvent): | |||
exception: BaseException | |||
|
|||
|
|||
@dataclass | |||
class BroadcastImportedBlock(BaseEvent): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe NewBlockImported
would be a more idiomatic name. The current name is tailored around the consuming component but there's nothing stopping anyone to build other components that use this event for other things.
trinity/sync/common/events.py
Outdated
class BroadcastImportedBlock(BaseEvent): | ||
""" | ||
Event that is only emitted after a new block has been successfully imported. | ||
This event communicates to the `NewBlockComponent` that a block is ready to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would drop the second sentence as this is just one current use case but the use cases for components that listen for freshly imported blocks could be endless.
bfef698
to
86305b0
Compare
86305b0
to
3e733e6
Compare
What was wrong?
Fixes #1133
Rules sourced from devp2p spec
When a
NewBlock
announcement message is received from a peer.NewBlock
messageImport block into local chain ...
NewBlockHashes
to all peers it didn't notify earlieris not the immediate successor of the client's current latest block
(it doesn't seem to me that this step is within the scope of this pr)
How was it fixed?
Wrote a
NewBlockComponent
that handles the above tasks. Also introduced aBroadcastImportedBlock
event to trigger block broadcasts after they've been imported.To-Do
Cute Animal Picture