Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/crawlee/_utils/recurring_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
if TYPE_CHECKING:
from collections.abc import Callable
from datetime import timedelta
from types import TracebackType

from typing_extensions import Self

logger = getLogger(__name__)

Expand All @@ -26,6 +29,18 @@ def __init__(self, func: Callable, delay: timedelta) -> None:
self.delay = delay
self.task: asyncio.Task | None = None

async def __aenter__(self) -> Self:
self.start()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
await self.stop()

async def _wrapper(self) -> None:
"""Continuously execute the provided function with the specified delay.

Expand Down
10 changes: 6 additions & 4 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
SessionError,
UserDefinedErrorHandlerError,
)
from crawlee.events._types import Event, EventCrawlerStatusData
from crawlee.events._types import Event, EventCrawlerStatusData, EventPersistStateData
from crawlee.http_clients import ImpitHttpClient
from crawlee.router import Router
from crawlee.sessions import SessionPool
Expand Down Expand Up @@ -440,6 +440,7 @@ def __init__(
self._statistics = statistics or cast(
'Statistics[TStatisticsState]',
Statistics.with_default_state(
persistence_enabled=True,
periodic_message_logger=self._logger,
statistics_log_format=self._statistics_log_format,
log_message='Current request statistics:',
Expand Down Expand Up @@ -689,7 +690,6 @@ def sigint_handler() -> None:
except CancelledError:
pass
finally:
await self._crawler_state_rec_task.stop()
if threading.current_thread() is threading.main_thread():
with suppress(NotImplementedError):
asyncio.get_running_loop().remove_signal_handler(signal.SIGINT)
Expand Down Expand Up @@ -721,8 +721,6 @@ def sigint_handler() -> None:
async def _run_crawler(self) -> None:
event_manager = self._service_locator.get_event_manager()

self._crawler_state_rec_task.start()

# Collect the context managers to be entered. Context managers that are already active are excluded,
# as they were likely entered by the caller, who will also be responsible for exiting them.
contexts_to_enter = [
Expand All @@ -733,6 +731,7 @@ async def _run_crawler(self) -> None:
self._statistics,
self._session_pool if self._use_session_pool else None,
self._http_client,
self._crawler_state_rec_task,
*self._additional_context_managers,
)
if cm and getattr(cm, 'active', False) is False
Expand All @@ -744,6 +743,9 @@ async def _run_crawler(self) -> None:

await self._autoscaled_pool.run()

# Emit PERSIST_STATE event when crawler is finishing to allow listeners to persist their state if needed
event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=False))

async def add_requests(
self,
requests: Sequence[str | Request],
Expand Down
12 changes: 7 additions & 5 deletions src/crawlee/statistics/_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(

self._state = RecoverableState(
default_state=state_model(stats_id=self._id),
persist_state_key=persist_state_key or f'SDK_CRAWLER_STATISTICS_{self._id}',
persist_state_key=persist_state_key or f'__CRAWLER_STATISTICS_{self._id}',
persistence_enabled=persistence_enabled,
persist_state_kvs_name=persist_state_kvs_name,
persist_state_kvs_factory=persist_state_kvs_factory,
Expand Down Expand Up @@ -187,7 +187,10 @@ async def __aexit__(
if not self._active:
raise RuntimeError(f'The {self.__class__.__name__} is not active.')

self._state.current_value.crawler_finished_at = datetime.now(timezone.utc)
if not self.state.crawler_last_started_at:
raise RuntimeError('Statistics.state.crawler_last_started_at not set.')
self.state.crawler_finished_at = datetime.now(timezone.utc)
self.state.crawler_runtime += self.state.crawler_finished_at - self.state.crawler_last_started_at

await self._state.teardown()

Expand Down Expand Up @@ -255,8 +258,7 @@ def calculate(self) -> FinalStatistics:
if self._instance_start is None:
raise RuntimeError('The Statistics object is not initialized')

crawler_runtime = datetime.now(timezone.utc) - self._instance_start
total_minutes = crawler_runtime.total_seconds() / 60
total_minutes = self.state.crawler_runtime.total_seconds() / 60
state = self._state.current_value
serialized_state = state.model_dump(by_alias=False)

Expand All @@ -267,7 +269,7 @@ def calculate(self) -> FinalStatistics:
requests_failed_per_minute=math.floor(state.requests_failed / total_minutes) if total_minutes else 0,
request_total_duration=state.request_total_finished_duration + state.request_total_failed_duration,
requests_total=state.requests_failed + state.requests_finished,
crawler_runtime=crawler_runtime,
crawler_runtime=state.crawler_runtime,
requests_finished=state.requests_finished,
requests_failed=state.requests_failed,
retry_histogram=serialized_state['request_retry_histogram'],
Expand Down
58 changes: 58 additions & 0 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import asyncio
import concurrent
import json
import logging
import os
Expand Down Expand Up @@ -1643,3 +1644,60 @@ async def handler(context: BasicCrawlingContext) -> None:

# Crawler should not fall back to the default storage after the purge
assert await unrelated_rq.fetch_next_request() == unrelated_request


async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
"""Run crawler and return its statistics state.

