Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 77 additions & 132 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@


class ApifyRequestQueueClient(RequestQueueClient):
"""Base class for Apify platform implementations of the request queue client."""
"""Request queue client for the Apify platform.

This client provides access to request queues stored on the Apify platform, supporting both single-consumer
and multi-consumer scenarios. It manages local caching, request fetching, and state synchronization with the
platform's API.
"""

_MAX_CACHED_REQUESTS: Final[int] = 1_000_000
"""Maximum number of requests that can be cached."""
"""Maximum number of requests that can be cached locally."""

def __init__(
self,
Expand All @@ -45,7 +50,8 @@ def __init__(
"""The Apify request queue client for API operations."""

self._implementation: ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient
"""Internal implementation used to communicate with the Apify platform based Request Queue."""
"""Internal implementation used to communicate with the Apify platform based request queue."""

if access == 'single':
self._implementation = ApifyRequestQueueSingleClient(
api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS
Expand All @@ -60,119 +66,36 @@ def __init__(
else:
raise RuntimeError(f"Unsupported access type: {access}. Allowed values are 'single' or 'shared'.")

@property
def _metadata(self) -> RequestQueueMetadata:
return self._implementation.metadata

@override
async def add_batch_of_requests(
self,
requests: Sequence[Request],
*,
forefront: bool = False,
) -> AddRequestsResponse:
"""Add a batch of requests to the queue.

Args:
requests: The requests to add.
forefront: Whether to add the requests to the beginning of the queue.

Returns:
Response containing information about the added requests.
"""
return await self._implementation.add_batch_of_requests(requests, forefront=forefront)

@override
async def fetch_next_request(self) -> Request | None:
"""Return the next request in the queue to be processed.

Once you successfully finish processing of the request, you need to call `mark_request_as_handled`
to mark the request as handled in the queue. If there was some error in processing the request, call
`reclaim_request` instead, so that the queue will give the request to some other consumer
in another call to the `fetch_next_request` method.

Returns:
The request or `None` if there are no more pending requests.
"""
return await self._implementation.fetch_next_request()

@override
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
"""Mark a request as handled after successful processing.

Handled requests will never again be returned by the `fetch_next_request` method.

Args:
request: The request to mark as handled.

Returns:
Information about the queue operation. `None` if the given request was not in progress.
"""
return await self._implementation.mark_request_as_handled(request)

@override
async def get_request(self, unique_key: str) -> Request | None:
"""Get a request by unique key.

Args:
unique_key: Unique key of the request to get.

Returns:
The request or None if not found.
"""
return await self._implementation.get_request(unique_key)

@override
async def reclaim_request(
self,
request: Request,
*,
forefront: bool = False,
) -> ProcessedRequest | None:
"""Reclaim a failed request back to the queue.

The request will be returned for processing later again by another call to `fetch_next_request`.

Args:
request: The request to return to the queue.
forefront: Whether to add the request to the head or the end of the queue.

Returns:
Information about the queue operation. `None` if the given request was not in progress.
"""
return await self._implementation.reclaim_request(request, forefront=forefront)

@override
async def is_empty(self) -> bool:
"""Check if the queue is empty.

Returns:
True if the queue is empty, False otherwise.
"""
return await self._implementation.is_empty()

@override
async def get_metadata(self) -> ApifyRequestQueueMetadata:
"""Get metadata about the request queue.
"""Retrieve current metadata about the request queue.

This method fetches metadata from the Apify API and merges it with local estimations to provide
the most up-to-date statistics. Local estimations are used to compensate for potential delays
in API data propagation (typically a few seconds).

Returns:
Metadata from the API, merged with local estimation, because in some cases, the data from the API can
be delayed.
Request queue metadata with accurate counts and timestamps, combining API data with local estimates.
"""
response = await self._api_client.get()

if response is None:
raise ValueError('Failed to fetch request queue metadata from the API.')
# Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.)

# Enhance API response with local estimations to account for propagation delays (API data can be delayed
# by a few seconds, while local estimates are immediately accurate).
return ApifyRequestQueueMetadata(
id=response['id'],
name=response['name'],
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count),
total_request_count=max(response['totalRequestCount'], self._implementation.metadata.total_request_count),
handled_request_count=max(
response['handledRequestCount'], self._implementation.metadata.handled_request_count
),
pending_request_count=response['pendingRequestCount'],
created_at=min(response['createdAt'], self._metadata.created_at),
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients,
created_at=min(response['createdAt'], self._implementation.metadata.created_at),
modified_at=max(response['modifiedAt'], self._implementation.metadata.modified_at),
accessed_at=max(response['accessedAt'], self._implementation.metadata.accessed_at),
had_multiple_clients=response['hadMultipleClients'] or self._implementation.metadata.had_multiple_clients,
stats=RequestQueueStats.model_validate(response['stats'], by_alias=True),
)

Expand All @@ -188,39 +111,27 @@ async def open(
) -> ApifyRequestQueueClient:
"""Open an Apify request queue client.

This method creates and initializes a new instance of the Apify request queue client. It handles
authentication, storage lookup/creation, and metadata retrieval, and sets up internal caching and queue
management structures.
This method creates and initializes a new request queue client instance, handling authentication,
storage lookup or creation, metadata retrieval, and initialization of internal caching structures.

Args:
id: The ID of the RQ to open. If provided, searches for existing RQ by ID.
Mutually exclusive with name and alias.
name: The name of the RQ to open (global scope, persists across runs).
Mutually exclusive with id and alias.
alias: The alias of the RQ to open (run scope, creates unnamed storage).
Mutually exclusive with id and name.
configuration: The configuration object containing API credentials and settings. Must include a valid
`token` and `api_base_url`. May also contain a `default_request_queue_id` for fallback when neither
`id`, `name`, nor `alias` is provided.
access: Controls the implementation of the request queue client based on expected scenario:
- 'single' is suitable for single consumer scenarios. It makes less API calls, is cheaper and faster.
- 'shared' is suitable for multiple consumers scenarios at the cost of higher API usage.
Detailed constraints for the 'single' access type:
- Only one client is consuming the request queue at the time.
- Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to
be handled so quickly as this client does not aggressively fetch the forefront and relies on local
head estimation.
- Requests are only added to the queue, never deleted by other clients. (Marking as handled is ok.)
- Other producers can add new requests, but not modify existing ones.
(Modifications would not be included in local cache)
id: ID of an existing request queue to open. Mutually exclusive with `name` and `alias`.
name: Name of the request queue to open or create (persists across Actor runs).
Mutually exclusive with `id` and `alias`.
alias: Alias for the request queue (scoped to current Actor run, creates unnamed storage).
Mutually exclusive with `id` and `name`.
configuration: Configuration object containing API credentials (`token`, `api_base_url`) and
optionally a `default_request_queue_id` for fallback when no identifier is provided.
access: Access mode controlling the client's behavior:
- `single`: Optimized for single-consumer scenarios (lower API usage, better performance).
- `shared`: Optimized for multi-consumer scenarios (more API calls, guaranteed consistency).

Returns:
An instance for the opened or created storage client.

Raises:
ValueError: If the configuration is missing required fields (token, api_base_url), if more than one of
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
in the configuration.
ValueError: If the configuration is missing required fields, if multiple identifiers (`id`, `name`,
`alias`) are provided simultaneously, or if no identifier is provided and no default is configured.
"""
api_client = await create_storage_api_client(
storage_type='RequestQueue',
Expand All @@ -230,10 +141,10 @@ async def open(
id=id,
)

# Fetch metadata separately
# Fetch initial metadata from the API.
raw_metadata = await api_client.get()
if raw_metadata is None:
raise ValueError('Failed to retrieve request queue metadata')
raise ValueError('Failed to retrieve request queue metadata from the API.')
metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata)

return cls(
Expand All @@ -252,3 +163,37 @@ async def purge(self) -> None:
@override
async def drop(self) -> None:
await self._api_client.delete()

@override
async def add_batch_of_requests(
self,
requests: Sequence[Request],
*,
forefront: bool = False,
) -> AddRequestsResponse:
return await self._implementation.add_batch_of_requests(requests, forefront=forefront)

@override
async def fetch_next_request(self) -> Request | None:
return await self._implementation.fetch_next_request()

@override
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
return await self._implementation.mark_request_as_handled(request)

@override
async def get_request(self, unique_key: str) -> Request | None:
return await self._implementation.get_request(unique_key)

@override
async def reclaim_request(
self,
request: Request,
*,
forefront: bool = False,
) -> ProcessedRequest | None:
return await self._implementation.reclaim_request(request, forefront=forefront)

@override
async def is_empty(self) -> bool:
return await self._implementation.is_empty()
Loading