New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Further flesh out round trip request/response API #1089
Further flesh out round trip request/response API #1089
Conversation
@gsalgado wanted to ping you with this early POC. This is the sort of API that I want to move towards for peer interactions. Curious to hear your thoughts. |
p2p/peer.py
Outdated
@@ -694,6 +698,13 @@ def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgT | |||
if actual_td > self.head_td: | |||
self.head_hash = actual_head | |||
self.head_td = actual_td | |||
elif isinstance(cmd, eth.BlockHeaders): | |||
# try to match with a request | |||
for request, waiter in self.pending_requests.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to support multiple in-flight BlockHeader
requests as we should never send multiple concurrent requests to a single peer, and it'd be better to have an API that enforces that rather than one that supports multiple concurrent requests as I'm pretty sure that will end up being misused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that.
d0879c2
to
9d69f7a
Compare
2646102
to
04f3dd4
Compare
f02aaa1
to
5f6bb64
Compare
3996fcd
to
04c002a
Compare
307d1cb
to
d08a4ff
Compare
d08a4ff
to
b6afa5f
Compare
@gsalgado this is ready for a second pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I've a few suggestions but my main concern is the exception with the same name as the one from the eth
package
@@ -313,9 +302,6 @@ class LightChainSyncer(BaseHeaderChainSyncer): | |||
msg: protocol._DecodedMsgType) -> None: | |||
if isinstance(cmd, les.Announce): | |||
self._sync_requests.put_nowait(peer) | |||
elif isinstance(cmd, les.BlockHeaders): | |||
msg = cast(Dict[str, Any], msg) | |||
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg['headers']))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's worth leaving a comment here that BlockHeaders
messages are not handled here because they're handled in a Peer method? Otherwise someone reading this code might have a hard time figuring out why those messages are not handled like the others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we should probably leave the elif
block here otherwise it will be handled in the else
block, which logs a debug msg and may be confusing to someone reading the logs
@@ -538,9 +525,7 @@ def request_receipts(self, target_td: int, headers: List[BlockHeader]) -> int: | |||
async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command, | |||
msg: protocol._DecodedMsgType) -> None: | |||
peer = cast(ETHPeer, peer) | |||
if isinstance(cmd, eth.BlockHeaders): | |||
self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg))) | |||
elif isinstance(cmd, eth.BlockBodies): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
self.skip = skip | ||
self.reverse = reverse | ||
|
||
def validate_response(self, response: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to use Any
here, even though you raise a ValidationError if it's not a tuple?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Any
is what we want. It is allowed to pass anything into this function, but it will raise a validation error if it's not the correct type or it isn't well formed.
That said, I can see an argument for dropping both isinstance
checks here if we think that mypy
is sufficient to catch those cases (Which is the way I'm leaning)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I look at it, I don't think we get much protection from mypy
here, so I'm leaning towards keeping this as Any
and keeping the type checks within the function body. The way the message handling works in the Peer.handle_sub_proto_msg
I don't think mypy
is going to be able to enforce the appropriate types for the validate_response
message.
@@ -159,3 +159,10 @@ class NoInternalAddressMatchesDevice(BaseP2PError): | |||
def __init__(self, *args: Any, device_hostname: str=None, **kwargs: Any) -> None: | |||
super().__init__(*args, **kwargs) | |||
self.device_hostname = device_hostname | |||
|
|||
|
|||
class ValidationError(BaseP2PError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about this as it's the same name of the exception from the eth
package, and I believe we'll often end up writing code that catches one when in fact is the other that will be raised. Like we just saw with the OperationCancelled
exception after the move to the cancel_token lib
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @carver
how would the two of you feel about a common/shared validation library that exposed a common ValidationError
exception that we could use across all of our libraries. It is one of those things that we have implemented almost everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else: | ||
future.set_result(msg) | ||
self.pending_requests.pop(cmd_type) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could move the whole block above into BasePeer.handle_sub_proto_msg()
and then avoid having to duplicate it in both ETHPeer
and LESPeer
, no?
p2p/protocol.py
Outdated
|
||
@property | ||
@abstractmethod | ||
def MAX_HEADERS_FETCH(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it weird to have an all-uppercase property. Any reason for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a constant so I mirrored the casing. I'm fine lower casing it.
p2p/protocol.py
Outdated
@property | ||
@abstractmethod | ||
def MAX_HEADERS_FETCH(self) -> int: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this raise NotImplementedError()? We definitely don't want it to return None, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstractmethod
decorator should take care of this for us, preventing us from instantiating the class if this property has not been overridden.
@@ -13,6 +14,23 @@ | |||
BLOCK_HASH = b'\x01' * 32 | |||
|
|||
|
|||
class HeaderRequest(BaseHeaderRequest): | |||
MAX_HEADERS_FETCH = 192 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought you couldn't overwrite an @abstractmethod
by defining a class attribute in a subclass. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brief testing shows this works as expected. The parent class with the abstract property disallows instantiation. The child class with the property set, allows instantiation and proper attribute access.
p2p/peer.py
Outdated
try: | ||
request.validate_response(msg) | ||
except ValidationError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log this, probably as a warning even, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, though I've been questioning the only one request of a single type in-flight per message type restriction. I'm going to keep it in place, but unless the spec says that it's disallowed, I'd like to support it eventually. Sending concurrent requests to high performing peers seems like a way to speed up sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, but IIRC geth doesn't do that, so it might be worth finding out why before we consider doing it ourselves.
@gsalgado PR review pushed. It includes some minor logic changes to ensure we don't end up with dangling requests in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
What was wrong?
Still too much manual work to be done for round trip requests for block headers.
How was it fixed?
p2p.peer.BaseRequest
ETHPeer
which houses a mapping betweenCommand -> Future
.handle_sub_process_msg
, if there is a waiting future for a given command, it will set the future result.ETHPeer.get_block_headers
which does the full request/response round trip.This has a really nice side effect of not needing the chain syncers to handle the
BlockHeaders
response, and instead can justawait peer.get_block_headers(...)
, removing a decent amount of complexity from the class. If we continue this pattern for the rest of the block body parts I think the chain syncer is going to look a lot cleaner and easier to grok.Cute Animal Picture