Skip to content

Commit

Permalink
Adjusted implementation to be simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Apr 13, 2024
1 parent 84c8696 commit 3b5b12d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 59 deletions.
5 changes: 3 additions & 2 deletions docs/versionhistory.rst
Expand Up @@ -18,8 +18,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
variable when setting the ``debug`` flag in ``anyio.run()``
- Fixed ``SocketStream.receive()`` not detecting EOF on asyncio if there is also data in
the read buffer (`#701 <https://github.com/agronholm/anyio/issues/701>`_)
- Add check on closed ``MemoryObjectReceiveStream`` and ``MemoryObjectSendStream``
in ``__del__`` method
- Emit a ``ResourceWarning`` for ``MemoryObjectReceiveStream`` and
``MemoryObjectSendStream`` that were garbage collected without being closed (PR by
Andrey Kazantcev)

**4.3.0**

Expand Down
32 changes: 2 additions & 30 deletions src/anyio/streams/memory.py
@@ -1,7 +1,5 @@
from __future__ import annotations

import sys
import traceback
import warnings
from collections import OrderedDict, deque
from dataclasses import dataclass, field
Expand Down Expand Up @@ -62,17 +60,10 @@ def statistics(self) -> MemoryObjectStreamStatistics:
class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
_state: MemoryObjectStreamState[T_co]
_closed: bool = field(init=False, default=False)
_source_traceback: traceback.StackSummary | None = field(init=False, default=None)

def __post_init__(self) -> None:
self._state.open_receive_channels += 1

if self._is_source_traceback_capturing_enabled():
self._source_traceback = traceback.extract_stack(sys._getframe(1))

def _is_source_traceback_capturing_enabled(self) -> bool:
return sys.flags.dev_mode

def receive_nowait(self) -> T_co:
"""
Receive the next item if it can be done without waiting.
Expand Down Expand Up @@ -176,14 +167,8 @@ def __exit__(

def __del__(self) -> None:
if not self._closed:
created_at_message = ""

if self._source_traceback is not None:
frame = self._source_traceback[-3]
created_at_message = f", created_at {frame[0]}:{frame[1]}"

warnings.warn(
f"Unclosed <{self.__class__.__name__}{created_at_message}>",
f"Unclosed <{self.__class__.__name__}>",
ResourceWarning,
source=self,
)
Expand All @@ -193,17 +178,10 @@ def __del__(self) -> None:
class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
_state: MemoryObjectStreamState[T_contra]
_closed: bool = field(init=False, default=False)
_source_traceback: traceback.StackSummary | None = field(init=False, default=None)

def __post_init__(self) -> None:
self._state.open_send_channels += 1

if self._is_source_traceback_capturing_enabled():
self._source_traceback = traceback.extract_stack(sys._getframe(1))

def _is_source_traceback_capturing_enabled(self) -> bool:
return sys.flags.dev_mode

def send_nowait(self, item: T_contra) -> None:
"""
Send an item immediately if it can be done without waiting.
Expand Down Expand Up @@ -315,14 +293,8 @@ def __exit__(

def __del__(self) -> None:
if not self._closed:
created_at_message = ""

if self._source_traceback is not None:
frame = self._source_traceback[-3]
created_at_message = f", created_at {frame.filename}:{frame.lineno}"

warnings.warn(
f"Unclosed <{self.__class__.__name__}{created_at_message}>",
f"Unclosed <{self.__class__.__name__}>",
ResourceWarning,
source=self,
)
30 changes: 3 additions & 27 deletions tests/streams/test_memory.py
Expand Up @@ -5,7 +5,6 @@
from typing import NoReturn

import pytest
from pytest_mock import MockerFixture

from anyio import (
BrokenResourceError,
Expand Down Expand Up @@ -445,36 +444,13 @@ async def test_deprecated_item_type_parameter() -> None:
receive.close()


@pytest.mark.parametrize("is_source_traceback_capturing_enabled", [True, False])
async def test_not_closed_warning(
mocker: MockerFixture, is_source_traceback_capturing_enabled: bool
) -> None:
mocker.patch.object(
MemoryObjectReceiveStream,
"_is_source_traceback_capturing_enabled",
return_value=is_source_traceback_capturing_enabled,
)
mocker.patch.object(
MemoryObjectSendStream,
"_is_source_traceback_capturing_enabled",
return_value=is_source_traceback_capturing_enabled,
)

async def test_not_closed_warning() -> None:
send, receive = create_memory_object_stream[int]()

if is_source_traceback_capturing_enabled:
match_suffix = ", .*>$"
else:
match_suffix = ">$"

with pytest.warns(
ResourceWarning, match=f"Unclosed <MemoryObjectSendStream{match_suffix}"
):
with pytest.warns(ResourceWarning, match="Unclosed <MemoryObjectSendStream>"):
del send
gc.collect()

with pytest.warns(
ResourceWarning, match=f"Unclosed <MemoryObjectReceiveStream{match_suffix}"
):
with pytest.warns(ResourceWarning, match="Unclosed <MemoryObjectReceiveStream>"):
del receive
gc.collect()

0 comments on commit 3b5b12d

Please sign in to comment.