From 922a4c8550616dcd851f4d4a8825b4116a978088 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 16 Oct 2025 17:25:42 +0200 Subject: [PATCH 1/9] Draft --- .../_apify/_request_queue_single_client.py | 129 ++++++++++-------- src/apify/storage_clients/_apify/_utils.py | 15 +- tests/integration/test_request_queue.py | 22 +++ 3 files changed, 106 insertions(+), 60 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 80b8ced7..491f7f3a 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -10,7 +10,7 @@ from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata from apify import Request -from apify.storage_clients._apify._utils import unique_key_to_request_id +from apify.storage_clients._apify._utils import _Request, unique_key_to_request_id if TYPE_CHECKING: from collections.abc import Sequence @@ -56,21 +56,21 @@ def __init__( """The Apify request queue client for API operations.""" self._requests_cache: LRUCache[str, Request] = LRUCache(maxsize=cache_size) - """A cache to store request objects. Request unique key is used as the cache key.""" + """A cache to store request objects. Request id is used as the cache key.""" self._head_requests: deque[str] = deque() - """Ordered unique keys of requests that represent queue head.""" + """Ordered ids of requests that represent queue head.""" self._requests_already_handled: set[str] = set() """Local estimation of requests unique keys that are already present and handled on the platform. - To enhance local deduplication. - To reduce the _requests_cache size. Already handled requests are most likely not going to be needed again, - so no need to cache more than their unique_key. + so no need to cache more than their id. """ self._requests_in_progress: set[str] = set() - """Set of requests unique keys that are being processed locally. + """Set of requests ids that are being processed locally. - To help decide if the RQ is finished or not. This is the only consumer, so it can be tracked locally. """ @@ -105,25 +105,30 @@ async def add_batch_of_requests( already_present_requests: list[ProcessedRequest] = [] for request in requests: + # Calculate id for request + _request = _Request.model_validate(request.model_dump()) + # Check if request is known to be already handled (it has to be present as well.) - if request.unique_key in self._requests_already_handled: + if _request.id in self._requests_already_handled: already_present_requests.append( ProcessedRequest.model_validate( { - 'uniqueKey': request.unique_key, + 'id': _request.id, + 'uniqueKey': _request.unique_key, 'wasAlreadyPresent': True, 'wasAlreadyHandled': True, } ) ) # Check if request is known to be already present, but unhandled - elif self._requests_cache.get(request.unique_key): + elif self._requests_cache.get(_request.id): already_present_requests.append( ProcessedRequest.model_validate( { - 'uniqueKey': request.unique_key, + 'id': _request.id, + 'uniqueKey': _request.unique_key, 'wasAlreadyPresent': True, - 'wasAlreadyHandled': request.was_already_handled, + 'wasAlreadyHandled': _request.was_already_handled, } ) ) @@ -132,11 +137,11 @@ async def add_batch_of_requests( new_requests.append(request) # Update local caches - self._requests_cache[request.unique_key] = request + self._requests_cache[_request.id] = request if forefront: - self._head_requests.append(request.unique_key) + self._head_requests.append(_request.id) else: - self._head_requests.appendleft(request.unique_key) + self._head_requests.appendleft(_request.id) if new_requests: # Prepare requests for API by converting to dictionaries. @@ -155,7 +160,7 @@ async def add_batch_of_requests( api_response.processed_requests.extend(already_present_requests) # Remove unprocessed requests from the cache for unprocessed_request in api_response.unprocessed_requests: - self._requests_cache.pop(unprocessed_request.unique_key, None) + self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None) else: api_response = AddRequestsResponse.model_validate( @@ -181,10 +186,21 @@ async def get_request(self, unique_key: str) -> Request | None: Returns: The request or None if not found. """ - if unique_key in self._requests_cache: - return self._requests_cache[unique_key] + return await self._get_request(id=unique_key_to_request_id(unique_key)) + + async def _get_request(self, id: str) -> Request | None: + """Get a request by unique key. + + Args: + id: Id of request to get. + + Returns: + The request or None if not found. + """ + if id in self._requests_cache: + return self._requests_cache[id] - response = await self._api_client.get_request(unique_key_to_request_id(unique_key)) + response = await self._api_client.get_request(id) if response is None: return None @@ -205,13 +221,10 @@ async def fetch_next_request(self) -> Request | None: await self._ensure_head_is_non_empty() while self._head_requests: - request_unique_key = self._head_requests.pop() - if ( - request_unique_key not in self._requests_in_progress - and request_unique_key not in self._requests_already_handled - ): - self._requests_in_progress.add(request_unique_key) - return await self.get_request(request_unique_key) + request_id = self._head_requests.pop() + if request_id not in self._requests_in_progress and request_id not in self._requests_already_handled: + self._requests_in_progress.add(request_id) + return await self._get_request(request_id) # No request locally and the ones returned from the platform are already in progress. return None @@ -236,31 +249,24 @@ async def _list_head(self) -> None: # Update the cached data for request_data in response.get('items', []): - request = Request.model_validate(request_data) + request = _Request.model_validate(request_data) - if request.unique_key in self._requests_in_progress: + if request.id in self._requests_in_progress: # Ignore requests that are already in progress, we will not process them again. continue if request.was_already_handled: - # Do not cache fully handled requests, we do not need them. Just cache their unique_key. - self._requests_already_handled.add(request.unique_key) + # Do not cache fully handled requests, we do not need them. Just cache their id. + self._requests_already_handled.add(request.id) else: # Only fetch the request if we do not know it yet. - if request.unique_key not in self._requests_cache: - request_id = unique_key_to_request_id(request.unique_key) - complete_request_data = await self._api_client.get_request(request_id) - - if complete_request_data is not None: - request = Request.model_validate(complete_request_data) - self._requests_cache[request.unique_key] = request - else: - logger.warning( - f'Could not fetch request data for unique_key=`{request.unique_key}` (id=`{request_id}`)' - ) + if request.id not in self._requests_cache: + complete_request_data = await self._api_client.get_request(request_data['id']) + request = _Request.model_validate(complete_request_data) + self._requests_cache[request.id] = request # Add new requests to the end of the head, unless already present in head - if request.unique_key not in self._head_requests: - self._head_requests.appendleft(request.unique_key) + if request.id not in self._head_requests: + self._head_requests.appendleft(request.id) async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: """Mark a request as handled after successful processing. @@ -275,12 +281,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | """ # Set the handled_at timestamp if not already set + _request = _Request.model_validate(request.model_dump()) + if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) self.metadata.handled_request_count += 1 self.metadata.pending_request_count -= 1 - if cached_request := self._requests_cache.get(request.unique_key): + if cached_request := self._requests_cache.get(_request.id): cached_request.handled_at = request.handled_at try: @@ -289,13 +297,13 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | # adding to the queue.) processed_request = await self._update_request(request) # Remember that we handled this request, to optimize local deduplication. - self._requests_already_handled.add(request.unique_key) + self._requests_already_handled.add(_request.id) # Remove request from cache. It will most likely not be needed. - self._requests_cache.pop(request.unique_key) - self._requests_in_progress.discard(request.unique_key) + self._requests_cache.pop(_request.id) + self._requests_in_progress.discard(_request.id) except Exception as exc: - logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}') + logger.debug(f'Error marking request {_request.unique_key} as handled: {exc!s}') return None else: return processed_request @@ -319,24 +327,28 @@ async def reclaim_request( """ # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. + + _request = _Request.model_validate(request.model_dump()) + if request.was_already_handled: request.handled_at = None try: # Make sure request is in the local cache. We might need it. - self._requests_cache[request.unique_key] = request + self._requests_cache[_request.id] = request # No longer in progress - self._requests_in_progress.discard(request.unique_key) + self._requests_in_progress.discard(_request.id) # No longer handled - self._requests_already_handled.discard(request.unique_key) + self._requests_already_handled.discard(_request.id) if forefront: # Append to top of the local head estimation - self._head_requests.append(request.unique_key) + self._head_requests.append(_request.id) - processed_request = await self._update_request(request, forefront=forefront) - processed_request.unique_key = request.unique_key + processed_request = await self._update_request(_request, forefront=forefront) + processed_request.id = _request.id + processed_request.unique_key = _request.unique_key # If the request was previously handled, decrement our handled count since # we're putting it back for processing. if request.was_already_handled and not processed_request.was_already_handled: @@ -374,10 +386,9 @@ async def _update_request( Returns: The updated request """ - request_dict = request.model_dump(by_alias=True) - request_dict['id'] = unique_key_to_request_id(request.unique_key) + _request = _Request.model_validate(request.model_dump(by_alias=True)) response = await self._api_client.update_request( - request=request_dict, + request=_request.model_dump(by_alias=True), forefront=forefront, ) @@ -396,10 +407,10 @@ async def _init_caches(self) -> None: """ response = await self._api_client.list_requests(limit=10_000) for request_data in response.get('items', []): - request = Request.model_validate(request_data) + request = _Request.model_validate(request_data) if request.was_already_handled: - # Cache just unique_key for deduplication - self._requests_already_handled.add(request.unique_key) + # Cache just id for deduplication + self._requests_already_handled.add(request.id) else: # Cache full request - self._requests_cache[request.unique_key] = request + self._requests_cache[request.id] = request diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index eee87367..fbced141 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -6,9 +6,12 @@ from base64 import b64encode from hashlib import sha256 from logging import getLogger -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, Annotated, ClassVar + +from pydantic import Field, model_validator from apify_client import ApifyClientAsync +from crawlee import Request from crawlee._utils.crypto import compute_short_hash from apify._configuration import Configuration @@ -192,3 +195,13 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> # Truncate the key to the desired length return url_safe_key[:request_id_length] + + +class _Request(Request): + id: Annotated[str, Field(default='')] + + @model_validator(mode='after') + def calculate_id(self) -> _Request: + if self.id == '': + self.id = unique_key_to_request_id(self.unique_key) + return self diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index fbbdfb74..315099bb 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -13,6 +13,7 @@ from ._utils import generate_unique_resource_name from apify import Actor from apify.storage_clients import ApifyStorageClient +from apify.storage_clients._apify._utils import unique_key_to_request_id from apify.storages import RequestQueue if TYPE_CHECKING: @@ -1189,3 +1190,24 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non assert hasattr(metadata, 'stats') apify_metadata = cast('ApifyRequestQueueMetadata', metadata) assert apify_metadata.stats.write_count == add_request_count + + +async def test_long_request(request_queue_apify: RequestQueue) -> None: + request = Request.from_url( + 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', + use_extended_unique_key=True, + always_enqueue=True, + ) + + request_id = unique_key_to_request_id(request.unique_key) + + processed_request = await request_queue_apify.add_request(request) + assert processed_request.id == request_id + + request_obtained = await request_queue_apify.fetch_next_request() + assert request_obtained is not None + + await request_queue_apify.mark_request_as_handled(request_obtained) + + is_finished = await request_queue_apify.is_finished() + assert is_finished From 6925f496ec106546c12d24457953006fd24885b7 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 17 Oct 2025 11:17:14 +0200 Subject: [PATCH 2/9] Remove unnecessary helper class --- .../_apify/_request_queue_single_client.py | 95 +++++++++++-------- src/apify/storage_clients/_apify/_utils.py | 15 +-- 2 files changed, 55 insertions(+), 55 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 491f7f3a..85b48543 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -10,7 +10,7 @@ from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata from apify import Request -from apify.storage_clients._apify._utils import _Request, unique_key_to_request_id +from apify.storage_clients._apify._utils import unique_key_to_request_id if TYPE_CHECKING: from collections.abc import Sequence @@ -106,29 +106,29 @@ async def add_batch_of_requests( for request in requests: # Calculate id for request - _request = _Request.model_validate(request.model_dump()) + request_id = unique_key_to_request_id(request.unique_key) # Check if request is known to be already handled (it has to be present as well.) - if _request.id in self._requests_already_handled: + if request_id in self._requests_already_handled: already_present_requests.append( ProcessedRequest.model_validate( { - 'id': _request.id, - 'uniqueKey': _request.unique_key, + 'id': request_id, + 'uniqueKey': request.unique_key, 'wasAlreadyPresent': True, 'wasAlreadyHandled': True, } ) ) # Check if request is known to be already present, but unhandled - elif self._requests_cache.get(_request.id): + elif self._requests_cache.get(request_id): already_present_requests.append( ProcessedRequest.model_validate( { - 'id': _request.id, - 'uniqueKey': _request.unique_key, + 'id': request_id, + 'uniqueKey': request.unique_key, 'wasAlreadyPresent': True, - 'wasAlreadyHandled': _request.was_already_handled, + 'wasAlreadyHandled': request.was_already_handled, } ) ) @@ -137,11 +137,11 @@ async def add_batch_of_requests( new_requests.append(request) # Update local caches - self._requests_cache[_request.id] = request + self._requests_cache[request_id] = request if forefront: - self._head_requests.append(_request.id) + self._head_requests.append(request_id) else: - self._head_requests.appendleft(_request.id) + self._head_requests.appendleft(request_id) if new_requests: # Prepare requests for API by converting to dictionaries. @@ -205,7 +205,19 @@ async def _get_request(self, id: str) -> Request | None: if response is None: return None - return Request.model_validate(response) + request = Request.model_validate(response) + + # Updated local caches + if id in self._requests_in_progress: + # Ignore requests that are already in progress, client is already aware of them. + self._requests_already_handled.add(id) + elif request.was_already_handled: + # Cache only id for already handled requests + self._requests_already_handled.add(id) + else: + # Cache full request for unhandled requests that are not yet in progress + self._requests_cache[id] = request + return request async def fetch_next_request(self) -> Request | None: """Return the next request in the queue to be processed. @@ -249,24 +261,25 @@ async def _list_head(self) -> None: # Update the cached data for request_data in response.get('items', []): - request = _Request.model_validate(request_data) + request = Request.model_validate(request_data) + request_id = unique_key_to_request_id(request_data['id']) - if request.id in self._requests_in_progress: + if request_id in self._requests_in_progress: # Ignore requests that are already in progress, we will not process them again. continue if request.was_already_handled: # Do not cache fully handled requests, we do not need them. Just cache their id. - self._requests_already_handled.add(request.id) + self._requests_already_handled.add(request_id) else: # Only fetch the request if we do not know it yet. - if request.id not in self._requests_cache: - complete_request_data = await self._api_client.get_request(request_data['id']) - request = _Request.model_validate(complete_request_data) - self._requests_cache[request.id] = request + if request_id not in self._requests_cache: + complete_request_data = await self._api_client.get_request(request_id) + request = Request.model_validate(complete_request_data) + self._requests_cache[request_id] = request # Add new requests to the end of the head, unless already present in head - if request.id not in self._head_requests: - self._head_requests.appendleft(request.id) + if request_id not in self._head_requests: + self._head_requests.appendleft(request_id) async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: """Mark a request as handled after successful processing. @@ -281,14 +294,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | """ # Set the handled_at timestamp if not already set - _request = _Request.model_validate(request.model_dump()) + request_id = unique_key_to_request_id(request.unique_key) if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) self.metadata.handled_request_count += 1 self.metadata.pending_request_count -= 1 - if cached_request := self._requests_cache.get(_request.id): + if cached_request := self._requests_cache.get(request_id): cached_request.handled_at = request.handled_at try: @@ -297,13 +310,13 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | # adding to the queue.) processed_request = await self._update_request(request) # Remember that we handled this request, to optimize local deduplication. - self._requests_already_handled.add(_request.id) + self._requests_already_handled.add(request_id) # Remove request from cache. It will most likely not be needed. - self._requests_cache.pop(_request.id) - self._requests_in_progress.discard(_request.id) + self._requests_cache.pop(request_id) + self._requests_in_progress.discard(request_id) except Exception as exc: - logger.debug(f'Error marking request {_request.unique_key} as handled: {exc!s}') + logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}') return None else: return processed_request @@ -328,27 +341,27 @@ async def reclaim_request( # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. - _request = _Request.model_validate(request.model_dump()) + request_id = unique_key_to_request_id(request.unique_key) if request.was_already_handled: request.handled_at = None try: # Make sure request is in the local cache. We might need it. - self._requests_cache[_request.id] = request + self._requests_cache[request_id] = request # No longer in progress - self._requests_in_progress.discard(_request.id) + self._requests_in_progress.discard(request_id) # No longer handled - self._requests_already_handled.discard(_request.id) + self._requests_already_handled.discard(request_id) if forefront: # Append to top of the local head estimation - self._head_requests.append(_request.id) + self._head_requests.append(request_id) - processed_request = await self._update_request(_request, forefront=forefront) - processed_request.id = _request.id - processed_request.unique_key = _request.unique_key + processed_request = await self._update_request(request, forefront=forefront) + processed_request.id = request_id + processed_request.unique_key = request.unique_key # If the request was previously handled, decrement our handled count since # we're putting it back for processing. if request.was_already_handled and not processed_request.was_already_handled: @@ -386,9 +399,8 @@ async def _update_request( Returns: The updated request """ - _request = _Request.model_validate(request.model_dump(by_alias=True)) response = await self._api_client.update_request( - request=_request.model_dump(by_alias=True), + request=request.model_dump(by_alias=True), forefront=forefront, ) @@ -407,10 +419,11 @@ async def _init_caches(self) -> None: """ response = await self._api_client.list_requests(limit=10_000) for request_data in response.get('items', []): - request = _Request.model_validate(request_data) + request = Request.model_validate(request_data) + request_id = unique_key_to_request_id(request_data['id']) if request.was_already_handled: # Cache just id for deduplication - self._requests_already_handled.add(request.id) + self._requests_already_handled.add(request_id) else: # Cache full request - self._requests_cache[request.id] = request + self._requests_cache[request_id] = request diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index fbced141..eee87367 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -6,12 +6,9 @@ from base64 import b64encode from hashlib import sha256 from logging import getLogger -from typing import TYPE_CHECKING, Annotated, ClassVar - -from pydantic import Field, model_validator +from typing import TYPE_CHECKING, ClassVar from apify_client import ApifyClientAsync -from crawlee import Request from crawlee._utils.crypto import compute_short_hash from apify._configuration import Configuration @@ -195,13 +192,3 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> # Truncate the key to the desired length return url_safe_key[:request_id_length] - - -class _Request(Request): - id: Annotated[str, Field(default='')] - - @model_validator(mode='after') - def calculate_id(self) -> _Request: - if self.id == '': - self.id = unique_key_to_request_id(self.unique_key) - return self From 27bc0605bf336e2912ad859a6706ed257787d7b9 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 17 Oct 2025 12:56:47 +0200 Subject: [PATCH 3/9] Update shared client --- src/apify/storage_clients/_apify/_models.py | 4 +- .../_apify/_request_queue_shared_client.py | 78 +++++++++++-------- tests/integration/conftest.py | 2 +- 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/src/apify/storage_clients/_apify/_models.py b/src/apify/storage_clients/_apify/_models.py index b1b3a425..f6917843 100644 --- a/src/apify/storage_clients/_apify/_models.py +++ b/src/apify/storage_clients/_apify/_models.py @@ -94,8 +94,8 @@ class CachedRequest(BaseModel): Only internal structure. """ - unique_key: str - """Unique key of the request.""" + id: str + """Id of the request.""" was_already_handled: bool """Whether the request was already handled.""" diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index e4e4ed68..612b18e6 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -54,10 +54,10 @@ def __init__( """The Apify request queue client for API operations.""" self._queue_head = deque[str]() - """A deque to store request unique keys in the queue head.""" + """A deque to store request ids in the queue head.""" self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size) - """A cache to store request objects. Request unique key is used as the cache key.""" + """A cache to store request objects. Request id is used as the cache key.""" self._queue_has_locked_requests: bool | None = None """Whether the queue has requests locked by another client.""" @@ -101,12 +101,14 @@ async def add_batch_of_requests( already_present_requests: list[ProcessedRequest] = [] for request in requests: - if self._requests_cache.get(request.unique_key): + request_id = unique_key_to_request_id(request.unique_key) + if self._requests_cache.get(request_id): # We are not sure if it was already handled at this point, and it is not worth calling API for it. # It could have been handled by another client in the meantime, so cached information about # `request.was_already_handled` is not reliable. already_present_requests.append( ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=request.was_already_handled, @@ -116,12 +118,13 @@ async def add_batch_of_requests( else: # Add new request to the cache. processed_request = ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=request.was_already_handled, ) self._cache_request( - request.unique_key, + request_id, processed_request, ) new_requests.append(request) @@ -131,7 +134,6 @@ async def add_batch_of_requests( requests_dict = [ request.model_dump( by_alias=True, - exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them. ) for request in new_requests ] @@ -146,7 +148,8 @@ async def add_batch_of_requests( # Remove unprocessed requests from the cache for unprocessed_request in api_response.unprocessed_requests: - self._requests_cache.pop(unprocessed_request.unique_key, None) + unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key) + self._requests_cache.pop(unprocessed_request_id, None) else: api_response = AddRequestsResponse.model_validate( @@ -179,7 +182,10 @@ async def get_request(self, unique_key: str) -> Request | None: Returns: The request or None if not found. """ - response = await self._api_client.get_request(unique_key_to_request_id(unique_key)) + return await self._get_request_by_id(unique_key_to_request_id(unique_key)) + + async def _get_request_by_id(self, request_id: str) -> Request | None: + response = await self._api_client.get_request(request_id) if response is None: return None @@ -206,15 +212,15 @@ async def fetch_next_request(self) -> Request | None: return None # Get the next request ID from the queue head - next_unique_key = self._queue_head.popleft() + next_request_id = self._queue_head.popleft() - request = await self._get_or_hydrate_request(next_unique_key) + request = await self._get_or_hydrate_request(next_request_id) # Handle potential inconsistency where request might not be in the main table yet if request is None: logger.debug( 'Cannot find a request from the beginning of queue, will be retried later', - extra={'nextRequestUniqueKey': next_unique_key}, + extra={'nextRequestId': next_request_id}, ) return None @@ -222,16 +228,16 @@ async def fetch_next_request(self) -> Request | None: if request.handled_at is not None: logger.debug( 'Request fetched from the beginning of queue was already handled', - extra={'nextRequestUniqueKey': next_unique_key}, + extra={'nextRequestId': next_request_id}, ) return None # Use get request to ensure we have the full request object. - request = await self.get_request(request.unique_key) + request = await self._get_request_by_id(next_request_id) if request is None: logger.debug( 'Request fetched from the beginning of queue was not found in the RQ', - extra={'nextRequestUniqueKey': next_unique_key}, + extra={'nextRequestId': next_request_id}, ) return None @@ -248,15 +254,17 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | Returns: Information about the queue operation. `None` if the given request was not in progress. """ + request_id = unique_key_to_request_id(request.unique_key) # Set the handled_at timestamp if not already set if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) - if cached_request := self._requests_cache[request.unique_key]: + if cached_request := self._requests_cache[request_id]: cached_request.was_already_handled = request.was_already_handled try: # Update the request in the API processed_request = await self._update_request(request) + processed_request.id = request_id processed_request.unique_key = request.unique_key # Update assumed handled count if this wasn't already handled @@ -265,10 +273,9 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | self.metadata.pending_request_count -= 1 # Update the cache with the handled request - cache_key = request.unique_key self._cache_request( - cache_key, - processed_request, + cache_key=request_id, + processed_request=processed_request, hydrated_request=request, ) except Exception as exc: @@ -352,17 +359,17 @@ async def _ensure_head_is_non_empty(self) -> None: # Fetch requests from the API and populate the queue head await self._list_head() - async def _get_or_hydrate_request(self, unique_key: str) -> Request | None: + async def _get_or_hydrate_request(self, request_id: str) -> Request | None: """Get a request by unique key, either from cache or by fetching from API. Args: - unique_key: Unique key of the request to get. + request_id: Id of the request to get. Returns: The request if found and valid, otherwise None. """ # First check if the request is in our cache - cached_entry = self._requests_cache.get(unique_key) + cached_entry = self._requests_cache.get(request_id) if cached_entry and cached_entry.hydrated: # If we have the request hydrated in cache, return it @@ -371,17 +378,17 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None: # If not in cache or not hydrated, fetch the request try: # Fetch the request data - request = await self.get_request(unique_key) + request = await self._get_request_by_id(request_id) # If request is not found and return None if not request: return None # Update cache with hydrated request - cache_key = request.unique_key self._cache_request( - cache_key, - ProcessedRequest( + cache_key=request_id, + processed_request=ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=request.handled_at is not None, @@ -389,7 +396,7 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None: hydrated_request=request, ) except Exception as exc: - logger.debug(f'Error fetching request {unique_key}: {exc!s}') + logger.debug(f'Error fetching request {request_id}: {exc!s}') return None else: return request @@ -438,8 +445,8 @@ async def _list_head( logger.debug(f'Using cached queue head with {len(self._queue_head)} requests') # Create a list of requests from the cached queue head items = [] - for unique_key in list(self._queue_head)[:limit]: - cached_request = self._requests_cache.get(unique_key) + for request_id in list(self._queue_head)[:limit]: + cached_request = self._requests_cache.get(request_id) if cached_request and cached_request.hydrated: items.append(cached_request.hydrated) @@ -472,32 +479,35 @@ async def _list_head( for request_data in response.get('items', []): request = Request.model_validate(request_data) + request_id = request_data.get('id') # Skip requests without ID or unique key - if not request.unique_key: + if not request.unique_key or not request_id: logger.debug( - 'Skipping request from queue head, missing unique key', + 'Skipping request from queue head, missing unique key or id', extra={ 'unique_key': request.unique_key, + 'id': request_id, }, ) continue # Cache the request self._cache_request( - request.unique_key, + request_id, ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=False, ), hydrated_request=request, ) - self._queue_head.append(request.unique_key) + self._queue_head.append(request_id) - for leftover_unique_key in leftover_buffer: + for leftover_id in leftover_buffer: # After adding new requests to the forefront, any existing leftover locked request is kept in the end. - self._queue_head.append(leftover_unique_key) + self._queue_head.append(leftover_id) return RequestQueueHead.model_validate(response) def _cache_request( @@ -516,7 +526,7 @@ def _cache_request( hydrated_request: The hydrated request object, if available. """ self._requests_cache[cache_key] = CachedRequest( - unique_key=processed_request.unique_key, + id=processed_request.id, was_already_handled=processed_request.was_already_handled, hydrated=hydrated_request, lock_expires_at=None, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index aea770db..55c0804b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -107,7 +107,7 @@ def apify_client_async(apify_token: str) -> ApifyClientAsync: return ApifyClientAsync(apify_token, api_url=api_url) -@pytest.fixture(params=['single', 'shared']) +@pytest.fixture(params=['shared']) async def request_queue_apify( apify_token: str, monkeypatch: pytest.MonkeyPatch, request: pytest.FixtureRequest ) -> AsyncGenerator[RequestQueue]: From e276944e155fd640244ff517db6b6f58507e59ef Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 17 Oct 2025 13:15:33 +0200 Subject: [PATCH 4/9] Fix some failing tests --- .../_apify/_request_queue_single_client.py | 14 ++++---------- tests/integration/conftest.py | 2 +- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 832a7a67..7286e244 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -258,16 +258,8 @@ async def _list_head(self) -> None: # Update the cached data for request_data in response.get('items', []): - # Due to https://github.com/apify/apify-core/blob/v0.1377.0/src/api/src/lib/request_queues/request_queue.ts#L53, - # the list_head endpoint may return truncated fields for long requests (e.g., long URLs or unique keys). - # If truncation is detected, fetch the full request data by its ID from the API. - # This is a temporary workaround - the caching will be refactored to use request IDs instead of unique keys. - # See https://github.com/apify/apify-sdk-python/issues/630 for details. - if '[truncated]' in request_data['uniqueKey'] or '[truncated]' in request_data['url']: - request_data = await self._api_client.get_request(request_id=request_data['id']) # noqa: PLW2901 - request = Request.model_validate(request_data) - request_id = unique_key_to_request_id(request_data['id']) + request_id = request_data['id'] if request_id in self._requests_in_progress: # Ignore requests that are already in progress, we will not process them again. @@ -405,8 +397,10 @@ async def _update_request( Returns: The updated request """ + request_dict = request.model_dump(by_alias=True) + request_dict['id'] = unique_key_to_request_id(request.unique_key) response = await self._api_client.update_request( - request=request.model_dump(by_alias=True), + request=request_dict, forefront=forefront, ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 55c0804b..aea770db 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -107,7 +107,7 @@ def apify_client_async(apify_token: str) -> ApifyClientAsync: return ApifyClientAsync(apify_token, api_url=api_url) -@pytest.fixture(params=['shared']) +@pytest.fixture(params=['single', 'shared']) async def request_queue_apify( apify_token: str, monkeypatch: pytest.MonkeyPatch, request: pytest.FixtureRequest ) -> AsyncGenerator[RequestQueue]: From 0ad37c0e33766672380b9951159699f928d1ee26 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 17 Oct 2025 13:32:49 +0200 Subject: [PATCH 5/9] Fix init cache --- .../storage_clients/_apify/_request_queue_single_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 7286e244..094fb6f2 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -420,7 +420,8 @@ async def _init_caches(self) -> None: response = await self._api_client.list_requests(limit=10_000) for request_data in response.get('items', []): request = Request.model_validate(request_data) - request_id = unique_key_to_request_id(request_data['id']) + request_id = request_data['id'] + if request.was_already_handled: # Cache just id for deduplication self._requests_already_handled.add(request_id) From 4209d75c1bf7cc0cccc21aaf89a8b3f452697de2 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 17 Oct 2025 14:31:24 +0200 Subject: [PATCH 6/9] Add user_data test --- .../_apify/_request_queue_single_client.py | 20 ++++------- tests/integration/test_request_queue.py | 34 ++++++++++++++++--- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 094fb6f2..7a6a14aa 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -197,6 +197,7 @@ async def _get_request(self, id: str) -> Request | None: if id in self._requests_cache: return self._requests_cache[id] + # Requests that were not added by this client are not in local cache. Fetch them from platform. response = await self._api_client.get_request(id) if response is None: @@ -206,13 +207,13 @@ async def _get_request(self, id: str) -> Request | None: # Updated local caches if id in self._requests_in_progress: - # Ignore requests that are already in progress, client is already aware of them. - self._requests_already_handled.add(id) + # No caching of requests that are already in progress, client is already aware of them. + pass elif request.was_already_handled: # Cache only id for already handled requests self._requests_already_handled.add(id) else: - # Cache full request for unhandled requests that are not yet in progress + # Cache full request for unhandled requests that are not yet in progress and are not yet handled. self._requests_cache[id] = request return request @@ -268,16 +269,9 @@ async def _list_head(self) -> None: if request.was_already_handled: # Do not cache fully handled requests, we do not need them. Just cache their id. self._requests_already_handled.add(request_id) - else: - # Only fetch the request if we do not know it yet. - if request_id not in self._requests_cache: - complete_request_data = await self._api_client.get_request(request_id) - request = Request.model_validate(complete_request_data) - self._requests_cache[request_id] = request - - # Add new requests to the end of the head, unless already present in head - if request_id not in self._head_requests: - self._head_requests.appendleft(request_id) + # Add new requests to the end of the head, unless already present in head + elif request_id not in self._head_requests: + self._head_requests.appendleft(request_id) async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: """Mark a request as handled after successful processing. diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 44ec77d4..854eb0a9 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -1193,6 +1193,7 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non async def test_long_request(request_queue_apify: RequestQueue) -> None: + rq = request_queue_apify request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', use_extended_unique_key=True, @@ -1201,13 +1202,38 @@ async def test_long_request(request_queue_apify: RequestQueue) -> None: request_id = unique_key_to_request_id(request.unique_key) - processed_request = await request_queue_apify.add_request(request) + processed_request = await rq.add_request(request) assert processed_request.id == request_id - request_obtained = await request_queue_apify.fetch_next_request() + request_obtained = await rq.fetch_next_request() assert request_obtained is not None - await request_queue_apify.mark_request_as_handled(request_obtained) + await rq.mark_request_as_handled(request_obtained) - is_finished = await request_queue_apify.is_finished() + is_finished = await rq.is_finished() assert is_finished + + +async def test_pre_existing_request_with_user_data( + request_queue_apify: RequestQueue, apify_client_async: ApifyClientAsync +) -> None: + """Test that pre-existing requests with user data are fully fetched. + + list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" + custom_data = {'key': 'value'} + + rq = request_queue_apify + request = Request.from_url( + 'https://example.com', + user_data=custom_data, + ) + + # Add request by a different producer + rq_client = apify_client_async.request_queue(request_queue_id=rq.id) + await rq_client.add_request(request.model_dump(by_alias=True)) + + # Fetch the request by the client under test + request_obtained = await rq.fetch_next_request() + assert request_obtained is not None + # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) + assert custom_data.items() <= request_obtained.user_data.items() From 9c01f295042effe47497ac0d0d3727e75dabd85d Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 20 Oct 2025 10:14:34 +0200 Subject: [PATCH 7/9] Review comments --- .../storage_clients/_apify/_request_queue_single_client.py | 2 +- tests/integration/test_request_queue.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 7a6a14aa..61c47acf 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -186,7 +186,7 @@ async def get_request(self, unique_key: str) -> Request | None: return await self._get_request(id=unique_key_to_request_id(unique_key)) async def _get_request(self, id: str) -> Request | None: - """Get a request by unique key. + """Get a request by id. Args: id: Id of request to get. diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 854eb0a9..6b6ba9eb 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -1192,7 +1192,7 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non assert apify_metadata.stats.write_count == add_request_count -async def test_long_request(request_queue_apify: RequestQueue) -> None: +async def test_rq_long_url(request_queue_apify: RequestQueue) -> None: rq = request_queue_apify request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', From bc7570a73d502f1e3e2048e691795a91886320c5 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 20 Oct 2025 10:25:27 +0200 Subject: [PATCH 8/9] Fix wrong docstring --- .../storage_clients/_apify/_request_queue_shared_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 612b18e6..1e3e494c 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -360,7 +360,7 @@ async def _ensure_head_is_non_empty(self) -> None: await self._list_head() async def _get_or_hydrate_request(self, request_id: str) -> Request | None: - """Get a request by unique key, either from cache or by fetching from API. + """Get a request by id, either from cache or by fetching from API. Args: request_id: Id of the request to get. From 480c6e6c1a74db0d1b15e7a55a27892cd121fdc3 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Mon, 20 Oct 2025 11:37:56 +0200 Subject: [PATCH 9/9] Change to snake case in logs --- .../storage_clients/_apify/_request_queue_shared_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 1e3e494c..845237fe 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -220,7 +220,7 @@ async def fetch_next_request(self) -> Request | None: if request is None: logger.debug( 'Cannot find a request from the beginning of queue, will be retried later', - extra={'nextRequestId': next_request_id}, + extra={'next_request_id': next_request_id}, ) return None @@ -228,7 +228,7 @@ async def fetch_next_request(self) -> Request | None: if request.handled_at is not None: logger.debug( 'Request fetched from the beginning of queue was already handled', - extra={'nextRequestId': next_request_id}, + extra={'next_request_id': next_request_id}, ) return None @@ -237,7 +237,7 @@ async def fetch_next_request(self) -> Request | None: if request is None: logger.debug( 'Request fetched from the beginning of queue was not found in the RQ', - extra={'nextRequestId': next_request_id}, + extra={'next_request_id': next_request_id}, ) return None