Skip to content

Commit

Permalink
WIP implementation of send_nowait()
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Jul 11, 2023
1 parent c052c38 commit c79bbb3
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,10 @@ async def send(self, item: bytes) -> None:
self._stream.write(item)
await self._stream.drain()

def send_nowait(self, item: bytes) -> int:
self._stream.write(item)
return len(item)

async def aclose(self) -> None:
self._stream.close()

Expand Down
4 changes: 4 additions & 0 deletions src/anyio/_backends/_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@ async def send(self, item: bytes) -> None:

view = view[bytes_sent:]

def send_nowait(self, item: bytes) -> int:
with self._send_guard:
return self._raw_socket.send(item)

async def send_eof(self) -> None:
self._trio_socket.shutdown(socket.SHUT_WR)

Expand Down
25 changes: 25 additions & 0 deletions src/anyio/abc/_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ async def send(self, item: T_contra) -> None:
due to external causes
"""

def send_nowait(self, item: T_contra) -> None:
"""
Send an item immediately if it can be done without waiting.
:param item: the item to send
:raises ~anyio.WouldBlock: if ``item`` cannot be sent without blocking
:raises ~anyio.ClosedResourceError: if the send stream has been explicitly
closed
:raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
due to external causes
"""
raise WouldBlock


class UnreliableObjectStream(
UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
Expand Down Expand Up @@ -158,6 +172,7 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes:
:return: the received bytes
:raises ~anyio.EndOfStream: if this stream has been closed from the other end
:raises ~anyio.WouldBlock: if there is no data waiting to be received
"""
raise WouldBlock

Expand All @@ -178,6 +193,16 @@ async def receive(self, max_bytes: int = 65536) -> bytes:
class ByteSendStream(AsyncResource, TypedAttributeProvider):
"""An interface for sending bytes to a single peer."""

def send_nowait(self, item: bytes) -> int:
"""
Send as many of the given bytes as possible to the peer without blocking.
:param item: the bytes to send
:return: the number of bytes actually sent
"""
return 0

@abstractmethod
async def send(self, item: bytes) -> None:
"""
Expand Down
11 changes: 0 additions & 11 deletions src/anyio/streams/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,6 @@ def __post_init__(self) -> None:
self._state.open_send_channels += 1

def send_nowait(self, item: T_contra) -> None:
"""
Send an item immediately if it can be done without waiting.
:param item: the item to send
:raises ~anyio.ClosedResourceError: if this send stream has been closed
:raises ~anyio.BrokenResourceError: if the stream has been closed from the
receiving end
:raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
to receive
"""
if self._closed:
raise ClosedResourceError
if not self._state.open_receive_channels:
Expand Down
3 changes: 3 additions & 0 deletions src/anyio/streams/stapled.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes:
async def receive(self, max_bytes: int = 65536) -> bytes:
return await self.receive_stream.receive(max_bytes)

def send_nowait(self, item: bytes) -> int:
return self.send_stream.send_nowait(item)

async def send(self, item: bytes) -> None:
await self.send_stream.send(item)

Expand Down
7 changes: 6 additions & 1 deletion src/anyio/streams/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes:
raise WouldBlock from None
except EndOfStream:
self._read_bio.write_eof()
except OSError as exc:
except BrokenResourceError as exc:
self._read_bio.write_eof()
self._write_bio.write_eof()
raise BrokenResourceError from exc
Expand All @@ -234,6 +234,11 @@ def receive_nowait(self, max_bytes: int = 65536) -> bytes:

return data

# def send_nowait(self, item: bytes) -> int:
# while True:
# self._ssl_object.write(item)
# self.transport_stream.send_nowait()

async def receive(self, max_bytes: int = 65536) -> bytes:
data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
if not data:
Expand Down

0 comments on commit c79bbb3

Please sign in to comment.