diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 69882386..e52483e2 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -1231,22 +1231,21 @@ async def reboot( (self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 ) + async def safe_dispatch(listener: Any, data: Any) -> None: + try: + await listener(data) + except Exception: + self.log.exception('A pre-reboot event listener failed') + + timeout = event_listeners_timeout.total_seconds() if event_listeners_timeout else None try: - results = await asyncio.wait_for( - asyncio.gather( - *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], - *[listener(EventMigratingData()) for listener in migrating_listeners], - return_exceptions=True, - ), - timeout=event_listeners_timeout.total_seconds() if event_listeners_timeout else None, - ) + async with asyncio.timeout(timeout), asyncio.TaskGroup() as tg: + for listener in persist_state_listeners: + tg.create_task(safe_dispatch(listener, EventPersistStateData(is_migrating=True))) + for listener in migrating_listeners: + tg.create_task(safe_dispatch(listener, EventMigratingData())) except TimeoutError: self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot') - results = [] - - for result in results: - if isinstance(result, Exception): - self.log.exception('A pre-reboot event listener failed', exc_info=result) if not self.configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') @@ -1419,9 +1418,16 @@ async def use_state( return await kvs.get_auto_saved_value(key or self._ACTOR_STATE_KEY, default_value) async def _save_actor_state(self) -> None: - for kvs_name in self._use_state_stores: - store = await self.open_key_value_store(name=kvs_name) - await store.persist_autosaved_values() + async def safe_persist(kvs_name: str | None) -> None: + try: + store = await self.open_key_value_store(name=kvs_name) + await store.persist_autosaved_values() + except Exception: + self.log.exception('Failed to persist auto-saved values', extra={'kvs_name': kvs_name}) + + async with asyncio.TaskGroup() as tg: + for kvs_name in self._use_state_stores: + tg.create_task(safe_persist(kvs_name)) def _get_default_exit_process(self) -> bool: """Return False for IPython and Scrapy environments, True otherwise.""" diff --git a/src/apify/request_loaders/_apify_request_list.py b/src/apify/request_loaders/_apify_request_list.py index a7d13bd2..bbff7c7d 100644 --- a/src/apify/request_loaders/_apify_request_list.py +++ b/src/apify/request_loaders/_apify_request_list.py @@ -117,9 +117,12 @@ async def _fetch_requests_from_url( are extracted from the response body and turned into `Request` objects, inheriting `method`, `payload`, `headers`, and `user_data` from the source entry. """ - tasks = [cls._process_remote_url(request_input, http_client) for request_input in remote_url_requests_inputs] - results = await asyncio.gather(*tasks) - return list(chain.from_iterable(results)) + async with asyncio.TaskGroup() as tg: + tasks = [ + tg.create_task(cls._process_remote_url(request_input, http_client)) + for request_input in remote_url_requests_inputs + ] + return list(chain.from_iterable(task.result() for task in tasks)) @staticmethod def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> list[Request]: diff --git a/src/apify/storage_clients/_file_system/_key_value_store_client.py b/src/apify/storage_clients/_file_system/_key_value_store_client.py index 8460dfcc..70073e69 100644 --- a/src/apify/storage_clients/_file_system/_key_value_store_client.py +++ b/src/apify/storage_clients/_file_system/_key_value_store_client.py @@ -75,11 +75,12 @@ async def purge(self) -> None: files_to_keep = {self._input_key_filename, f'{self._input_key_filename}.{METADATA_FILENAME}'} files_to_keep.add(METADATA_FILENAME) - for file_path in self.path_to_kvs.glob('*'): - if file_path.name in files_to_keep: - continue - if file_path.is_file(): - await asyncio.to_thread(file_path.unlink, missing_ok=True) + async with asyncio.TaskGroup() as tg: + for file_path in self.path_to_kvs.glob('*'): + if file_path.name in files_to_keep: + continue + if file_path.is_file(): + tg.create_task(asyncio.to_thread(file_path.unlink, missing_ok=True)) await self._update_metadata( update_accessed_at=True,