Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ async def __aenter__(self) -> Self:
# Mark initialization as complete and update global state.
self._is_initialized = True
_ActorType._is_any_instance_initialized = True

if not Actor.is_at_home():
# Make sure that the input related KVS is initialized to ensure that the input aware client is used
await self.open_key_value_store()
return self

async def __aexit__(
Expand Down
124 changes: 83 additions & 41 deletions src/apify/storage_clients/_file_system/_key_value_store_client.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import asyncio
import json
import logging
from itertools import chain
from pathlib import Path

from more_itertools import flatten
from typing_extensions import Self, override

from crawlee._consts import METADATA_FILENAME
from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps
from crawlee.configuration import Configuration as CrawleeConfiguration
from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreRecord
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata

from apify._configuration import Configuration
from apify._configuration import Configuration as ApifyConfiguration

logger = logging.getLogger(__name__)

Expand All @@ -22,6 +24,18 @@ class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient):
directory, except for the metadata file and the `INPUT.json` file.
"""

def __init__(
self,
*,
metadata: KeyValueStoreMetadata,
path_to_kvs: Path,
lock: asyncio.Lock,
) -> None:
super().__init__(metadata=metadata, path_to_kvs=path_to_kvs, lock=lock)
global_configuration = ApifyConfiguration.get_global_configuration()
self._input_key = global_configuration.input_key
self._input_key_filename = global_configuration.input_key

@override
@classmethod
async def open(
Expand All @@ -34,7 +48,18 @@ async def open(
) -> Self:
client = await super().open(id=id, name=name, alias=alias, configuration=configuration)

await client._sanitize_input_json_files() # noqa: SLF001 - it's okay, this is a factory method
if isinstance(configuration, ApifyConfiguration):
client._input_key = configuration.input_key # noqa: SLF001 - it's okay, this is a factory method
input_key_filename = cls._get_input_key_file_name(
path_to_kvs=client.path_to_kvs, configuration=configuration
)
client._input_key_filename = input_key_filename # noqa: SLF001 - it's okay, this is a factory method
input_file_path = client.path_to_kvs / input_key_filename
input_file_metadata_path = client.path_to_kvs / f'{input_file_path}.{METADATA_FILENAME}'
if input_file_path.exists() and not input_file_metadata_path.exists():
await cls._create_missing_metadata_for_input_file(
key=configuration.input_key, record_path=input_file_path
)

return client

Expand All @@ -43,14 +68,10 @@ async def purge(self) -> None:
"""Purges the key-value store by deleting all its contents.

It deletes all files in the key-value store directory, except for the metadata file and
the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
the input related file and its metadata.
"""
configuration = Configuration.get_global_configuration()

async with self._lock:
files_to_keep = set(
flatten([key, f'{key}.{METADATA_FILENAME}'] for key in configuration.input_key_candidates)
)
files_to_keep = {self._input_key_filename, f'{self._input_key_filename}.{METADATA_FILENAME}'}
files_to_keep.add(METADATA_FILENAME)

for file_path in self.path_to_kvs.glob('*'):
Expand All @@ -64,40 +85,61 @@ async def purge(self) -> None:
update_modified_at=True,
)

async def _sanitize_input_json_files(self) -> None:
"""Handle missing metadata for input files."""
configuration = Configuration.get_global_configuration()
alternative_keys = configuration.input_key_candidates - {configuration.canonical_input_key}

if (self.path_to_kvs / configuration.canonical_input_key).exists():
# Refresh metadata to prevent inconsistencies
input_data = await asyncio.to_thread(
lambda: json.loads((self.path_to_kvs / configuration.canonical_input_key).read_text())
)
await self.set_value(key=configuration.canonical_input_key, value=input_data)
@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
if key == self._input_key:
# Potentially point to custom input file name instead
key = self._input_key_filename
return await super().get_value(key=key)

for alternative_key in alternative_keys:
if (alternative_input_file := self.path_to_kvs / alternative_key).exists():
logger.warning(f'Redundant input file found: {alternative_input_file}')
@staticmethod
async def _create_missing_metadata_for_input_file(key: str, record_path: Path) -> None:
# Read the actual value
try:
content = await asyncio.to_thread(record_path.read_bytes)
except FileNotFoundError:
logger.warning(f'Input file disparaged on path: "{record_path}"')
return

# Figure out the metadata from the file content
size = len(content)
if record_path.suffix == '.json':
value = json.loads(content.decode('utf-8'))
elif record_path.suffix == '.txt':
value = content.decode('utf-8')
elif record_path.suffix == '':
try:
value = json.loads(content.decode('utf-8'))
except json.JSONDecodeError:
value = content
else:
for alternative_key in alternative_keys:
alternative_input_file = self.path_to_kvs / alternative_key
value = content

# Only process files that actually exist
if alternative_input_file.exists():
# Refresh metadata to prevent inconsistencies
with alternative_input_file.open() as f:
input_data = await asyncio.to_thread(lambda: json.load(f))
await self.set_value(key=alternative_key, value=input_data)
content_type = infer_mime_type(value)

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
configuration = Configuration.get_global_configuration()
record_metadata = KeyValueStoreRecordMetadata(key=key, content_type=content_type, size=size)
record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}')
record_metadata_content = await json_dumps(record_metadata.model_dump())

if key in configuration.input_key_candidates:
for candidate in configuration.input_key_candidates:
value = await super().get_value(key=candidate)
if value is not None:
return value
# Write the record metadata to the file.
await atomic_write(record_metadata_filepath, record_metadata_content)

return await super().get_value(key=key)
@staticmethod
def _get_input_key_file_name(path_to_kvs: Path, configuration: ApifyConfiguration) -> str:
found_input_files = set()
for file_path in chain(
path_to_kvs.glob(f'{configuration.input_key}.*'), path_to_kvs.glob(f'{configuration.input_key}')
):
if str(file_path).endswith(METADATA_FILENAME):
# Ignore metadata files
continue
found_input_files.add(file_path.name)

if len(found_input_files) > 1:
raise RuntimeError(f'Only one input file is allowed. Following input files found: {found_input_files}')

if len(found_input_files) == 1:
return found_input_files.pop()

# No custom input file found, return the default input key
return configuration.input_key
9 changes: 9 additions & 0 deletions src/apify/storage_clients/_file_system/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from ._key_value_store_client import ApifyFileSystemKeyValueStoreClient

if TYPE_CHECKING:
from collections.abc import Hashable

from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient


Expand All @@ -21,6 +23,13 @@ class ApifyFileSystemStorageClient(FileSystemStorageClient):
except for the metadata file and the `INPUT.json` file.
"""

