New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Establish pattern for generic round trip request response handling #1138
Establish pattern for generic round trip request response handling #1138
Conversation
7605d05
to
a19dcad
Compare
e7f62c6
to
fb401c7
Compare
Any of @carver or @cburgdorf , can I get a high level review on this? It's basically ready for a real review, but I'm struggling a bit with how to do the type hinting in a sane way. |
Integration test, tail end of stack trace:
|
@carver thnx, got a fix in for that, still something wrong that I haven't sorted out, but the overall structure of this is still fair game to review. |
trinity/sync/full/constants.py
Outdated
@@ -1,3 +1,4 @@ | |||
# How old (in seconds) must our local head be to cause us to start with a | |||
# fast-sync before we switch to regular-sync. | |||
FAST_SYNC_CUTOFF = 60 * 60 * 24 | |||
FAST_SYNC_CUTOFF = 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing line slipped in?
async def _run(self) -> None: | ||
for attr in self._managers.keys(): | ||
manager = getattr(self, attr) | ||
self.run_child_service(manager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're talking about num_peers * num_commands
services running, right? Not sure how many commands there will be, but let's say eventually 50. If someone wanted to experiment with 200 peers, we're talking 10,000 different services running. Not sure when overhead of multiple tasks starts to overwhelm asyncio, but I wouldn't be surprised if it was an issue at 10k.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formula is a little off but it doesn't discount your concern entirely.
num_peers *
num_command_pairs`
Currently I think the eth
protocol has 4-5 command pairs and the LES protocol has maybe 8-10. So only request/response command pairs. That at least pushes the overhead of this down by a decent bit, and for now, I really like how clean it is for this to be self contained. We can reduce the overhead by moving the subscription from the Manager
classes down to the Handler
classes and having the handler dispatch to each of the mangers which moves it from O(n)
to O(1)
per peer.
trinity/protocol/common/handlers.py
Outdated
manager = getattr(self, attr) | ||
self.run_child_service(manager) | ||
|
||
while not self.cancel_token.triggered: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or the other common construction: while self.is_running:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realizing I can drop the inner loop (and thus some task switching overhead) by removing this loop and replacing with await self.cancel_token.wait()
trinity/protocol/common/managers.py
Outdated
# Service API | ||
# | ||
async def _run(self) -> None: | ||
self.logger.debug("Running %s for peer %s", self.__class__.__name__, self._peer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe: Running Launching
trinity/protocol/common/managers.py
Outdated
|
||
def _handle_msg(self, msg: ResponseType) -> None: | ||
if self.pending_request is None: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth a debug message that a peer sent an unexpected msg.
p2p/peer.py
Outdated
timeout=CHAIN_SPLIT_CHECK_TIMEOUT, | ||
) | ||
|
||
msgs = [msg_buffer.msg_queue.get_nowait()[1:] for _ in range(msg_buffer.queue_size)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took me a bit to parse/remember that this was for capturing inbound requests for later processing. MsgBuffer
and this is a lot of stuff that doesn't really have to do with the DAO fork. Maybe push it into BasePeer
, which might get reused elsewhere:
with peer.buffer_inbound_commands() as buffer:
headers = await peer.handler.get_block_headers(...
msgs = buffer.get_messages()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh, I like this.
p2p/peer.py
Outdated
if len(headers) != 2: | ||
raise DAOForkCheckFailure( | ||
"Peer failed to return all requested headers for DAO fork check" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this removed so we can connect to peers who haven't sync'd that far yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's removed because the response will have already been validated to contain exactly 2 headers as part of the core API for request/response validation.
trinity/protocol/eth/peer.py
Outdated
_handler: ETHRequestResponseHandler = None | ||
|
||
@property | ||
def handler(self) -> ETHRequestResponseHandler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handler alone is such a generic word: it could mean connection handler, service handler, etc.
Could it just be named for the sub protocol it uses, like:
peer = EthPeer(...)
peer.eth.get_block_headers(...)
and
peer = LESPeer(...)
peer.les.get_proof(...)
etc. Reading it would always be explicit about which subprotocol is being used.
1a3d762
to
73ea0e5
Compare
@carver This is ready for another pass. Changes should be in the newest commit
|
One more change pushed which allows adding a |
I was optimistic I could take this apart into smaller pieces but upon trying they are all kind of woven together. |
fb1a972
to
404c926
Compare
404c926
to
fbddd5f
Compare
try: | ||
# Although connect() may seem like a more appropriate place to perform the DAO fork | ||
# check, we do it here because we want to perform it for incoming peer connections as | ||
# well. | ||
msgs = await self.ensure_same_side_on_dao_fork(peer) | ||
with peer.collect_sub_proto_messages() as buffer: | ||
await self.ensure_same_side_on_dao_fork(peer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's unrelated to this PR and I remember silently reading a discussion around this a while back so please forgive if my comment doesn't make sense but it bothers me (probably same as everyone else) that we have this dao check built right into the PeerPool
. Did we consider to allow passing something like a precondition_predicate
(name tbd) to the PeerPool
that the PeerPool
uses to check whether peers qualify for further communication. That way, we don't hardcode the dao check here and keep things flexible for other chains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cburgdorf one of the tasks that I don't think there is a detailed issue for but is likely to be a core part of one of the subsequent milestones is to tease out the rest of this EVM logic from the p2p
module. Exactly how isn't clear but it might look something like you mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too familiar with the current inner workings but I tried to familiarize with it a little bit in order to properly review this.
It looks sane to me and so my comments mainly focus on the type hints.
|
||
assert collector not in peer._subscribers | ||
|
||
# yeild to let remote and peer transmit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo yield
trinity/protocol/common/managers.py
Outdated
from .requests import BaseRequest | ||
|
||
|
||
PeerClass = TypeVar('PeerClass', bound=BasePeer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The informal convention for type vars is to be either written as singe letter capitals (e.g. K
, V
in a key-value context), or if more context is appropriate (as in this case here), to be written prefixed with an upper T
(for type) so that these become and TPeerClass
, TRequest
, TResponse
, TReturn
.
trinity/protocol/common/managers.py
Outdated
pass | ||
|
||
@abstractmethod | ||
def __call__(self) -> ReturnType: # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can drop that ignore
here. Ignoring the violating places is enough. However, maybe we can get rid of the violations altogether. I'll elaborate in another comment.
trinity/protocol/eth/managers.py
Outdated
|
||
_response_msg_type: Type[Command] = BlockHeaders | ||
|
||
async def __call__(self, # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature doesn't align with the parent. The signature here (the child) is
__call__(self, # type: ignore
block_number_or_hash: BlockIdentifier,
max_headers: int = None,
skip: int = 0,
reverse: bool = True,
timeout: int = None) -> Tuple[BlockHeader, ...]
The signature in the parent is:
def __call__(self) -> ReturnType:
While the ReturnType
matches just fine, notice that the parent method is defined as one without any arguments whereas the child implementation does have arguments. That is violating the type safety rules.
What we could do is to define __call__
in the parent as
__call__(self, req_params: TRequest) -> TReturn:`
Then, in the child it's perfectly valid to implement it as
__call__(self, req_params: HeaderRequest) -> Tuple[BlockHeader, ...]:`
...
So, instead of passing the arguments as individual arguments that mypy doesn't know about, we use the type of the request that the subclass is specialized for (as specified via BaseRequestManager[T1, T2, T3, T4]
Hope that helps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, there is also the alternative to define __call__
in parent and child as *args: Any
but that gives up type safety whereas the previous solution doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intentional. I spent a decent amount of time trying to figure out a sane way to do this and this is the result. Here are the properties I was able to achieve.
- the
__call__
method is required to be implemented on subclasses. - call sites which call this class still do correct type checks on the function signature (i.e. we don't loose type safety when this function is being called)
- all
__call__
implementations on subclasses require a# type: ignore
comment. - we don't have to manually instantiate a
Request
object in order to use the API (this one is important)
I've added a comment to both of the Manager
subclasses noting this.
I'm open to alternate solutions but all that I've seen have trade-offs that I'm not ok with (like requiring a Request
object to use the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, I just noticed a comment regarding "figuring out types" and jumped in here. I'm fine with the ignore
if it's intentional and you put thought into it.
Just out of curiosity, why is it so important to not require to manually instantiate a Request
object? Doesn't feel like a big deal to me to write
foo(Request(1, 2, 'bar'))
vs
foo(1, 2, 'bar')
But there's probably something more important about it that I'm not seeing, that's why I'm asking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a big deal, but it exposes the underlying implementation which bothers me. It means every single call site for foo
must import Request
when really the Request
class is an internal API that callers should not be exposed to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It doesn't have to be Request
though. We could add one more generic type which could be set to something as lightweight as Tuple[int, int, str]
and make call sites look like foo((1, 2, 'bar'))
.
I wanted to find out if there's any other way how we could constrain an abstract method on only the return type and created a question on SO. But there doesn't seem to be anything and someone also pointed out that it's violating the Liskov Substitution Principle.
But anyway, I didn't mean to bikeshed about it. If it works for you, it works for me :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the Tuple[...]
example works because the signatures of these will vary widely in both argument type and count, so we'd have to use Tuple[Any, ...]
With this we'd end up with satisfying the type checker while losing a degree of type safety on the actual function calls (at least I think we would).
Right now this works for me. I'm open to an alternative if someone wants to take the time to figure it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the
Tuple[...]
example works because the signatures of these will vary widely in both argument type and count, so we'd have to useTuple[Any, ...]
No, what I meant is that you add one more generic param to the BaseRequestManager
and this one can be tailored exactly to the needs of that specific request manager. So, it may be a Tuple[int, int, bool, int]
or whatever is needed.
Here's a quick proof of concept:
However, using a Tuple
comes with it's own ergonomic downsides like not having default values. So, one quickly reaches for a NamedTuple
but at that point, one is left wondering if not simply using the existing Request
type would be the more lightweight approach.
Anyway, I'm fine with it as it stands.
9a95e65
to
cbb9aab
Compare
What was wrong?
Currently, the way that we implement the
Peer.get_block_headers
API is very boilerplate heavy. Doing it this way for all of the various requests and response pairs is not going to be maintainable.How was it fixed?
RequestResponseHandler
which exists on thePeer
class under thePeer.handler
propertyBaseRequestManager
which implements the generic logic for doing request/response handling.get_block_headers
API onLESPeer
andETHPeer
to instead use implementations of these APIs.Still to be done: Figure out the types.
Cute Animal Picture