Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from crawlee.storages import RequestQueue

from ._models import ApifyRequestQueueMetadata, RequestQueueStats
from ._request_queue_shared_client import _ApifyRequestQueueSharedClient
from ._request_queue_single_client import _ApifyRequestQueueSingleClient
from ._request_queue_shared_client import ApifyRequestQueueSharedClient
from ._request_queue_single_client import ApifyRequestQueueSingleClient
from ._utils import AliasResolver

if TYPE_CHECKING:
Expand Down Expand Up @@ -47,14 +47,14 @@ def __init__(
self._api_client = api_client
"""The Apify request queue client for API operations."""

self._implementation: _ApifyRequestQueueSingleClient | _ApifyRequestQueueSharedClient
self._implementation: ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient
"""Internal implementation used to communicate with the Apify platform based Request Queue."""
if access == 'single':
self._implementation = _ApifyRequestQueueSingleClient(
self._implementation = ApifyRequestQueueSingleClient(
api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS
)
elif access == 'shared':
self._implementation = _ApifyRequestQueueSharedClient(
self._implementation = ApifyRequestQueueSharedClient(
api_client=self._api_client,
metadata=metadata,
cache_size=self._MAX_CACHED_REQUESTS,
Expand Down
22 changes: 9 additions & 13 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
logger = getLogger(__name__)


class _ApifyRequestQueueSharedClient:
class ApifyRequestQueueSharedClient:
"""An Apify platform implementation of the request queue client.

This implementation supports multiple producers and multiple consumers scenario.
Expand Down Expand Up @@ -106,23 +106,19 @@ async def add_batch_of_requests(
# It could have been handled by another client in the meantime, so cached information about
# `request.was_already_handled` is not reliable.
already_present_requests.append(
ProcessedRequest.model_validate(
{
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': request.was_already_handled,
}
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
)

else:
# Add new request to the cache.
processed_request = ProcessedRequest.model_validate(
{
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': request.was_already_handled,
}
processed_request = ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
self._cache_request(
request.unique_key,
Expand Down
52 changes: 31 additions & 21 deletions src/apify/storage_clients/_apify/_request_queue_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
logger = getLogger(__name__)


class _ApifyRequestQueueSingleClient:
class ApifyRequestQueueSingleClient:
"""An Apify platform implementation of the request queue client with limited capability.

This client is designed to use as little resources as possible, but has to be used in constrained context.
Expand Down Expand Up @@ -108,23 +108,19 @@ async def add_batch_of_requests(
# Check if request is known to be already handled (it has to be present as well.)
if request.unique_key in self._requests_already_handled:
already_present_requests.append(
ProcessedRequest.model_validate(
{
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': True,
}
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=True,
)
)
# Check if request is known to be already present, but unhandled
elif self._requests_cache.get(request.unique_key):
already_present_requests.append(
ProcessedRequest.model_validate(
{
'uniqueKey': request.unique_key,
'wasAlreadyPresent': True,
'wasAlreadyHandled': request.was_already_handled,
}
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=request.was_already_handled,
)
)
else:
Expand Down Expand Up @@ -158,8 +154,9 @@ async def add_batch_of_requests(
self._requests_cache.pop(unprocessed_request.unique_key, None)

else:
api_response = AddRequestsResponse.model_validate(
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
api_response = AddRequestsResponse(
unprocessed_requests=[],
processed_requests=already_present_requests,
)

# Update assumed total count for newly added requests.
Expand Down Expand Up @@ -236,28 +233,41 @@ async def _list_head(self) -> None:

# Update the cached data
for request_data in response.get('items', []):
# Due to https://github.com/apify/apify-core/blob/v0.1377.0/src/api/src/lib/request_queues/request_queue.ts#L53,
# the list_head endpoint may return truncated fields for long requests (e.g., long URLs or unique keys).
# If truncation is detected, fetch the full request data by its ID from the API.
# This is a temporary workaround - the caching will be refactored to use request IDs instead of unique keys.
# See https://github.com/apify/apify-sdk-python/issues/630 for details.
if '[truncated]' in request_data['uniqueKey'] or '[truncated]' in request_data['url']:
request_data = await self._api_client.get_request(request_id=request_data['id']) # noqa: PLW2901

request = Request.model_validate(request_data)

if request.unique_key in self._requests_in_progress:
# Ignore requests that are already in progress, we will not process them again.
continue

if request.was_already_handled:
# Do not cache fully handled requests, we do not need them. Just cache their unique_key.
self._requests_already_handled.add(request.unique_key)
else:
# Only fetch the request if we do not know it yet.
if request.unique_key not in self._requests_cache:
request_id = unique_key_to_request_id(request.unique_key)
complete_request_data = await self._api_client.get_request(request_id)

if complete_request_data is not None:
request = Request.model_validate(complete_request_data)
self._requests_cache[request.unique_key] = request
else:
if request_data is not None and request_id != request_data['id']:
logger.warning(
f'Could not fetch request data for unique_key=`{request.unique_key}` (id=`{request_id}`)'
f'Request ID mismatch: {request_id} != {request_data["id"]}, '
'this may cause unexpected behavior.'
)

# See https://github.com/apify/apify-sdk-python/issues/630 for details.
if '[truncated]' not in request.unique_key:
request_data = await self._api_client.get_request(request_id=request_id) # noqa: PLW2901
request = Request.model_validate(request_data)

self._requests_cache[request.unique_key] = request

# Add new requests to the end of the head, unless already present in head
if request.unique_key not in self._head_requests:
self._head_requests.appendleft(request.unique_key)
Expand Down
33 changes: 31 additions & 2 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import pytest

from apify_shared.consts import ApifyEnvVars
from crawlee import Request, service_locator
from crawlee import service_locator
from crawlee.crawlers import BasicCrawler

from ._utils import generate_unique_resource_name
from apify import Actor
from apify import Actor, Request
from apify.storage_clients import ApifyStorageClient
from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient
from apify.storage_clients._apify._utils import unique_key_to_request_id
from apify.storages import RequestQueue

if TYPE_CHECKING:
Expand Down Expand Up @@ -1189,3 +1191,30 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
assert hasattr(metadata, 'stats')
apify_metadata = cast('ApifyRequestQueueMetadata', metadata)
assert apify_metadata.stats.write_count == add_request_count


async def test_rq_long_url(request_queue_apify: RequestQueue) -> None:
# TODO: Remove the skip when issue #630 is resolved.
if isinstance(request_queue_apify._client._implementation, ApifyRequestQueueSharedClient): # type: ignore[attr-defined]
pytest.skip('Skipping for the "shared" request queue - unskip after issue #630 is resolved.')

url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1'
Copy link
Contributor

@honzajavorek honzajavorek Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providing test cases since 2024 2023 😂


req = Request.from_url(
url=url,
use_extended_unique_key=True,
always_enqueue=True,
)

request_id = unique_key_to_request_id(req.unique_key)

processed_request = await request_queue_apify.add_request(req)
assert processed_request.id == request_id

request_obtained = await request_queue_apify.fetch_next_request()
assert request_obtained is not None

await request_queue_apify.mark_request_as_handled(request_obtained)

is_finished = await request_queue_apify.is_finished()
assert is_finished