diff --git a/CHANGELOG.md b/CHANGELOG.md index 22c7770c..d14a0cca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,9 @@ ## [1.4.1](../../releases/tag/v1.4.1) - Unreleased -... +### Internal changes + +- Fix type hint problems for resource clients ## [1.4.0](../../releases/tag/v1.4.0) - 2023-12-05 diff --git a/src/apify/_memory_storage/resource_clients/base_resource_client.py b/src/apify/_memory_storage/resource_clients/base_resource_client.py index d68b0584..0c02c2dd 100644 --- a/src/apify/_memory_storage/resource_clients/base_resource_client.py +++ b/src/apify/_memory_storage/resource_clients/base_resource_client.py @@ -8,6 +8,8 @@ from apify_shared.utils import ignore_docs if TYPE_CHECKING: + from typing_extensions import Self + from ..memory_storage_client import MemoryStorageClient @@ -48,9 +50,9 @@ def _get_storages_dir(cls: type[BaseResourceClient], memory_storage_client: Memo @classmethod @abstractmethod def _get_storage_client_cache( - cls: type[BaseResourceClient], + cls, # noqa: ANN102 # type annotated cls does not work with Self as a return type memory_storage_client: MemoryStorageClient, - ) -> list[BaseResourceClient]: + ) -> list[Self]: raise NotImplementedError('You must override this method in the subclass!') @abstractmethod @@ -60,21 +62,21 @@ def _to_resource_info(self: BaseResourceClient) -> dict: @classmethod @abstractmethod def _create_from_directory( - cls: type[BaseResourceClient], + cls, # noqa: ANN102 # type annotated cls does not work with Self as a return type storage_directory: str, memory_storage_client: MemoryStorageClient, id: str | None = None, # noqa: A002 name: str | None = None, - ) -> BaseResourceClient: + ) -> Self: raise NotImplementedError('You must override this method in the subclass!') @classmethod def _find_or_create_client_by_id_or_name( - cls: type[BaseResourceClient], + cls, # noqa: ANN102 # type annotated cls does not work with Self as a return type memory_storage_client: MemoryStorageClient, id: str | None = None, # noqa: A002 name: str | None = None, - ) -> BaseResourceClient | None: + ) -> Self | None: assert id is not None or name is not None # noqa: S101 storage_client_cache = cls._get_storage_client_cache(memory_storage_client) diff --git a/src/apify/_memory_storage/resource_clients/dataset.py b/src/apify/_memory_storage/resource_clients/dataset.py index f0c9d119..a30e7723 100644 --- a/src/apify/_memory_storage/resource_clients/dataset.py +++ b/src/apify/_memory_storage/resource_clients/dataset.py @@ -74,8 +74,8 @@ async def get(self: DatasetClient) -> dict | None: found = self._find_or_create_client_by_id_or_name(memory_storage_client=self._memory_storage_client, id=self._id, name=self._name) if found: - async with found._file_operation_lock: # type: ignore - await found._update_timestamps(has_been_modified=False) # type: ignore + async with found._file_operation_lock: + await found._update_timestamps(has_been_modified=False) return found._to_resource_info() return None @@ -103,7 +103,7 @@ async def update(self: DatasetClient, *, name: str | None = None) -> dict: if name is None: return existing_dataset_by_id._to_resource_info() - async with existing_dataset_by_id._file_operation_lock: # type: ignore + async with existing_dataset_by_id._file_operation_lock: # Check that name is not in use already existing_dataset_by_name = next( (dataset for dataset in self._memory_storage_client._datasets_handled if dataset._name and dataset._name.lower() == name.lower()), @@ -122,7 +122,7 @@ async def update(self: DatasetClient, *, name: str | None = None) -> dict: await force_rename(previous_dir, existing_dataset_by_id._resource_directory) # Update timestamps - await existing_dataset_by_id._update_timestamps(has_been_modified=True) # type: ignore + await existing_dataset_by_id._update_timestamps(has_been_modified=True) return existing_dataset_by_id._to_resource_info() @@ -193,9 +193,9 @@ async def list_items( if existing_dataset_by_id is None: raise_on_non_existing_storage(StorageTypes.DATASET, self._id) - async with existing_dataset_by_id._file_operation_lock: # type: ignore - start, end = existing_dataset_by_id._get_start_and_end_indexes( # type: ignore - max(existing_dataset_by_id._item_count - (offset or 0) - (limit or LIST_ITEMS_LIMIT), 0) if desc else offset or 0, # type: ignore + async with existing_dataset_by_id._file_operation_lock: + start, end = existing_dataset_by_id._get_start_and_end_indexes( + max(existing_dataset_by_id._item_count - (offset or 0) - (limit or LIST_ITEMS_LIMIT), 0) if desc else offset or 0, limit, ) @@ -203,9 +203,9 @@ async def list_items( for idx in range(start, end): entry_number = self._generate_local_entry_name(idx) - items.append(existing_dataset_by_id._dataset_entries[entry_number]) # type: ignore + items.append(existing_dataset_by_id._dataset_entries[entry_number]) - await existing_dataset_by_id._update_timestamps(has_been_modified=False) # type: ignore + await existing_dataset_by_id._update_timestamps(has_been_modified=False) if desc: items.reverse() @@ -217,7 +217,7 @@ async def list_items( 'items': items, 'limit': limit or LIST_ITEMS_LIMIT, 'offset': offset or 0, - 'total': existing_dataset_by_id._item_count, # type: ignore + 'total': existing_dataset_by_id._item_count, } ) @@ -308,16 +308,16 @@ async def push_items(self: DatasetClient, items: JSONSerializable) -> None: added_ids: list[str] = [] for entry in normalized: - existing_dataset_by_id._item_count += 1 # type: ignore - idx = self._generate_local_entry_name(existing_dataset_by_id._item_count) # type: ignore + existing_dataset_by_id._item_count += 1 + idx = self._generate_local_entry_name(existing_dataset_by_id._item_count) - existing_dataset_by_id._dataset_entries[idx] = entry # type: ignore + existing_dataset_by_id._dataset_entries[idx] = entry added_ids.append(idx) - data_entries = [(id, existing_dataset_by_id._dataset_entries[id]) for id in added_ids] # type: ignore # noqa: A001 + data_entries = [(id, existing_dataset_by_id._dataset_entries[id]) for id in added_ids] # noqa: A001 - async with existing_dataset_by_id._file_operation_lock: # type: ignore - await existing_dataset_by_id._update_timestamps(has_been_modified=True) # type: ignore + async with existing_dataset_by_id._file_operation_lock: + await existing_dataset_by_id._update_timestamps(has_been_modified=True) await _update_dataset_items( data=data_entries, @@ -385,7 +385,7 @@ def _get_storages_dir(cls: type[DatasetClient], memory_storage_client: MemorySto return memory_storage_client._datasets_directory @classmethod - def _get_storage_client_cache( # type: ignore + def _get_storage_client_cache( cls: type[DatasetClient], memory_storage_client: MemoryStorageClient, ) -> list[DatasetClient]: diff --git a/src/apify/_memory_storage/resource_clients/key_value_store.py b/src/apify/_memory_storage/resource_clients/key_value_store.py index a77949a4..9f5e34bd 100644 --- a/src/apify/_memory_storage/resource_clients/key_value_store.py +++ b/src/apify/_memory_storage/resource_clients/key_value_store.py @@ -100,8 +100,8 @@ async def get(self: KeyValueStoreClient) -> dict | None: found = self._find_or_create_client_by_id_or_name(memory_storage_client=self._memory_storage_client, id=self._id, name=self._name) if found: - async with found._file_operation_lock: # type: ignore - await found._update_timestamps(has_been_modified=False) # type: ignore + async with found._file_operation_lock: + await found._update_timestamps(has_been_modified=False) return found._to_resource_info() return None @@ -127,7 +127,7 @@ async def update(self: KeyValueStoreClient, *, name: str | None = None) -> dict: if name is None: return existing_store_by_id._to_resource_info() - async with existing_store_by_id._file_operation_lock: # type: ignore + async with existing_store_by_id._file_operation_lock: # Check that name is not in use already existing_store_by_name = next( (store for store in self._memory_storage_client._key_value_stores_handled if store._name and store._name.lower() == name.lower()), @@ -146,7 +146,7 @@ async def update(self: KeyValueStoreClient, *, name: str | None = None) -> dict: await force_rename(previous_dir, existing_store_by_id._resource_directory) # Update timestamps - await existing_store_by_id._update_timestamps(has_been_modified=True) # type: ignore + await existing_store_by_id._update_timestamps(has_been_modified=True) return existing_store_by_id._to_resource_info() @@ -187,7 +187,7 @@ async def list_keys( items = [] - for record in existing_store_by_id._records.values(): # type: ignore + for record in existing_store_by_id._records.values(): size = len(record['value']) items.append( { @@ -222,8 +222,8 @@ async def list_keys( is_last_selected_item_absolutely_last = last_item_in_store == last_selected_item next_exclusive_start_key = None if is_last_selected_item_absolutely_last else last_selected_item['key'] - async with existing_store_by_id._file_operation_lock: # type: ignore - await existing_store_by_id._update_timestamps(has_been_modified=False) # type: ignore + async with existing_store_by_id._file_operation_lock: + await existing_store_by_id._update_timestamps(has_been_modified=False) return { 'count': len(items), @@ -247,7 +247,7 @@ async def _get_record_internal( if existing_store_by_id is None: raise_on_non_existing_storage(StorageTypes.KEY_VALUE_STORE, self._id) - stored_record = existing_store_by_id._records.get(key) # type: ignore + stored_record = existing_store_by_id._records.get(key) if stored_record is None: return None @@ -264,8 +264,8 @@ async def _get_record_internal( except ValueError: logger.exception('Error parsing key-value store record') - async with existing_store_by_id._file_operation_lock: # type: ignore - await existing_store_by_id._update_timestamps(has_been_modified=False) # type: ignore + async with existing_store_by_id._file_operation_lock: + await existing_store_by_id._update_timestamps(has_been_modified=False) return record @@ -324,22 +324,22 @@ async def set_record(self: KeyValueStoreClient, key: str, value: Any, content_ty if 'application/json' in content_type and not is_file_or_bytes(value) and not isinstance(value, str): value = json_dumps(value).encode('utf-8') - async with existing_store_by_id._file_operation_lock: # type: ignore - await existing_store_by_id._update_timestamps(has_been_modified=True) # type: ignore + async with existing_store_by_id._file_operation_lock: + await existing_store_by_id._update_timestamps(has_been_modified=True) record: KeyValueStoreRecord = { 'key': key, 'value': value, 'contentType': content_type, } - old_record = existing_store_by_id._records.get(key) # type: ignore - existing_store_by_id._records[key] = record # type: ignore + old_record = existing_store_by_id._records.get(key) + existing_store_by_id._records[key] = record if self._memory_storage_client._persist_storage: if old_record is not None and _filename_from_record(old_record) != _filename_from_record(record): - await existing_store_by_id._delete_persisted_record(old_record) # type: ignore + await existing_store_by_id._delete_persisted_record(old_record) - await existing_store_by_id._persist_record(record) # type: ignore + await existing_store_by_id._persist_record(record) async def _persist_record(self: KeyValueStoreClient, record: KeyValueStoreRecord) -> None: store_directory = self._resource_directory @@ -385,14 +385,14 @@ async def delete_record(self: KeyValueStoreClient, key: str) -> None: if existing_store_by_id is None: raise_on_non_existing_storage(StorageTypes.KEY_VALUE_STORE, self._id) - record = existing_store_by_id._records.get(key) # type: ignore + record = existing_store_by_id._records.get(key) if record is not None: - async with existing_store_by_id._file_operation_lock: # type: ignore - del existing_store_by_id._records[key] # type: ignore - await existing_store_by_id._update_timestamps(has_been_modified=True) # type: ignore + async with existing_store_by_id._file_operation_lock: + del existing_store_by_id._records[key] + await existing_store_by_id._update_timestamps(has_been_modified=True) if self._memory_storage_client._persist_storage: - await existing_store_by_id._delete_persisted_record(record) # type: ignore + await existing_store_by_id._delete_persisted_record(record) async def _delete_persisted_record(self: KeyValueStoreClient, record: KeyValueStoreRecord) -> None: store_directory = self._resource_directory @@ -437,7 +437,7 @@ def _get_storages_dir(cls: type[KeyValueStoreClient], memory_storage_client: Mem return memory_storage_client._key_value_stores_directory @classmethod - def _get_storage_client_cache( # type: ignore + def _get_storage_client_cache( cls: type[KeyValueStoreClient], memory_storage_client: MemoryStorageClient, ) -> list[KeyValueStoreClient]: diff --git a/src/apify/_memory_storage/resource_clients/request_queue.py b/src/apify/_memory_storage/resource_clients/request_queue.py index caabf57f..d9217e89 100644 --- a/src/apify/_memory_storage/resource_clients/request_queue.py +++ b/src/apify/_memory_storage/resource_clients/request_queue.py @@ -67,8 +67,8 @@ async def get(self: RequestQueueClient) -> dict | None: found = self._find_or_create_client_by_id_or_name(memory_storage_client=self._memory_storage_client, id=self._id, name=self._name) if found: - async with found._file_operation_lock: # type: ignore - await found._update_timestamps(has_been_modified=False) # type: ignore + async with found._file_operation_lock: + await found._update_timestamps(has_been_modified=False) return found._to_resource_info() return None @@ -94,7 +94,7 @@ async def update(self: RequestQueueClient, *, name: str | None = None) -> dict: if name is None: return existing_queue_by_id._to_resource_info() - async with existing_queue_by_id._file_operation_lock: # type: ignore + async with existing_queue_by_id._file_operation_lock: # Check that name is not in use already existing_queue_by_name = next( (queue for queue in self._memory_storage_client._request_queues_handled if queue._name and queue._name.lower() == name.lower()), None @@ -112,7 +112,7 @@ async def update(self: RequestQueueClient, *, name: str | None = None) -> dict: await force_rename(previous_dir, existing_queue_by_id._resource_directory) # Update timestamps - await existing_queue_by_id._update_timestamps(has_been_modified=True) # type: ignore + await existing_queue_by_id._update_timestamps(has_been_modified=True) return existing_queue_by_id._to_resource_info() @@ -146,18 +146,18 @@ async def list_head(self: RequestQueueClient, *, limit: int | None = None) -> di if existing_queue_by_id is None: raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self._id) - async with existing_queue_by_id._file_operation_lock: # type: ignore - await existing_queue_by_id._update_timestamps(has_been_modified=False) # type: ignore + async with existing_queue_by_id._file_operation_lock: + await existing_queue_by_id._update_timestamps(has_been_modified=False) items: list[dict] = [] # Iterate all requests in the queue which have sorted key larger than infinity, which means `orderNo` is not `None` # This will iterate them in order of `orderNo` - for request_key in existing_queue_by_id._requests.irange_key(min_key=-float('inf'), inclusive=(False, True)): # type: ignore + for request_key in existing_queue_by_id._requests.irange_key(min_key=-float('inf'), inclusive=(False, True)): if len(items) == limit: break - request = existing_queue_by_id._requests.get(request_key) # type: ignore + request = existing_queue_by_id._requests.get(request_key) # Check that the request still exists and was not handled, # in case something deleted it or marked it as handled concurrenctly @@ -167,7 +167,7 @@ async def list_head(self: RequestQueueClient, *, limit: int | None = None) -> di return { 'limit': limit, 'hadMultipleClients': False, - 'queueModifiedAt': existing_queue_by_id._modified_at, # type: ignore + 'queueModifiedAt': existing_queue_by_id._modified_at, 'items': [self._json_to_request(item['json']) for item in items], } @@ -190,12 +190,12 @@ async def add_request(self: RequestQueueClient, request: dict, *, forefront: boo request_model = self._create_internal_request(request, forefront) - async with existing_queue_by_id._file_operation_lock: # type: ignore - existing_request_with_id = existing_queue_by_id._requests.get(request_model['id']) # type: ignore + async with existing_queue_by_id._file_operation_lock: + existing_request_with_id = existing_queue_by_id._requests.get(request_model['id']) # We already have the request present, so we return information about it if existing_request_with_id is not None: - await existing_queue_by_id._update_timestamps(has_been_modified=False) # type: ignore + await existing_queue_by_id._update_timestamps(has_been_modified=False) return { 'requestId': existing_request_with_id['id'], @@ -203,12 +203,12 @@ async def add_request(self: RequestQueueClient, request: dict, *, forefront: boo 'wasAlreadyPresent': True, } - existing_queue_by_id._requests[request_model['id']] = request_model # type: ignore + existing_queue_by_id._requests[request_model['id']] = request_model if request_model['orderNo'] is None: - existing_queue_by_id._handled_request_count += 1 # type: ignore + existing_queue_by_id._handled_request_count += 1 else: - existing_queue_by_id._pending_request_count += 1 # type: ignore - await existing_queue_by_id._update_timestamps(has_been_modified=True) # type: ignore + existing_queue_by_id._pending_request_count += 1 + await existing_queue_by_id._update_timestamps(has_been_modified=True) await update_request_queue_item( request=request_model, request_id=request_model['id'], @@ -240,10 +240,10 @@ async def get_request(self: RequestQueueClient, request_id: str) -> dict | None: if existing_queue_by_id is None: raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self._id) - async with existing_queue_by_id._file_operation_lock: # type: ignore - await existing_queue_by_id._update_timestamps(has_been_modified=False) # type: ignore + async with existing_queue_by_id._file_operation_lock: + await existing_queue_by_id._update_timestamps(has_been_modified=False) - request = existing_queue_by_id._requests.get(request_id) # type: ignore + request = existing_queue_by_id._requests.get(request_id) return self._json_to_request(request['json'] if request is not None else None) async def update_request(self: RequestQueueClient, request: dict, *, forefront: bool | None = None) -> dict: @@ -268,17 +268,17 @@ async def update_request(self: RequestQueueClient, request: dict, *, forefront: # First we need to check the existing request to be # able to return information about its handled state. - existing_request = existing_queue_by_id._requests.get(request_model['id']) # type: ignore + existing_request = existing_queue_by_id._requests.get(request_model['id']) # Undefined means that the request is not present in the queue. # We need to insert it, to behave the same as API. if existing_request is None: return await self.add_request(request, forefront=forefront) - async with existing_queue_by_id._file_operation_lock: # type: ignore + async with existing_queue_by_id._file_operation_lock: # When updating the request, we need to make sure that # the handled counts are updated correctly in all cases. - existing_queue_by_id._requests[request_model['id']] = request_model # type: ignore + existing_queue_by_id._requests[request_model['id']] = request_model pending_count_adjustment = 0 is_request_handled_state_changing = not isinstance(existing_request['orderNo'], type(request_model['orderNo'])) @@ -288,9 +288,9 @@ async def update_request(self: RequestQueueClient, request: dict, *, forefront: if is_request_handled_state_changing: pending_count_adjustment = 1 if request_was_handled_before_update else -1 - existing_queue_by_id._pending_request_count += pending_count_adjustment # type: ignore - existing_queue_by_id._handled_request_count -= pending_count_adjustment # type: ignore - await existing_queue_by_id._update_timestamps(has_been_modified=True) # type: ignore + existing_queue_by_id._pending_request_count += pending_count_adjustment + existing_queue_by_id._handled_request_count -= pending_count_adjustment + await existing_queue_by_id._update_timestamps(has_been_modified=True) await update_request_queue_item( request=request_model, request_id=request_model['id'], @@ -317,16 +317,16 @@ async def delete_request(self: RequestQueueClient, request_id: str) -> None: if existing_queue_by_id is None: raise_on_non_existing_storage(StorageTypes.REQUEST_QUEUE, self._id) - async with existing_queue_by_id._file_operation_lock: # type: ignore - request = existing_queue_by_id._requests.get(request_id) # type: ignore + async with existing_queue_by_id._file_operation_lock: + request = existing_queue_by_id._requests.get(request_id) if request: - del existing_queue_by_id._requests[request_id] # type: ignore + del existing_queue_by_id._requests[request_id] if request['orderNo'] is None: - existing_queue_by_id._handled_request_count -= 1 # type: ignore + existing_queue_by_id._handled_request_count -= 1 else: - existing_queue_by_id._pending_request_count -= 1 # type: ignore - await existing_queue_by_id._update_timestamps(has_been_modified=True) # type: ignore + existing_queue_by_id._pending_request_count -= 1 + await existing_queue_by_id._update_timestamps(has_been_modified=True) await delete_request(entity_directory=existing_queue_by_id._resource_directory, request_id=request_id) def _to_resource_info(self: RequestQueueClient) -> dict: @@ -403,7 +403,7 @@ def _get_storages_dir(cls: type[RequestQueueClient], memory_storage_client: Memo return memory_storage_client._request_queues_directory @classmethod - def _get_storage_client_cache( # type: ignore + def _get_storage_client_cache( cls: type[RequestQueueClient], memory_storage_client: MemoryStorageClient, ) -> list[RequestQueueClient]: