diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 1928f0ad..88b65542 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -12,8 +12,8 @@ from crawlee.storages import RequestQueue from ._models import ApifyRequestQueueMetadata, RequestQueueStats -from ._request_queue_shared_client import _ApifyRequestQueueSharedClient -from ._request_queue_single_client import _ApifyRequestQueueSingleClient +from ._request_queue_shared_client import ApifyRequestQueueSharedClient +from ._request_queue_single_client import ApifyRequestQueueSingleClient from ._utils import AliasResolver if TYPE_CHECKING: @@ -47,14 +47,14 @@ def __init__( self._api_client = api_client """The Apify request queue client for API operations.""" - self._implementation: _ApifyRequestQueueSingleClient | _ApifyRequestQueueSharedClient + self._implementation: ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient """Internal implementation used to communicate with the Apify platform based Request Queue.""" if access == 'single': - self._implementation = _ApifyRequestQueueSingleClient( + self._implementation = ApifyRequestQueueSingleClient( api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS ) elif access == 'shared': - self._implementation = _ApifyRequestQueueSharedClient( + self._implementation = ApifyRequestQueueSharedClient( api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS, diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 65ad8daa..e4e4ed68 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -23,7 +23,7 @@ logger = getLogger(__name__) -class _ApifyRequestQueueSharedClient: +class ApifyRequestQueueSharedClient: """An Apify platform implementation of the request queue client. This implementation supports multiple producers and multiple consumers scenario. @@ -106,23 +106,19 @@ async def add_batch_of_requests( # It could have been handled by another client in the meantime, so cached information about # `request.was_already_handled` is not reliable. already_present_requests.append( - ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': request.was_already_handled, - } + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=request.was_already_handled, ) ) else: # Add new request to the cache. - processed_request = ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': request.was_already_handled, - } + processed_request = ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=request.was_already_handled, ) self._cache_request( request.unique_key, diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 80b8ced7..12149d9e 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -21,7 +21,7 @@ logger = getLogger(__name__) -class _ApifyRequestQueueSingleClient: +class ApifyRequestQueueSingleClient: """An Apify platform implementation of the request queue client with limited capability. This client is designed to use as little resources as possible, but has to be used in constrained context. @@ -108,23 +108,19 @@ async def add_batch_of_requests( # Check if request is known to be already handled (it has to be present as well.) if request.unique_key in self._requests_already_handled: already_present_requests.append( - ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': True, - } + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=True, ) ) # Check if request is known to be already present, but unhandled elif self._requests_cache.get(request.unique_key): already_present_requests.append( - ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': request.was_already_handled, - } + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=request.was_already_handled, ) ) else: @@ -158,8 +154,9 @@ async def add_batch_of_requests( self._requests_cache.pop(unprocessed_request.unique_key, None) else: - api_response = AddRequestsResponse.model_validate( - {'unprocessedRequests': [], 'processedRequests': already_present_requests} + api_response = AddRequestsResponse( + unprocessed_requests=[], + processed_requests=already_present_requests, ) # Update assumed total count for newly added requests. @@ -236,11 +233,20 @@ async def _list_head(self) -> None: # Update the cached data for request_data in response.get('items', []): + # Due to https://github.com/apify/apify-core/blob/v0.1377.0/src/api/src/lib/request_queues/request_queue.ts#L53, + # the list_head endpoint may return truncated fields for long requests (e.g., long URLs or unique keys). + # If truncation is detected, fetch the full request data by its ID from the API. + # This is a temporary workaround - the caching will be refactored to use request IDs instead of unique keys. + # See https://github.com/apify/apify-sdk-python/issues/630 for details. + if '[truncated]' in request_data['uniqueKey'] or '[truncated]' in request_data['url']: + request_data = await self._api_client.get_request(request_id=request_data['id']) # noqa: PLW2901 + request = Request.model_validate(request_data) if request.unique_key in self._requests_in_progress: # Ignore requests that are already in progress, we will not process them again. continue + if request.was_already_handled: # Do not cache fully handled requests, we do not need them. Just cache their unique_key. self._requests_already_handled.add(request.unique_key) @@ -248,16 +254,20 @@ async def _list_head(self) -> None: # Only fetch the request if we do not know it yet. if request.unique_key not in self._requests_cache: request_id = unique_key_to_request_id(request.unique_key) - complete_request_data = await self._api_client.get_request(request_id) - if complete_request_data is not None: - request = Request.model_validate(complete_request_data) - self._requests_cache[request.unique_key] = request - else: + if request_data is not None and request_id != request_data['id']: logger.warning( - f'Could not fetch request data for unique_key=`{request.unique_key}` (id=`{request_id}`)' + f'Request ID mismatch: {request_id} != {request_data["id"]}, ' + 'this may cause unexpected behavior.' ) + # See https://github.com/apify/apify-sdk-python/issues/630 for details. + if '[truncated]' not in request.unique_key: + request_data = await self._api_client.get_request(request_id=request_id) # noqa: PLW2901 + request = Request.model_validate(request_data) + + self._requests_cache[request.unique_key] = request + # Add new requests to the end of the head, unless already present in head if request.unique_key not in self._head_requests: self._head_requests.appendleft(request.unique_key) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index fbbdfb74..971aa1b3 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -7,12 +7,14 @@ import pytest from apify_shared.consts import ApifyEnvVars -from crawlee import Request, service_locator +from crawlee import service_locator from crawlee.crawlers import BasicCrawler from ._utils import generate_unique_resource_name -from apify import Actor +from apify import Actor, Request from apify.storage_clients import ApifyStorageClient +from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient +from apify.storage_clients._apify._utils import unique_key_to_request_id from apify.storages import RequestQueue if TYPE_CHECKING: @@ -1189,3 +1191,30 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non assert hasattr(metadata, 'stats') apify_metadata = cast('ApifyRequestQueueMetadata', metadata) assert apify_metadata.stats.write_count == add_request_count + + +async def test_rq_long_url(request_queue_apify: RequestQueue) -> None: + # TODO: Remove the skip when issue #630 is resolved. + if isinstance(request_queue_apify._client._implementation, ApifyRequestQueueSharedClient): # type: ignore[attr-defined] + pytest.skip('Skipping for the "shared" request queue - unskip after issue #630 is resolved.') + + url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1' + + req = Request.from_url( + url=url, + use_extended_unique_key=True, + always_enqueue=True, + ) + + request_id = unique_key_to_request_id(req.unique_key) + + processed_request = await request_queue_apify.add_request(req) + assert processed_request.id == request_id + + request_obtained = await request_queue_apify.fetch_next_request() + assert request_obtained is not None + + await request_queue_apify.mark_request_as_handled(request_obtained) + + is_finished = await request_queue_apify.is_finished() + assert is_finished