Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,22 +1231,21 @@ async def reboot(
(self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)

async def safe_dispatch(listener: Any, data: Any) -> None:
try:
await listener(data)
except Exception:
self.log.exception('A pre-reboot event listener failed')

timeout = event_listeners_timeout.total_seconds() if event_listeners_timeout else None
try:
results = await asyncio.wait_for(
asyncio.gather(
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
*[listener(EventMigratingData()) for listener in migrating_listeners],
return_exceptions=True,
),
timeout=event_listeners_timeout.total_seconds() if event_listeners_timeout else None,
)
async with asyncio.timeout(timeout), asyncio.TaskGroup() as tg:
for listener in persist_state_listeners:
tg.create_task(safe_dispatch(listener, EventPersistStateData(is_migrating=True)))
for listener in migrating_listeners:
tg.create_task(safe_dispatch(listener, EventMigratingData()))
except TimeoutError:
self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot')
results = []

for result in results:
if isinstance(result, Exception):
self.log.exception('A pre-reboot event listener failed', exc_info=result)

if not self.configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')
Expand Down Expand Up @@ -1419,9 +1418,16 @@ async def use_state(
return await kvs.get_auto_saved_value(key or self._ACTOR_STATE_KEY, default_value)

async def _save_actor_state(self) -> None:
for kvs_name in self._use_state_stores:
store = await self.open_key_value_store(name=kvs_name)
await store.persist_autosaved_values()
async def safe_persist(kvs_name: str | None) -> None:
try:
store = await self.open_key_value_store(name=kvs_name)
await store.persist_autosaved_values()
except Exception:
self.log.exception('Failed to persist auto-saved values', extra={'kvs_name': kvs_name})

async with asyncio.TaskGroup() as tg:
for kvs_name in self._use_state_stores:
tg.create_task(safe_persist(kvs_name))

def _get_default_exit_process(self) -> bool:
"""Return False for IPython and Scrapy environments, True otherwise."""
Expand Down
9 changes: 6 additions & 3 deletions src/apify/request_loaders/_apify_request_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ async def _fetch_requests_from_url(
are extracted from the response body and turned into `Request` objects, inheriting `method`, `payload`,
`headers`, and `user_data` from the source entry.
"""
tasks = [cls._process_remote_url(request_input, http_client) for request_input in remote_url_requests_inputs]
results = await asyncio.gather(*tasks)
return list(chain.from_iterable(results))
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(cls._process_remote_url(request_input, http_client))
for request_input in remote_url_requests_inputs
]
return list(chain.from_iterable(task.result() for task in tasks))

@staticmethod
def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> list[Request]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ async def purge(self) -> None:
files_to_keep = {self._input_key_filename, f'{self._input_key_filename}.{METADATA_FILENAME}'}
files_to_keep.add(METADATA_FILENAME)

for file_path in self.path_to_kvs.glob('*'):
if file_path.name in files_to_keep:
continue
if file_path.is_file():
await asyncio.to_thread(file_path.unlink, missing_ok=True)
async with asyncio.TaskGroup() as tg:
for file_path in self.path_to_kvs.glob('*'):
if file_path.name in files_to_keep:
continue
if file_path.is_file():
tg.create_task(asyncio.to_thread(file_path.unlink, missing_ok=True))

await self._update_metadata(
update_accessed_at=True,
Expand Down
Loading