diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 9ec81af7..ce96df85 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -757,6 +757,10 @@ async def send(self, item: bytes) -> None: self._stream.write(item) await self._stream.drain() + def send_nowait(self, item: bytes) -> int: + self._stream.write(item) + return len(item) + async def aclose(self) -> None: self._stream.close() diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index 228db1d3..c422afa4 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -397,6 +397,10 @@ async def send(self, item: bytes) -> None: view = view[bytes_sent:] + def send_nowait(self, item: bytes) -> int: + with self._send_guard: + return self._raw_socket.send(item) + async def send_eof(self) -> None: self._trio_socket.shutdown(socket.SHUT_WR) diff --git a/src/anyio/abc/_streams.py b/src/anyio/abc/_streams.py index e7de1562..77f2e1f1 100644 --- a/src/anyio/abc/_streams.py +++ b/src/anyio/abc/_streams.py @@ -85,6 +85,20 @@ async def send(self, item: T_contra) -> None: due to external causes """ + def send_nowait(self, item: T_contra) -> None: + """ + Send an item immediately if it can be done without waiting. + + :param item: the item to send + :raises ~anyio.WouldBlock: if ``item`` cannot be sent without blocking + :raises ~anyio.ClosedResourceError: if the send stream has been explicitly + closed + :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable + due to external causes + + """ + raise WouldBlock + class UnreliableObjectStream( UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] @@ -158,6 +172,7 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: :return: the received bytes :raises ~anyio.EndOfStream: if this stream has been closed from the other end :raises ~anyio.WouldBlock: if there is no data waiting to be received + """ raise WouldBlock @@ -178,6 +193,16 @@ async def receive(self, max_bytes: int = 65536) -> bytes: class ByteSendStream(AsyncResource, TypedAttributeProvider): """An interface for sending bytes to a single peer.""" + def send_nowait(self, item: bytes) -> int: + """ + Send as many of the given bytes as possible to the peer without blocking. + + :param item: the bytes to send + :return: the number of bytes actually sent + + """ + return 0 + @abstractmethod async def send(self, item: bytes) -> None: """ diff --git a/src/anyio/streams/memory.py b/src/anyio/streams/memory.py index 81b8567a..48eff710 100644 --- a/src/anyio/streams/memory.py +++ b/src/anyio/streams/memory.py @@ -169,17 +169,6 @@ def __post_init__(self) -> None: self._state.open_send_channels += 1 def send_nowait(self, item: T_contra) -> None: - """ - Send an item immediately if it can be done without waiting. - - :param item: the item to send - :raises ~anyio.ClosedResourceError: if this send stream has been closed - :raises ~anyio.BrokenResourceError: if the stream has been closed from the - receiving end - :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting - to receive - - """ if self._closed: raise ClosedResourceError if not self._state.open_receive_channels: diff --git a/src/anyio/streams/stapled.py b/src/anyio/streams/stapled.py index e49f24a2..170950ae 100644 --- a/src/anyio/streams/stapled.py +++ b/src/anyio/streams/stapled.py @@ -40,6 +40,9 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: async def receive(self, max_bytes: int = 65536) -> bytes: return await self.receive_stream.receive(max_bytes) + def send_nowait(self, item: bytes) -> int: + return self.send_stream.send_nowait(item) + async def send(self, item: bytes) -> None: await self.send_stream.send(item) diff --git a/src/anyio/streams/tls.py b/src/anyio/streams/tls.py index 4f07779e..debb1943 100644 --- a/src/anyio/streams/tls.py +++ b/src/anyio/streams/tls.py @@ -207,7 +207,7 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: raise WouldBlock from None except EndOfStream: self._read_bio.write_eof() - except OSError as exc: + except BrokenResourceError as exc: self._read_bio.write_eof() self._write_bio.write_eof() raise BrokenResourceError from exc @@ -234,6 +234,11 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes: return data + # def send_nowait(self, item: bytes) -> int: + # while True: + # self._ssl_object.write(item) + # self.transport_stream.send_nowait() + async def receive(self, max_bytes: int = 65536) -> bytes: data = await self._call_sslobject_method(self._ssl_object.read, max_bytes) if not data: