Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
base repository: agronholm/anyio
base: 3.3.0
head repository: agronholm/anyio
compare: 3.3.1
  • 15 commits
  • 15 files changed
  • 0 comments
  • 3 contributors
@@ -33,7 +33,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: [3.6, 3.7, 3.8, 3.9, 3.10.0-beta.4, pypy3]
python-version: [3.6, 3.7, 3.8, 3.9, 3.10-dev, pypy3]
exclude:
- os: macos-latest
python-version: 3.7
@@ -1,7 +1,7 @@
version: 2
formats: [htmlzip, pdf]
python:
version: 3.6
version: "3.6"
install:
- method: pip
path: .
@@ -23,7 +23,7 @@ Timeouts

Networked operations can often take a long time, and you usually want to set up some kind of a
timeout to ensure that your application doesn't stall forever. There are two principal ways to do
this: :func:`~move_on_after` and :func:`~fail_after`. Both are used as asynchronous
this: :func:`~move_on_after` and :func:`~fail_after`. Both are used as synchronous
context managers. The difference between these two is that the former simply exits the context
block prematurely on a timeout, while the other raises a :exc:`TimeoutError`.

@@ -74,7 +74,7 @@ Exceptions to this rule are:

If the code you wish to run does not belong in this category, it's best to use worker processes
instead in order to take advantage of multiple CPU cores.
This is done by using :func:`.run_sync_in_process`::
This is done by using :func:`.to_process.run_sync`::

import time

@@ -104,7 +104,7 @@ There are some limitations regarding the arguments and return values passed:

Other considerations:

* Even "cancellable" runs can be cancelled before the request has been sent to the worker process
* Even "cancellable=False" runs can be cancelled before the request has been sent to the worker process
* If a cancellable call is cancelled during execution on the worker process, the worker process
will be killed
* The worker process imports the parent's ``__main__`` module, so guarding for any import time side
@@ -177,6 +177,3 @@ managers as a synchronous one::

.. note:: You cannot use wrapped async context managers in synchronous callbacks inside the event
loop thread.

.. note:: The ``__aenter__()`` and ``__aexit__()`` methods will be called from different
tasks so a task group as the async context manager will not work here.
@@ -3,6 +3,16 @@ Version history

This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.

**3.3.1**

- Added missing documentation for the ``ExceptionGroup.exceptions`` attribute
- Changed the asyncio test runner not to use uvloop by default (to match the behavior of
``anyio.run()``)
- Fixed ``RuntimeError`` on asyncio when a ``CancelledError`` is raised from a task spawned through
a ``BlockingPortal`` (`#357 <https://github.com/agronholm/anyio/issues/357>`_)
- Fixed asyncio warning about a ``Future`` with an exception that was never retrieved which
happened when a socket was already written to but the peer abruptly closed the connection

**3.3.0**

- Added asynchronous ``Path`` class
@@ -998,14 +998,14 @@ def setup_process_pool_exit_at_shutdown(workers: Set[Process]) -> None:
class StreamProtocol(asyncio.Protocol):
read_queue: Deque[bytes]
read_event: asyncio.Event
write_future: asyncio.Future
write_event: asyncio.Event
exception: Optional[Exception] = None

def connection_made(self, transport: asyncio.BaseTransport) -> None:
self.read_queue = deque()
self.read_event = asyncio.Event()
self.write_future = asyncio.Future()
self.write_future.set_result(None)
self.write_event = asyncio.Event()
self.write_event.set()
cast(asyncio.Transport, transport).set_write_buffer_limits(0)

def connection_lost(self, exc: Optional[Exception]) -> None:
@@ -1014,11 +1014,7 @@ def connection_lost(self, exc: Optional[Exception]) -> None:
self.exception.__cause__ = exc

self.read_event.set()
self.write_future = asyncio.Future()
if self.exception:
self.write_future.set_exception(self.exception)
else:
self.write_future.set_result(None)
self.write_event.set()

def data_received(self, data: bytes) -> None:
self.read_queue.append(data)
@@ -1029,10 +1025,10 @@ def eof_received(self) -> Optional[bool]:
return True

def pause_writing(self) -> None:
self.write_future = asyncio.Future()
self.write_event = asyncio.Event()

def resume_writing(self) -> None:
self.write_future.set_result(None)
self.write_event.set()


class DatagramProtocol(asyncio.DatagramProtocol):
@@ -1081,6 +1077,7 @@ def _raw_socket(self) -> socket.socket:
async def receive(self, max_bytes: int = 65536) -> bytes:
with self._receive_guard:
await checkpoint()

if not self._protocol.read_event.is_set() and not self._transport.is_closing():
self._transport.resume_reading()
await self._protocol.read_event.wait()
@@ -1094,7 +1091,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes:
elif self._protocol.exception:
raise self._protocol.exception
else:
raise EndOfStream
raise EndOfStream from None

if len(chunk) > max_bytes:
# Split the oversized chunk
@@ -1111,19 +1108,21 @@ async def receive(self, max_bytes: int = 65536) -> bytes:
async def send(self, item: bytes) -> None:
with self._send_guard:
await checkpoint()

if self._closed:
raise ClosedResourceError
elif self._protocol.exception is not None:
raise self._protocol.exception

try:
self._transport.write(item)
except RuntimeError as exc:
if self._protocol.write_future.exception():
await self._protocol.write_future
elif self._closed:
raise ClosedResourceError from None
elif self._transport.is_closing():
if self._transport.is_closing():
raise BrokenResourceError from exc
else:
raise

await self._protocol.write_future
await self._protocol.write_event.wait()

async def send_eof(self) -> None:
try:
@@ -1844,7 +1843,7 @@ async def wait_all_tasks_blocked() -> None:


