From e677b1367c1820f0c2a5b6c7ab234f8752be54cd Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 26 May 2026 15:55:32 +0200 Subject: [PATCH 1/3] refactor: adopt asyncio.TaskGroup for structured concurrency Switch from asyncio.gather to asyncio.TaskGroup in two places enabled by the Python 3.11+ baseline: - ApifyRequestList._fetch_requests_from_url: structured task scope so any fetch failure cancels siblings cleanly and aggregates errors in an ExceptionGroup instead of swallowing all but the first. - Actor.reboot pre-reboot dispatch: combine asyncio.timeout + TaskGroup, with each listener wrapped to preserve the existing best-effort semantics (one failing listener does not abort the others). Closes #765. --- src/apify/_actor.py | 25 +++++++++---------- .../request_loaders/_apify_request_list.py | 9 ++++--- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 69882386..b8256d7b 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -1231,22 +1231,21 @@ async def reboot( (self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 ) + async def safe_dispatch(listener: Any, data: Any) -> None: + try: + await listener(data) + except Exception: + self.log.exception('A pre-reboot event listener failed') + + timeout = event_listeners_timeout.total_seconds() if event_listeners_timeout else None try: - results = await asyncio.wait_for( - asyncio.gather( - *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], - *[listener(EventMigratingData()) for listener in migrating_listeners], - return_exceptions=True, - ), - timeout=event_listeners_timeout.total_seconds() if event_listeners_timeout else None, - ) + async with asyncio.timeout(timeout), asyncio.TaskGroup() as tg: + for listener in persist_state_listeners: + tg.create_task(safe_dispatch(listener, EventPersistStateData(is_migrating=True))) + for listener in migrating_listeners: + tg.create_task(safe_dispatch(listener, EventMigratingData())) except TimeoutError: self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot') - results = [] - - for result in results: - if isinstance(result, Exception): - self.log.exception('A pre-reboot event listener failed', exc_info=result) if not self.configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') diff --git a/src/apify/request_loaders/_apify_request_list.py b/src/apify/request_loaders/_apify_request_list.py index a7d13bd2..bbff7c7d 100644 --- a/src/apify/request_loaders/_apify_request_list.py +++ b/src/apify/request_loaders/_apify_request_list.py @@ -117,9 +117,12 @@ async def _fetch_requests_from_url( are extracted from the response body and turned into `Request` objects, inheriting `method`, `payload`, `headers`, and `user_data` from the source entry. """ - tasks = [cls._process_remote_url(request_input, http_client) for request_input in remote_url_requests_inputs] - results = await asyncio.gather(*tasks) - return list(chain.from_iterable(results)) + async with asyncio.TaskGroup() as tg: + tasks = [ + tg.create_task(cls._process_remote_url(request_input, http_client)) + for request_input in remote_url_requests_inputs + ] + return list(chain.from_iterable(task.result() for task in tasks)) @staticmethod def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> list[Request]: From 201603ef6affc3fd3ab35905440446e670b16d71 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 26 May 2026 16:08:00 +0200 Subject: [PATCH 2/3] refactor: persist Actor state across key-value stores in parallel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `_save_actor_state` previously iterated `_use_state_stores` and awaited each `persist_autosaved_values` call sequentially. The persists are independent — fan them out with `asyncio.TaskGroup` so cleanup latency is bounded by the slowest store instead of summed across stores. A per-task try/except keeps the cleanup best-effort: one failing store no longer prevents siblings from persisting. --- src/apify/_actor.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index b8256d7b..e52483e2 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -1418,9 +1418,16 @@ async def use_state( return await kvs.get_auto_saved_value(key or self._ACTOR_STATE_KEY, default_value) async def _save_actor_state(self) -> None: - for kvs_name in self._use_state_stores: - store = await self.open_key_value_store(name=kvs_name) - await store.persist_autosaved_values() + async def safe_persist(kvs_name: str | None) -> None: + try: + store = await self.open_key_value_store(name=kvs_name) + await store.persist_autosaved_values() + except Exception: + self.log.exception('Failed to persist auto-saved values', extra={'kvs_name': kvs_name}) + + async with asyncio.TaskGroup() as tg: + for kvs_name in self._use_state_stores: + tg.create_task(safe_persist(kvs_name)) def _get_default_exit_process(self) -> bool: """Return False for IPython and Scrapy environments, True otherwise.""" From 649272ee32c961e724fdf5d51e850a0ac52a58e1 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 26 May 2026 16:09:04 +0200 Subject: [PATCH 3/3] refactor: purge file-system key-value store entries in parallel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ApifyFileSystemKeyValueStoreClient.purge` previously awaited each `unlink` sequentially. The `asyncio.to_thread` calls are independent — fan them out with `asyncio.TaskGroup` so purge latency scales with the slowest deletion instead of the total file count. --- .../_file_system/_key_value_store_client.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index 8460dfcc..70073e69 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -75,11 +75,12 @@ async def purge(self) -> None: files_to_keep = {self._input_key_filename, f'{self._input_key_filename}.{METADATA_FILENAME}'} files_to_keep.add(METADATA_FILENAME) - for file_path in self.path_to_kvs.glob('*'): - if file_path.name in files_to_keep: - continue - if file_path.is_file(): - await asyncio.to_thread(file_path.unlink, missing_ok=True) + async with asyncio.TaskGroup() as tg: + for file_path in self.path_to_kvs.glob('*'): + if file_path.name in files_to_keep: + continue + if file_path.is_file(): + tg.create_task(asyncio.to_thread(file_path.unlink, missing_ok=True)) await self._update_metadata( update_accessed_at=True,