@override
def get_storage_client_cache_key(self, configuration: Configuration) -> Hashable:
# Ensure same cache key as the `FileSystemStorageClient` to prevent potential purging of the path twice.
# If `FileSystemStorageClient` opens the storage first, it will be used even in successive open calls by
# `ApifyFileSystemStorageClient` and vice versa.
return FileSystemStorageClient().get_storage_client_cache_key(configuration)

@override
async def create_kvs_client(
self,
Expand Down
143 changes: 142 additions & 1 deletion tests/unit/test_apify_storages.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import asyncio
import json
from datetime import datetime, timezone
from pathlib import Path
from unittest import mock
from unittest.mock import AsyncMock

import pytest

from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient
from crawlee.storage_clients.models import StorageMetadata
from crawlee.storages._base import Storage

from apify import Configuration
from apify import Actor, Configuration
from apify.storage_clients import ApifyStorageClient
from apify.storage_clients._apify import ApifyDatasetClient, ApifyKeyValueStoreClient, ApifyRequestQueueClient
from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient, ApifyFileSystemStorageClient
from apify.storages import Dataset, KeyValueStore, RequestQueue

EXAMPLE_JSON_INPUT = json.dumps({'key': 'value'})
EXAMPLE_TXT_INPUT = 'Best input ever'
EXAMPLE_BYTES_INPUT = b'High quality bytes'


@pytest.mark.parametrize(
('storage', '_storage_client'),
Expand Down Expand Up @@ -61,3 +71,134 @@ def create_metadata(id: str) -> StorageMetadata:
# Equivalent configuration results in same storage clients.
assert storage_1 is storage_4
assert storage_3 is storage_5


async def test_no_double_purge_for_filesystem_storage_client() -> None:
expected_value = 'some value'
expected_key = 'some key'

async with Actor():
await Actor.set_value(expected_key, expected_value)
# RQ uses KVS under the hood for persistence, so it will try to open same default KVS as it was already opened,
# but based on different client - FileSystemStorageClient.
await Actor.open_request_queue()
assert expected_value == await Actor.get_value(expected_key)


async def test_first_filesystem_storage_client_wins() -> None:
"""Test that when two different FileSystemStorageClient variants are used to open the same storage, they both use
the same client that was used to open the storage first"""
kvs_1 = await KeyValueStore.open(storage_client=ApifyFileSystemStorageClient())
kvs_2 = await KeyValueStore.open(storage_client=FileSystemStorageClient())

kvs_3 = await KeyValueStore.open(name='a', storage_client=FileSystemStorageClient())
kvs_4 = await KeyValueStore.open(name='a', storage_client=ApifyFileSystemStorageClient())

assert kvs_1 is kvs_2
assert type(kvs_2._client) is ApifyFileSystemKeyValueStoreClient

assert kvs_3 is kvs_4
assert type(kvs_4._client) is FileSystemKeyValueStoreClient


@pytest.fixture(params=['INPUT', 'FOO'])
def input_test_configuration(tmp_path: Path, request: pytest.FixtureRequest) -> Configuration:
configuration = Configuration()
configuration.input_key = request.param
configuration.storage_dir = str(tmp_path)
# Explicitly demand purge. Input file should survive this.
configuration.purge_on_start = True

# Create custom key file without metadata in the KVS directory
(tmp_path / 'key_value_stores' / 'default').mkdir(parents=True)
return configuration


async def test_multiple_input_file_formats_cause_error(input_test_configuration: Configuration) -> None:
"""Test that having multiple input files causes an error, for example: `INPUT` and `INPUT.json`"""

# Create two input files in the KVS directory
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
(kvs_path / f'{input_test_configuration.input_key}').write_bytes(EXAMPLE_BYTES_INPUT)
(kvs_path / f'{input_test_configuration.input_key}.json').write_text(EXAMPLE_JSON_INPUT)

with pytest.raises(RuntimeError, match=r'Only one input file is allowed. Following input files found: .*'):
await KeyValueStore.open(
storage_client=ApifyFileSystemStorageClient(),
configuration=input_test_configuration,
)


async def test_txt_input_missing_metadata(input_test_configuration: Configuration) -> None:
"""Test that files with missing metadata can be used, and metadata is recreated."""

# Create custom key file without metadata in the KVS directory
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
input_file = kvs_path / f'{input_test_configuration.input_key}.txt'
input_file.write_text(EXAMPLE_TXT_INPUT)
last_modified = input_file.stat().st_mtime

# Make sure that filesystem has enough time to detect changes
await asyncio.sleep(1)

kvs = await KeyValueStore.open(
storage_client=ApifyFileSystemStorageClient(), configuration=input_test_configuration
)
assert await kvs.get_value(input_test_configuration.input_key) == EXAMPLE_TXT_INPUT
assert last_modified == input_file.stat().st_mtime, 'File was modified or recreated.'


@pytest.mark.parametrize('suffix', [('.json'), ('')])
async def test_json_input_missing_metadata(input_test_configuration: Configuration, suffix: str) -> None:
"""Test that files with missing metadata can be used, and metadata is recreated."""

# Create custom key file without metadata in the KVS directory
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
input_file = kvs_path / f'{input_test_configuration.input_key}{suffix}'
input_file.write_text(EXAMPLE_JSON_INPUT)
last_modified = input_file.stat().st_mtime

# Make sure that filesystem has enough time to detect changes
await asyncio.sleep(1)

kvs = await KeyValueStore.open(
storage_client=ApifyFileSystemStorageClient(), configuration=input_test_configuration
)
assert json.loads(EXAMPLE_JSON_INPUT) == await kvs.get_value(input_test_configuration.input_key)
assert last_modified == input_file.stat().st_mtime, 'File was modified or recreated.'


@pytest.mark.parametrize('suffix', [('.bin'), (''), ('.whatever')])
async def test_bytes_input_missing_metadata(input_test_configuration: Configuration, suffix: str) -> None:
"""Test that files with missing metadata can be used, and metadata is recreated."""

# Create custom key file without metadata in the KVS directory
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
input_file = kvs_path / f'{input_test_configuration.input_key}{suffix}'
input_file.write_bytes(EXAMPLE_BYTES_INPUT)
last_modified = input_file.stat().st_mtime

# Make sure that filesystem has enough time to detect changes
await asyncio.sleep(1)

kvs = await KeyValueStore.open(
storage_client=ApifyFileSystemStorageClient(), configuration=input_test_configuration
)
assert await kvs.get_value(input_test_configuration.input_key) == EXAMPLE_BYTES_INPUT
assert last_modified == input_file.stat().st_mtime, 'File was modified or recreated.'


async def test_pre_existing_input_not_deleted_in_actor_context(input_test_configuration: Configuration) -> None:
"""Test that pre-existing INPUT file is never deleted as long as the Actor context was started first."""

# Create custom key file without metadata in the KVS directory
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
input_file = kvs_path / f'{input_test_configuration.input_key}'
input_file.write_bytes(EXAMPLE_BYTES_INPUT)

async with Actor(configuration=input_test_configuration):
# Storage client that is not aware of the input file and could delete it during purge.
storage_client = FileSystemStorageClient()
# Unless already implicitly opened by Actor, the input file would be deleted.
await KeyValueStore.open(storage_client=storage_client, configuration=input_test_configuration)
assert await Actor.get_input() == EXAMPLE_BYTES_INPUT