Skip to content

Commit

Permalink
Merge pull request #157 from pipermerriam/piper/ensure-lahja-closes-s…
Browse files Browse the repository at this point in the history
…treams

Ensure lahja closes asyncio streams
  • Loading branch information
pipermerriam committed Sep 10, 2019
2 parents 60dec3a + c3dc16d commit 2483638
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 19 deletions.
9 changes: 8 additions & 1 deletion lahja/asyncio/endpoint.py
Expand Up @@ -88,6 +88,11 @@ async def connect_to(cls, path: Path) -> ConnectionAPI:
reader, writer = await asyncio.open_unix_connection(str(path))
return cls(reader, writer)

async def close(self) -> None:
if not self.reader.at_eof():
self.reader.feed_eof()
self.writer.close()

async def send_message(self, message: Msg) -> None:
pickled = pickle.dumps(message)
size = len(pickled)
Expand All @@ -106,7 +111,7 @@ async def send_message(self, message: Msg) -> None:
# We don't do a pre-check for a closed writer since this is a performance critical
# path. We also don't want to swallow runtime errors unrelated to closed handlers.
if "handler is closed" in str(err):
self.logger.warning("Failed to send %s. Handler closed.", message)
self.logger.debug("Failed to send %s. Handler closed.", message)
raise RemoteDisconnected from err
raise

Expand Down Expand Up @@ -175,6 +180,8 @@ async def stop(self) -> None:
await self._task
except asyncio.CancelledError:
pass
finally:
await self.conn.close()


TFunc = TypeVar("TFunc", bound=Callable[..., Any])
Expand Down
10 changes: 7 additions & 3 deletions lahja/base.py
Expand Up @@ -52,13 +52,17 @@ class ConnectionAPI(ABC):
async def connect_to(cls, path: Path) -> "ConnectionAPI":
...

@abstractmethod
async def close(self) -> None:
...

@abstractmethod
async def send_message(self, message: Msg) -> None:
pass
...

@abstractmethod
async def read_message(self) -> Message:
pass
...


class RemoteEndpointAPI(ABC):
Expand Down Expand Up @@ -456,7 +460,7 @@ def broadcast_nowait(
implementation should be expected to cause problems.
"""
pass
...

@abstractmethod
async def request(
Expand Down
22 changes: 9 additions & 13 deletions lahja/tools/drivers/actions.py
Expand Up @@ -87,26 +87,22 @@ def wait_for(
GetResponseFn = Callable[[EndpointAPI, BaseRequestResponseEvent[BaseEvent]], BaseEvent]


async def _serve_response(
endpoint: EndpointAPI,
request: BaseRequestResponseEvent[BaseEvent],
get_response: GetResponseFn,
) -> None:
response = get_response(endpoint, request)
await endpoint.broadcast(response, config=request.broadcast_config())
logger.debug("[%s] sent response: %s", endpoint, response)


def serve_request(
request_type: Type[BaseRequestResponseEvent[BaseEvent]], get_response: GetResponseFn
) -> AsyncAction:
"""
Wait for an event of the provided ``request_type`` and respond using the
response event returned by the provide ``get_response`` function.
"""
return wait_for(
request_type, functools.partial(_serve_response, get_response=get_response)
)

async def _serve_response(
endpoint: EndpointAPI, request: BaseRequestResponseEvent[BaseEvent]
) -> None:
response = get_response(endpoint, request)
await endpoint.broadcast(response, config=request.broadcast_config())
logger.debug("[%s] sent response: %s", endpoint, response)

return AsyncAction(_wait_for, event_type=request_type, on_event=_serve_response)


#
Expand Down
2 changes: 1 addition & 1 deletion lahja/tools/drivers/driver.py
Expand Up @@ -38,7 +38,7 @@ async def drive(
engine: EngineAPI,
initializer: Initializer,
actions: Sequence[Action],
action_timeout: int = 1,
action_timeout: int = 5,
) -> None:
"""
Use the provide *Engine* to initialize and drive an endpoint
Expand Down
3 changes: 2 additions & 1 deletion lahja/trio/endpoint.py
Expand Up @@ -77,7 +77,7 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return f"<{self}>"

async def aclose(self) -> None:
async def close(self) -> None:
await self._socket.aclose()

#
Expand Down Expand Up @@ -181,6 +181,7 @@ async def stop(self) -> None:
if self.is_stopped:
return
self._stopped.set()
await self.conn.close()


async def _wait_for_path(path: trio.Path) -> None:
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions newsfragments/157.bugfix.rst
@@ -0,0 +1 @@
Fix that ensures ``asyncio`` streams are closed when an endpoint shuts down to prevent ``ResourceWarning`` warnings.

0 comments on commit 2483638

Please sign in to comment.