From 87c4c7886e1f2d910910dc80852944bf63977551 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 25 Oct 2022 22:32:44 +0300 Subject: [PATCH 1/5] Added the receive_nowait() method to all streams Closes #482. --- docs/versionhistory.rst | 1 + src/anyio/_backends/_asyncio.py | 10 ++++++++++ src/anyio/_backends/_trio.py | 12 ++++++++++++ src/anyio/abc/_streams.py | 31 ++++++++++++++++++++++++++++++- src/anyio/streams/buffered.py | 21 +++++++++++++++++++++ src/anyio/streams/memory.py | 11 ----------- src/anyio/streams/stapled.py | 3 +++ src/anyio/streams/text.py | 7 +++++++ src/anyio/streams/tls.py | 12 ++++++++++++ 9 files changed, 96 insertions(+), 12 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index dccd1b5b..3d65c5a6 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -10,6 +10,7 @@ This library adheres to `Semantic Versioning 2.0 `_. - Bumped minimum version of trio to v0.22 - Added ``create_unix_datagram_socket`` and ``create_connected_unix_datagram_socket`` to create UNIX datagram sockets (PR by Jean Hominal) +- Added the ``receive_nowait()`` method to the entire stream class hierarchy - Improved type annotations: - Several functions and methods that previously only accepted coroutines as the return diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 1c085277..292dd523 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -755,6 +755,16 @@ def _spawn_task_from_thread( class StreamReaderWrapper(abc.ByteReceiveStream): _stream: asyncio.StreamReader + def receive_nowait(self, max_bytes: int = 65536) -> bytes: + if self._stream.exception(): + raise self._stream.exception() + elif not self._stream._buffer: # type: ignore[attr-defined] + raise WouldBlock + + data = self._stream._buffer[:max_bytes] # type: ignore[attr-defined] + del self._stream._buffer[:max_bytes] # type: ignore[attr-defined] + return data + async def receive(self, max_bytes: int = 65536) -> bytes: data = await self._stream.read(max_bytes) if data: diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index f7880b36..8343f5ac 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -363,6 +363,18 @@ def __init__(self, trio_socket: TrioSocketType) -> None: self._receive_guard = ResourceGuard("reading from") self._send_guard = ResourceGuard("writing to") + def receive_nowait(self, max_bytes: int = 65536) -> bytes: + with self._receive_guard: + try: + data = self._raw_socket.recv(max_bytes) + except BaseException as exc: + self._convert_socket_error(exc) + + if data: + return data + else: + raise EndOfStream + async def receive(self, max_bytes: int = 65536) -> bytes: with self._receive_guard: try: diff --git a/src/anyio/abc/_streams.py b/src/anyio/abc/_streams.py index 8c638683..e7de1562 100644 --- a/src/anyio/abc/_streams.py +++ b/src/anyio/abc/_streams.py @@ -4,7 +4,7 @@ from collections.abc import Callable from typing import Any, Generic, TypeVar, Union -from .._core._exceptions import EndOfStream +from .._core._exceptions import EndOfStream, WouldBlock from .._core._typedattr import TypedAttributeProvider from ._resources import AsyncResource from ._tasks import TaskGroup @@ -36,6 +36,20 @@ async def __anext__(self) -> T_co: except EndOfStream: raise StopAsyncIteration + def receive_nowait(self) -> T_co: + """ + Receive the next item if it can be done without waiting. + + :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly + closed + :raises ~anyio.EndOfStream: if this stream has been closed from the other end + :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable + due to external causes + :raises ~anyio.WouldBlock: if there is no item immeditately available + + """ + raise WouldBlock + @abstractmethod async def receive(self) -> T_co: """ @@ -132,6 +146,21 @@ async def __anext__(self) -> bytes: except EndOfStream: raise StopAsyncIteration + def receive_nowait(self, max_bytes: int = 65536) -> bytes: + """ + Receive at most ``max_bytes`` bytes from the peer, if it can be done without + blocking. + + .. note:: Implementors of this interface should not return an empty + :class:`bytes` object, and users should ignore them. + + :param max_bytes: maximum number of bytes to receive + :return: the received bytes + :raises ~anyio.EndOfStream: if this stream has been closed from the other end + :raises ~anyio.WouldBlock: if there is no data waiting to be received + """ + raise WouldBlock + @abstractmethod async def receive(self, max_bytes: int = 65536) -> bytes: """ diff --git a/src/anyio/streams/buffered.py b/src/anyio/streams/buffered.py index f5d5e836..1ff61875 100644 --- a/src/anyio/streams/buffered.py +++ b/src/anyio/streams/buffered.py @@ -32,6 +32,27 @@ def buffer(self) -> bytes: def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: return self.receive_stream.extra_attributes + def receive_nowait(self, max_bytes: int = 65536) -> bytes: + if self._closed: + raise ClosedResourceError + + if self._buffer: + chunk = bytes(self._buffer[:max_bytes]) + del self._buffer[:max_bytes] + return chunk + elif isinstance(self.receive_stream, ByteReceiveStream): + return self.receive_stream.receive_nowait(max_bytes) + else: + # With a bytes-oriented object stream, we need to handle any surplus bytes + # we get from the receive_nowait() call + chunk = self.receive_stream.receive_nowait() + if len(chunk) > max_bytes: + # Save the surplus bytes in the buffer + self._buffer.extend(chunk[max_bytes:]) + return chunk[:max_bytes] + else: + return chunk + async def receive(self, max_bytes: int = 65536) -> bytes: if self._closed: raise ClosedResourceError diff --git a/src/anyio/streams/memory.py b/src/anyio/streams/memory.py index 28a58a01..81b8567a 100644 --- a/src/anyio/streams/memory.py +++ b/src/anyio/streams/memory.py @@ -65,17 +65,6 @@ def __post_init__(self) -> None: self._state.open_receive_channels += 1 def receive_nowait(self) -> T_co: - """ - Receive the next item if it can be done without waiting. - - :return: the received item - :raises ~anyio.ClosedResourceError: if this send stream has been closed - :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been - closed from the sending end - :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks - waiting to send - - """ if self._closed: raise ClosedResourceError diff --git a/src/anyio/streams/stapled.py b/src/anyio/streams/stapled.py index 80f64a2e..e49f24a2 100644 --- a/src/anyio/streams/stapled.py +++ b/src/anyio/streams/stapled.py @@ -34,6 +34,9 @@ class StapledByteStream(ByteStream): send_stream: ByteSendStream receive_stream: ByteReceiveStream + def receive_nowait(self, max_bytes: int = 65536) -> bytes: + return self.receive_stream.receive_nowait(max_bytes) + async def receive(self, max_bytes: int = 65536) -> bytes: return await self.receive_stream.receive(max_bytes) diff --git a/src/anyio/streams/text.py b/src/anyio/streams/text.py index f1a11278..8e4a38e0 100644 --- a/src/anyio/streams/text.py +++ b/src/anyio/streams/text.py @@ -42,6 +42,13 @@ def __post_init__(self, encoding: str, errors: str) -> None: decoder_class = codecs.getincrementaldecoder(encoding) self._decoder = decoder_class(errors=errors) + def receive_nowait(self) -> str: + while True: + chunk = self.transport_stream.receive_nowait() + decoded = self._decoder.decode(chunk) + if decoded: + return decoded + async def receive(self) -> str: while True: chunk = await self.transport_stream.receive() diff --git a/src/anyio/streams/tls.py b/src/anyio/streams/tls.py index e35999e5..8d2b32bb 100644 --- a/src/anyio/streams/tls.py +++ b/src/anyio/streams/tls.py @@ -11,6 +11,7 @@ from .. import ( BrokenResourceError, EndOfStream, + WouldBlock, aclose_forcefully, get_cancelled_exc_class, ) @@ -195,6 +196,17 @@ async def aclose(self) -> None: await self.transport_stream.aclose() + def receive_nowait(self, max_bytes: int = 65536) -> bytes: + try: + data = self._ssl_object.read(max_bytes) + except ssl.SSLError: + raise WouldBlock + + if not data: + raise EndOfStream + + return data + async def receive(self, max_bytes: int = 65536) -> bytes: data = await self._call_sslobject_method(self._ssl_object.read, max_bytes) if not data: From 8b90809cef9a86bdf42c0b172888d86cea34405b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 11 Jul 2023 14:05:16 +0300 Subject: [PATCH 2/5] Hide SSLWantReadError/SSLWantWriteError in receive_nowait() --- src/anyio/streams/tls.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/anyio/streams/tls.py b/src/anyio/streams/tls.py index 455f150c..a5311a7d 100644 --- a/src/anyio/streams/tls.py +++ b/src/anyio/streams/tls.py @@ -198,8 +198,8 @@ async def aclose(self) -> None: def receive_nowait(self, max_bytes: int = 65536) -> bytes: try: data = self._ssl_object.read(max_bytes) - except ssl.SSLError: - raise WouldBlock + except (ssl.SSLWantReadError, ssl.SSLWantWriteError): + raise WouldBlock from None if not data: raise EndOfStream From c052c38f9e448134ec5187b50edb10446ec83342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 11 Jul 2023 14:19:55 +0300 Subject: [PATCH 3/5] Read from wrapped stream if the SSLObject doesn't have data in its buffer --- src/anyio/streams/tls.py | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/src/anyio/streams/tls.py b/src/anyio/streams/tls.py index a5311a7d..4f07779e 100644 --- a/src/anyio/streams/tls.py +++ b/src/anyio/streams/tls.py @@ -196,10 +196,38 @@ async def aclose(self) -> None: await self.transport_stream.aclose() def receive_nowait(self, max_bytes: int = 65536) -> bytes: - try: - data = self._ssl_object.read(max_bytes) - except (ssl.SSLWantReadError, ssl.SSLWantWriteError): - raise WouldBlock from None + while True: + try: + data = self._ssl_object.read(max_bytes) + break + except ssl.SSLWantReadError: + try: + data = self.transport_stream.receive_nowait() + except WouldBlock: + raise WouldBlock from None + except EndOfStream: + self._read_bio.write_eof() + except OSError as exc: + self._read_bio.write_eof() + self._write_bio.write_eof() + raise BrokenResourceError from exc + else: + self._read_bio.write(data) + except ssl.SSLWantWriteError: + raise WouldBlock from None + except ssl.SSLError as exc: + self._read_bio.write_eof() + self._write_bio.write_eof() + if ( + isinstance(exc, ssl.SSLEOFError) + or "UNEXPECTED_EOF_WHILE_READING" in exc.strerror + ): + if self.standard_compatible: + raise BrokenResourceError from exc + else: + raise EndOfStream from None + + raise if not data: raise EndOfStream From c79bbb36d4a28e399f8b3887c6a2b028a406a70f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 11 Jul 2023 17:22:45 +0300 Subject: [PATCH 4/5] WIP implementation of send_nowait() --- src/anyio/_backends/_asyncio.py | 4 ++++ src/anyio/_backends/_trio.py | 4 ++++ src/anyio/abc/_streams.py | 25 +++++++++++++++++++++++++ src/anyio/streams/memory.py | 11 ----------- src/anyio/streams/stapled.py | 3 +++ src/anyio/streams/tls.py | 7 ++++++- 6 files changed, 42 insertions(+), 12 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 9ec81af7..ce96df85 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -757,6 +757,10 @@ async def send(self, item: bytes) -> None: self._stream.write(item) await self._stream.drain() + def send_nowait(self, item: bytes) -> int: + self._stream.write(item) + return len(item) + async def aclose(self) -> None: self._stream.close() diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index 228db1d3..c422afa4 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -397,6 +397,10 @@ async def send(self, item: bytes) -> None: view = view[bytes_sent:] + def send_nowait(self, item: bytes) -> int: + with self._send_guard: + return self._raw_socket.send(item) + async def send_eof(self) -> None: self._trio_socket.shutdown(socket.SHUT_WR) diff --git a/src/anyio/abc/_streams.py b/src/anyio/abc/_streams.py index e7de1562..77f2e1f1 100644 --- a/src/anyio/abc/_streams.py +++ b/src/anyio/abc/_streams.py @@ -85,6 +85,20 @@ async def send(self, item: T_contra) -> None: due to external causes """ + def send_nowait(self, item: T_contra) -> None: + """ + Send an item immediately if it can be done without waiting. + + :param item: the item to send + :raises ~anyio.WouldBlock: if ``item`` cannot be sent without blocking + :raises ~anyio.ClosedResourceError: if the send stream has been explicitly + closed + :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable + due to external causes + + """ + raise WouldBlock + class UnreliableObjectStream( UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] @@ -158,6 +172,7 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: :return: the received bytes :raises ~anyio.EndOfStream: if this stream has been closed from the other end :raises ~anyio.WouldBlock: if there is no data waiting to be received + """ raise WouldBlock @@ -178,6 +193,16 @@ async def receive(self, max_bytes: int = 65536) -> bytes: class ByteSendStream(AsyncResource, TypedAttributeProvider): """An interface for sending bytes to a single peer.""" + def send_nowait(self, item: bytes) -> int: + """ + Send as many of the given bytes as possible to the peer without blocking. + + :param item: the bytes to send + :return: the number of bytes actually sent + + """ + return 0 + @abstractmethod async def send(self, item: bytes) -> None: """ diff --git a/src/anyio/streams/memory.py b/src/anyio/streams/memory.py index 81b8567a..48eff710 100644 --- a/src/anyio/streams/memory.py +++ b/src/anyio/streams/memory.py @@ -169,17 +169,6 @@ def __post_init__(self) -> None: self._state.open_send_channels += 1 def send_nowait(self, item: T_contra) -> None: - """ - Send an item immediately if it can be done without waiting. - - :param item: the item to send - :raises ~anyio.ClosedResourceError: if this send stream has been closed - :raises ~anyio.BrokenResourceError: if the stream has been closed from the - receiving end - :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting - to receive - - """ if self._closed: raise ClosedResourceError if not self._state.open_receive_channels: diff --git a/src/anyio/streams/stapled.py b/src/anyio/streams/stapled.py index e49f24a2..170950ae 100644 --- a/src/anyio/streams/stapled.py +++ b/src/anyio/streams/stapled.py @@ -40,6 +40,9 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: async def receive(self, max_bytes: int = 65536) -> bytes: return await self.receive_stream.receive(max_bytes) + def send_nowait(self, item: bytes) -> int: + return self.send_stream.send_nowait(item) + async def send(self, item: bytes) -> None: await self.send_stream.send(item) diff --git a/src/anyio/streams/tls.py b/src/anyio/streams/tls.py index 4f07779e..debb1943 100644 --- a/src/anyio/streams/tls.py +++ b/src/anyio/streams/tls.py @@ -207,7 +207,7 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: raise WouldBlock from None except EndOfStream: self._read_bio.write_eof() - except OSError as exc: + except BrokenResourceError as exc: self._read_bio.write_eof() self._write_bio.write_eof() raise BrokenResourceError from exc @@ -234,6 +234,11 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: return data + # def send_nowait(self, item: bytes) -> int: + # while True: + # self._ssl_object.write(item) + # self.transport_stream.send_nowait() + async def receive(self, max_bytes: int = 65536) -> bytes: data = await self._call_sslobject_method(self._ssl_object.read, max_bytes) if not data: From 7e3b126cba102e06627f33753ebbe99cffb8624c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 13 Dec 2023 01:17:54 +0200 Subject: [PATCH 5/5] Revert "WIP implementation of send_nowait()" This reverts commit c79bbb36d4a28e399f8b3887c6a2b028a406a70f. --- src/anyio/_backends/_asyncio.py | 4 ---- src/anyio/_backends/_trio.py | 4 ---- src/anyio/abc/_streams.py | 25 ------------------------- src/anyio/streams/memory.py | 11 +++++++++++ src/anyio/streams/stapled.py | 3 --- src/anyio/streams/tls.py | 7 +------ 6 files changed, 12 insertions(+), 42 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 3434f813..7d83e87a 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -909,10 +909,6 @@ async def send(self, item: bytes) -> None: self._stream.write(item) await self._stream.drain() - def send_nowait(self, item: bytes) -> int: - self._stream.write(item) - return len(item) - async def aclose(self) -> None: self._stream.close() diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index 039a7f61..7df56875 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -410,10 +410,6 @@ async def send(self, item: bytes) -> None: view = view[bytes_sent:] - def send_nowait(self, item: bytes) -> int: - with self._send_guard: - return self._raw_socket.send(item) - async def send_eof(self) -> None: self._trio_socket.shutdown(socket.SHUT_WR) diff --git a/src/anyio/abc/_streams.py b/src/anyio/abc/_streams.py index 77f2e1f1..e7de1562 100644 --- a/src/anyio/abc/_streams.py +++ b/src/anyio/abc/_streams.py @@ -85,20 +85,6 @@ async def send(self, item: T_contra) -> None: due to external causes """ - def send_nowait(self, item: T_contra) -> None: - """ - Send an item immediately if it can be done without waiting. - - :param item: the item to send - :raises ~anyio.WouldBlock: if ``item`` cannot be sent without blocking - :raises ~anyio.ClosedResourceError: if the send stream has been explicitly - closed - :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable - due to external causes - - """ - raise WouldBlock - class UnreliableObjectStream( UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] @@ -172,7 +158,6 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: :return: the received bytes :raises ~anyio.EndOfStream: if this stream has been closed from the other end :raises ~anyio.WouldBlock: if there is no data waiting to be received - """ raise WouldBlock @@ -193,16 +178,6 @@ async def receive(self, max_bytes: int = 65536) -> bytes: class ByteSendStream(AsyncResource, TypedAttributeProvider): """An interface for sending bytes to a single peer.""" - def send_nowait(self, item: bytes) -> int: - """ - Send as many of the given bytes as possible to the peer without blocking. - - :param item: the bytes to send - :return: the number of bytes actually sent - - """ - return 0 - @abstractmethod async def send(self, item: bytes) -> None: """ diff --git a/src/anyio/streams/memory.py b/src/anyio/streams/memory.py index 93c2cba2..cfb3fa60 100644 --- a/src/anyio/streams/memory.py +++ b/src/anyio/streams/memory.py @@ -163,6 +163,17 @@ def __post_init__(self) -> None: self._state.open_send_channels += 1 def send_nowait(self, item: T_contra) -> None: + """ + Send an item immediately if it can be done without waiting. + + :param item: the item to send + :raises ~anyio.ClosedResourceError: if this send stream has been closed + :raises ~anyio.BrokenResourceError: if the stream has been closed from the + receiving end + :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting + to receive + + """ if self._closed: raise ClosedResourceError if not self._state.open_receive_channels: diff --git a/src/anyio/streams/stapled.py b/src/anyio/streams/stapled.py index 170950ae..e49f24a2 100644 --- a/src/anyio/streams/stapled.py +++ b/src/anyio/streams/stapled.py @@ -40,9 +40,6 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: async def receive(self, max_bytes: int = 65536) -> bytes: return await self.receive_stream.receive(max_bytes) - def send_nowait(self, item: bytes) -> int: - return self.send_stream.send_nowait(item) - async def send(self, item: bytes) -> None: await self.send_stream.send(item) diff --git a/src/anyio/streams/tls.py b/src/anyio/streams/tls.py index 472d291d..7a20564f 100644 --- a/src/anyio/streams/tls.py +++ b/src/anyio/streams/tls.py @@ -207,7 +207,7 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: raise WouldBlock from None except EndOfStream: self._read_bio.write_eof() - except BrokenResourceError as exc: + except OSError as exc: self._read_bio.write_eof() self._write_bio.write_eof() raise BrokenResourceError from exc @@ -234,11 +234,6 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: return data - # def send_nowait(self, item: bytes) -> int: - # while True: - # self._ssl_object.write(item) - # self.transport_stream.send_nowait() - async def receive(self, max_bytes: int = 65536) -> bytes: data = await self._call_sslobject_method(self._ssl_object.read, max_bytes) if not data: