From a8ef7153d6a68580d5686ab40800eee8f4f57eaf Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Mon, 1 Dec 2025 15:50:46 +0100 Subject: [PATCH] chore: Polish RQ Apify storage client --- .../_apify/_request_queue_client.py | 209 ++++++-------- .../_apify/_request_queue_shared_client.py | 139 ++++------ .../_apify/_request_queue_single_client.py | 260 ++++++++---------- 3 files changed, 242 insertions(+), 366 deletions(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index be6bc5c9..9a589ec1 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -25,10 +25,15 @@ class ApifyRequestQueueClient(RequestQueueClient): - """Base class for Apify platform implementations of the request queue client.""" + """Request queue client for the Apify platform. + + This client provides access to request queues stored on the Apify platform, supporting both single-consumer + and multi-consumer scenarios. It manages local caching, request fetching, and state synchronization with the + platform's API. + """ _MAX_CACHED_REQUESTS: Final[int] = 1_000_000 - """Maximum number of requests that can be cached.""" + """Maximum number of requests that can be cached locally.""" def __init__( self, @@ -45,7 +50,8 @@ def __init__( """The Apify request queue client for API operations.""" self._implementation: ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient - """Internal implementation used to communicate with the Apify platform based Request Queue.""" + """Internal implementation used to communicate with the Apify platform based request queue.""" + if access == 'single': self._implementation = ApifyRequestQueueSingleClient( api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS @@ -60,119 +66,36 @@ def __init__( else: raise RuntimeError(f"Unsupported access type: {access}. Allowed values are 'single' or 'shared'.") - @property - def _metadata(self) -> RequestQueueMetadata: - return self._implementation.metadata - - @override - async def add_batch_of_requests( - self, - requests: Sequence[Request], - *, - forefront: bool = False, - ) -> AddRequestsResponse: - """Add a batch of requests to the queue. - - Args: - requests: The requests to add. - forefront: Whether to add the requests to the beginning of the queue. - - Returns: - Response containing information about the added requests. - """ - return await self._implementation.add_batch_of_requests(requests, forefront=forefront) - - @override - async def fetch_next_request(self) -> Request | None: - """Return the next request in the queue to be processed. - - Once you successfully finish processing of the request, you need to call `mark_request_as_handled` - to mark the request as handled in the queue. If there was some error in processing the request, call - `reclaim_request` instead, so that the queue will give the request to some other consumer - in another call to the `fetch_next_request` method. - - Returns: - The request or `None` if there are no more pending requests. - """ - return await self._implementation.fetch_next_request() - - @override - async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: - """Mark a request as handled after successful processing. - - Handled requests will never again be returned by the `fetch_next_request` method. - - Args: - request: The request to mark as handled. - - Returns: - Information about the queue operation. `None` if the given request was not in progress. - """ - return await self._implementation.mark_request_as_handled(request) - - @override - async def get_request(self, unique_key: str) -> Request | None: - """Get a request by unique key. - - Args: - unique_key: Unique key of the request to get. - - Returns: - The request or None if not found. - """ - return await self._implementation.get_request(unique_key) - - @override - async def reclaim_request( - self, - request: Request, - *, - forefront: bool = False, - ) -> ProcessedRequest | None: - """Reclaim a failed request back to the queue. - - The request will be returned for processing later again by another call to `fetch_next_request`. - - Args: - request: The request to return to the queue. - forefront: Whether to add the request to the head or the end of the queue. - - Returns: - Information about the queue operation. `None` if the given request was not in progress. - """ - return await self._implementation.reclaim_request(request, forefront=forefront) - - @override - async def is_empty(self) -> bool: - """Check if the queue is empty. - - Returns: - True if the queue is empty, False otherwise. - """ - return await self._implementation.is_empty() - @override async def get_metadata(self) -> ApifyRequestQueueMetadata: - """Get metadata about the request queue. + """Retrieve current metadata about the request queue. + + This method fetches metadata from the Apify API and merges it with local estimations to provide + the most up-to-date statistics. Local estimations are used to compensate for potential delays + in API data propagation (typically a few seconds). Returns: - Metadata from the API, merged with local estimation, because in some cases, the data from the API can - be delayed. + Request queue metadata with accurate counts and timestamps, combining API data with local estimates. """ response = await self._api_client.get() + if response is None: raise ValueError('Failed to fetch request queue metadata from the API.') - # Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.) + + # Enhance API response with local estimations to account for propagation delays (API data can be delayed + # by a few seconds, while local estimates are immediately accurate). return ApifyRequestQueueMetadata( id=response['id'], name=response['name'], - total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count), - handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count), + total_request_count=max(response['totalRequestCount'], self._implementation.metadata.total_request_count), + handled_request_count=max( + response['handledRequestCount'], self._implementation.metadata.handled_request_count + ), pending_request_count=response['pendingRequestCount'], - created_at=min(response['createdAt'], self._metadata.created_at), - modified_at=max(response['modifiedAt'], self._metadata.modified_at), - accessed_at=max(response['accessedAt'], self._metadata.accessed_at), - had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients, + created_at=min(response['createdAt'], self._implementation.metadata.created_at), + modified_at=max(response['modifiedAt'], self._implementation.metadata.modified_at), + accessed_at=max(response['accessedAt'], self._implementation.metadata.accessed_at), + had_multiple_clients=response['hadMultipleClients'] or self._implementation.metadata.had_multiple_clients, stats=RequestQueueStats.model_validate(response['stats'], by_alias=True), ) @@ -188,39 +111,27 @@ async def open( ) -> ApifyRequestQueueClient: """Open an Apify request queue client. - This method creates and initializes a new instance of the Apify request queue client. It handles - authentication, storage lookup/creation, and metadata retrieval, and sets up internal caching and queue - management structures. + This method creates and initializes a new request queue client instance, handling authentication, + storage lookup or creation, metadata retrieval, and initialization of internal caching structures. Args: - id: The ID of the RQ to open. If provided, searches for existing RQ by ID. - Mutually exclusive with name and alias. - name: The name of the RQ to open (global scope, persists across runs). - Mutually exclusive with id and alias. - alias: The alias of the RQ to open (run scope, creates unnamed storage). - Mutually exclusive with id and name. - configuration: The configuration object containing API credentials and settings. Must include a valid - `token` and `api_base_url`. May also contain a `default_request_queue_id` for fallback when neither - `id`, `name`, nor `alias` is provided. - access: Controls the implementation of the request queue client based on expected scenario: - - 'single' is suitable for single consumer scenarios. It makes less API calls, is cheaper and faster. - - 'shared' is suitable for multiple consumers scenarios at the cost of higher API usage. - Detailed constraints for the 'single' access type: - - Only one client is consuming the request queue at the time. - - Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to - be handled so quickly as this client does not aggressively fetch the forefront and relies on local - head estimation. - - Requests are only added to the queue, never deleted by other clients. (Marking as handled is ok.) - - Other producers can add new requests, but not modify existing ones. - (Modifications would not be included in local cache) + id: ID of an existing request queue to open. Mutually exclusive with `name` and `alias`. + name: Name of the request queue to open or create (persists across Actor runs). + Mutually exclusive with `id` and `alias`. + alias: Alias for the request queue (scoped to current Actor run, creates unnamed storage). + Mutually exclusive with `id` and `name`. + configuration: Configuration object containing API credentials (`token`, `api_base_url`) and + optionally a `default_request_queue_id` for fallback when no identifier is provided. + access: Access mode controlling the client's behavior: + - `single`: Optimized for single-consumer scenarios (lower API usage, better performance). + - `shared`: Optimized for multi-consumer scenarios (more API calls, guaranteed consistency). Returns: An instance for the opened or created storage client. Raises: - ValueError: If the configuration is missing required fields (token, api_base_url), if more than one of - `id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available - in the configuration. + ValueError: If the configuration is missing required fields, if multiple identifiers (`id`, `name`, + `alias`) are provided simultaneously, or if no identifier is provided and no default is configured. """ api_client = await create_storage_api_client( storage_type='RequestQueue', @@ -230,10 +141,10 @@ async def open( id=id, ) - # Fetch metadata separately + # Fetch initial metadata from the API. raw_metadata = await api_client.get() if raw_metadata is None: - raise ValueError('Failed to retrieve request queue metadata') + raise ValueError('Failed to retrieve request queue metadata from the API.') metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata) return cls( @@ -252,3 +163,37 @@ async def purge(self) -> None: @override async def drop(self) -> None: await self._api_client.delete() + + @override + async def add_batch_of_requests( + self, + requests: Sequence[Request], + *, + forefront: bool = False, + ) -> AddRequestsResponse: + return await self._implementation.add_batch_of_requests(requests, forefront=forefront) + + @override + async def fetch_next_request(self) -> Request | None: + return await self._implementation.fetch_next_request() + + @override + async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: + return await self._implementation.mark_request_as_handled(request) + + @override + async def get_request(self, unique_key: str) -> Request | None: + return await self._implementation.get_request(unique_key) + + @override + async def reclaim_request( + self, + request: Request, + *, + forefront: bool = False, + ) -> ProcessedRequest | None: + return await self._implementation.reclaim_request(request, forefront=forefront) + + @override + async def is_empty(self) -> bool: + return await self._implementation.is_empty() diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 845237fe..3210ab93 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -19,14 +19,19 @@ from apify_client.clients import RequestQueueClientAsync - logger = getLogger(__name__) class ApifyRequestQueueSharedClient: - """An Apify platform implementation of the request queue client. + """Internal request queue client implementation for multi-consumer scenarios on the Apify platform. + + This implementation is optimized for scenarios where multiple clients concurrently fetch and process requests + from the same queue. It makes more frequent API calls to ensure consistency across all consumers and uses + request locking to prevent duplicate processing. + + This class is used internally by `ApifyRequestQueueClient` when `access='shared'` is specified. - This implementation supports multiple producers and multiple consumers scenario. + Public methods are not individually documented as they implement the interface defined in `RequestQueueClient`. """ _DEFAULT_LOCK_TIME: Final[timedelta] = timedelta(minutes=3) @@ -40,45 +45,39 @@ def __init__( cache_size: int, metadata_getter: Callable[[], Coroutine[Any, Any, ApifyRequestQueueMetadata]], ) -> None: - """Initialize a new instance. + """Initialize a new shared request queue client instance. - Preferably use the `ApifyRequestQueueClient.open` class method to create a new instance. + Use `ApifyRequestQueueClient.open(access='shared')` instead of calling this directly. + + Args: + api_client: The Apify API client for request queue operations. + metadata: Initial metadata for the request queue. + cache_size: Maximum number of requests to cache locally. + metadata_getter: Async function to fetch current metadata from the API. """ self.metadata = metadata - """Additional data related to the RequestQueue.""" + """Current metadata for the request queue.""" self._metadata_getter = metadata_getter - """Async function to get metadata from API.""" + """Async function to fetch the latest metadata from the API.""" self._api_client = api_client - """The Apify request queue client for API operations.""" + """The Apify API client for communication with Apify platform.""" self._queue_head = deque[str]() - """A deque to store request ids in the queue head.""" + """Local cache of request IDs from the queue head for efficient fetching.""" self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=cache_size) - """A cache to store request objects. Request id is used as the cache key.""" + """LRU cache storing request objects, keyed by request ID.""" self._queue_has_locked_requests: bool | None = None - """Whether the queue has requests locked by another client.""" + """Whether the queue contains requests currently locked by other clients.""" self._should_check_for_forefront_requests = False - """Whether to check for forefront requests in the next list_head call.""" + """Flag indicating whether to refresh the queue head to check for newly added forefront requests.""" self._fetch_lock = asyncio.Lock() - """Fetch lock to minimize race conditions when communicating with API.""" - - async def _get_metadata_estimate(self) -> RequestQueueMetadata: - """Try to get cached metadata first. If multiple clients, fuse with global metadata. - - This method is used internally to avoid unnecessary API call unless needed (multiple clients). - Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one - client, it is the better choice. - """ - if self.metadata.had_multiple_clients: - return await self._metadata_getter() - # Get local estimation (will not include changes done bo another client) - return self.metadata + """Lock to prevent race conditions during concurrent fetch operations.""" async def add_batch_of_requests( self, @@ -86,17 +85,8 @@ async def add_batch_of_requests( *, forefront: bool = False, ) -> AddRequestsResponse: - """Add a batch of requests to the queue. - - Args: - requests: The requests to add. - forefront: Whether to add the requests to the beginning of the queue. - - Returns: - Response containing information about the added requests. - """ + """Specific implementation of this method for the RQ shared access mode.""" # Do not try to add previously added requests to avoid pointless expensive calls to API - new_requests: list[Request] = [] already_present_requests: list[ProcessedRequest] = [] @@ -174,35 +164,11 @@ async def add_batch_of_requests( return api_response async def get_request(self, unique_key: str) -> Request | None: - """Get a request by unique key. - - Args: - unique_key: Unique key of the request to get. - - Returns: - The request or None if not found. - """ + """Specific implementation of this method for the RQ shared access mode.""" return await self._get_request_by_id(unique_key_to_request_id(unique_key)) - async def _get_request_by_id(self, request_id: str) -> Request | None: - response = await self._api_client.get_request(request_id) - - if response is None: - return None - - return Request.model_validate(response) - async def fetch_next_request(self) -> Request | None: - """Return the next request in the queue to be processed. - - Once you successfully finish processing of the request, you need to call `mark_request_as_handled` - to mark the request as handled in the queue. If there was some error in processing the request, call - `reclaim_request` instead, so that the queue will give the request to some other consumer - in another call to the `fetch_next_request` method. - - Returns: - The request or `None` if there are no more pending requests. - """ + """Specific implementation of this method for the RQ shared access mode.""" # Ensure the queue head has requests if available. Fetching the head with lock to prevent race conditions. async with self._fetch_lock: await self._ensure_head_is_non_empty() @@ -244,16 +210,7 @@ async def fetch_next_request(self) -> Request | None: return request async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: - """Mark a request as handled after successful processing. - - Handled requests will never again be returned by the `fetch_next_request` method. - - Args: - request: The request to mark as handled. - - Returns: - Information about the queue operation. `None` if the given request was not in progress. - """ + """Specific implementation of this method for the RQ shared access mode.""" request_id = unique_key_to_request_id(request.unique_key) # Set the handled_at timestamp if not already set if request.handled_at is None: @@ -290,17 +247,7 @@ async def reclaim_request( *, forefront: bool = False, ) -> ProcessedRequest | None: - """Reclaim a failed request back to the queue. - - The request will be returned for processing later again by another call to `fetch_next_request`. - - Args: - request: The request to return to the queue. - forefront: Whether to add the request to the head or the end of the queue. - - Returns: - Information about the queue operation. `None` if the given request was not in progress. - """ + """Specific implementation of this method for the RQ shared access mode.""" # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. if request.was_already_handled: @@ -339,17 +286,34 @@ async def reclaim_request( return processed_request async def is_empty(self) -> bool: - """Check if the queue is empty. - - Returns: - True if the queue is empty, False otherwise. - """ + """Specific implementation of this method for the RQ shared access mode.""" # Check _list_head. # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. async with self._fetch_lock: head = await self._list_head(limit=1) return len(head.items) == 0 and not self._queue_has_locked_requests + async def _get_metadata_estimate(self) -> RequestQueueMetadata: + """Try to get cached metadata first. If multiple clients, fuse with global metadata. + + This method is used internally to avoid unnecessary API call unless needed (multiple clients). + Local estimation of metadata is without delay, unlike metadata from API. In situation where there is only one + client, it is the better choice. + """ + if self.metadata.had_multiple_clients: + return await self._metadata_getter() + + # Get local estimation (will not include changes done bo another client) + return self.metadata + + async def _get_request_by_id(self, request_id: str) -> Request | None: + response = await self._api_client.get_request(request_id) + + if response is None: + return None + + return Request.model_validate(response) + async def _ensure_head_is_non_empty(self) -> None: """Ensure that the queue head has requests if they are available in the queue.""" # If queue head has adequate requests, skip fetching more @@ -508,6 +472,7 @@ async def _list_head( for leftover_id in leftover_buffer: # After adding new requests to the forefront, any existing leftover locked request is kept in the end. self._queue_head.append(leftover_id) + return RequestQueueHead.model_validate(response) def _cache_request( diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 04acac1b..7cc202bb 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -17,22 +17,33 @@ from apify_client.clients import RequestQueueClientAsync - logger = getLogger(__name__) class ApifyRequestQueueSingleClient: - """An Apify platform implementation of the request queue client with limited capability. + """Internal request queue client implementation for single-consumer scenarios on the Apify platform. + + This implementation minimizes API calls and resource usage by leveraging local caching and head estimation. + It is designed for scenarios where only one client consumes requests from the queue at a time, though multiple + producers may add requests concurrently. + + ### Usage constraints + + This client must operate within the following constraints to function correctly: + + - **Single consumer**: Only one client should be consuming (fetching) requests from the queue at any given time. + - **Multiple producers allowed**: Multiple clients can add requests concurrently, but their forefront requests + may not be prioritized immediately since this client relies on local head estimation. + - **Append-only queue**: Requests should only be added to the queue, never deleted by other clients. + Marking requests as handled is permitted. + - **No external modifications**: Other producers can add new requests but should not modify existing ones, + as modifications won't be reflected in the local cache. + + If these constraints are not met, the client may exhibit unpredictable behavior. - This client is designed to use as little resources as possible, but has to be used in constrained context. - Constraints: - - Only one client is consuming the request queue at the time. - - Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to be handled - so quickly as this client does not aggressively fetch the forefront and relies on local head estimation. - - Requests are only added to the queue, never deleted. (Marking as handled is ok.) - - Other producers can add new requests, but not modify existing ones (otherwise caching can miss the updates) + This class is used internally by `ApifyRequestQueueClient` when `access='single'` is specified. - If the constraints are not met, the client might work in an unpredictable way. + Public methods are not individually documented as they implement the interface defined in `RequestQueueClient`. """ _MAX_HEAD_ITEMS: Final[int] = 1000 @@ -45,40 +56,43 @@ def __init__( metadata: RequestQueueMetadata, cache_size: int, ) -> None: - """Initialize a new instance. + """Initialize a new single-consumer request queue client instance. - Preferably use the `ApifyRequestQueueClient.open` class method to create a new instance. + Use `ApifyRequestQueueClient.open(access='single')` instead of calling this directly. + + Args: + api_client: The Apify API client for request queue operations. + metadata: Initial metadata for the request queue. + cache_size: Maximum number of requests to cache locally. """ self.metadata = metadata - """Additional data related to the RequestQueue.""" + """Current metadata for the request queue.""" self._api_client = api_client - """The Apify request queue client for API operations.""" + """The Apify API client for communication with Apify platform.""" self._requests_cache: LRUCache[str, Request] = LRUCache(maxsize=cache_size) - """A cache to store request objects. Request id is used as the cache key.""" + """LRU cache storing unhandled request objects, keyed by request ID.""" self._head_requests: deque[str] = deque() - """Ordered ids of requests that represent queue head.""" + """Ordered queue of request IDs representing the local estimate of the queue head.""" self._requests_already_handled: set[str] = set() - """Local estimation of requests unique keys that are already present and handled on the platform. + """Set of request IDs known to be already processed on the platform. - - To enhance local deduplication. - - To reduce the _requests_cache size. Already handled requests are most likely not going to be needed again, - so no need to cache more than their id. + Used for efficient local deduplication without needing to cache full request objects. """ self._requests_in_progress: set[str] = set() - """Set of requests ids that are being processed locally. + """Set of request IDs currently being processed by this client. - - To help decide if the RQ is finished or not. This is the only consumer, so it can be tracked locally. + Tracked locally to accurately determine when the queue is empty for this single consumer. """ self._initialized_caches = False - """This flag indicates whether the local caches were already initialized. + """Flag indicating whether local caches have been populated from existing queue contents. - Initialization is done lazily only if deduplication is needed (When calling add_batch_of_requests). + Initialization is performed lazily when deduplication is first needed (during add_batch_of_requests). """ async def add_batch_of_requests( @@ -87,15 +101,7 @@ async def add_batch_of_requests( *, forefront: bool = False, ) -> AddRequestsResponse: - """Add a batch of requests to the queue. - - Args: - requests: The requests to add. - forefront: Whether to add the requests to the beginning of the queue. - - Returns: - Response containing information about the added requests. - """ + """Specific implementation of this method for the RQ single access mode.""" if not self._initialized_caches: # One time process to initialize local caches for existing request queues. await self._init_caches() @@ -175,117 +181,24 @@ async def add_batch_of_requests( return api_response async def get_request(self, unique_key: str) -> Request | None: - """Get a request by unique key. - - Args: - unique_key: Unique key of the request to get. - - Returns: - The request or None if not found. - """ - return await self._get_request(id=unique_key_to_request_id(unique_key)) - - async def _get_request(self, id: str) -> Request | None: - """Get a request by id. - - Args: - id: Id of request to get. - - Returns: - The request or None if not found. - """ - if id in self._requests_cache: - return self._requests_cache[id] - - # Requests that were not added by this client are not in local cache. Fetch them from platform. - response = await self._api_client.get_request(id) - - if response is None: - return None - - request = Request.model_validate(response) - - # Updated local caches - if id in self._requests_in_progress: - # No caching of requests that are already in progress, client is already aware of them. - pass - elif request.was_already_handled: - # Cache only id for already handled requests - self._requests_already_handled.add(id) - else: - # Cache full request for unhandled requests that are not yet in progress and are not yet handled. - self._requests_cache[id] = request - return request + """Specific implementation of this method for the RQ single access mode.""" + return await self._get_request_by_id(id=unique_key_to_request_id(unique_key)) async def fetch_next_request(self) -> Request | None: - """Return the next request in the queue to be processed. - - Once you successfully finish processing of the request, you need to call `mark_request_as_handled` - to mark the request as handled in the queue. If there was some error in processing the request, call - `reclaim_request` instead, so that the queue will give the request to some other consumer - in another call to the `fetch_next_request` method. - - Returns: - The request or `None` if there are no more pending requests. - """ + """Specific implementation of this method for the RQ single access mode.""" await self._ensure_head_is_non_empty() while self._head_requests: request_id = self._head_requests.pop() if request_id not in self._requests_in_progress and request_id not in self._requests_already_handled: self._requests_in_progress.add(request_id) - return await self._get_request(request_id) + return await self._get_request_by_id(request_id) # No request locally and the ones returned from the platform are already in progress. return None - async def _ensure_head_is_non_empty(self) -> None: - """Ensure that the queue head has requests if they are available in the queue.""" - if len(self._head_requests) <= 1: - await self._list_head() - - async def _list_head(self) -> None: - desired_new_head_items = 200 - # The head will contain in progress requests as well, so we need to fetch more, to get some new ones. - requested_head_items = max(self._MAX_HEAD_ITEMS, desired_new_head_items + len(self._requests_in_progress)) - response = await self._api_client.list_head(limit=requested_head_items) - - # Update metadata - # Check if there is another client working with the RequestQueue - self.metadata.had_multiple_clients = response.get('hadMultipleClients', False) - # Should warn once? This might be outside expected context if the other consumers consumes at the same time - - if modified_at := response.get('queueModifiedAt'): - self.metadata.modified_at = max(self.metadata.modified_at, modified_at) - - # Update the cached data - for request_data in response.get('items', []): - request = Request.model_validate(request_data) - request_id = request_data['id'] - - if request_id in self._requests_in_progress: - # Ignore requests that are already in progress, we will not process them again. - continue - - if request.was_already_handled: - # Do not cache fully handled requests, we do not need them. Just cache their id. - self._requests_already_handled.add(request_id) - # Add new requests to the end of the head, unless already present in head - elif request_id not in self._head_requests: - self._head_requests.appendleft(request_id) - async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: - """Mark a request as handled after successful processing. - - Handled requests will never again be returned by the `fetch_next_request` method. - - Args: - request: The request to mark as handled. - - Returns: - Information about the queue operation. `None` if the given request was not in progress. - """ + """Specific implementation of this method for the RQ single access mode.""" # Set the handled_at timestamp if not already set - request_id = unique_key_to_request_id(request.unique_key) if request.handled_at is None: @@ -319,17 +232,7 @@ async def reclaim_request( *, forefront: bool = False, ) -> ProcessedRequest | None: - """Reclaim a failed request back to the queue. - - The request will be returned for processing later again by another call to `fetch_next_request`. - - Args: - request: The request to return to the queue. - forefront: Whether to add the request to the head or the end of the queue. - - Returns: - Information about the queue operation. `None` if the given request was not in progress. - """ + """Specific implementation of this method for the RQ single access mode.""" # Check if the request was marked as handled and clear it. When reclaiming, # we want to put the request back for processing. @@ -367,15 +270,78 @@ async def reclaim_request( return processed_request async def is_empty(self) -> bool: - """Check if the queue is empty. - - Returns: - True if the queue is empty, False otherwise. - """ + """Specific implementation of this method for the RQ single access mode.""" # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. await self._ensure_head_is_non_empty() return not self._head_requests and not self._requests_in_progress + async def _ensure_head_is_non_empty(self) -> None: + """Ensure that the queue head has requests if they are available in the queue.""" + if len(self._head_requests) <= 1: + await self._list_head() + + async def _list_head(self) -> None: + desired_new_head_items = 200 + # The head will contain in progress requests as well, so we need to fetch more, to get some new ones. + requested_head_items = max(self._MAX_HEAD_ITEMS, desired_new_head_items + len(self._requests_in_progress)) + response = await self._api_client.list_head(limit=requested_head_items) + + # Update metadata + # Check if there is another client working with the RequestQueue + self.metadata.had_multiple_clients = response.get('hadMultipleClients', False) + # Should warn once? This might be outside expected context if the other consumers consumes at the same time + + if modified_at := response.get('queueModifiedAt'): + self.metadata.modified_at = max(self.metadata.modified_at, modified_at) + + # Update the cached data + for request_data in response.get('items', []): + request = Request.model_validate(request_data) + request_id = request_data['id'] + + if request_id in self._requests_in_progress: + # Ignore requests that are already in progress, we will not process them again. + continue + + if request.was_already_handled: + # Do not cache fully handled requests, we do not need them. Just cache their id. + self._requests_already_handled.add(request_id) + # Add new requests to the end of the head, unless already present in head + elif request_id not in self._head_requests: + self._head_requests.appendleft(request_id) + + async def _get_request_by_id(self, id: str) -> Request | None: + """Get a request by id. + + Args: + id: Id of request to get. + + Returns: + The request or None if not found. + """ + if id in self._requests_cache: + return self._requests_cache[id] + + # Requests that were not added by this client are not in local cache. Fetch them from platform. + response = await self._api_client.get_request(id) + + if response is None: + return None + + request = Request.model_validate(response) + + # Updated local caches + if id in self._requests_in_progress: + # No caching of requests that are already in progress, client is already aware of them. + pass + elif request.was_already_handled: + # Cache only id for already handled requests + self._requests_already_handled.add(id) + else: + # Cache full request for unhandled requests that are not yet in progress and are not yet handled. + self._requests_cache[id] = request + return request + async def _update_request( self, request: Request,