class TestRunner(abc.TestRunner):
def __init__(self, debug: bool = False, use_uvloop: bool = True,
def __init__(self, debug: bool = False, use_uvloop: bool = False,
policy: Optional[asyncio.AbstractEventLoopPolicy] = None):
_maybe_set_event_loop_policy(policy, use_uvloop)
self._loop = asyncio.new_event_loop()
@@ -42,11 +42,14 @@ class EndOfStream(Exception):


class ExceptionGroup(BaseException):
"""Raised when multiple exceptions have been raised in a task group."""
"""
Raised when multiple exceptions have been raised in a task group.
:var ~typing.Sequence[BaseException] exceptions: the sequence of exceptions raised together
"""

SEPARATOR = '----------------------------\n'

#: the sequence of exceptions raised together
exceptions: Sequence[BaseException]

def __str__(self) -> str:
@@ -124,7 +124,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes:
"""
Receive at most ``max_bytes`` bytes from the peer.
.. note:: Implementors of this interface should not return an empty :cls:`bytes` object,
.. note:: Implementors of this interface should not return an empty :class:`bytes` object,
and users should ignore them.
:param max_bytes: maximum number of bytes to receive
@@ -162,7 +162,7 @@ async def stop(self, cancel_remaining: bool = False) -> None:
async def _call_func(self, func: Callable, args: tuple, kwargs: Dict[str, Any],
future: Future) -> None:
def callback(f: Future) -> None:
if f.cancelled():
if f.cancelled() and self._event_loop_thread_id not in (None, threading.get_ident()):
self.call(scope.cancel)

try:
@@ -13,11 +13,13 @@


class MemoryObjectStreamStatistics(NamedTuple):
current_buffer_used: int
current_buffer_used: int #: number of items stored in the buffer
#: maximum number of items that can be stored on this stream (or :data:`math.inf`)
max_buffer_size: float
open_send_streams: int
open_receive_streams: int
tasks_waiting_send: int
open_send_streams: int #: number of unclosed clones of the send stream
open_receive_streams: int #: number of unclosed clones of the receive stream
tasks_waiting_send: int #: number of tasks blocked on :meth:`MemoryObjectSendStream.send`
#: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive`
tasks_waiting_receive: int


@@ -65,21 +65,22 @@ def serve_sync() -> None:
async with await connect_tcp(*server_sock.getsockname()) as stream:
wrapper = await TLSStream.wrap(stream, hostname='localhost',
ssl_context=client_context, standard_compatible=False)
for name, attribute in SocketAttribute.__dict__.items():
if not name.startswith('_'):
assert wrapper.extra(attribute) == stream.extra(attribute)

assert wrapper.extra(TLSAttribute.alpn_protocol) == 'h2'
assert isinstance(wrapper.extra(TLSAttribute.channel_binding_tls_unique), bytes)
assert isinstance(wrapper.extra(TLSAttribute.cipher), tuple)
assert isinstance(wrapper.extra(TLSAttribute.peer_certificate), dict)
assert isinstance(wrapper.extra(TLSAttribute.peer_certificate_binary), bytes)
assert wrapper.extra(TLSAttribute.server_side) is False
assert isinstance(wrapper.extra(TLSAttribute.shared_ciphers), list)
assert isinstance(wrapper.extra(TLSAttribute.ssl_object), ssl.SSLObject)
assert wrapper.extra(TLSAttribute.standard_compatible) is False
assert wrapper.extra(TLSAttribute.tls_version).startswith('TLSv')
await wrapper.send(b'\x00')
async with wrapper:
for name, attribute in SocketAttribute.__dict__.items():
if not name.startswith('_'):
assert wrapper.extra(attribute) == stream.extra(attribute)

assert wrapper.extra(TLSAttribute.alpn_protocol) == 'h2'
assert isinstance(wrapper.extra(TLSAttribute.channel_binding_tls_unique), bytes)
assert isinstance(wrapper.extra(TLSAttribute.cipher), tuple)
assert isinstance(wrapper.extra(TLSAttribute.peer_certificate), dict)
assert isinstance(wrapper.extra(TLSAttribute.peer_certificate_binary), bytes)
assert wrapper.extra(TLSAttribute.server_side) is False
assert isinstance(wrapper.extra(TLSAttribute.shared_ciphers), list)
assert isinstance(wrapper.extra(TLSAttribute.ssl_object), ssl.SSLObject)
assert wrapper.extra(TLSAttribute.standard_compatible) is False
assert wrapper.extra(TLSAttribute.tls_version).startswith('TLSv')
await wrapper.send(b'\x00')

server_thread.join()
server_sock.close()
@@ -202,7 +202,7 @@ async def test_is_block_device(self) -> None:
assert await Path(entry.path).is_block_device()
break
else:
pytest.fail('Could not find a suitable block device')
pytest.skip('Could not find a suitable block device')

@pytest.mark.skipif(platform.system() == 'Windows',
reason='Character devices are not available on Windows')
@@ -6,6 +6,7 @@
from typing import Any, Dict, List, NoReturn, Optional

import pytest
from _pytest.logging import LogCaptureFixture

from anyio import (
Event, from_thread, get_cancelled_exc_class, get_current_task, run, sleep, to_thread,
@@ -376,3 +377,14 @@ def taskfunc(*, task_status: TaskStatus) -> None:
future, start_value = portal.start_task(
taskfunc, name='testname') # type: ignore[arg-type]
assert start_value == 'testname'

@pytest.mark.parametrize('anyio_backend', ['asyncio'])
async def test_asyncio_run_sync_called(self, caplog: LogCaptureFixture) -> None:
"""Regression test for #357."""
async def in_loop() -> None:
raise CancelledError

async with BlockingPortal() as portal:
await to_thread.run_sync(portal.start_task_soon, in_loop)

assert not caplog.text
@@ -1,4 +1,5 @@
import array
import gc
import io
import os
import platform
@@ -15,6 +16,7 @@

import pytest
from _pytest.fixtures import SubRequest
from _pytest.logging import LogCaptureFixture
from _pytest.monkeypatch import MonkeyPatch
from _pytest.tmpdir import TempPathFactory

@@ -387,6 +389,39 @@ def serve() -> None:
thread.join()
assert thread_exception is None

@pytest.mark.parametrize('anyio_backend', ['asyncio'])
async def test_unretrieved_future_exception_server_crash(
self, family: AnyIPAddressFamily, caplog: LogCaptureFixture) -> None:
"""
Tests that there won't be any leftover Futures that don't get their exceptions retrieved.
See https://github.com/encode/httpcore/issues/382 for details.
"""
def serve() -> None:
sock, addr = server_sock.accept()
event.wait(3)
del sock
gc.collect()

server_sock = socket.socket(family, socket.SOCK_STREAM)
server_sock.settimeout(1)
server_sock.bind(('localhost', 0))
server_sock.listen()
server_addr = server_sock.getsockname()[:2]
event = threading.Event()
thread = Thread(target=serve)
thread.start()
async with await connect_tcp(*server_addr) as stream:
await stream.send(b'GET')
event.set()
with pytest.raises(BrokenResourceError):
await stream.receive()

thread.join()
gc.collect()
assert not caplog.text


class TestTCPListener:
async def test_extra_attributes(self, family: AnyIPAddressFamily) -> None:

No commit comments for this range