From 3543633005dc6e7cb5438a14268efd08fbefc357 Mon Sep 17 00:00:00 2001 From: Faolain Date: Mon, 9 Jun 2025 19:56:31 -0400 Subject: [PATCH 1/4] fix: reduce unclosed sessions in lazily loaded xarrays --- Architecture.md | 213 ++++++++++++++++++++++++++++++++++++++++++++ py_hamt/store.py | 64 +++++++++++-- tests/test_async.py | 85 ++++++++++++++++++ 3 files changed, 357 insertions(+), 5 deletions(-) create mode 100644 Architecture.md create mode 100644 tests/test_async.py diff --git a/Architecture.md b/Architecture.md new file mode 100644 index 0000000..531f219 --- /dev/null +++ b/Architecture.md @@ -0,0 +1,213 @@ +# py-hamt Library Architecture Analysis + +## Overview + +The py-hamt library is a Python implementation of a Hash Array Mapped Trie (HAMT) designed for content-addressed storage systems like IPFS. It provides efficient storage and retrieval of large key-value mappings using the IPLD data model. + +## Core Architecture + +### 1. HAMT Structure (`hamt.py`) + +The core HAMT implementation follows a hierarchical trie structure where: +- **Nodes** contain 256 buckets (indexed 0-255) +- Each bucket can either be: + - A dictionary containing key-value mappings (when bucket size ≤ max_bucket_size) + - A link to a child Node (when bucket overflows) +- Hash values are consumed 8 bits at a time to determine bucket indices + +#### Key Components: + +**Node Class** (`hamt.py:62`) +- Fixed array of 256 elements representing buckets +- Uses `list[IPLDKind]` where empty dicts `{}` represent empty buckets +- Links are stored as single-element lists `[link_id]` +- Serializes to DAG-CBOR format for content addressing + +**HAMT Class** (`hamt.py:287`) +- Main interface for trie operations +- Supports both read-only and read-write modes +- Uses asyncio locks for thread safety in write mode +- Implements two node storage strategies via NodeStore abstraction + +### 2. Storage Abstraction (`store.py`) + +**ContentAddressedStore** (`store.py:11`) +- Abstract base class for content-addressed storage backends +- Returns immutable IDs (IPLDKind) for stored content +- Two codec types: "raw" for data, "dag-cbor" for structured content + +**KuboCAS** (`store.py:74`) +- IPFS implementation using Kubo daemon +- Uses RPC API for writes (`/api/v0/add`) +- Uses HTTP Gateway for reads (`/ipfs/{cid}`) +- Supports authentication and custom headers +- Implements connection pooling and concurrency limiting + +**InMemoryCAS** (`store.py:37`) +- Testing implementation using in-memory dictionary +- Content-addressed via Blake3 hashing + +### 3. Node Storage Strategies + +**ReadCacheStore** (`hamt.py:150`) +- Used in read-only mode +- Implements LRU-style caching of loaded nodes +- Cannot perform writes (throws exception) +- Optimized for concurrent read operations + +**InMemoryTreeStore** (`hamt.py:180`) +- Used in read-write mode +- Maintains modified nodes in memory buffer +- Uses UUID4 integers as temporary node IDs +- Implements sophisticated flush algorithm during `vacate()` + +### 4. Zarr Integration (`zarr_hamt_store.py`) + +**ZarrHAMTStore** (`zarr_hamt_store.py:11`) +- Implements Zarr v3 Store interface +- Provides metadata caching for `zarr.json` files +- Supports directory listing operations with efficient prefix matching +- Key insight: Zarr keys map directly to HAMT keys without transformation + +**SimpleEncryptedZarrHAMTStore** (`encryption_hamt_store.py:12`) +- Extends ZarrHAMTStore with ChaCha20-Poly1305 encryption +- Encrypts all data including metadata +- Uses 24-byte nonces and 16-byte authentication tags + +## Value Setting Mechanism & Data Flow + +### Setting Values (`_set_pointer` in `hamt.py:505`) + +1. **Hash and Navigate**: Key is hashed, bits extracted to determine path +2. **Queue-based Insertion**: Uses FIFO queue to handle bucket overflows +3. **Bucket Overflow Handling**: When bucket exceeds max_bucket_size: + - All existing items moved to queue for reinsertion + - New child node created and linked + - Continues insertion process at deeper level +4. **Tree Rebalancing**: After insertion, `_reserialize_and_link` updates all affected nodes + +### Getting Values (`_get_pointer` in `hamt.py:618`) + +1. **Hash Traversal**: Follow hash-determined path through trie +2. **Bucket Search**: Check final bucket for key +3. **Pointer Resolution**: Return content-addressed pointer +4. **Value Retrieval**: Load actual value from CAS using pointer + +### Memory Management + +**Read-Write Mode**: +- Uses `InMemoryTreeStore` with UUID-based temporary IDs +- Modified nodes stay in memory until `make_read_only()` or `cache_vacate()` +- Flush algorithm uses DFS to preserve parent-child relationships + +**Read-Only Mode**: +- Switches to `ReadCacheStore` for better read performance +- Allows concurrent operations without locks +- Cache size can be monitored and manually vacated + +## Architecture Gotchas & Edge Cases + +### 1. Mode Switching Complexity +- **Problem**: HAMT can switch between read-only and read-write modes +- **Gotcha**: In read-write mode, `root_node_id` is invalid until `make_read_only()` +- **Why**: InMemoryTreeStore uses temporary UUIDs that aren't content-addressed +- **Solution**: Always call `make_read_only()` before reading `root_node_id` + +### 2. Thread Safety Limitations +- **Problem**: Only async-safe, not thread-safe in write mode +- **Gotcha**: Multiple threads writing simultaneously can corrupt state +- **Why**: Uses asyncio.Lock, not threading.Lock +- **Solution**: Use single event loop for all write operations + +### 3. Hash Function Constraints +- **Problem**: Hash must be multiple of 8 bits (bytes) +- **Gotcha**: Custom hash functions with odd bit lengths will fail +- **Why**: `extract_bits` assumes byte-aligned hash values +- **Solution**: Ensure hash functions return byte-aligned results + +### 4. Bucket Size Tuning +- **Problem**: max_bucket_size affects performance vs memory trade-offs +- **Gotcha**: Very small bucket sizes (1) force deep trees, large sizes create big nodes +- **Why**: Small buckets = more CAS operations, large buckets = bigger serialized nodes +- **Solution**: Test with your specific workload (default 4 is reasonable) + +### 5. Empty Node Pruning +- **Problem**: Deletions can leave empty nodes in tree +- **Gotcha**: Empty nodes are automatically pruned except root +- **Why**: Content addressing means identical empty nodes have same hash +- **Solution**: Pruning is automatic, but be aware root can become empty + +### 6. Serialization Edge Cases +- **Problem**: InMemoryTreeStore nodes contain non-serializable UUID links +- **Gotcha**: Attempting to serialize buffer nodes directly will fail +- **Why**: UUID4.int values exceed DAG-CBOR integer limits +- **Solution**: Always use `vacate()` or `make_read_only()` before accessing serialized form + +### 7. CAS ID Immutability Requirement +- **Problem**: CAS implementations must return immutable IDs +- **Gotcha**: Using mutable objects (lists, dicts) as IDs breaks assumptions +- **Why**: HAMT uses ID equality checks and as dictionary keys +- **Solution**: Ensure CAS returns immutable types (bytes, CID, int, str, etc.) + +### 8. Zarr Metadata Caching +- **Problem**: ZarrHAMTStore caches `zarr.json` files +- **Gotcha**: Cache isn't invalidated on writes, can become stale +- **Why**: Zarr frequently re-reads metadata, caching improves performance +- **Solution**: Cache is updated on writes, but be aware of this optimization + +### 9. Concurrent Operations in Read Mode +- **Problem**: Read-only mode allows concurrent access +- **Gotcha**: Switching to write mode while reads are happening is unsafe +- **Why**: Mode switch changes internal data structures +- **Solution**: Ensure all operations complete before mode switches + +### 10. Key Encoding Assumptions +- **Problem**: Keys are encoded as UTF-8 bytes for hashing +- **Gotcha**: Non-UTF-8 string keys or binary keys need special handling +- **Why**: `key.encode()` assumes UTF-8 encoding +- **Solution**: Ensure keys are valid UTF-8 strings or modify key handling + +## Performance Characteristics + +### Time Complexity +- **Get/Set/Delete**: O(log₂₅₆ n) average case, O(depth) worst case +- **Iteration**: O(n) for all keys/values +- **Tree depth**: Typically 1-4 levels for most datasets + +### Space Complexity +- **Node overhead**: ~2KB per node (256 × 8-byte pointers) +- **Memory efficiency**: Improves with higher bucket sizes +- **CAS efficiency**: Content addressing deduplicates identical subtrees + +### Concurrency +- **Read-only mode**: Full concurrency support +- **Write mode**: Single-writer, async-safe +- **Mode switching**: Blocking operation requiring exclusive access + +## Integration Patterns + +### 1. IPFS Integration +```python +kubo_cas = KuboCAS() +hamt = await HAMT.build(cas=kubo_cas) +await hamt.set("key", "value") +await hamt.make_read_only() +cid = hamt.root_node_id # IPFS CID +``` + +### 2. Zarr Storage +```python +hamt = await HAMT.build(cas=kubo_cas, values_are_bytes=True) +zarr_store = ZarrHAMTStore(hamt, read_only=False) +dataset.to_zarr(store=zarr_store, zarr_format=3) +``` + +### 3. Encrypted Storage +```python +encryption_key = get_random_bytes(32) +encrypted_store = SimpleEncryptedZarrHAMTStore( + hamt, read_only=False, encryption_key=encryption_key, header=b"app-name" +) +``` + +The py-hamt library provides a robust, efficient implementation of HAMTs for content-addressed storage with careful attention to memory management, concurrency, and integration patterns. Understanding these architectural details and gotchas is crucial for successful implementation in production systems. diff --git a/py_hamt/store.py b/py_hamt/store.py index a35efb5..b69c2c5 100644 --- a/py_hamt/store.py +++ b/py_hamt/store.py @@ -212,13 +212,22 @@ def __init__( # helper: get or create the session bound to the current running loop # # --------------------------------------------------------------------- # def _loop_session(self) -> aiohttp.ClientSession: + """Get or create a session for the current event loop with improved cleanup.""" loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() try: return self._session_per_loop[loop] except KeyError: + # Create a session with a connector that closes more quickly + connector = aiohttp.TCPConnector( + limit=64, + limit_per_host=32, + force_close=True, # Force close connections + enable_cleanup_closed=True, # Enable cleanup of closed connections + ) + sess = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=60), - connector=aiohttp.TCPConnector(limit=64, limit_per_host=32), + connector=connector, headers=self._default_headers, auth=self._default_auth, ) @@ -229,11 +238,56 @@ def _loop_session(self) -> aiohttp.ClientSession: # graceful shutdown: close **all** sessions we own # # --------------------------------------------------------------------- # async def aclose(self) -> None: - if self._owns_session: - for sess in list(self._session_per_loop.values()): - if not sess.closed: - await sess.close() + """ + Close all sessions we own, properly handling multi-loop scenarios. + - Sessions in the current loop are properly awaited and closed + - Sessions in other loops are removed from tracking (can't be safely closed from here) + - All references are cleaned up to allow garbage collection + """ + if not self._owns_session: + # User provided the session, they're responsible for closing it + return + + # Get the current event loop if one exists + current_loop = None + try: + current_loop = asyncio.get_running_loop() + except RuntimeError: + # No running event loop - just clear all references + # The sessions will be garbage collected eventually + self._session_per_loop.clear() + return + + # Separate sessions by whether they belong to the current loop + sessions_to_close = [] + loops_to_remove = [] + + for loop, sess in list(self._session_per_loop.items()): + if loop == current_loop: + # This session belongs to our current loop + if not sess.closed: + sessions_to_close.append(sess) + loops_to_remove.append(loop) + else: + # Session belongs to a different loop + # We can't safely close it from here, but we should remove our reference + loops_to_remove.append(loop) + + # Close all sessions that belong to the current loop + for sess in sessions_to_close: + try: + await sess.close() + except Exception: + # If closing fails, continue anyway + pass + + # Remove all references (both current loop and other loops) + for loop in loops_to_remove: + self._session_per_loop.pop(loop, None) + + # At this point, _session_per_loop should be empty or only contain + # sessions from loops we haven't seen (which shouldn't happen in practice) async def __aenter__(self) -> "KuboCAS": return self diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..75dc009 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,85 @@ +import asyncio + +import pytest + +from py_hamt import KuboCAS + + +@pytest.mark.asyncio +async def test_kubocas_cross_loop_close(): + """Test that KuboCAS handles closing when sessions exist in multiple loops.""" + + # Create a KuboCAS instance + cas = KuboCAS() + + # Create a session in the current loop + _ = cas._loop_session() # This creates a session for the current loop + + # Create a new event loop and session in a different thread + other_loop_session = [] + + def create_session_in_other_loop(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def _create(): + # Force creation of a session in this loop + sess = cas._loop_session() + other_loop_session.append((loop, sess)) + + loop.run_until_complete(_create()) + # Don't close the loop yet - leave it pending + + import threading + + thread = threading.Thread(target=create_session_in_other_loop) + thread.start() + thread.join() + + # Now we have sessions in two different loops + assert len(cas._session_per_loop) == 2 + + # This should not raise an error + try: + await cas.aclose() + except RuntimeError as e: + if "attached to a different loop" in str(e): + pytest.fail(f"Cross-loop close error: {e}") + else: + raise + + # Verify current loop's session was closed + current_loop = asyncio.get_running_loop() + assert ( + current_loop not in cas._session_per_loop + or cas._session_per_loop[current_loop].closed + ) + + # The other loop's session should be removed from tracking + assert len(cas._session_per_loop) == 0 or all( + loop == current_loop for loop in cas._session_per_loop + ) + + +@pytest.mark.asyncio +async def test_kubocas_context_manager_with_fresh_loop(): + """Test the exact scenario from the user's script.""" + # This simulates what asyncio.run() does - creates a fresh loop + + async def use_kubocas(): + async with KuboCAS() as cas: + # Simulate some work that creates a session + _ = cas._loop_session() + # The __aexit__ should handle cleanup properly + + # This should complete without errors + # Using asyncio.run() creates a fresh event loop + try: + # We're already in an event loop from pytest, so we need to be careful + # In real usage, this would be: asyncio.run(use_kubocas()) + await use_kubocas() + except RuntimeError as e: + if "attached to a different loop" in str(e): + pytest.fail(f"Context manager close error: {e}") + else: + raise From d596e8f0f6b77f8f01e3bc3e7be033fe2c91bb97 Mon Sep 17 00:00:00 2001 From: Faolain Date: Mon, 9 Jun 2025 20:13:05 -0400 Subject: [PATCH 2/4] tests: add missing test line coverage --- tests/test_async.py | 83 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/tests/test_async.py b/tests/test_async.py index 75dc009..0c4a681 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -1,4 +1,6 @@ +# tests/test_async.py import asyncio +import threading import pytest @@ -16,7 +18,8 @@ async def test_kubocas_cross_loop_close(): _ = cas._loop_session() # This creates a session for the current loop # Create a new event loop and session in a different thread - other_loop_session = [] + other_loop_container = [] + other_session_container = [] def create_session_in_other_loop(): loop = asyncio.new_event_loop() @@ -25,12 +28,12 @@ def create_session_in_other_loop(): async def _create(): # Force creation of a session in this loop sess = cas._loop_session() - other_loop_session.append((loop, sess)) + other_session_container.append(sess) + other_loop_container.append(loop) loop.run_until_complete(_create()) - # Don't close the loop yet - leave it pending - - import threading + # Important: Don't close the loop yet - we want to test the multi-loop scenario + # But we do need to clean it up later thread = threading.Thread(target=create_session_in_other_loop) thread.start() @@ -48,23 +51,33 @@ async def _create(): else: raise - # Verify current loop's session was closed - current_loop = asyncio.get_running_loop() - assert ( - current_loop not in cas._session_per_loop - or cas._session_per_loop[current_loop].closed - ) - - # The other loop's session should be removed from tracking + # After aclose, the dictionary should be empty or only contain closed sessions + # from the current loop (the implementation may choose either approach) assert len(cas._session_per_loop) == 0 or all( - loop == current_loop for loop in cas._session_per_loop + loop == asyncio.get_running_loop() for loop in cas._session_per_loop ) + # Clean up the other event loop to prevent warnings + if other_loop_container: + other_loop = other_loop_container[0] + if other_session_container and not other_session_container[0].closed: + # Schedule the session close in its own loop + def cleanup(): + async def _close(): + if not other_session_container[0].closed: + await other_session_container[0].close() + + other_loop.run_until_complete(_close()) + other_loop.close() + + cleanup_thread = threading.Thread(target=cleanup) + cleanup_thread.start() + cleanup_thread.join() + @pytest.mark.asyncio async def test_kubocas_context_manager_with_fresh_loop(): """Test the exact scenario from the user's script.""" - # This simulates what asyncio.run() does - creates a fresh loop async def use_kubocas(): async with KuboCAS() as cas: @@ -73,13 +86,47 @@ async def use_kubocas(): # The __aexit__ should handle cleanup properly # This should complete without errors - # Using asyncio.run() creates a fresh event loop try: - # We're already in an event loop from pytest, so we need to be careful - # In real usage, this would be: asyncio.run(use_kubocas()) await use_kubocas() except RuntimeError as e: if "attached to a different loop" in str(e): pytest.fail(f"Context manager close error: {e}") else: raise + + +@pytest.mark.asyncio +async def test_kubocas_no_running_loop_in_aclose(): + """Test aclose behavior when called without a running event loop.""" + cas = KuboCAS() + + # Create a session in the current loop + _ = cas._loop_session() + + # Simulate calling aclose when there's no event loop + # We'll mock this by calling the method directly + import unittest.mock + + with unittest.mock.patch( + "asyncio.get_running_loop", side_effect=RuntimeError("No running loop") + ): + await cas.aclose() + + # The session references should be cleared + assert len(cas._session_per_loop) == 0 + + +@pytest.mark.asyncio +async def test_kubocas_session_already_closed(): + """Test aclose behavior when session is already closed.""" + cas = KuboCAS() + + # Create and immediately close a session + sess = cas._loop_session() + await sess.close() + + # aclose should handle this gracefully + await cas.aclose() + + # Verify cleanup + assert len(cas._session_per_loop) == 0 From 0128b0fd87d9d94acba1b36dc8e97a4b80114cb9 Mon Sep 17 00:00:00 2001 From: Faolain Date: Tue, 10 Jun 2025 00:33:21 -0400 Subject: [PATCH 3/4] test: more async tests --- tests/test_async.py | 118 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/tests/test_async.py b/tests/test_async.py index 0c4a681..4aca804 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -130,3 +130,121 @@ async def test_kubocas_session_already_closed(): # Verify cleanup assert len(cas._session_per_loop) == 0 + + +# tests/test_kubocas_session.py (add this test to the existing file) + + +@pytest.mark.asyncio +async def test_aclose_handles_session_close_failure(): + """Test that aclose() gracefully handles exceptions when closing sessions.""" + + kubo = KuboCAS( + rpc_base_url="http://127.0.0.1:5001", gateway_base_url="http://127.0.0.1:8080" + ) + + # Create a session in the current loop + sess = kubo._loop_session() + + # Mock the session's close method to raise an exception + original_close = sess.close + + async def failing_close(): + raise RuntimeError("Simulated close failure") + + sess.close = failing_close + + # aclose should handle the exception gracefully without propagating it + try: + await kubo.aclose() + except Exception as e: + pytest.fail( + f"aclose() should not propagate session close exceptions, but got: {e}" + ) + + # Verify that the session was removed from tracking despite the close failure + assert len(kubo._session_per_loop) == 0 + + # Clean up - restore original close method and close manually if needed + sess.close = original_close + if not sess.closed: + await sess.close() + + +@pytest.mark.asyncio +async def test_aclose_handles_multiple_close_failures(): + """Test that aclose() handles exceptions when closing sessions multiple times.""" + + kubo = KuboCAS( + rpc_base_url="http://127.0.0.1:5001", gateway_base_url="http://127.0.0.1:8080" + ) + + # Create a session + sess1 = kubo._loop_session() + current_loop = asyncio.get_running_loop() + + # Mock session to raise on close + async def failing_close(): + raise ValueError("Simulated close failure") + + original_close = sess1.close + sess1.close = failing_close + + # Pretend there's another session (same object, different loop simulated) + kubo._session_per_loop[current_loop] = sess1 + + # Run aclose, expecting it to suppress errors + try: + await kubo.aclose() + except Exception as e: + pytest.fail( + f"aclose() should not propagate session close exceptions, but got: {e}" + ) + + # Ensure sessions are cleared + assert len(kubo._session_per_loop) == 0 + + # Restore original close + sess1.close = original_close + if not sess1.closed: + await sess1.close() + + +@pytest.mark.asyncio +async def test_aclose_handles_multiple_session_close_failures(): + """Test that aclose() handles exceptions when closing multiple sessions.""" + + kubo = KuboCAS( + rpc_base_url="http://127.0.0.1:5001", gateway_base_url="http://127.0.0.1:8080" + ) + + # Create a session + sess1 = kubo._loop_session() + current_loop = asyncio.get_running_loop() + + # Mock session to raise on close + async def failing_close(): + raise ValueError("Simulated close failure") + + original_close = sess1.close + sess1.close = failing_close + + # Pretend there's another session (same object, different loop simulated) + kubo._session_per_loop[current_loop] = sess1 + kubo._session_per_loop[object()] = sess1 # Fake a second loop + + # Run aclose, expecting it to suppress errors + try: + await kubo.aclose() + except Exception as e: + pytest.fail( + f"aclose() should not propagate session close exceptions, but got: {e}" + ) + + # Ensure sessions are cleared + assert len(kubo._session_per_loop) == 0 + + # Restore original close + sess1.close = original_close + if not sess1.closed: + await sess1.close() From 37bcc87199bf52895a6898110d31b2b9ad208f01 Mon Sep 17 00:00:00 2001 From: Faolain Date: Tue, 10 Jun 2025 00:54:14 -0400 Subject: [PATCH 4/4] Fix session cleanup for multiloop usage --- py_hamt/store.py | 53 +++++++---------------------------- tests/test_kubocas_session.py | 3 +- 2 files changed, 11 insertions(+), 45 deletions(-) diff --git a/py_hamt/store.py b/py_hamt/store.py index b69c2c5..e0838b9 100644 --- a/py_hamt/store.py +++ b/py_hamt/store.py @@ -238,53 +238,20 @@ def _loop_session(self) -> aiohttp.ClientSession: # graceful shutdown: close **all** sessions we own # # --------------------------------------------------------------------- # async def aclose(self) -> None: - """ - Close all sessions we own, properly handling multi-loop scenarios. - - - Sessions in the current loop are properly awaited and closed - - Sessions in other loops are removed from tracking (can't be safely closed from here) - - All references are cleaned up to allow garbage collection - """ + """Close all internally-created sessions.""" if not self._owns_session: - # User provided the session, they're responsible for closing it + # User supplied the session; they are responsible for closing it. return - # Get the current event loop if one exists - current_loop = None - try: - current_loop = asyncio.get_running_loop() - except RuntimeError: - # No running event loop - just clear all references - # The sessions will be garbage collected eventually - self._session_per_loop.clear() - return + for sess in list(self._session_per_loop.values()): + if not sess.closed: + try: + await sess.close() + except Exception: + # Best-effort cleanup; ignore errors during shutdown + pass - # Separate sessions by whether they belong to the current loop - sessions_to_close = [] - loops_to_remove = [] - - for loop, sess in list(self._session_per_loop.items()): - if loop == current_loop: - # This session belongs to our current loop - if not sess.closed: - sessions_to_close.append(sess) - loops_to_remove.append(loop) - else: - # Session belongs to a different loop - # We can't safely close it from here, but we should remove our reference - loops_to_remove.append(loop) - - # Close all sessions that belong to the current loop - for sess in sessions_to_close: - try: - await sess.close() - except Exception: - # If closing fails, continue anyway - pass - - # Remove all references (both current loop and other loops) - for loop in loops_to_remove: - self._session_per_loop.pop(loop, None) + self._session_per_loop.clear() # At this point, _session_per_loop should be empty or only contain # sessions from loops we haven't seen (which shouldn't happen in practice) diff --git a/tests/test_kubocas_session.py b/tests/test_kubocas_session.py index 50b7006..eeb102a 100644 --- a/tests/test_kubocas_session.py +++ b/tests/test_kubocas_session.py @@ -98,5 +98,4 @@ async def test_distinct_loops_get_distinct_sessions(): # Clean‑up await kubo.aclose() - if not secondary_session.closed: - await asyncio.to_thread(secondary_session.close) + assert secondary_session.closed