Must be defined like this to be pickable for ProcessPoolExecutor."""
service_locator.set_configuration(
Configuration(
crawlee_storage_dir=storage_dir, # type: ignore[call-arg]
purge_on_start=False,
)
)

async def request_handler(context: BasicCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')

crawler = BasicCrawler(
request_handler=request_handler,
concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1),
)

await crawler.run(requests)
return crawler.statistics.state


def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))


async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
"""Test that crawler statistics persist and are loaded correctly.

This test simulates starting the crawler process twice, and checks that the statistics include first run."""

with concurrent.futures.ProcessPoolExecutor() as executor:
# Crawl 2 requests in the first run and automatically persist the state.
first_run_state = executor.submit(
_process_run_crawler,
requests=['https://a.placeholder.com', 'https://b.placeholder.com'],
storage_dir=str(tmp_path),
).result()
assert first_run_state.requests_finished == 2

# Do not reuse the executor to simulate a fresh process to avoid modified class attributes.
with concurrent.futures.ProcessPoolExecutor() as executor:
# Crawl 1 additional requests in the second run, but use previously automatically persisted state.
second_run_state = executor.submit(
_process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path)
).result()
assert second_run_state.requests_finished == 3

assert first_run_state.crawler_started_at == second_run_state.crawler_started_at
assert first_run_state.crawler_finished_at
assert second_run_state.crawler_finished_at

assert first_run_state.crawler_finished_at < second_run_state.crawler_finished_at
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime
8 changes: 4 additions & 4 deletions tests/unit/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ async def test_storage_not_persisted_when_disabled(tmp_path: Path, server_url: U
)
storage_client = MemoryStorageClient()

crawler = HttpCrawler(
configuration=configuration,
storage_client=storage_client,
)
service_locator.set_configuration(configuration)
service_locator.set_storage_client(storage_client)

crawler = HttpCrawler()
Comment on lines -44 to +47
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because RecoverableState of statistics persists to/recovers from global storage_client. And since statistics is persisted by default now, it will try to persist to default global service_client, which is FileSystem... regardless of the crawler-specific storage_client

Mentioned here:
#1438 (comment)

I am open to discussion about this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we use the storage client passed to the crawler?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but do we want to? I had an inconclusive discussion about this with @janbuchar
I am still not sure about this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I'm kinda disappointed with the amount of edge cases that arose from having a separate service locator for crawlers.

From a "common sense" perspective, the RecoverableState is owned by the crawler and it doesn't make much sense to put the serialized state in a different storage (the global one). Then again, there's a good chance that the crawler-wide storage client will be a memory storage, which is not a great fit for RecoverableState.

But, unless I'm missing something, it should be super rare that somebody will do this intentionally. In my opinion, we should pick one of these options and just show a warning if both the global and crawler-specific storage client are configured.


@crawler.router.default_handler
async def default_handler(context: HttpCrawlingContext) -> None:
Expand Down