-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] (asyncio) Only send subscribed messages #42
Conversation
This comes with some interface changes: - connect_to_endpoints_blocking no longer exists, there's no way to start a server without using await - it's important that endpoint.stop() is called when the endpoint is no longer needed. If it is not called a lot of coros will be orphaned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just some things I noticed.
self.reader = reader | ||
self._drain_lock = asyncio.Lock() | ||
|
||
@staticmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it should be a classmethod
and return an instance of cls
self._received_response = asyncio.Condition() | ||
|
||
async def run(self) -> None: | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be inverted, with the try/except inside the while
loop such that the except
block uses break
to escape the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some more idle comments (I know this is a WIP, ignore anything that isn't appropriate at this stage of the PR)
async with self._received_response: | ||
self._received_response.notify_all() | ||
else: | ||
self.logger.error(f'received unexpected message: {message}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to include the type(message)
here since it may not be clear what message
is when represented as a string.
async with self._received_response: | ||
await self.conn.send_message(SubscriptionsUpdated(subscriptions, block)) | ||
if block: | ||
await self._received_response.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of understand this but only because I've spent some brain-space in the past trying to understand the use case for asyncio.Condition
. Can you maybe create a large-ish comment that tries to lay out exactly what is happening here and why. This interplay between synchronization primatives seems appropriate but also hard for a newcomer to understand.
self.name = name | ||
self.subscribed_messages: Set[Type[BaseEvent]] = set() | ||
|
||
self.logger = logging.getLogger('lahja.endpoint.OutboundConnection') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be defined at the class level.
self._received_subscription = asyncio.Condition() | ||
|
||
async def run(self) -> None: | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here with reducing the scope of the try
block to only contain the part that can throw the exception.
pass | ||
|
||
def can_send_item(self, item: BaseEvent, config: Optional[BroadcastConfig]) -> bool: | ||
is_response = config is not None and config.filter_event_id is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is an early exit condition for passes_config
, maybe optimize this a little by doing that check before you compute is_response
or passes_filter
since they are not needed in the early exit case.
@@ -109,17 +254,23 @@ class Endpoint: | |||
_internal_queue: asyncio.Queue | |||
_internal_loop_running: asyncio.Event | |||
|
|||
_loop: asyncio.AbstractEventLoop | |||
_server_running: asyncio.Event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems prudent to have as a public API since it is likely a common thing to want to wait until the endpoint is active...
|
||
# Broadcast to every connected Endpoint that is allowed to receive the event | ||
compressed_item = self._compress_event(item) | ||
for name, remote in list(self._outbound_connections.items()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick. In another place you use copy
. Here you cast to a list
. Can we use a common mechanism for this (like maybe tuple()
because I love them so much more than lists 😄 )
@@ -51,12 +55,12 @@ | |||
|
|||
tracker = Tracker() | |||
|
|||
endpoint1.subscribe( | |||
await endpoint1.subscribe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This falls into the same category as the broadcast
that became async. We use this API in a bunch of places that would need to turn into async
and I actually quite like that synchronous code could use subscribe
to consume events :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah! I should come back and look at subscribe
, my idea was that I wanted to use an architecture which got rid of these race conditions once and for all. So subscribe
blocks until all the inbound connections have confirmed that they're only sending the requested messages. This was supposed to reduce the number of asyncio.sleep()
I added to the tests. It didn't get rid of very many asyncio.sleep()
calls though, and I later added the more useful wait_until_subscribed_to
, so I'll look into simplifying the code and making subscribe
a regular call again.
Going to review this once the other two PRs landed and this got rebased. |
Closing in favor of #55 |
Super WIP, opened to see what CircleCI's tests say
A replacement for #36 built on top of #39.
TODO:
What was wrong?
Issue #
How was it fixed?
Summary of approach.
Cute Animal Picture