Skip to content

Commit

Permalink
Merge pull request #331 from pipermerriam/piper/handle-closed-channel…
Browse files Browse the repository at this point in the history
…-in-explore

Unhandled crashes
  • Loading branch information
pipermerriam committed Dec 19, 2020
2 parents 3be4d51 + b7ed77c commit 218b44b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
11 changes: 9 additions & 2 deletions ddht/v5_1/explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def _bond_then_send(self, enr: ENRAPI,) -> None:
# In the event that the consumer of `recursive_find_nodes`
# exits early before the lookup has completed we can end up
# operating on a closed channel.
pass
return

async def _explore(self, node_id: NodeID, max_distance: int,) -> None:
"""
Expand Down Expand Up @@ -293,7 +293,14 @@ async def _source_initial_nodes(self) -> None:
if enr.node_id not in self.seen:
self.seen.add(enr.node_id)
self._condition.notify_all()
await self._send_channel.send(enr)

try:
await self._send_channel.send(enr)
except (trio.BrokenResourceError, trio.ClosedResourceError):
# In the event that the consumer of `recursive_find_nodes`
# exits early before the lookup has completed we can end up
# operating on a closed channel.
return

self.logger.debug2("%s: finished seeding nodes for exploration", self)
self._exploration_seeded.set()
5 changes: 4 additions & 1 deletion ddht/v5_1/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ async def worker(
tasks = tuple(
(do_lookup, (node_id, send_channel)) for node_id in node_ids
)
await adaptive_timeout(*tasks, threshold=1, variance=2.0)
try:
await adaptive_timeout(*tasks, threshold=1, variance=2.0)
except trio.TooSlowError:
pass

async with condition:
# Remove the `node_ids` from the in_flight set.
Expand Down

0 comments on commit 218b44b

Please sign in to comment.