diff --git a/src/apify/storage_clients/_apify/_models.py b/src/apify/storage_clients/_apify/_models.py index b1b3a425..f6917843 100644 --- a/src/apify/storage_clients/_apify/_models.py +++ b/src/apify/storage_clients/_apify/_models.py @@ -94,8 +94,8 @@ class CachedRequest(BaseModel): Only internal structure. """ - unique_key: str - """Unique key of the request.""" + id: str + """Id of the request.""" was_already_handled: bool """Whether the request was already handled.""" diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index e4e4ed68..845237fe 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -54,10 +54,10 @@ def __init__( """The Apify request queue client for API operations.""" self._queue_head = deque[str]() - """A deque to store request unique keys in the queue head.""" + """A deque to store request ids in the queue head.""" self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size) - """A cache to store request objects. Request unique key is used as the cache key.""" + """A cache to store request objects. Request id is used as the cache key.""" self._queue_has_locked_requests: bool | None = None """Whether the queue has requests locked by another client.""" @@ -101,12 +101,14 @@ async def add_batch_of_requests( already_present_requests: list[ProcessedRequest] = [] for request in requests: - if self._requests_cache.get(request.unique_key): + request_id = unique_key_to_request_id(request.unique_key) + if self._requests_cache.get(request_id): # We are not sure if it was already handled at this point, and it is not worth calling API for it. # It could have been handled by another client in the meantime, so cached information about # `request.was_already_handled` is not reliable. already_present_requests.append( ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=request.was_already_handled, @@ -116,12 +118,13 @@ async def add_batch_of_requests( else: # Add new request to the cache. processed_request = ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=request.was_already_handled, ) self._cache_request( - request.unique_key, + request_id, processed_request, ) new_requests.append(request) @@ -131,7 +134,6 @@ async def add_batch_of_requests( requests_dict = [ request.model_dump( by_alias=True, - exclude={'id'}, # Exclude ID fields from requests since the API doesn't accept them. ) for request in new_requests ] @@ -146,7 +148,8 @@ async def add_batch_of_requests( # Remove unprocessed requests from the cache for unprocessed_request in api_response.unprocessed_requests: - self._requests_cache.pop(unprocessed_request.unique_key, None) + unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key) + self._requests_cache.pop(unprocessed_request_id, None) else: api_response = AddRequestsResponse.model_validate( @@ -179,7 +182,10 @@ async def get_request(self, unique_key: str) -> Request | None: Returns: The request or None if not found. """ - response = await self._api_client.get_request(unique_key_to_request_id(unique_key)) + return await self._get_request_by_id(unique_key_to_request_id(unique_key)) + + async def _get_request_by_id(self, request_id: str) -> Request | None: + response = await self._api_client.get_request(request_id) if response is None: return None @@ -206,15 +212,15 @@ async def fetch_next_request(self) -> Request | None: return None # Get the next request ID from the queue head - next_unique_key = self._queue_head.popleft() + next_request_id = self._queue_head.popleft() - request = await self._get_or_hydrate_request(next_unique_key) + request = await self._get_or_hydrate_request(next_request_id) # Handle potential inconsistency where request might not be in the main table yet if request is None: logger.debug( 'Cannot find a request from the beginning of queue, will be retried later', - extra={'nextRequestUniqueKey': next_unique_key}, + extra={'next_request_id': next_request_id}, ) return None @@ -222,16 +228,16 @@ async def fetch_next_request(self) -> Request | None: if request.handled_at is not None: logger.debug( 'Request fetched from the beginning of queue was already handled', - extra={'nextRequestUniqueKey': next_unique_key}, + extra={'next_request_id': next_request_id}, ) return None # Use get request to ensure we have the full request object. - request = await self.get_request(request.unique_key) + request = await self._get_request_by_id(next_request_id) if request is None: logger.debug( 'Request fetched from the beginning of queue was not found in the RQ', - extra={'nextRequestUniqueKey': next_unique_key}, + extra={'next_request_id': next_request_id}, ) return None @@ -248,15 +254,17 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | Returns: Information about the queue operation. `None` if the given request was not in progress. """ + request_id = unique_key_to_request_id(request.unique_key) # Set the handled_at timestamp if not already set if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) - if cached_request := self._requests_cache[request.unique_key]: + if cached_request := self._requests_cache[request_id]: cached_request.was_already_handled = request.was_already_handled try: # Update the request in the API processed_request = await self._update_request(request) + processed_request.id = request_id processed_request.unique_key = request.unique_key # Update assumed handled count if this wasn't already handled @@ -265,10 +273,9 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | self.metadata.pending_request_count -= 1 # Update the cache with the handled request - cache_key = request.unique_key self._cache_request( - cache_key, - processed_request, + cache_key=request_id, + processed_request=processed_request, hydrated_request=request, ) except Exception as exc: @@ -352,17 +359,17 @@ async def _ensure_head_is_non_empty(self) -> None: # Fetch requests from the API and populate the queue head await self._list_head() - async def _get_or_hydrate_request(self, unique_key: str) -> Request | None: - """Get a request by unique key, either from cache or by fetching from API. + async def _get_or_hydrate_request(self, request_id: str) -> Request | None: + """Get a request by id, either from cache or by fetching from API. Args: - unique_key: Unique key of the request to get. + request_id: Id of the request to get. Returns: The request if found and valid, otherwise None. """ # First check if the request is in our cache - cached_entry = self._requests_cache.get(unique_key) + cached_entry = self._requests_cache.get(request_id) if cached_entry and cached_entry.hydrated: # If we have the request hydrated in cache, return it @@ -371,17 +378,17 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None: # If not in cache or not hydrated, fetch the request try: # Fetch the request data - request = await self.get_request(unique_key) + request = await self._get_request_by_id(request_id) # If request is not found and return None if not request: return None # Update cache with hydrated request - cache_key = request.unique_key self._cache_request( - cache_key, - ProcessedRequest( + cache_key=request_id, + processed_request=ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=request.handled_at is not None, @@ -389,7 +396,7 @@ async def _get_or_hydrate_request(self, unique_key: str) -> Request | None: hydrated_request=request, ) except Exception as exc: - logger.debug(f'Error fetching request {unique_key}: {exc!s}') + logger.debug(f'Error fetching request {request_id}: {exc!s}') return None else: return request @@ -438,8 +445,8 @@ async def _list_head( logger.debug(f'Using cached queue head with {len(self._queue_head)} requests') # Create a list of requests from the cached queue head items = [] - for unique_key in list(self._queue_head)[:limit]: - cached_request = self._requests_cache.get(unique_key) + for request_id in list(self._queue_head)[:limit]: + cached_request = self._requests_cache.get(request_id) if cached_request and cached_request.hydrated: items.append(cached_request.hydrated) @@ -472,32 +479,35 @@ async def _list_head( for request_data in response.get('items', []): request = Request.model_validate(request_data) + request_id = request_data.get('id') # Skip requests without ID or unique key - if not request.unique_key: + if not request.unique_key or not request_id: logger.debug( - 'Skipping request from queue head, missing unique key', + 'Skipping request from queue head, missing unique key or id', extra={ 'unique_key': request.unique_key, + 'id': request_id, }, ) continue # Cache the request self._cache_request( - request.unique_key, + request_id, ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=False, ), hydrated_request=request, ) - self._queue_head.append(request.unique_key) + self._queue_head.append(request_id) - for leftover_unique_key in leftover_buffer: + for leftover_id in leftover_buffer: # After adding new requests to the forefront, any existing leftover locked request is kept in the end. - self._queue_head.append(leftover_unique_key) + self._queue_head.append(leftover_id) return RequestQueueHead.model_validate(response) def _cache_request( @@ -516,7 +526,7 @@ def _cache_request( hydrated_request: The hydrated request object, if available. """ self._requests_cache[cache_key] = CachedRequest( - unique_key=processed_request.unique_key, + id=processed_request.id, was_already_handled=processed_request.was_already_handled, hydrated=hydrated_request, lock_expires_at=None, diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 12149d9e..61c47acf 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -56,21 +56,21 @@ def __init__( """The Apify request queue client for API operations.""" self._requests_cache: LRUCache[str, Request] = LRUCache(maxsize=cache_size) - """A cache to store request objects. Request unique key is used as the cache key.""" + """A cache to store request objects. Request id is used as the cache key.""" self._head_requests: deque[str] = deque() - """Ordered unique keys of requests that represent queue head.""" + """Ordered ids of requests that represent queue head.""" self._requests_already_handled: set[str] = set() """Local estimation of requests unique keys that are already present and handled on the platform. - To enhance local deduplication. - To reduce the _requests_cache size. Already handled requests are most likely not going to be needed again, - so no need to cache more than their unique_key. + so no need to cache more than their id. """ self._requests_in_progress: set[str] = set() - """Set of requests unique keys that are being processed locally. + """Set of requests ids that are being processed locally. - To help decide if the RQ is finished or not. This is the only consumer, so it can be tracked locally. """ @@ -105,19 +105,24 @@ async def add_batch_of_requests( already_present_requests: list[ProcessedRequest] = [] for request in requests: + # Calculate id for request + request_id = unique_key_to_request_id(request.unique_key) + # Check if request is known to be already handled (it has to be present as well.) - if request.unique_key in self._requests_already_handled: + if request_id in self._requests_already_handled: already_present_requests.append( ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=True, ) ) # Check if request is known to be already present, but unhandled - elif self._requests_cache.get(request.unique_key): + elif self._requests_cache.get(request_id): already_present_requests.append( ProcessedRequest( + id=request_id, unique_key=request.unique_key, was_already_present=True, was_already_handled=request.was_already_handled, @@ -128,11 +133,11 @@ async def add_batch_of_requests( new_requests.append(request) # Update local caches - self._requests_cache[request.unique_key] = request + self._requests_cache[request_id] = request if forefront: - self._head_requests.append(request.unique_key) + self._head_requests.append(request_id) else: - self._head_requests.appendleft(request.unique_key) + self._head_requests.appendleft(request_id) if new_requests: # Prepare requests for API by converting to dictionaries. @@ -151,7 +156,7 @@ async def add_batch_of_requests( api_response.processed_requests.extend(already_present_requests) # Remove unprocessed requests from the cache for unprocessed_request in api_response.unprocessed_requests: - self._requests_cache.pop(unprocessed_request.unique_key, None) + self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None) else: api_response = AddRequestsResponse( @@ -178,15 +183,39 @@ async def get_request(self, unique_key: str) -> Request | None: Returns: The request or None if not found. """ - if unique_key in self._requests_cache: - return self._requests_cache[unique_key] + return await self._get_request(id=unique_key_to_request_id(unique_key)) + + async def _get_request(self, id: str) -> Request | None: + """Get a request by id. + + Args: + id: Id of request to get. + + Returns: + The request or None if not found. + """ + if id in self._requests_cache: + return self._requests_cache[id] - response = await self._api_client.get_request(unique_key_to_request_id(unique_key)) + # Requests that were not added by this client are not in local cache. Fetch them from platform. + response = await self._api_client.get_request(id) if response is None: return None - return Request.model_validate(response) + request = Request.model_validate(response) + + # Updated local caches + if id in self._requests_in_progress: + # No caching of requests that are already in progress, client is already aware of them. + pass + elif request.was_already_handled: + # Cache only id for already handled requests + self._requests_already_handled.add(id) + else: + # Cache full request for unhandled requests that are not yet in progress and are not yet handled. + self._requests_cache[id] = request + return request async def fetch_next_request(self) -> Request | None: """Return the next request in the queue to be processed. @@ -202,13 +231,10 @@ async def fetch_next_request(self) -> Request | None: await self._ensure_head_is_non_empty() while self._head_requests: - request_unique_key = self._head_requests.pop() - if ( - request_unique_key not in self._requests_in_progress - and request_unique_key not in self._requests_already_handled - ): - self._requests_in_progress.add(request_unique_key) - return await self.get_request(request_unique_key) + request_id = self._head_requests.pop() + if request_id not in self._requests_in_progress and request_id not in self._requests_already_handled: + self._requests_in_progress.add(request_id) + return await self._get_request(request_id) # No request locally and the ones returned from the platform are already in progress. return None @@ -233,44 +259,19 @@ async def _list_head(self) -> None: # Update the cached data for request_data in response.get('items', []): - # Due to https://github.com/apify/apify-core/blob/v0.1377.0/src/api/src/lib/request_queues/request_queue.ts#L53, - # the list_head endpoint may return truncated fields for long requests (e.g., long URLs or unique keys). - # If truncation is detected, fetch the full request data by its ID from the API. - # This is a temporary workaround - the caching will be refactored to use request IDs instead of unique keys. - # See https://github.com/apify/apify-sdk-python/issues/630 for details. - if '[truncated]' in request_data['uniqueKey'] or '[truncated]' in request_data['url']: - request_data = await self._api_client.get_request(request_id=request_data['id']) # noqa: PLW2901 - request = Request.model_validate(request_data) + request_id = request_data['id'] - if request.unique_key in self._requests_in_progress: + if request_id in self._requests_in_progress: # Ignore requests that are already in progress, we will not process them again. continue if request.was_already_handled: - # Do not cache fully handled requests, we do not need them. Just cache their unique_key. - self._requests_already_handled.add(request.unique_key) - else: - # Only fetch the request if we do not know it yet. - if request.unique_key not in self._requests_cache: - request_id = unique_key_to_request_id(request.unique_key) - - if request_data is not None and request_id != request_data['id']: - logger.warning( - f'Request ID mismatch: {request_id} != {request_data["id"]}, ' - 'this may cause unexpected behavior.' - ) - - # See https://github.com/apify/apify-sdk-python/issues/630 for details. - if '[truncated]' not in request.unique_key: - request_data = await self._api_client.get_request(request_id=request_id) # noqa: PLW2901 - request = Request.model_validate(request_data) - - self._requests_cache[request.unique_key] = request - - # Add new requests to the end of the head, unless already present in head - if request.unique_key not in self._head_requests: - self._head_requests.appendleft(request.unique_key) + # Do not cache fully handled requests, we do not need them. Just cache their id. + self._requests_already_handled.add(request_id) + # Add new requests to the end of the head, unless already present in head + elif request_id not in self._head_requests: + self._head_requests.appendleft(request_id) async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: """Mark a request as handled after successful processing. @@ -285,12 +286,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | """ # Set the handled_at timestamp if not already set + request_id = unique_key_to_request_id(request.unique_key) + if request.handled_at is None: request.handled_at = datetime.now(tz=timezone.utc) self.metadata.handled_request_count += 1 self.metadata.pending_request_count -= 1 - if cached_request := self._requests_cache.get(request.unique_key): + if cached_request := self._requests_cache.get(request_id): cached_request.handled_at = request.handled_at try: @@ -299,10 +302,10 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | # adding to the queue.) processed_request = await self._update_request(request) # Remember that we handled this request, to optimize local deduplication. - self._requests_already_handled.add(request.unique_key) + self._requests_already_handled.add(request_id) # Remove request from cache. It will most likely not be needed. - self._requests_cache.pop(request.unique_key) - self._requests_in_progress.discard(request.unique_key) + self._requests_cache.pop(request_id) + self._requests_in_progress.discard(request_id) except Exception as exc: logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}') @@ -329,23 +332,27 @@ async def reclaim_request( """ # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. + + request_id = unique_key_to_request_id(request.unique_key) + if request.was_already_handled: request.handled_at = None try: # Make sure request is in the local cache. We might need it. - self._requests_cache[request.unique_key] = request + self._requests_cache[request_id] = request # No longer in progress - self._requests_in_progress.discard(request.unique_key) + self._requests_in_progress.discard(request_id) # No longer handled - self._requests_already_handled.discard(request.unique_key) + self._requests_already_handled.discard(request_id) if forefront: # Append to top of the local head estimation - self._head_requests.append(request.unique_key) + self._head_requests.append(request_id) processed_request = await self._update_request(request, forefront=forefront) + processed_request.id = request_id processed_request.unique_key = request.unique_key # If the request was previously handled, decrement our handled count since # we're putting it back for processing. @@ -407,9 +414,11 @@ async def _init_caches(self) -> None: response = await self._api_client.list_requests(limit=10_000) for request_data in response.get('items', []): request = Request.model_validate(request_data) + request_id = request_data['id'] + if request.was_already_handled: - # Cache just unique_key for deduplication - self._requests_already_handled.add(request.unique_key) + # Cache just id for deduplication + self._requests_already_handled.add(request_id) else: # Cache full request - self._requests_cache[request.unique_key] = request + self._requests_cache[request_id] = request diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 971aa1b3..6b6ba9eb 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -13,7 +13,6 @@ from ._utils import generate_unique_resource_name from apify import Actor, Request from apify.storage_clients import ApifyStorageClient -from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient from apify.storage_clients._apify._utils import unique_key_to_request_id from apify.storages import RequestQueue @@ -1194,27 +1193,47 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non async def test_rq_long_url(request_queue_apify: RequestQueue) -> None: - # TODO: Remove the skip when issue #630 is resolved. - if isinstance(request_queue_apify._client._implementation, ApifyRequestQueueSharedClient): # type: ignore[attr-defined] - pytest.skip('Skipping for the "shared" request queue - unskip after issue #630 is resolved.') - - url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1' - - req = Request.from_url( - url=url, + rq = request_queue_apify + request = Request.from_url( + 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', use_extended_unique_key=True, always_enqueue=True, ) - request_id = unique_key_to_request_id(req.unique_key) + request_id = unique_key_to_request_id(request.unique_key) - processed_request = await request_queue_apify.add_request(req) + processed_request = await rq.add_request(request) assert processed_request.id == request_id - request_obtained = await request_queue_apify.fetch_next_request() + request_obtained = await rq.fetch_next_request() assert request_obtained is not None - await request_queue_apify.mark_request_as_handled(request_obtained) + await rq.mark_request_as_handled(request_obtained) - is_finished = await request_queue_apify.is_finished() + is_finished = await rq.is_finished() assert is_finished + + +async def test_pre_existing_request_with_user_data( + request_queue_apify: RequestQueue, apify_client_async: ApifyClientAsync +) -> None: + """Test that pre-existing requests with user data are fully fetched. + + list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" + custom_data = {'key': 'value'} + + rq = request_queue_apify + request = Request.from_url( + 'https://example.com', + user_data=custom_data, + ) + + # Add request by a different producer + rq_client = apify_client_async.request_queue(request_queue_id=rq.id) + await rq_client.add_request(request.model_dump(by_alias=True)) + + # Fetch the request by the client under test + request_obtained = await rq.fetch_next_request() + assert request_obtained is not None + # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) + assert custom_data.items() <= request_obtained.user_data.items()