diff --git a/aws_advanced_python_wrapper/utils/sliding_expiration_cache.py b/aws_advanced_python_wrapper/utils/sliding_expiration_cache.py index 4e5e97f0..e4bbe8da 100644 --- a/aws_advanced_python_wrapper/utils/sliding_expiration_cache.py +++ b/aws_advanced_python_wrapper/utils/sliding_expiration_cache.py @@ -14,7 +14,7 @@ from __future__ import annotations -from concurrent.futures import Executor, ThreadPoolExecutor +from threading import Thread from time import perf_counter_ns, sleep from typing import Callable, Generic, ItemsView, KeysView, Optional, TypeVar @@ -119,28 +119,26 @@ def __init__( should_dispose_func: Optional[Callable] = None, item_disposal_func: Optional[Callable] = None): super().__init__(cleanup_interval_ns, should_dispose_func, item_disposal_func) - self._executor: Executor = ThreadPoolExecutor(thread_name_prefix="SlidingExpirationCacheWithCleanupThreadExecutor") - self.init_cleanup_thread() - - def init_cleanup_thread(self) -> None: - self._executor.submit(self._cleanup_thread_internal) + self._cleanup_thread = Thread(target=self._cleanup_thread_internal, daemon=True) + self._cleanup_thread.start() def _cleanup_thread_internal(self): - logger.debug("SlidingExpirationCache.CleaningUp") - current_time = perf_counter_ns() - sleep(self._cleanup_interval_ns / 1_000_000_000) - self._cleanup_time_ns.set(current_time + self._cleanup_interval_ns) - keys = [key for key, _ in self._cdict.items()] - for key in keys: + while True: try: - self._remove_if_expired(key) + sleep(self._cleanup_interval_ns / 1_000_000_000) + logger.debug("SlidingExpirationCache.CleaningUp") + self._cleanup_time_ns.set(perf_counter_ns() + self._cleanup_interval_ns) + keys = [key for key, _ in self._cdict.items()] + for key in keys: + try: + self._remove_if_expired(key) + except Exception: + pass # ignore except Exception: - pass # ignore - - self._executor.shutdown() + break def _cleanup(self): - pass # do nothing, cleanup thread does the job + pass # cleanup thread handles this class CacheItem(Generic[V]): diff --git a/tests/unit/test_sliding_expiration_cache.py b/tests/unit/test_sliding_expiration_cache.py index 2a8ab102..4029faf8 100644 --- a/tests/unit/test_sliding_expiration_cache.py +++ b/tests/unit/test_sliding_expiration_cache.py @@ -14,8 +14,8 @@ import time -from aws_advanced_python_wrapper.utils.sliding_expiration_cache import \ - SlidingExpirationCache +from aws_advanced_python_wrapper.utils.sliding_expiration_cache import ( + SlidingExpirationCache, SlidingExpirationCacheWithCleanupThread) def test_compute_if_absent(): @@ -89,6 +89,34 @@ def test_clear(): assert item2.disposed is True +def test_cleanup_thread_continuous_removal(): + # Use very short cleanup interval for testing (100ms) + cache = SlidingExpirationCacheWithCleanupThread( + cleanup_interval_ns=100_000_000, # 100ms + item_disposal_func=lambda item: item.dispose() + ) + + # First cycle: insert item that expires quickly + item1 = DisposableItem(True) + cache.compute_if_absent("key1", lambda _: item1, 50_000_000) # 50ms expiration + assert cache.get("key1") == item1 + + # Wait for cleanup thread to remove expired item + time.sleep(0.2) # Wait 200ms for cleanup + assert cache.get("key1") is None + assert item1.disposed is True + + # Second cycle: insert another item that expires quickly + item2 = DisposableItem(True) + cache.compute_if_absent("key2", lambda _: item2, 50_000_000) # 50ms expiration + assert cache.get("key2") == item2 + + # Wait for cleanup thread to remove second expired item + time.sleep(0.2) # Wait 200ms for cleanup + assert cache.get("key2") is None + assert item2.disposed is True + + class DisposableItem: def __init__(self, should_dispose): self.should_dispose = should_dispose