From 8a8306f0ed4b2b6a1afd4fda740b8f91106b9e73 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Thu, 16 Oct 2025 16:12:52 +0200 Subject: [PATCH 1/5] fix: Handle truncated unique_key in list_head by fetching full request data Closes: #627 --- .../_apify/_request_queue_single_client.py | 21 ++++++---- tests/integration/test_actor_request_queue.py | 38 +++++++++++++++++++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 80b8ced7..a7bd1e31 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -236,6 +236,14 @@ async def _list_head(self) -> None: # Update the cached data for request_data in response.get('items', []): + # Due to https://github.com/apify/apify-core/blob/v0.1377.0/src/api/src/lib/request_queues/request_queue.ts#L53, + # the list_head endpoint may return truncated fields for long requests (e.g., long URLs or unique keys). + # If truncation is detected, fetch the full request data by its ID from the API. + # This is a temporary workaround - the caching will be refactored to use request IDs instead of unique keys. + # See https://github.com/apify/apify-sdk-python/issues/630 for details. + if '[truncated]' in request_data['uniqueKey'] or '[truncated]' in request_data['url']: + request_data = await self._api_client.get_request(request_id=request_data['id']) # noqa: PLW2901 + request = Request.model_validate(request_data) if request.unique_key in self._requests_in_progress: @@ -248,15 +256,14 @@ async def _list_head(self) -> None: # Only fetch the request if we do not know it yet. if request.unique_key not in self._requests_cache: request_id = unique_key_to_request_id(request.unique_key) - complete_request_data = await self._api_client.get_request(request_id) - - if complete_request_data is not None: - request = Request.model_validate(complete_request_data) - self._requests_cache[request.unique_key] = request - else: + if request_data is not None and request_id != request_data['id']: logger.warning( - f'Could not fetch request data for unique_key=`{request.unique_key}` (id=`{request_id}`)' + f'Request ID mismatch: {request_id} != {request_data["id"]}, ' + 'this may cause unexpected behavior.' ) + full_request_data = await self._api_client.get_request(request_id) + request = Request.model_validate(full_request_data) + self._requests_cache[request.unique_key] = request # Add new requests to the end of the head, unless already present in head if request.unique_key not in self._head_requests: diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index 3a9053c7..3c406a50 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -489,3 +489,41 @@ async def main() -> None: run_result = await run_actor(actor) assert run_result.status == 'SUCCEEDED' + + +@pytest.mark.only +async def test_rq_long_url( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, +) -> None: + async def main() -> None: + from apify import Actor, Request + from apify.storage_clients._apify._utils import unique_key_to_request_id + + url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1' + + async with Actor: + request = Request.from_url( + url=url, + use_extended_unique_key=True, + always_enqueue=True, + ) + + rq = await Actor.open_request_queue(force_cloud=True) + request_id = unique_key_to_request_id(request.unique_key) + + processed_request = await rq.add_request(request) + assert processed_request.id == request_id + + request_obtained = await rq.fetch_next_request() + assert request_obtained is not None + + await rq.mark_request_as_handled(request_obtained) + + is_finished = await rq.is_finished() + assert is_finished + + actor = await make_actor(label='long-url', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' From 2ae442e4d6fd8f0295d1b449d3a88bb52f762131 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Thu, 16 Oct 2025 16:28:55 +0200 Subject: [PATCH 2/5] Better integration test --- .../_apify/_request_queue_client.py | 10 ++--- .../_apify/_request_queue_shared_client.py | 2 +- .../_apify/_request_queue_single_client.py | 2 +- tests/integration/test_actor_request_queue.py | 38 ------------------- tests/integration/test_request_queue.py | 28 +++++++++++++- 5 files changed, 33 insertions(+), 47 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index 1928f0ad..88b65542 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -12,8 +12,8 @@ from crawlee.storages import RequestQueue from ._models import ApifyRequestQueueMetadata, RequestQueueStats -from ._request_queue_shared_client import _ApifyRequestQueueSharedClient -from ._request_queue_single_client import _ApifyRequestQueueSingleClient +from ._request_queue_shared_client import ApifyRequestQueueSharedClient +from ._request_queue_single_client import ApifyRequestQueueSingleClient from ._utils import AliasResolver if TYPE_CHECKING: @@ -47,14 +47,14 @@ def __init__( self._api_client = api_client """The Apify request queue client for API operations.""" - self._implementation: _ApifyRequestQueueSingleClient | _ApifyRequestQueueSharedClient + self._implementation: ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient """Internal implementation used to communicate with the Apify platform based Request Queue.""" if access == 'single': - self._implementation = _ApifyRequestQueueSingleClient( + self._implementation = ApifyRequestQueueSingleClient( api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS ) elif access == 'shared': - self._implementation = _ApifyRequestQueueSharedClient( + self._implementation = ApifyRequestQueueSharedClient( api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS, diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 65ad8daa..955900df 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -23,7 +23,7 @@ logger = getLogger(__name__) -class _ApifyRequestQueueSharedClient: +class ApifyRequestQueueSharedClient: """An Apify platform implementation of the request queue client. This implementation supports multiple producers and multiple consumers scenario. diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index a7bd1e31..d9f04da7 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -21,7 +21,7 @@ logger = getLogger(__name__) -class _ApifyRequestQueueSingleClient: +class ApifyRequestQueueSingleClient: """An Apify platform implementation of the request queue client with limited capability. This client is designed to use as little resources as possible, but has to be used in constrained context. diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index 3c406a50..3a9053c7 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -489,41 +489,3 @@ async def main() -> None: run_result = await run_actor(actor) assert run_result.status == 'SUCCEEDED' - - -@pytest.mark.only -async def test_rq_long_url( - make_actor: MakeActorFunction, - run_actor: RunActorFunction, -) -> None: - async def main() -> None: - from apify import Actor, Request - from apify.storage_clients._apify._utils import unique_key_to_request_id - - url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1' - - async with Actor: - request = Request.from_url( - url=url, - use_extended_unique_key=True, - always_enqueue=True, - ) - - rq = await Actor.open_request_queue(force_cloud=True) - request_id = unique_key_to_request_id(request.unique_key) - - processed_request = await rq.add_request(request) - assert processed_request.id == request_id - - request_obtained = await rq.fetch_next_request() - assert request_obtained is not None - - await rq.mark_request_as_handled(request_obtained) - - is_finished = await rq.is_finished() - assert is_finished - - actor = await make_actor(label='long-url', main_func=main) - run_result = await run_actor(actor) - - assert run_result.status == 'SUCCEEDED' diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index fbbdfb74..d53d98c6 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -7,12 +7,13 @@ import pytest from apify_shared.consts import ApifyEnvVars -from crawlee import Request, service_locator +from crawlee import service_locator from crawlee.crawlers import BasicCrawler from ._utils import generate_unique_resource_name -from apify import Actor +from apify import Actor, Request from apify.storage_clients import ApifyStorageClient +from apify.storage_clients._apify._utils import unique_key_to_request_id from apify.storages import RequestQueue if TYPE_CHECKING: @@ -1189,3 +1190,26 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non assert hasattr(metadata, 'stats') apify_metadata = cast('ApifyRequestQueueMetadata', metadata) assert apify_metadata.stats.write_count == add_request_count + + +async def test_rq_long_url(request_queue_apify: RequestQueue) -> None: + url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1' + + request = Request.from_url( + url=url, + use_extended_unique_key=True, + always_enqueue=True, + ) + + request_id = unique_key_to_request_id(request.unique_key) + + processed_request = await request_queue_apify.add_request(request) + assert processed_request.id == request_id + + request_obtained = await request_queue_apify.fetch_next_request() + assert request_obtained is not None + + await request_queue_apify.mark_request_as_handled(request_obtained) + + is_finished = await request_queue_apify.is_finished() + assert is_finished From 7552da65a145a735ddf7f062672b767cea42e459 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 17 Oct 2025 09:40:35 +0200 Subject: [PATCH 3/5] skip for shared --- .../_apify/_request_queue_shared_client.py | 20 ++++++--------- .../_apify/_request_queue_single_client.py | 25 +++++++++---------- tests/integration/test_request_queue.py | 11 +++++--- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 955900df..e4e4ed68 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -106,23 +106,19 @@ async def add_batch_of_requests( # It could have been handled by another client in the meantime, so cached information about # `request.was_already_handled` is not reliable. already_present_requests.append( - ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': request.was_already_handled, - } + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=request.was_already_handled, ) ) else: # Add new request to the cache. - processed_request = ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': request.was_already_handled, - } + processed_request = ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=request.was_already_handled, ) self._cache_request( request.unique_key, diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index d9f04da7..a1c829d8 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -109,22 +109,20 @@ async def add_batch_of_requests( if request.unique_key in self._requests_already_handled: already_present_requests.append( ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': True, - } + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=True, + ) ) ) # Check if request is known to be already present, but unhandled elif self._requests_cache.get(request.unique_key): already_present_requests.append( - ProcessedRequest.model_validate( - { - 'uniqueKey': request.unique_key, - 'wasAlreadyPresent': True, - 'wasAlreadyHandled': request.was_already_handled, - } + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=request.was_already_handled, ) ) else: @@ -158,8 +156,9 @@ async def add_batch_of_requests( self._requests_cache.pop(unprocessed_request.unique_key, None) else: - api_response = AddRequestsResponse.model_validate( - {'unprocessedRequests': [], 'processedRequests': already_present_requests} + api_response = AddRequestsResponse( + unprocessed_requests=[], + processed_requests=already_present_requests, ) # Update assumed total count for newly added requests. diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index d53d98c6..971aa1b3 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -13,6 +13,7 @@ from ._utils import generate_unique_resource_name from apify import Actor, Request from apify.storage_clients import ApifyStorageClient +from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient from apify.storage_clients._apify._utils import unique_key_to_request_id from apify.storages import RequestQueue @@ -1193,17 +1194,21 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non async def test_rq_long_url(request_queue_apify: RequestQueue) -> None: + # TODO: Remove the skip when issue #630 is resolved. + if isinstance(request_queue_apify._client._implementation, ApifyRequestQueueSharedClient): # type: ignore[attr-defined] + pytest.skip('Skipping for the "shared" request queue - unskip after issue #630 is resolved.') + url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1' - request = Request.from_url( + req = Request.from_url( url=url, use_extended_unique_key=True, always_enqueue=True, ) - request_id = unique_key_to_request_id(request.unique_key) + request_id = unique_key_to_request_id(req.unique_key) - processed_request = await request_queue_apify.add_request(request) + processed_request = await request_queue_apify.add_request(req) assert processed_request.id == request_id request_obtained = await request_queue_apify.fetch_next_request() From 53d44ad00435a38eb76eeb20a6efb4218c777814 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 17 Oct 2025 09:57:42 +0200 Subject: [PATCH 4/5] address feedback --- .../_apify/_request_queue_single_client.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index a1c829d8..ef98a33d 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -108,12 +108,10 @@ async def add_batch_of_requests( # Check if request is known to be already handled (it has to be present as well.) if request.unique_key in self._requests_already_handled: already_present_requests.append( - ProcessedRequest.model_validate( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=True, - ) + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=True, ) ) # Check if request is known to be already present, but unhandled @@ -260,8 +258,6 @@ async def _list_head(self) -> None: f'Request ID mismatch: {request_id} != {request_data["id"]}, ' 'this may cause unexpected behavior.' ) - full_request_data = await self._api_client.get_request(request_id) - request = Request.model_validate(full_request_data) self._requests_cache[request.unique_key] = request # Add new requests to the end of the head, unless already present in head From 7440dd02fb1b1f85a56669e719b7753170206681 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 17 Oct 2025 10:35:40 +0200 Subject: [PATCH 5/5] address feedback --- .../_apify/_request_queue_single_client.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index ef98a33d..12149d9e 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -246,6 +246,7 @@ async def _list_head(self) -> None: if request.unique_key in self._requests_in_progress: # Ignore requests that are already in progress, we will not process them again. continue + if request.was_already_handled: # Do not cache fully handled requests, we do not need them. Just cache their unique_key. self._requests_already_handled.add(request.unique_key) @@ -253,11 +254,18 @@ async def _list_head(self) -> None: # Only fetch the request if we do not know it yet. if request.unique_key not in self._requests_cache: request_id = unique_key_to_request_id(request.unique_key) + if request_data is not None and request_id != request_data['id']: logger.warning( f'Request ID mismatch: {request_id} != {request_data["id"]}, ' 'this may cause unexpected behavior.' ) + + # See https://github.com/apify/apify-sdk-python/issues/630 for details. + if '[truncated]' not in request.unique_key: + request_data = await self._api_client.get_request(request_id=request_id) # noqa: PLW2901 + request = Request.model_validate(request_data) + self._requests_cache[request.unique_key] = request # Add new requests to the end of the head, unless already present in head