Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dev = [
"flake8-noqa ~= 1.3.1",
"flake8-pytest-style ~= 1.7.2",
"flake8-quotes ~= 3.3.2",
"flake8-simplify ~= 0.20.0",
"flake8-unused-arguments ~= 0.0.13",
"isort ~= 5.12.0",
"mypy ~= 1.3.0",
Expand Down
6 changes: 2 additions & 4 deletions src/apify/_memory_storage/memory_storage_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import os
from pathlib import Path
from typing import List, Optional
Expand Down Expand Up @@ -167,11 +168,8 @@ async def _handle_default_key_value_store(self, folder: str) -> None:
for entity in possible_input_keys:
original_file_path = os.path.join(folder, entity)
temp_file_path = os.path.join(temporary_path, entity)
try:
with contextlib.suppress(Exception):
await rename(original_file_path, temp_file_path)
except Exception:
# Ignore
pass

# Remove the original folder and all its content
counter = 0
Expand Down
35 changes: 17 additions & 18 deletions src/apify/_memory_storage/resource_clients/base_resource_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,23 @@ def _find_or_create_client_by_id_or_name(
storage_path = possible_storage_path

# If it's not found, try going through the storages dir and finding it by metadata
if not storage_path:
if os.access(storages_dir, os.F_OK):
for entry in os.scandir(storages_dir):
if not entry.is_dir():
continue
metadata_path = os.path.join(entry.path, '__metadata__.json')
if not os.access(metadata_path, os.F_OK):
continue
with open(metadata_path, encoding='utf-8') as metadata_file:
metadata = json.load(metadata_file)
if id and id == metadata.get('id'):
storage_path = entry.path
name = metadata.get(name)
break
if name and name == metadata.get('name'):
storage_path = entry.path
id = metadata.get(id)
break
if not storage_path and os.access(storages_dir, os.F_OK):
for entry in os.scandir(storages_dir):
if not entry.is_dir():
continue
metadata_path = os.path.join(entry.path, '__metadata__.json')
if not os.access(metadata_path, os.F_OK):
continue
with open(metadata_path, encoding='utf-8') as metadata_file:
metadata = json.load(metadata_file)
if id and id == metadata.get('id'):
storage_path = entry.path
name = metadata.get(name)
break
if name and name == metadata.get('name'):
storage_path = entry.path
id = metadata.get(id)
break

# As a last resort, try to check if the accessed storage is the default one,
# and the folder has no metadata
Expand Down
5 changes: 2 additions & 3 deletions src/apify/_memory_storage/resource_clients/key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,8 @@ async def set_record(self, key: str, value: Any, content_type: Optional[str] = N
existing_store_by_id._records[key] = record

if self._memory_storage_client._persist_storage:
if old_record is not None:
if _filename_from_record(old_record) != _filename_from_record(record):
await existing_store_by_id._delete_persisted_record(old_record)
if old_record is not None and _filename_from_record(old_record) != _filename_from_record(record):
await existing_store_by_id._delete_persisted_record(old_record)

await existing_store_by_id._persist_record(record)

Expand Down
4 changes: 1 addition & 3 deletions src/apify/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ def _get_memory_usage_bytes() -> int:
current_process = psutil.Process(os.getpid())
mem = int(current_process.memory_info().rss or 0)
for child in current_process.children(recursive=True):
try:
with contextlib.suppress(psutil.NoSuchProcess):
mem += int(child.memory_info().rss or 0)
except psutil.NoSuchProcess:
pass
return mem


Expand Down
17 changes: 6 additions & 11 deletions src/apify/actor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import inspect
import logging
import os
Expand Down Expand Up @@ -271,28 +272,22 @@ async def _respond_to_migrating_event(self, _event_data: Any) -> None:
# Don't emit any more regular persist state events
if self._send_persist_state_interval_task and not self._send_persist_state_interval_task.cancelled():
self._send_persist_state_interval_task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await self._send_persist_state_interval_task
except asyncio.CancelledError:
pass

self._event_manager.emit(ActorEventTypes.PERSIST_STATE, {'isMigrating': True})
self._was_final_persist_state_emitted = True

async def _cancel_event_emitting_intervals(self) -> None:
if self._send_persist_state_interval_task and not self._send_persist_state_interval_task.cancelled():
self._send_persist_state_interval_task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await self._send_persist_state_interval_task
except asyncio.CancelledError:
pass

if self._send_system_info_interval_task and not self._send_system_info_interval_task.cancelled():
self._send_system_info_interval_task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await self._send_system_info_interval_task
except asyncio.CancelledError:
pass

@classmethod
async def exit(
Expand Down Expand Up @@ -1299,8 +1294,8 @@ async def _create_proxy_configuration_internal(

if actor_proxy_input is not None:
if actor_proxy_input.get('useApifyProxy', False):
country_code = country_code or actor_proxy_input.get('apifyProxyCountry', None)
groups = groups or actor_proxy_input.get('apifyProxyGroups', None)
country_code = country_code or actor_proxy_input.get('apifyProxyCountry')
groups = groups or actor_proxy_input.get('apifyProxyGroups')
else:
proxy_urls = actor_proxy_input.get('proxyUrls', [])
if not proxy_urls:
Expand Down
5 changes: 2 additions & 3 deletions src/apify/event_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import inspect
import json
from collections import defaultdict
Expand Down Expand Up @@ -200,10 +201,8 @@ async def _wait_for_listeners() -> None:
logger.warning('Timed out waiting for event listeners to complete, unfinished event listeners will be canceled')
for pending_task in pending:
pending_task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await pending_task
except asyncio.CancelledError:
pass
else:
await _wait_for_listeners()

Expand Down
5 changes: 2 additions & 3 deletions src/apify/storages/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,8 @@ async def open(
return cast(Self, cached_storage)

# Purge default storages if configured
if used_config.purge_on_start:
if isinstance(used_client, MemoryStorageClient):
await used_client._purge_on_start()
if used_config.purge_on_start and isinstance(used_client, MemoryStorageClient):
await used_client._purge_on_start()

assert cls._storage_creating_lock is not None
async with cls._storage_creating_lock:
Expand Down
5 changes: 2 additions & 3 deletions tests/unit/actor/test_actor_lifecycle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
from datetime import datetime
from typing import Any, Callable
from unittest.mock import AsyncMock
Expand Down Expand Up @@ -86,12 +87,10 @@ async def test_with_actor_fail(self) -> None:
assert my_actor._is_initialized is False

async def test_with_actor_failed(self) -> None:
try:
with contextlib.suppress(Exception):
async with Actor() as my_actor:
assert my_actor._is_initialized
raise Exception('Failed')
except Exception:
pass
assert my_actor._is_initialized is False

async def test_raise_on_fail_without_init(self) -> None:
Expand Down
5 changes: 2 additions & 3 deletions tests/unit/actor/test_actor_log.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import logging
import sys

Expand All @@ -11,7 +12,7 @@
class TestActorLog:
async def test_actor_log(self, caplog: pytest.LogCaptureFixture) -> None:
caplog.set_level(logging.DEBUG, logger='apify')
try:
with contextlib.suppress(RuntimeError):
async with Actor:
# Test Actor.log
Actor.log.debug('Debug message')
Expand All @@ -32,8 +33,6 @@ async def test_actor_log(self, caplog: pytest.LogCaptureFixture) -> None:

# Test that exception in Actor.main is logged with the traceback
raise RuntimeError('Dummy RuntimeError')
except RuntimeError:
pass

assert len(caplog.records) == 12

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/memory_storage/resource_clients/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async def test_iterate_items(dataset_client: DatasetClient) -> None:
await dataset_client.push_items([{'id': i} for i in range(item_count)])
actual_items = []
async for item in dataset_client.iterate_items():
assert 'id' in item.keys()
assert 'id' in item
actual_items.append(item)
assert len(actual_items) == item_count
assert actual_items[0]['id'] == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def test_list_head(request_queue_client: RequestQueueClient) -> None:
list_head = await request_queue_client.list_head()
assert len(list_head['items']) == 2
for item in list_head['items']:
assert 'id' in item.keys()
assert 'id' in item


async def test_add_record(request_queue_client: RequestQueueClient) -> None:
Expand Down Expand Up @@ -124,7 +124,7 @@ async def test_get_record(request_queue_client: RequestQueueClient) -> None:
})
request = await request_queue_client.get_request(request_info['requestId'])
assert request is not None
assert 'id' in request.keys()
assert 'id' in request
assert request['url'] == request['uniqueKey'] == request_url

# Non-existent id
Expand Down
18 changes: 9 additions & 9 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import os
import time
from collections import OrderedDict
Expand Down Expand Up @@ -118,10 +119,8 @@ def sync_increment() -> None:
assert test_var == increments
finally:
sync_increment_task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await sync_increment_task
except asyncio.CancelledError:
pass

await asyncio.sleep(1.5)
assert test_var == increments
Expand Down Expand Up @@ -157,10 +156,8 @@ async def async_increment() -> None:
assert test_var == increments
finally:
async_increment_task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await async_increment_task
except asyncio.CancelledError:
pass

await asyncio.sleep(1.5)
assert test_var == increments
Expand All @@ -174,7 +171,8 @@ async def test__force_remove(tmp_path: Path) -> None:
assert os.path.exists(test_file_path) is False

# Removes the file if it exists
open(test_file_path, 'a', encoding='utf-8').close()
with open(test_file_path, 'a', encoding='utf-8'):
pass
assert os.path.exists(test_file_path) is True
await _force_remove(test_file_path)
assert os.path.exists(test_file_path) is False
Expand Down Expand Up @@ -224,10 +222,12 @@ async def test__force_rename(tmp_path: Path) -> None:
# Will remove dst_dir if it exists (also covers normal case)
# Create the src_dir with a file in it
await mkdir(src_dir)
open(src_file, 'a', encoding='utf-8').close()
with open(src_file, 'a', encoding='utf-8'):
pass
# Create the dst_dir with a file in it
await mkdir(dst_dir)
open(dst_file, 'a', encoding='utf-8').close()
with open(dst_file, 'a', encoding='utf-8'):
pass
assert os.path.exists(src_file) is True
assert os.path.exists(dst_file) is True
await _force_rename(src_dir, dst_dir)
Expand Down