From 245455c3f1083fa98061b0f119b7a26dc4df6089 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Wed, 5 Feb 2025 16:24:17 +0100 Subject: [PATCH 01/24] Add new config options --- src/apify/_configuration.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/apify/_configuration.py b/src/apify/_configuration.py index af15ade7..7020e2a3 100644 --- a/src/apify/_configuration.py +++ b/src/apify/_configuration.py @@ -220,6 +220,14 @@ class Configuration(CrawleeConfiguration): BeforeValidator(lambda val: val or None), ] = None + test_pay_per_event: Annotated[ + bool, + Field( + alias='actor_test_pay_per_event', + description='Enable pay-per-event functionality for local development', + ), + ] = False + meta_origin: Annotated[ str | None, Field( From 31eb27814ffe86cdc69a712ef88c69e768dcfe38 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 6 Feb 2025 13:22:47 +0100 Subject: [PATCH 02/24] Copy public interface --- src/apify/_actor.py | 48 +++++++++++++++++++++++++++++++++++++++--- src/apify/_charging.py | 30 ++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 src/apify/_charging.py diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 71e3b6e2..70acbd06 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -24,6 +24,7 @@ EventSystemInfoData, ) +from apify._charging import ChargeResult, ChargingManager from apify._configuration import Configuration from apify._consts import EVENT_LISTENERS_TIMEOUT from apify._crypto import decrypt_input_secrets, load_private_key @@ -97,6 +98,8 @@ def __init__( ) ) + self._charging_manager = ChargingManager(self._configuration, self._apify_client) + self._is_initialized = False @ignore_docs @@ -221,6 +224,10 @@ async def init(self) -> None: # https://github.com/apify/apify-sdk-python/issues/146 await self._event_manager.__aenter__() + self.log.debug('Event manager initialized') + + await self._charging_manager.init() + self.log.debug('Charging manager initialized') self._is_initialized = True @@ -445,19 +452,45 @@ async def open_request_queue( storage_client=storage_client, ) - async def push_data(self, data: dict | list[dict]) -> None: + @overload + async def push_data(self, data: dict | list[dict]) -> None: ... + @overload + async def push_data(self, data: dict | list[dict], event_name: str) -> ChargeResult: ... + async def push_data(self, data: dict | list[dict], event_name: str | None = None) -> ChargeResult | None: """Store an object or a list of objects to the default dataset of the current Actor run. Args: data: The data to push to the default dataset. + event_name: If provided, the method will attempt to charge for the event for each pushed item. """ self._raise_if_not_initialized() if not data: - return + return None + + data = data if isinstance(data, list) else [data] + + max_charged_count = ( + self._charging_manager.calculate_max_event_charge_within_limit(event_name) + if event_name is not None + else None + ) dataset = await self.open_dataset() - await dataset.push_data(data) + + if max_charged_count is not None and len(data) > max_charged_count: + # Push as many items as we can charge for + await dataset.push_data(data[:max_charged_count]) + else: + await dataset.push_data(data) + + if event_name: + return await self._charging_manager.charge( + event_name=event_name, + count=min(max_charged_count, len(data)) if max_charged_count is not None else len(data), + ) + + return None async def get_input(self) -> Any: """Get the Actor input value from the default key-value store associated with the current Actor run.""" @@ -506,6 +539,15 @@ async def set_value( key_value_store = await self.open_key_value_store() return await key_value_store.set_value(key, value, content_type=content_type) + def get_charging_manager(self) -> ChargingManager: + """Retrieve the charging manager to access granular pricing information.""" + self._raise_if_not_initialized() + return self._charging_manager + + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: + self._raise_if_not_initialized() + return await self._charging_manager.charge(event_name, count) + @overload def on( self, event_name: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData] diff --git a/src/apify/_charging.py b/src/apify/_charging.py new file mode 100644 index 00000000..c83987c7 --- /dev/null +++ b/src/apify/_charging.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from apify_client import ApifyClientAsync + + from apify._configuration import Configuration + + +class ChargingManager: + def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None: + pass + + async def init(self) -> None: + pass + + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: + pass + + def calculate_max_event_charge_within_limit(self, event_name: str) -> int: + pass + + +@dataclass(frozen=True) +class ChargeResult: + event_charge_limit_reached: bool + charged_count: int + chargeable_within_limit: int From 588db5cc8d47aef7fbc06dcb202754cdfc712959 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 6 Feb 2025 17:25:59 +0100 Subject: [PATCH 03/24] Update apify client --- poetry.lock | 54 ++++++-------------------------------------------- pyproject.toml | 2 +- 2 files changed, 7 insertions(+), 49 deletions(-) diff --git a/poetry.lock b/poetry.lock index 0bbe5de3..7f7b4df0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. [[package]] name = "annotated-types" @@ -50,19 +50,19 @@ trio = ["trio (>=0.26.1)"] [[package]] name = "apify-client" -version = "1.8.1" +version = "1.9.0" description = "Apify API client for Python" optional = false python-versions = "<4.0,>=3.9" groups = ["main"] files = [ - {file = "apify_client-1.8.1-py3-none-any.whl", hash = "sha256:cfa6df3816c436204e37457fba28981a0ef6a7602cde372463f0f078eee64747"}, - {file = "apify_client-1.8.1.tar.gz", hash = "sha256:2be1be7879570655bddeebf126833efe94cabb95b3755592845e92c20c70c674"}, + {file = "apify_client-1.9.0-py3-none-any.whl", hash = "sha256:dd67093c570cea068ac5ad4100f67d57f1aeb217f6f887b369420f913ef597e7"}, + {file = "apify_client-1.9.0.tar.gz", hash = "sha256:f57ec6a2d6b978daa48ffc470e31cfa586ab82145a88acfd1fa79b5857013ddb"}, ] [package.dependencies] apify-shared = ">=1.1.2" -httpx = ">=0.25.0" +httpx = ">=0.25" more_itertools = ">=10.0.0" [[package]] @@ -238,10 +238,6 @@ files = [ {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:a37b8f0391212d29b3a91a799c8e4a2855e0576911cdfb2515487e30e322253d"}, {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:e84799f09591700a4154154cab9787452925578841a94321d5ee8fb9a9a328f0"}, {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f66b5337fa213f1da0d9000bc8dc0cb5b896b726eefd9c6046f699b169c41b9e"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:5dab0844f2cf82be357a0eb11a9087f70c5430b2c241493fc122bb6f2bb0917c"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:e4fe605b917c70283db7dfe5ada75e04561479075761a0b3866c081d035b01c1"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:1e9a65b5736232e7a7f91ff3d02277f11d339bf34099a56cdab6a8b3410a02b2"}, - {file = "Brotli-1.1.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:58d4b711689366d4a03ac7957ab8c28890415e267f9b6589969e74b6e42225ec"}, {file = "Brotli-1.1.0-cp310-cp310-win32.whl", hash = "sha256:be36e3d172dc816333f33520154d708a2657ea63762ec16b62ece02ab5e4daf2"}, {file = "Brotli-1.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:0c6244521dda65ea562d5a69b9a26120769b7a9fb3db2fe9545935ed6735b128"}, {file = "Brotli-1.1.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:a3daabb76a78f829cafc365531c972016e4aa8d5b4bf60660ad8ecee19df7ccc"}, @@ -254,14 +250,8 @@ files = [ {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:19c116e796420b0cee3da1ccec3b764ed2952ccfcc298b55a10e5610ad7885f9"}, {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:510b5b1bfbe20e1a7b3baf5fed9e9451873559a976c1a78eebaa3b86c57b4265"}, {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:a1fd8a29719ccce974d523580987b7f8229aeace506952fa9ce1d53a033873c8"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c247dd99d39e0338a604f8c2b3bc7061d5c2e9e2ac7ba9cc1be5a69cb6cd832f"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1b2c248cd517c222d89e74669a4adfa5577e06ab68771a529060cf5a156e9757"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:2a24c50840d89ded6c9a8fdc7b6ed3692ed4e86f1c4a4a938e1e92def92933e0"}, - {file = "Brotli-1.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f31859074d57b4639318523d6ffdca586ace54271a73ad23ad021acd807eb14b"}, {file = "Brotli-1.1.0-cp311-cp311-win32.whl", hash = "sha256:39da8adedf6942d76dc3e46653e52df937a3c4d6d18fdc94a7c29d263b1f5b50"}, {file = "Brotli-1.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:aac0411d20e345dc0920bdec5548e438e999ff68d77564d5e9463a7ca9d3e7b1"}, - {file = "Brotli-1.1.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:32d95b80260d79926f5fab3c41701dbb818fde1c9da590e77e571eefd14abe28"}, - {file = "Brotli-1.1.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b760c65308ff1e462f65d69c12e4ae085cff3b332d894637f6273a12a482d09f"}, {file = "Brotli-1.1.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:316cc9b17edf613ac76b1f1f305d2a748f1b976b033b049a6ecdfd5612c70409"}, {file = "Brotli-1.1.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:caf9ee9a5775f3111642d33b86237b05808dafcd6268faa492250e9b78046eb2"}, {file = "Brotli-1.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:70051525001750221daa10907c77830bc889cb6d865cc0b813d9db7fefc21451"}, @@ -272,24 +262,8 @@ files = [ {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:4093c631e96fdd49e0377a9c167bfd75b6d0bad2ace734c6eb20b348bc3ea180"}, {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:7e4c4629ddad63006efa0ef968c8e4751c5868ff0b1c5c40f76524e894c50248"}, {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:861bf317735688269936f755fa136a99d1ed526883859f86e41a5d43c61d8966"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:87a3044c3a35055527ac75e419dfa9f4f3667a1e887ee80360589eb8c90aabb9"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:c5529b34c1c9d937168297f2c1fde7ebe9ebdd5e121297ff9c043bdb2ae3d6fb"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:ca63e1890ede90b2e4454f9a65135a4d387a4585ff8282bb72964fab893f2111"}, - {file = "Brotli-1.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e79e6520141d792237c70bcd7a3b122d00f2613769ae0cb61c52e89fd3443839"}, {file = "Brotli-1.1.0-cp312-cp312-win32.whl", hash = "sha256:5f4d5ea15c9382135076d2fb28dde923352fe02951e66935a9efaac8f10e81b0"}, {file = "Brotli-1.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:906bc3a79de8c4ae5b86d3d75a8b77e44404b0f4261714306e3ad248d8ab0951"}, - {file = "Brotli-1.1.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:8bf32b98b75c13ec7cf774164172683d6e7891088f6316e54425fde1efc276d5"}, - {file = "Brotli-1.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:7bc37c4d6b87fb1017ea28c9508b36bbcb0c3d18b4260fcdf08b200c74a6aee8"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3c0ef38c7a7014ffac184db9e04debe495d317cc9c6fb10071f7fefd93100a4f"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:91d7cc2a76b5567591d12c01f019dd7afce6ba8cba6571187e21e2fc418ae648"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a93dde851926f4f2678e704fadeb39e16c35d8baebd5252c9fd94ce8ce68c4a0"}, - {file = "Brotli-1.1.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0db75f47be8b8abc8d9e31bc7aad0547ca26f24a54e6fd10231d623f183d089"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6967ced6730aed543b8673008b5a391c3b1076d834ca438bbd70635c73775368"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:7eedaa5d036d9336c95915035fb57422054014ebdeb6f3b42eac809928e40d0c"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:d487f5432bf35b60ed625d7e1b448e2dc855422e87469e3f450aa5552b0eb284"}, - {file = "Brotli-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:832436e59afb93e1836081a20f324cb185836c617659b07b129141a8426973c7"}, - {file = "Brotli-1.1.0-cp313-cp313-win32.whl", hash = "sha256:43395e90523f9c23a3d5bdf004733246fba087f2948f87ab28015f12359ca6a0"}, - {file = "Brotli-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:9011560a466d2eb3f5a6e4929cf4a09be405c64154e12df0dd72713f6500e32b"}, {file = "Brotli-1.1.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:a090ca607cbb6a34b0391776f0cb48062081f5f60ddcce5d11838e67a01928d1"}, {file = "Brotli-1.1.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2de9d02f5bda03d27ede52e8cfe7b865b066fa49258cbab568720aa5be80a47d"}, {file = "Brotli-1.1.0-cp36-cp36m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2333e30a5e00fe0fe55903c8832e08ee9c3b1382aacf4db26664a16528d51b4b"}, @@ -299,10 +273,6 @@ files = [ {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:fd5f17ff8f14003595ab414e45fce13d073e0762394f957182e69035c9f3d7c2"}, {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_ppc64le.whl", hash = "sha256:069a121ac97412d1fe506da790b3e69f52254b9df4eb665cd42460c837193354"}, {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:e93dfc1a1165e385cc8239fab7c036fb2cd8093728cbd85097b284d7b99249a2"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_aarch64.whl", hash = "sha256:aea440a510e14e818e67bfc4027880e2fb500c2ccb20ab21c7a7c8b5b4703d75"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_i686.whl", hash = "sha256:6974f52a02321b36847cd19d1b8e381bf39939c21efd6ee2fc13a28b0d99348c"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_ppc64le.whl", hash = "sha256:a7e53012d2853a07a4a79c00643832161a910674a893d296c9f1259859a289d2"}, - {file = "Brotli-1.1.0-cp36-cp36m-musllinux_1_2_x86_64.whl", hash = "sha256:d7702622a8b40c49bffb46e1e3ba2e81268d5c04a34f460978c6b5517a34dd52"}, {file = "Brotli-1.1.0-cp36-cp36m-win32.whl", hash = "sha256:a599669fd7c47233438a56936988a2478685e74854088ef5293802123b5b2460"}, {file = "Brotli-1.1.0-cp36-cp36m-win_amd64.whl", hash = "sha256:d143fd47fad1db3d7c27a1b1d66162e855b5d50a89666af46e1679c496e8e579"}, {file = "Brotli-1.1.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:11d00ed0a83fa22d29bc6b64ef636c4552ebafcef57154b4ddd132f5638fbd1c"}, @@ -314,10 +284,6 @@ files = [ {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:919e32f147ae93a09fe064d77d5ebf4e35502a8df75c29fb05788528e330fe74"}, {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:23032ae55523cc7bccb4f6a0bf368cd25ad9bcdcc1990b64a647e7bbcce9cb5b"}, {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:224e57f6eac61cc449f498cc5f0e1725ba2071a3d4f48d5d9dffba42db196438"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:cb1dac1770878ade83f2ccdf7d25e494f05c9165f5246b46a621cc849341dc01"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_i686.whl", hash = "sha256:3ee8a80d67a4334482d9712b8e83ca6b1d9bc7e351931252ebef5d8f7335a547"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_ppc64le.whl", hash = "sha256:5e55da2c8724191e5b557f8e18943b1b4839b8efc3ef60d65985bcf6f587dd38"}, - {file = "Brotli-1.1.0-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:d342778ef319e1026af243ed0a07c97acf3bad33b9f29e7ae6a1f68fd083e90c"}, {file = "Brotli-1.1.0-cp37-cp37m-win32.whl", hash = "sha256:587ca6d3cef6e4e868102672d3bd9dc9698c309ba56d41c2b9c85bbb903cdb95"}, {file = "Brotli-1.1.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2954c1c23f81c2eaf0b0717d9380bd348578a94161a65b3a2afc62c86467dd68"}, {file = "Brotli-1.1.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:efa8b278894b14d6da122a72fefcebc28445f2d3f880ac59d46c90f4c13be9a3"}, @@ -330,10 +296,6 @@ files = [ {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:1ab4fbee0b2d9098c74f3057b2bc055a8bd92ccf02f65944a241b4349229185a"}, {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:141bd4d93984070e097521ed07e2575b46f817d08f9fa42b16b9b5f27b5ac088"}, {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:fce1473f3ccc4187f75b4690cfc922628aed4d3dd013d047f95a9b3919a86596"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:d2b35ca2c7f81d173d2fadc2f4f31e88cc5f7a39ae5b6db5513cf3383b0e0ec7"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:af6fa6817889314555aede9a919612b23739395ce767fe7fcbea9a80bf140fe5"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:2feb1d960f760a575dbc5ab3b1c00504b24caaf6986e2dc2b01c09c87866a943"}, - {file = "Brotli-1.1.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:4410f84b33374409552ac9b6903507cdb31cd30d2501fc5ca13d18f73548444a"}, {file = "Brotli-1.1.0-cp38-cp38-win32.whl", hash = "sha256:db85ecf4e609a48f4b29055f1e144231b90edc90af7481aa731ba2d059226b1b"}, {file = "Brotli-1.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:3d7954194c36e304e1523f55d7042c59dc53ec20dd4e9ea9d151f1b62b4415c0"}, {file = "Brotli-1.1.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5fb2ce4b8045c78ebbc7b8f3c15062e435d47e7393cc57c25115cfd49883747a"}, @@ -346,10 +308,6 @@ files = [ {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:949f3b7c29912693cee0afcf09acd6ebc04c57af949d9bf77d6101ebb61e388c"}, {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:89f4988c7203739d48c6f806f1e87a1d96e0806d44f0fba61dba81392c9e474d"}, {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:de6551e370ef19f8de1807d0a9aa2cdfdce2e85ce88b122fe9f6b2b076837e59"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:0737ddb3068957cf1b054899b0883830bb1fec522ec76b1098f9b6e0f02d9419"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:4f3607b129417e111e30637af1b56f24f7a49e64763253bbc275c75fa887d4b2"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:6c6e0c425f22c1c719c42670d561ad682f7bfeeef918edea971a79ac5252437f"}, - {file = "Brotli-1.1.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:494994f807ba0b92092a163a0a283961369a65f6cbe01e8891132b7a320e61eb"}, {file = "Brotli-1.1.0-cp39-cp39-win32.whl", hash = "sha256:f0d8a7a6b5983c2496e364b969f0e526647a06b075d034f3297dc66f3b360c64"}, {file = "Brotli-1.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:cdad5b9014d83ca68c25d2e9444e28e967ef16e80f6b436918c700c117a85467"}, {file = "Brotli-1.1.0.tar.gz", hash = "sha256:81de08ac11bcb85841e440c13611c00b67d3bf82698314928d0b676362546724"}, @@ -3834,4 +3792,4 @@ scrapy = ["scrapy"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "87308b6edfa2e2acd74bcb6f45826e4ad583c7799a1e387f3c99e3d9798e6982" +content-hash = "d407bff85b715fa17ffa0a82b5dd0fb901762d32deb7674335610b9bdc650318" diff --git a/pyproject.toml b/pyproject.toml index bb27ebb2..b86fca11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ keywords = [ [tool.poetry.dependencies] python = "^3.9" -apify-client = ">=1.8.1" +apify-client = ">=1.9.0" apify-shared = ">=1.2.1" crawlee = "~0.5.1" cryptography = ">=42.0.0" From 3dfb40e60f1a9b9c084b042eb3d50e1132640f50 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Thu, 6 Feb 2025 17:57:56 +0100 Subject: [PATCH 04/24] Implement ChargingManager --- src/apify/_actor.py | 2 +- src/apify/_charging.py | 224 ++++++++++++++++++++++++++++++++++-- src/apify/_configuration.py | 3 +- src/apify/_models.py | 59 +++++++++- 4 files changed, 278 insertions(+), 10 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 70acbd06..8bebd980 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -471,7 +471,7 @@ async def push_data(self, data: dict | list[dict], event_name: str | None = None data = data if isinstance(data, list) else [data] max_charged_count = ( - self._charging_manager.calculate_max_event_charge_within_limit(event_name) + self._charging_manager.calculate_max_event_charge_count_within_limit(event_name) if event_name is not None else None ) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index c83987c7..f57bf7f7 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,7 +1,16 @@ from __future__ import annotations +import math from dataclasses import dataclass -from typing import TYPE_CHECKING +from datetime import datetime, timezone +from decimal import Decimal +from typing import TYPE_CHECKING, Union + +from pydantic import TypeAdapter + +from apify._models import ActorRun, PricingModel +from apify.log import logger +from apify.storages import Dataset if TYPE_CHECKING: from apify_client import ApifyClientAsync @@ -9,22 +18,223 @@ from apify._configuration import Configuration +run_validator: TypeAdapter[ActorRun | None] = TypeAdapter(Union[ActorRun, None]) + + class ChargingManager: + LOCAL_CHARGING_LOG_DATASET_NAME = 'charging_log' + def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None: - pass + self._max_total_charge_usd = configuration.max_total_charge_usd or Decimal('inf') + self._is_at_home = configuration.is_at_home + self._actor_run_id = configuration.actor_run_id + self._purge_charging_log_dataset = configuration.purge_on_start + self._pricing_model: PricingModel | None = None + + if configuration.test_pay_per_event: + if self._is_at_home: + raise ValueError( + 'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported ' + 'in a local development environment' + ) + + self._pricing_model = 'PAY_PER_EVENT' + + self._client = client + self._charging_log_dataset: Dataset | None = None + + self._charging_state: dict[str, ChargingStateItem] | None = None + self._pricing_info: dict[str, PricingInfoItem] = {} + + self._not_ppe_warning_printed = False async def init(self) -> None: - pass + self._charging_state = {} + + if self._is_at_home: + if self._actor_run_id is None: + raise RuntimeError('Actor run ID not found even though the Actor is running on Apify') + + run = run_validator.validate_python(await self._client.run(self._actor_run_id).get()) + if run is None: + raise RuntimeError('Actor run not found') + + if run.pricing_info is not None: + self._pricing_model = run.pricing_info.pricing_model + + if run.pricing_info.pricing_model == 'PAY_PER_EVENT': + for event_name, event_pricing in run.pricing_info.pricing_per_event.actor_charge_events.items(): + self._pricing_info[event_name] = PricingInfoItem( + price=event_pricing.event_price_usd, + title=event_pricing.event_title, + ) + + self._max_total_charge_usd = run.options.max_total_charge_usd or self._max_total_charge_usd + + for event_name, count in (run.charged_event_counts or {}).items(): + price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price + self._charging_state[event_name] = ChargingStateItem( + charge_count=count, + total_charged_amount=count * price, + ) + + if not self._is_at_home and self._pricing_model == 'PAY_PER_EVENT': + if self._purge_charging_log_dataset: + dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) + await dataset.drop() + + self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) async def charge(self, event_name: str, count: int = 1) -> ChargeResult: - pass + if self._charging_state is None: + raise RuntimeError('Charging manager is not initialized') + + def calculate_chargeable() -> dict[str, int | None]: + return { + event_name: self.calculate_max_event_charge_count_within_limit(event_name) + for event_name in self._pricing_info + } + + if self._pricing_model != 'PAY_PER_EVENT': + if not self._not_ppe_warning_printed: + logger.warning( + 'Ignored attempt to charge for an event - the Actor does not use the pay-per-event pricing' + ) + self._not_ppe_warning_printed = True + + return ChargeResult( + event_charge_limit_reached=False, + charged_count=0, + chargeable_within_limit=calculate_chargeable(), + ) + + # START OF CRITICAL SECTION - no awaits here + charged_count = min(count, self.calculate_max_event_charge_count_within_limit(event_name) or count) + + if charged_count == 0: + return ChargeResult( + event_charge_limit_reached=True, + charged_count=0, + chargeable_within_limit=calculate_chargeable(), + ) - def calculate_max_event_charge_within_limit(self, event_name: str) -> int: - pass + pricing_info = self._pricing_info.get( + event_name, + PricingInfoItem( + price=Decimal() + if self._is_at_home + else Decimal( + '1' + ), # Use a nonzero price for local development so that the maximum budget can be reached, + title=f"Unknown event '{event_name}'", + ), + ) + + self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) + self._charging_state[event_name].charge_count += charged_count + self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price + + # END OF CRITICAL SECTION + if self._is_at_home: + if self._actor_run_id is None: + raise RuntimeError('Actor run ID not configured') + + if event_name is self._pricing_info: + await self._client.run(self._actor_run_id).charge(event_name, charged_count) + else: + logger.warning(f"Attempting to charge for an unknown event '{event_name}'") + + if self._charging_log_dataset: + await self._charging_log_dataset.push_data( + { + 'event_name': event_name, + 'event_title': pricing_info.title, + 'event_price_usd': round(pricing_info.price, 3), + 'charged_count': charged_count, + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + ) + + if charged_count < count: + subject = 'instance' if count == 1 else 'instances' + logger.info( + f"Charging {count} ${subject} of '{event_name}' event would exceed max_total_charge_usd " + '- only {charged_count} events were charged' + ) + + max_charge_count = self.calculate_max_event_charge_count_within_limit(event_name) + + return ChargeResult( + event_charge_limit_reached=max_charge_count is not None and max_charge_count <= 0, + charged_count=charged_count, + chargeable_within_limit=calculate_chargeable(), + ) + + def calculate_total_charged_amount(self) -> Decimal: + if self._charging_state is None: + raise RuntimeError('Charging manager is not initialized') + + return sum( + (item.total_charged_amount for item in self._charging_state.values()), + start=Decimal(), + ) + + def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: + if self._charging_state is None: + raise RuntimeError('Charging manager is not initialized') + + pricing_info = self._pricing_info.get(event_name) + + if pricing_info is not None: + price = pricing_info.price + elif not self._is_at_home: + price = Decimal('1') # Use a nonzero price for local development so that the maximum budget can be reached + else: + price = Decimal() + + if not price: + return None + + return math.floor((self._max_total_charge_usd - self.calculate_total_charged_amount()) / price) + + def get_pricing_info(self) -> ActorPricingInfo: + if self._charging_state is None: + raise RuntimeError('Charging manager is not initialized') + + return ActorPricingInfo( + pricing_model=self._pricing_model, + is_pay_per_event=self._pricing_model == 'PAY_PER_EVENT', + max_total_charge_usd=self._max_total_charge_usd + if self._max_total_charge_usd is not None + else Decimal('inf'), + per_event_prices={ + event_name: pricing_info.price for event_name, pricing_info in self._pricing_info.items() + }, + ) @dataclass(frozen=True) class ChargeResult: event_charge_limit_reached: bool charged_count: int - chargeable_within_limit: int + chargeable_within_limit: dict[str, int | None] + + +@dataclass +class ChargingStateItem: + charge_count: int + total_charged_amount: Decimal + + +@dataclass +class PricingInfoItem: + price: Decimal + title: str + + +@dataclass +class ActorPricingInfo: + pricing_model: PricingModel | None + max_total_charge_usd: Decimal + is_pay_per_event: bool + per_event_prices: dict[str, Decimal] diff --git a/src/apify/_configuration.py b/src/apify/_configuration.py index 7020e2a3..e3dec442 100644 --- a/src/apify/_configuration.py +++ b/src/apify/_configuration.py @@ -1,6 +1,7 @@ from __future__ import annotations from datetime import datetime, timedelta +from decimal import Decimal from logging import getLogger from typing import Annotated, Any @@ -212,7 +213,7 @@ class Configuration(CrawleeConfiguration): ] = None max_total_charge_usd: Annotated[ - float | None, + Decimal | None, Field( alias='actor_max_total_charge_usd', description='For pay-per-event Actors, the user-set limit on total charges. Do not exceed this limit', diff --git a/src/apify/_models.py b/src/apify/_models.py index ed66bf2f..627aa875 100644 --- a/src/apify/_models.py +++ b/src/apify/_models.py @@ -1,7 +1,8 @@ from __future__ import annotations from datetime import datetime, timedelta -from typing import Annotated +from decimal import Decimal +from typing import TYPE_CHECKING, Annotated, Literal from pydantic import BaseModel, BeforeValidator, ConfigDict, Field @@ -11,6 +12,9 @@ from apify._utils import docs_group +if TYPE_CHECKING: + from typing_extensions import TypeAlias + @docs_group('Data structures') class Webhook(BaseModel): @@ -67,6 +71,7 @@ class ActorRunOptions(BaseModel): timeout: Annotated[timedelta, Field(alias='timeoutSecs')] memory_mbytes: Annotated[int, Field(alias='memoryMbytes')] disk_mbytes: Annotated[int, Field(alias='diskMbytes')] + max_total_charge_usd: Annotated[Decimal | None, Field(alias='maxTotalChargeUsd')] = None @docs_group('Data structures') @@ -115,3 +120,55 @@ class ActorRun(BaseModel): usage: Annotated[ActorRunUsage | None, Field(alias='usage')] = None usage_total_usd: Annotated[float | None, Field(alias='usageTotalUsd')] = None usage_usd: Annotated[ActorRunUsage | None, Field(alias='usageUsd')] = None + pricing_info: Annotated[ + FreeActorPricingInfo + | FlatPricePerMonthActorPricingInfo + | PricePerDatasetItemActorPricingInfo + | PayPerEventActorPricingInfo + | None, + Field(alias='pricingInfo', discriminator='pricing_model'), + ] = None + charged_event_counts: Annotated[ + dict[str, int] | None, + Field(alias='chargedEventCounts'), + ] = None + + +class FreeActorPricingInfo(BaseModel): + pricing_model: Annotated[Literal['FREE'], Field(alias='pricingModel')] + + +class FlatPricePerMonthActorPricingInfo(BaseModel): + pricing_model: Annotated[Literal['FLAT_PRICE_PER_MONTH'], Field(alias='pricingModel')] + trial_minutes: Annotated[int | None, Field(alias='trialMinutes')] + price_per_unit_usd: Annotated[Decimal, Field(alias='pricePerUnitUsd')] + + +class PricePerDatasetItemActorPricingInfo(BaseModel): + pricing_model: Annotated[Literal['PRICE_PER_DATASET_ITEM'], Field(alias='pricingModel')] + unit_name: Annotated[str | None, Field(alias='unitName')] + price_per_unit_usd: Annotated[Decimal, Field(alias='pricePerUnitUsd')] + + +class ActorChargeEvent(BaseModel): + event_price_usd: Annotated[Decimal, Field(alias='eventPriceUsd')] + event_title: Annotated[str, Field(alias='eventTitle')] + event_description: Annotated[str | None, Field(alias='eventDescription')] = None + + +class PricingPerEvent(BaseModel): + actor_charge_events: Annotated[dict[str, ActorChargeEvent], Field(alias='actorChargeEvents')] + + +class PayPerEventActorPricingInfo(BaseModel): + pricing_model: Annotated[Literal['PAY_PER_EVENT'], Field(alias='pricingModel')] + pricing_per_event: Annotated[PricingPerEvent, Field(alias='pricingPerEvent')] + minimal_max_total_charge_usd: Annotated[Decimal | None, Field(alias='minimalMaxTotalChargeUsd')] + + +PricingModel: TypeAlias = Literal[ + 'FREE', + 'FLAT_PRICE_PER_MONTH', + 'PRICE_PER_DATASET_ITEM', + 'PAY_PER_EVENT', +] From 75b72c5259fd71571e4ec74f3e2430641d2facb4 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 7 Feb 2025 17:01:02 +0100 Subject: [PATCH 05/24] Update to beta apify client --- poetry.lock | 8 ++++---- pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index 7f7b4df0..6c25d4fc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -50,14 +50,14 @@ trio = ["trio (>=0.26.1)"] [[package]] name = "apify-client" -version = "1.9.0" +version = "1.9.2b1" description = "Apify API client for Python" optional = false python-versions = "<4.0,>=3.9" groups = ["main"] files = [ - {file = "apify_client-1.9.0-py3-none-any.whl", hash = "sha256:dd67093c570cea068ac5ad4100f67d57f1aeb217f6f887b369420f913ef597e7"}, - {file = "apify_client-1.9.0.tar.gz", hash = "sha256:f57ec6a2d6b978daa48ffc470e31cfa586ab82145a88acfd1fa79b5857013ddb"}, + {file = "apify_client-1.9.2b1-py3-none-any.whl", hash = "sha256:7c7e3db1b062bd17d9634ae4ccb91ac6d79bc1099823a8a2374866e5c26b76d3"}, + {file = "apify_client-1.9.2b1.tar.gz", hash = "sha256:a7958110be56d643d2ba5ab2e79cd64e3799734a0d3866ebfbd1a785423cfe17"}, ] [package.dependencies] @@ -3792,4 +3792,4 @@ scrapy = ["scrapy"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "d407bff85b715fa17ffa0a82b5dd0fb901762d32deb7674335610b9bdc650318" +content-hash = "8348fd67787a1085c2b503b46bc58283a419780d7c956df982bf51a1b8723f02" diff --git a/pyproject.toml b/pyproject.toml index b86fca11..e18b956f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ keywords = [ [tool.poetry.dependencies] python = "^3.9" -apify-client = ">=1.9.0" +apify-client = ">=1.9.2b1" apify-shared = ">=1.2.1" crawlee = "~0.5.1" cryptography = ">=42.0.0" From fd48836409ccc6c98a6586872e4b162cfaba6206 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 7 Feb 2025 17:02:38 +0100 Subject: [PATCH 06/24] Add default values to optional fields in models --- src/apify/_models.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/apify/_models.py b/src/apify/_models.py index 627aa875..68dd4dac 100644 --- a/src/apify/_models.py +++ b/src/apify/_models.py @@ -140,13 +140,13 @@ class FreeActorPricingInfo(BaseModel): class FlatPricePerMonthActorPricingInfo(BaseModel): pricing_model: Annotated[Literal['FLAT_PRICE_PER_MONTH'], Field(alias='pricingModel')] - trial_minutes: Annotated[int | None, Field(alias='trialMinutes')] + trial_minutes: Annotated[int | None, Field(alias='trialMinutes')] = None price_per_unit_usd: Annotated[Decimal, Field(alias='pricePerUnitUsd')] class PricePerDatasetItemActorPricingInfo(BaseModel): pricing_model: Annotated[Literal['PRICE_PER_DATASET_ITEM'], Field(alias='pricingModel')] - unit_name: Annotated[str | None, Field(alias='unitName')] + unit_name: Annotated[str | None, Field(alias='unitName')] = None price_per_unit_usd: Annotated[Decimal, Field(alias='pricePerUnitUsd')] @@ -163,7 +163,7 @@ class PricingPerEvent(BaseModel): class PayPerEventActorPricingInfo(BaseModel): pricing_model: Annotated[Literal['PAY_PER_EVENT'], Field(alias='pricingModel')] pricing_per_event: Annotated[PricingPerEvent, Field(alias='pricingPerEvent')] - minimal_max_total_charge_usd: Annotated[Decimal | None, Field(alias='minimalMaxTotalChargeUsd')] + minimal_max_total_charge_usd: Annotated[Decimal | None, Field(alias='minimalMaxTotalChargeUsd')] = None PricingModel: TypeAlias = Literal[ From 36159b9a61e7cf99250f4c424c6c3272075d16e6 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 7 Feb 2025 17:02:58 +0100 Subject: [PATCH 07/24] Fix minor bugs --- src/apify/_charging.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index f57bf7f7..782431f4 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -139,7 +139,7 @@ def calculate_chargeable() -> dict[str, int | None]: if self._actor_run_id is None: raise RuntimeError('Actor run ID not configured') - if event_name is self._pricing_info: + if event_name in self._pricing_info: await self._client.run(self._actor_run_id).charge(event_name, charged_count) else: logger.warning(f"Attempting to charge for an unknown event '{event_name}'") @@ -195,7 +195,8 @@ def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int if not price: return None - return math.floor((self._max_total_charge_usd - self.calculate_total_charged_amount()) / price) + result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / price + return math.floor(result) if result.is_finite() else None def get_pricing_info(self) -> ActorPricingInfo: if self._charging_state is None: From 9c5c92a78b3b29c0f97bc6fe073441ac9f643e80 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 7 Feb 2025 17:04:21 +0100 Subject: [PATCH 08/24] Make make_actor fixture non-async and session-scoped, add PPE-related cleanup code --- tests/integration/conftest.py | 80 ++++++++++++++++++++++++----------- 1 file changed, 56 insertions(+), 24 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9a74924a..58615ac4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -12,7 +12,7 @@ import pytest from filelock import FileLock -from apify_client import ApifyClientAsync +from apify_client import ApifyClient, ApifyClientAsync from apify_shared.consts import ActorJobStatus, ActorSourceType, ApifyEnvVars from crawlee import service_locator from crawlee.storages import _creation_management @@ -22,7 +22,8 @@ from apify._models import ActorRun if TYPE_CHECKING: - from collections.abc import AsyncIterator, Awaitable, Coroutine, Mapping + from collections.abc import Awaitable, Coroutine, Iterator, Mapping + from decimal import Decimal from apify_client.clients.resource_clients import ActorClientAsync @@ -94,21 +95,27 @@ def _isolate_test_environment(prepare_test_env: Callable[[], None]) -> None: prepare_test_env() +@pytest.fixture(scope='session') +def apify_token() -> str: + api_token = os.getenv(_TOKEN_ENV_VAR) + + if not api_token: + raise RuntimeError(f'{_TOKEN_ENV_VAR} environment variable is missing, cannot run tests!') + + return api_token + + @pytest.fixture -def apify_client_async() -> ApifyClientAsync: +def apify_client_async(apify_token: str) -> ApifyClientAsync: """Create an instance of the ApifyClientAsync. This fixture can't be session-scoped, because then you start getting `RuntimeError: Event loop is closed` errors, because `httpx.AsyncClient` in `ApifyClientAsync` tries to reuse the same event loop across requests, but `pytest-asyncio` closes the event loop after each test, and uses a new one for the next test. """ - api_token = os.getenv(_TOKEN_ENV_VAR) api_url = os.getenv(_API_URL_ENV_VAR) - if not api_token: - raise RuntimeError(f'{_TOKEN_ENV_VAR} environment variable is missing, cannot run tests!') - - return ApifyClientAsync(api_token, api_url=api_url) + return ApifyClientAsync(apify_token, api_url=api_url) @pytest.fixture(scope='session') @@ -217,17 +224,17 @@ def __call__( """ -@pytest.fixture -async def make_actor( +@pytest.fixture(scope='session') +def make_actor( actor_base_source_files: dict[str, str | bytes], - apify_client_async: ApifyClientAsync, -) -> AsyncIterator[MakeActorFunction]: + apify_token: str, +) -> Iterator[MakeActorFunction]: """Fixture for creating temporary Actors for testing purposes. This returns a function that creates a temporary Actor from the given main function or source files. The Actor will be uploaded to the Apify Platform, built there, and after the test finishes, it will be automatically deleted. """ - actor_clients_for_cleanup: list[ActorClientAsync] = [] + actors_for_cleanup: list[str] = [] async def _make_actor( label: str, @@ -242,6 +249,7 @@ async def _make_actor( if (main_func and main_py) or (main_func and source_files) or (main_py and source_files): raise TypeError('Cannot specify more than one of `main_func`, `main_py` and `source_files` arguments') + client = ApifyClientAsync(token=apify_token, api_url=os.getenv(_API_URL_ENV_VAR)) actor_name = generate_unique_resource_name(label) # Get the source of main_func and convert it into a reasonable main_py file. @@ -290,7 +298,7 @@ async def _make_actor( ) print(f'Creating Actor {actor_name}...') - created_actor = await apify_client_async.actors().create( + created_actor = await client.actors().create( name=actor_name, default_run_build='latest', default_run_memory_mbytes=256, @@ -305,11 +313,11 @@ async def _make_actor( ], ) - actor_client = apify_client_async.actor(created_actor['id']) + actor_client = client.actor(created_actor['id']) print(f'Building Actor {actor_name}...') build_result = await actor_client.build(version_number='0.0') - build_client = apify_client_async.build(build_result['id']) + build_client = client.build(build_result['id']) build_client_result = await build_client.wait_for_finish(wait_secs=600) assert build_client_result is not None @@ -317,15 +325,29 @@ async def _make_actor( # We only mark the client for cleanup if the build succeeded, so that if something goes wrong here, # you have a chance to check the error. - actor_clients_for_cleanup.append(actor_client) + actors_for_cleanup.append(created_actor['id']) return actor_client yield _make_actor + client = ApifyClient(token=apify_token, api_url=os.getenv(_API_URL_ENV_VAR)) + # Delete all the generated Actors. - for actor_client in actor_clients_for_cleanup: - await actor_client.delete() + for actor_id in actors_for_cleanup: + actor_client = client.actor(actor_id) + + if (actor := actor_client.get()) is not None: + actor_client.update( + pricing_infos=[ + *actor['pricingInfos'], + { + 'pricingModel': 'FREE', + }, + ] + ) + + actor_client.delete() class RunActorFunction(Protocol): @@ -336,6 +358,7 @@ def __call__( actor: ActorClientAsync, *, run_input: Any = None, + max_total_charge_usd: Decimal | None = None, ) -> Coroutine[None, None, ActorRun]: """Initiate an Actor run and wait for its completion. @@ -348,21 +371,30 @@ def __call__( """ -@pytest.fixture -async def run_actor(apify_client_async: ApifyClientAsync) -> RunActorFunction: +@pytest.fixture(scope='session') +def run_actor(apify_token: str) -> RunActorFunction: """Fixture for calling an Actor run and waiting for its completion. This fixture returns a function that initiates an Actor run with optional run input, waits for its completion, and retrieves the final result. It uses the `wait_for_finish` method with a timeout of 10 minutes. """ - async def _run_actor(actor: ActorClientAsync, *, run_input: Any = None) -> ActorRun: - call_result = await actor.call(run_input=run_input) + async def _run_actor( + actor: ActorClientAsync, + *, + run_input: Any = None, + max_total_charge_usd: Decimal | None = None, + ) -> ActorRun: + call_result = await actor.call( + run_input=run_input, + max_total_charge_usd=max_total_charge_usd, + ) assert isinstance(call_result, dict), 'The result of ActorClientAsync.call() is not a dictionary.' assert 'id' in call_result, 'The result of ActorClientAsync.call() does not contain an ID.' - run_client = apify_client_async.run(call_result['id']) + client = ApifyClientAsync(token=apify_token, api_url=os.getenv(_API_URL_ENV_VAR)) + run_client = client.run(call_result['id']) run_result = await run_client.wait_for_finish(wait_secs=600) return ActorRun.model_validate(run_result) From 6ab9aa0caa39bf6d8b9cca66c33f2198d069083e Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 7 Feb 2025 17:05:18 +0100 Subject: [PATCH 09/24] Add basic e2e test --- tests/integration/test_actor_charge.py | 95 ++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/integration/test_actor_charge.py diff --git a/tests/integration/test_actor_charge.py b/tests/integration/test_actor_charge.py new file mode 100644 index 00000000..bcd716c3 --- /dev/null +++ b/tests/integration/test_actor_charge.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import asyncio +from decimal import Decimal +from typing import TYPE_CHECKING + +import pytest_asyncio + +from apify_shared.consts import ActorJobStatus + +from apify import Actor +from apify._models import ActorRun + +if TYPE_CHECKING: + from apify_client import ApifyClientAsync + from apify_client.clients import ActorClientAsync + + from .conftest import MakeActorFunction, RunActorFunction + + +@pytest_asyncio.fixture(scope='module', loop_scope='module') +async def ppe_actor_build(make_actor: MakeActorFunction) -> str: + async def main() -> None: + from dataclasses import asdict + + async with Actor: + charge_result = await Actor.charge( + event_name='foobar', + count=4, + ) + Actor.log.info('Charged', extra=asdict(charge_result)) + + actor_client = await make_actor('ppe', main_func=main) + + await actor_client.update( + pricing_infos=[ + { + 'pricingModel': 'PAY_PER_EVENT', + 'pricingPerEvent': { + 'actorChargeEvents': { + 'foobar': { + 'eventTitle': 'Foo bar', + 'eventPriceUsd': 0.1, + 'eventDescription': 'Foo foo bar bar', + }, + }, + }, + }, + ] + ) + + actor = await actor_client.get() + + assert actor is not None + return str(actor['id']) + + +@pytest_asyncio.fixture(scope='function', loop_scope='module') +async def ppe_actor( + ppe_actor_build: str, + apify_client_async: ApifyClientAsync, +) -> ActorClientAsync: + return apify_client_async.actor(ppe_actor_build) + + +async def test_actor_charge_basic( + ppe_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + run = await run_actor(ppe_actor) + + # Wait until the platform gets its act together and refetch + await asyncio.sleep(6) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 4} + + +async def test_actor_charge_limit( + ppe_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + run = await run_actor(ppe_actor, max_total_charge_usd=Decimal('0.2')) + + # Wait until the platform gets its act together and refetch + await asyncio.sleep(6) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 2} From 5a60822b27e2bbe5311202abe4a830a69fdc1949 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Mon, 10 Feb 2025 11:17:50 +0100 Subject: [PATCH 10/24] Update unit tests --- tests/unit/actor/test_actor_env_helpers.py | 19 +++--- tests/unit/actor/test_actor_log.py | 69 +++++++++++----------- 2 files changed, 43 insertions(+), 45 deletions(-) diff --git a/tests/unit/actor/test_actor_env_helpers.py b/tests/unit/actor/test_actor_env_helpers.py index 770fb856..e9eacdb2 100644 --- a/tests/unit/actor/test_actor_env_helpers.py +++ b/tests/unit/actor/test_actor_env_helpers.py @@ -3,6 +3,7 @@ import random import string from datetime import datetime, timedelta +from decimal import Decimal from typing import TYPE_CHECKING, Any from pydantic_core import TzInfo @@ -30,15 +31,7 @@ async def test_actor_is_not_at_home_when_local() -> None: assert is_at_home is False -async def test_actor_is_at_home_on_apify(monkeypatch: pytest.MonkeyPatch) -> None: - print('setenv') - monkeypatch.setenv(ApifyEnvVars.IS_AT_HOME, 'true') - async with Actor as actor: - is_at_home = actor.is_at_home() - assert is_at_home is True - - -async def test_get_env_with_randomized_env_vars(monkeypatch: pytest.MonkeyPatch) -> None: +async def test_get_env_with_randomized_env_vars(monkeypatch: pytest.MonkeyPatch) -> None: # noqa: PLR0912 ignored_env_vars = { ApifyEnvVars.INPUT_KEY, ApifyEnvVars.MEMORY_MBYTES, @@ -82,7 +75,10 @@ async def test_get_env_with_randomized_env_vars(monkeypatch: pytest.MonkeyPatch) continue float_get_env_var = float_env_var.name.lower() - expected_get_env[float_get_env_var] = random.random() + if float_env_var == ActorEnvVars.MAX_TOTAL_CHARGE_USD: + expected_get_env[float_get_env_var] = Decimal(random.random()) + else: + expected_get_env[float_get_env_var] = random.random() monkeypatch.setenv(float_env_var, f'{expected_get_env[float_get_env_var]}') for bool_env_var in BOOL_ENV_VARS: @@ -93,6 +89,9 @@ async def test_get_env_with_randomized_env_vars(monkeypatch: pytest.MonkeyPatch) expected_get_env[bool_get_env_var] = random.choice([True, False]) monkeypatch.setenv(bool_env_var, f'{"true" if expected_get_env[bool_get_env_var] else "false"}') + expected_get_env[ApifyEnvVars.IS_AT_HOME.name.lower()] = False + monkeypatch.setenv(ApifyEnvVars.IS_AT_HOME, 'false') + for datetime_env_var in DATETIME_ENV_VARS: if datetime_env_var in ignored_env_vars: continue diff --git a/tests/unit/actor/test_actor_log.py b/tests/unit/actor/test_actor_log.py index 41217486..0cd1aeb8 100644 --- a/tests/unit/actor/test_actor_log.py +++ b/tests/unit/actor/test_actor_log.py @@ -14,12 +14,8 @@ import pytest -async def test_actor_logs_messages_correctly( - caplog: pytest.LogCaptureFixture, - monkeypatch: pytest.MonkeyPatch, -) -> None: +async def test_actor_logs_messages_correctly(caplog: pytest.LogCaptureFixture) -> None: caplog.set_level(logging.DEBUG, logger='apify') - monkeypatch.setenv('APIFY_IS_AT_HOME', '1') with contextlib.suppress(RuntimeError): async with Actor(configure_logging=False): @@ -43,7 +39,7 @@ async def test_actor_logs_messages_correctly( # Test that exception in Actor.main is logged with the traceback raise RuntimeError('Dummy RuntimeError') - assert len(caplog.records) == 12 + assert len(caplog.records) == 13 assert caplog.records[0].levelno == logging.INFO assert caplog.records[0].message == 'Initializing Actor...' @@ -56,39 +52,42 @@ async def test_actor_logs_messages_correctly( assert getattr(caplog.records[1], 'os', None) == sys.platform assert caplog.records[2].levelno == logging.DEBUG - assert caplog.records[2].message.startswith('APIFY_ACTOR_EVENTS_WS_URL env var not set') + assert caplog.records[2].message == 'Event manager initialized' assert caplog.records[3].levelno == logging.DEBUG - assert caplog.records[3].message == 'Debug message' + assert caplog.records[3].message == 'Charging manager initialized' - assert caplog.records[4].levelno == logging.INFO - assert caplog.records[4].message == 'Info message' + assert caplog.records[4].levelno == logging.DEBUG + assert caplog.records[4].message == 'Debug message' - assert caplog.records[5].levelno == logging.WARNING - assert caplog.records[5].message == 'Warning message' + assert caplog.records[5].levelno == logging.INFO + assert caplog.records[5].message == 'Info message' - assert caplog.records[6].levelno == logging.ERROR - assert caplog.records[6].message == 'Error message' + assert caplog.records[6].levelno == logging.WARNING + assert caplog.records[6].message == 'Warning message' assert caplog.records[7].levelno == logging.ERROR - assert caplog.records[7].message == 'Exception message' - assert caplog.records[7].exc_info is not None - assert caplog.records[7].exc_info[0] is ValueError - assert isinstance(caplog.records[7].exc_info[1], ValueError) - assert str(caplog.records[7].exc_info[1]) == 'Dummy ValueError' - - assert caplog.records[8].levelno == logging.INFO - assert caplog.records[8].message == 'Multi\nline\nlog\nmessage' - - assert caplog.records[9].levelno == logging.ERROR - assert caplog.records[9].message == 'Actor failed with an exception' - assert caplog.records[9].exc_info is not None - assert caplog.records[9].exc_info[0] is RuntimeError - assert isinstance(caplog.records[9].exc_info[1], RuntimeError) - assert str(caplog.records[9].exc_info[1]) == 'Dummy RuntimeError' - - assert caplog.records[10].levelno == logging.INFO - assert caplog.records[10].message == 'Exiting Actor' - - assert caplog.records[11].levelno == logging.DEBUG - assert caplog.records[11].message == 'Not calling sys.exit(91) because Actor is running in an unit test' + assert caplog.records[7].message == 'Error message' + + assert caplog.records[8].levelno == logging.ERROR + assert caplog.records[8].message == 'Exception message' + assert caplog.records[8].exc_info is not None + assert caplog.records[8].exc_info[0] is ValueError + assert isinstance(caplog.records[8].exc_info[1], ValueError) + assert str(caplog.records[8].exc_info[1]) == 'Dummy ValueError' + + assert caplog.records[9].levelno == logging.INFO + assert caplog.records[9].message == 'Multi\nline\nlog\nmessage' + + assert caplog.records[10].levelno == logging.ERROR + assert caplog.records[10].message == 'Actor failed with an exception' + assert caplog.records[10].exc_info is not None + assert caplog.records[10].exc_info[0] is RuntimeError + assert isinstance(caplog.records[10].exc_info[1], RuntimeError) + assert str(caplog.records[10].exc_info[1]) == 'Dummy RuntimeError' + + assert caplog.records[11].levelno == logging.INFO + assert caplog.records[11].message == 'Exiting Actor' + + assert caplog.records[12].levelno == logging.DEBUG + assert caplog.records[12].message == 'Not calling sys.exit(91) because Actor is running in an unit test' From 46bef0de4258c42d2f66be3c9d23121ffeb3d85c Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Mon, 10 Feb 2025 14:44:31 +0100 Subject: [PATCH 11/24] Fix last test --- tests/unit/actor/test_actor_lifecycle.py | 36 +++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index 33af45e6..b1bfbd78 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -4,13 +4,15 @@ import contextlib import json import sys +from datetime import datetime, timezone from typing import Any, Callable, cast +from unittest.mock import AsyncMock, Mock import pytest import websockets.server from lazy_object_proxy import Proxy -from apify_shared.consts import ApifyEnvVars +from apify_shared.consts import ActorEnvVars, ApifyEnvVars from crawlee.events._types import Event, EventPersistStateData import apify._actor @@ -129,6 +131,7 @@ async def test_actor_handles_migrating_event_correctly(monkeypatch: pytest.Monke # the Actor automatically emits the PERSIST_STATE event with data `{'isMigrating': True}` monkeypatch.setenv(ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS, '500') monkeypatch.setenv(ApifyEnvVars.IS_AT_HOME, '1') + monkeypatch.setenv(ActorEnvVars.RUN_ID, 'asdf') persist_state_events_data = [] @@ -143,6 +146,37 @@ async def handler(websocket: websockets.server.WebSocketServerProtocol) -> None: port: int = ws_server.sockets[0].getsockname()[1] # type: ignore[index] monkeypatch.setenv(ApifyEnvVars.ACTOR_EVENTS_WS_URL, f'ws://localhost:{port}') + mock_run_client = Mock() + mock_run_client.run.return_value.get = AsyncMock( + side_effect=lambda: { + 'id': 'asdf', + 'actId': 'asdf', + 'userId': 'adsf', + 'startedAt': datetime.now(timezone.utc), + 'status': 'RUNNING', + 'meta': {'origin': 'DEVELOPMENT'}, + 'stats': { + 'inputBodyLen': 99, + 'restartCount': 0, + 'resurrectCount': 0, + 'computeUnits': 1, + }, + 'options': { + 'build': 'asdf', + 'timeoutSecs': 4, + 'memoryMbytes': 1024, + 'diskMbytes': 1024, + }, + 'buildId': 'hjkl', + 'defaultDatasetId': 'hjkl', + 'defaultKeyValueStoreId': 'hjkl', + 'defaultRequestQueueId': 'hjkl', + 'containerUrl': 'https://hjkl', + } + ) + + monkeypatch.setattr(Actor._charging_manager, '_client', mock_run_client) + async with Actor: Actor.on(Event.PERSIST_STATE, log_persist_state) await asyncio.sleep(2) From f75647994848193b4ce9c6cb308323c82a3f2140 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Fri, 14 Feb 2025 23:50:38 +0100 Subject: [PATCH 12/24] Use non-beta apify client --- poetry.lock | 8 ++++---- pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index 6c25d4fc..99ba3b24 100644 --- a/poetry.lock +++ b/poetry.lock @@ -50,14 +50,14 @@ trio = ["trio (>=0.26.1)"] [[package]] name = "apify-client" -version = "1.9.2b1" +version = "1.9.2" description = "Apify API client for Python" optional = false python-versions = "<4.0,>=3.9" groups = ["main"] files = [ - {file = "apify_client-1.9.2b1-py3-none-any.whl", hash = "sha256:7c7e3db1b062bd17d9634ae4ccb91ac6d79bc1099823a8a2374866e5c26b76d3"}, - {file = "apify_client-1.9.2b1.tar.gz", hash = "sha256:a7958110be56d643d2ba5ab2e79cd64e3799734a0d3866ebfbd1a785423cfe17"}, + {file = "apify_client-1.9.2-py3-none-any.whl", hash = "sha256:a441fb59b5ec1c42aead73284c90304029442ddc26e764c151b8dc7f15e38600"}, + {file = "apify_client-1.9.2.tar.gz", hash = "sha256:af76b78c3153263040615daec0619765e067466bbb82e569afe799ad72c53050"}, ] [package.dependencies] @@ -3792,4 +3792,4 @@ scrapy = ["scrapy"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "8348fd67787a1085c2b503b46bc58283a419780d7c956df982bf51a1b8723f02" +content-hash = "0e390f17a20d24316610ab40008b0806b46a8a4782a8afe5861ad7b4d9254ab1" diff --git a/pyproject.toml b/pyproject.toml index e18b956f..2aa3c21d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ keywords = [ [tool.poetry.dependencies] python = "^3.9" -apify-client = ">=1.9.2b1" +apify-client = ">=1.9.2" apify-shared = ">=1.2.1" crawlee = "~0.5.1" cryptography = ">=42.0.0" From 9d125fbeb8025119dd3b5c3d018c4f034348a521 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Sat, 15 Feb 2025 00:32:43 +0100 Subject: [PATCH 13/24] Fill in docstrings --- src/apify/_actor.py | 4 ++++ src/apify/_charging.py | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 8bebd980..a6d943ed 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -545,6 +545,10 @@ def get_charging_manager(self) -> ChargingManager: return self._charging_manager async def charge(self, event_name: str, count: int = 1) -> ChargeResult: + """Charge for a specified number of events - sub-operations of the Actor. + + This is relevant only for the pay-per-event pricing model. + """ self._raise_if_not_initialized() return await self._charging_manager.charge(event_name, count) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 782431f4..4a405675 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -49,6 +49,7 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._not_ppe_warning_printed = False async def init(self) -> None: + """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" self._charging_state = {} if self._is_at_home: @@ -86,15 +87,21 @@ async def init(self) -> None: self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) async def charge(self, event_name: str, count: int = 1) -> ChargeResult: + """Charge for a specified number of events - sub-operations of the Actor. + + This is relevant only for the pay-per-event pricing model. + """ if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') def calculate_chargeable() -> dict[str, int | None]: + """Calculate the maximum number of events of each type that can be charged within the current budget.""" return { event_name: self.calculate_max_event_charge_count_within_limit(event_name) for event_name in self._pricing_info } + # For runs that do not use the pay-per-event pricing model, just print a warning and return if self._pricing_model != 'PAY_PER_EVENT': if not self._not_ppe_warning_printed: logger.warning( @@ -109,6 +116,8 @@ def calculate_chargeable() -> dict[str, int | None]: ) # START OF CRITICAL SECTION - no awaits here + + # Determine the maximum amount of events that can be charged within the budget charged_count = min(count, self.calculate_max_event_charge_count_within_limit(event_name) or count) if charged_count == 0: @@ -130,11 +139,14 @@ def calculate_chargeable() -> dict[str, int | None]: ), ) + # Update the charging state self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) self._charging_state[event_name].charge_count += charged_count self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price # END OF CRITICAL SECTION + + # If running on the platform, call the charge endpoint if self._is_at_home: if self._actor_run_id is None: raise RuntimeError('Actor run ID not configured') @@ -144,6 +156,7 @@ def calculate_chargeable() -> dict[str, int | None]: else: logger.warning(f"Attempting to charge for an unknown event '{event_name}'") + # Log the charged operation (if enabled) if self._charging_log_dataset: await self._charging_log_dataset.push_data( { @@ -155,6 +168,7 @@ def calculate_chargeable() -> dict[str, int | None]: } ) + # If it is not possible to charge the full amount, log that fact if charged_count < count: subject = 'instance' if count == 1 else 'instances' logger.info( @@ -171,6 +185,7 @@ def calculate_chargeable() -> dict[str, int | None]: ) def calculate_total_charged_amount(self) -> Decimal: + """Calculate the total amount of money charged for pay-per-event events.""" if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -180,6 +195,7 @@ def calculate_total_charged_amount(self) -> Decimal: ) def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: + """Calculate how many instances of an event can be charged before we reach the configured limit.""" if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -199,6 +215,7 @@ def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int return math.floor(result) if result.is_finite() else None def get_pricing_info(self) -> ActorPricingInfo: + """Retrieve detailed infor about the effective pricing of the current Actor run.""" if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') From b827bb5800cee71d4bca5b8c7dd0cc002512c01f Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Sun, 16 Feb 2025 14:03:57 +0100 Subject: [PATCH 14/24] More documentation --- src/apify/_charging.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 4a405675..c1de8d6a 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -9,6 +9,7 @@ from pydantic import TypeAdapter from apify._models import ActorRun, PricingModel +from apify._utils import docs_group from apify.log import logger from apify.storages import Dataset @@ -21,6 +22,7 @@ run_validator: TypeAdapter[ActorRun | None] = TypeAdapter(Union[ActorRun, None]) +@docs_group('Classes') class ChargingManager: LOCAL_CHARGING_LOG_DATASET_NAME = 'charging_log' @@ -185,7 +187,7 @@ def calculate_chargeable() -> dict[str, int | None]: ) def calculate_total_charged_amount(self) -> Decimal: - """Calculate the total amount of money charged for pay-per-event events.""" + """Calculate the total amount of money charged for pay-per-event events so far.""" if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -215,7 +217,10 @@ def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int return math.floor(result) if result.is_finite() else None def get_pricing_info(self) -> ActorPricingInfo: - """Retrieve detailed infor about the effective pricing of the current Actor run.""" + """Retrieve detailed information about the effective pricing of the current Actor run. + + This can be used for instance when your code needs to support multiple pricing models in transition periods. + """ if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -231,11 +236,17 @@ def get_pricing_info(self) -> ActorPricingInfo: ) +@docs_group('Data structures') @dataclass(frozen=True) class ChargeResult: event_charge_limit_reached: bool + """If true, no more events of this type can be charged within the limit""" + charged_count: int + """Total amount of charged events - may be lower than the requested amount""" + chargeable_within_limit: dict[str, int | None] + """How many events of each known type can still be charged within the limit""" @dataclass From a4f1cbc89067c159645c35d13554c02eed17f8a7 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Sun, 16 Feb 2025 14:19:08 +0100 Subject: [PATCH 15/24] Little bit more documentation --- src/apify/_charging.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index c1de8d6a..109f3f51 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -249,6 +249,22 @@ class ChargeResult: """How many events of each known type can still be charged within the limit""" +@docs_group('Data structures') +@dataclass +class ActorPricingInfo: + pricing_model: PricingModel | None + """The currently effective pricing model""" + + max_total_charge_usd: Decimal + """A configured limit for the total charged amount - if you exceed it, you won't receive more money than this.""" + + is_pay_per_event: bool + """A shortcut - true if the Actor runs with the pay-per-event pricing model""" + + per_event_prices: dict[str, Decimal] + """Price of every known event type""" + + @dataclass class ChargingStateItem: charge_count: int @@ -259,11 +275,3 @@ class ChargingStateItem: class PricingInfoItem: price: Decimal title: str - - -@dataclass -class ActorPricingInfo: - pricing_model: PricingModel | None - max_total_charge_usd: Decimal - is_pay_per_event: bool - per_event_prices: dict[str, Decimal] From f7c56c4268aec0871b101b60791732f09947cc6a Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 18 Feb 2025 20:16:33 +0100 Subject: [PATCH 16/24] More robust test result checking --- tests/integration/test_actor_charge.py | 47 +++++++++++++++++++------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/tests/integration/test_actor_charge.py b/tests/integration/test_actor_charge.py index bcd716c3..c6acd54b 100644 --- a/tests/integration/test_actor_charge.py +++ b/tests/integration/test_actor_charge.py @@ -12,6 +12,8 @@ from apify._models import ActorRun if TYPE_CHECKING: + from collections.abc import Iterable + from apify_client import ApifyClientAsync from apify_client.clients import ActorClientAsync @@ -63,6 +65,13 @@ async def ppe_actor( return apify_client_async.actor(ppe_actor_build) +def retry_counter(retry_count: int) -> Iterable[tuple[bool, int]]: + for retry in range(retry_count - 1): + yield False, retry + + yield True, retry_count - 1 + + async def test_actor_charge_basic( ppe_actor: ActorClientAsync, run_actor: RunActorFunction, @@ -70,13 +79,19 @@ async def test_actor_charge_basic( ) -> None: run = await run_actor(ppe_actor) - # Wait until the platform gets its act together and refetch - await asyncio.sleep(6) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 4} + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 4} + break + except AssertionError: + if is_last_attempt: + raise async def test_actor_charge_limit( @@ -86,10 +101,16 @@ async def test_actor_charge_limit( ) -> None: run = await run_actor(ppe_actor, max_total_charge_usd=Decimal('0.2')) - # Wait until the platform gets its act together and refetch - await asyncio.sleep(6) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) - - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 2} + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 2} + break + except AssertionError: + if is_last_attempt: + raise From ebc1687b0c834a71c268c64043e3bdbdcec5d430 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 18 Feb 2025 20:30:35 +0100 Subject: [PATCH 17/24] Clarify docstring --- src/apify/_actor.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index eef05b80..ceb79052 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -462,13 +462,14 @@ async def open_request_queue( @overload async def push_data(self, data: dict | list[dict]) -> None: ... @overload - async def push_data(self, data: dict | list[dict], event_name: str) -> ChargeResult: ... - async def push_data(self, data: dict | list[dict], event_name: str | None = None) -> ChargeResult | None: + async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ... + async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None: """Store an object or a list of objects to the default dataset of the current Actor run. Args: data: The data to push to the default dataset. - event_name: If provided, the method will attempt to charge for the event for each pushed item. + charged_event_name: If provided and if the Actor uses the pay-per-event pricing model, + the method will attempt to charge for the event for each pushed item. """ self._raise_if_not_initialized() @@ -478,8 +479,8 @@ async def push_data(self, data: dict | list[dict], event_name: str | None = None data = data if isinstance(data, list) else [data] max_charged_count = ( - self._charging_manager.calculate_max_event_charge_count_within_limit(event_name) - if event_name is not None + self._charging_manager.calculate_max_event_charge_count_within_limit(charged_event_name) + if charged_event_name is not None else None ) @@ -491,9 +492,9 @@ async def push_data(self, data: dict | list[dict], event_name: str | None = None else: await dataset.push_data(data) - if event_name: + if charged_event_name: return await self._charging_manager.charge( - event_name=event_name, + event_name=charged_event_name, count=min(max_charged_count, len(data)) if max_charged_count is not None else len(data), ) From 3b8747553528ce9caa0a3b2fd032890367edf797 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 18 Feb 2025 20:36:40 +0100 Subject: [PATCH 18/24] Missing docstrings and periods --- src/apify/_charging.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 109f3f51..9f90b50c 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -239,30 +239,34 @@ def get_pricing_info(self) -> ActorPricingInfo: @docs_group('Data structures') @dataclass(frozen=True) class ChargeResult: + """Result of the `ChargingManager.charge` method.""" + event_charge_limit_reached: bool - """If true, no more events of this type can be charged within the limit""" + """If true, no more events of this type can be charged within the limit.""" charged_count: int - """Total amount of charged events - may be lower than the requested amount""" + """Total amount of charged events - may be lower than the requested amount.""" chargeable_within_limit: dict[str, int | None] - """How many events of each known type can still be charged within the limit""" + """How many events of each known type can still be charged within the limit.""" @docs_group('Data structures') @dataclass class ActorPricingInfo: + """Result of the `ChargingManager.get_pricing_info` method.""" + pricing_model: PricingModel | None - """The currently effective pricing model""" + """The currently effective pricing model.""" max_total_charge_usd: Decimal """A configured limit for the total charged amount - if you exceed it, you won't receive more money than this.""" is_pay_per_event: bool - """A shortcut - true if the Actor runs with the pay-per-event pricing model""" + """A shortcut - true if the Actor runs with the pay-per-event pricing model.""" per_event_prices: dict[str, Decimal] - """Price of every known event type""" + """Price of every known event type.""" @dataclass From b4807bcb347bb9360155ae178aa3daa806e8c39f Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 18 Feb 2025 21:15:13 +0100 Subject: [PATCH 19/24] Add comments --- src/apify/_charging.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 9f90b50c..4a714cd3 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -55,6 +55,8 @@ async def init(self) -> None: self._charging_state = {} if self._is_at_home: + # Running on the Apify platform - fetch pricing info for the current run. + if self._actor_run_id is None: raise RuntimeError('Actor run ID not found even though the Actor is running on Apify') @@ -82,6 +84,9 @@ async def init(self) -> None: ) if not self._is_at_home and self._pricing_model == 'PAY_PER_EVENT': + # We are not running on the Apify platform, but PPE is enabled for testing - open a dataset that + # will contain a log of all charge calls for debugging purposes. + if self._purge_charging_log_dataset: dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) await dataset.drop() From efb6079803fe0fb14a366aa0bfe3b70ab33f89b2 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Tue, 18 Feb 2025 21:18:06 +0100 Subject: [PATCH 20/24] Tolerate missing pricingInfos field in tests --- tests/integration/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8b28661e..1cd800f1 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -353,7 +353,7 @@ async def _make_actor( if (actor := actor_client.get()) is not None: actor_client.update( pricing_infos=[ - *actor['pricingInfos'], + *actor.get('pricingInfos', []), { 'pricingModel': 'FREE', }, From 872f04c8058a0e0d72d8a9a36858c4eb22b408fe Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Wed, 19 Feb 2025 10:02:00 +0100 Subject: [PATCH 21/24] Rename a parameter --- tests/integration/test_actor_charge.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_actor_charge.py b/tests/integration/test_actor_charge.py index c6acd54b..d72062bc 100644 --- a/tests/integration/test_actor_charge.py +++ b/tests/integration/test_actor_charge.py @@ -65,11 +65,11 @@ async def ppe_actor( return apify_client_async.actor(ppe_actor_build) -def retry_counter(retry_count: int) -> Iterable[tuple[bool, int]]: - for retry in range(retry_count - 1): +def retry_counter(total_attempts: int) -> Iterable[tuple[bool, int]]: + for retry in range(total_attempts - 1): yield False, retry - yield True, retry_count - 1 + yield True, total_attempts - 1 async def test_actor_charge_basic( From 1ea5ae8ab6a71f950bfa4a05ad106f49ab8eebe8 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Wed, 19 Feb 2025 11:22:04 +0100 Subject: [PATCH 22/24] Separate public and private interface of CharginManager --- src/apify/_actor.py | 7 ++- src/apify/_charging.py | 129 ++++++++++++++++++++++++++--------------- src/apify/_utils.py | 2 +- 3 files changed, 87 insertions(+), 51 deletions(-) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index ceb79052..396b2672 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -24,7 +24,7 @@ EventSystemInfoData, ) -from apify._charging import ChargeResult, ChargingManager +from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation from apify._configuration import Configuration from apify._consts import EVENT_LISTENERS_TIMEOUT from apify._crypto import decrypt_input_secrets, load_private_key @@ -98,7 +98,7 @@ def __init__( ) ) - self._charging_manager = ChargingManager(self._configuration, self._apify_client) + self._charging_manager = ChargingManagerImplementation(self._configuration, self._apify_client) self._is_initialized = False @@ -232,7 +232,7 @@ async def init(self) -> None: await self._event_manager.__aenter__() self.log.debug('Event manager initialized') - await self._charging_manager.init() + await self._charging_manager.__aenter__() self.log.debug('Charging manager initialized') self._is_initialized = True @@ -276,6 +276,7 @@ async def finalize() -> None: await self._event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout) await self._event_manager.__aexit__(None, None, None) + await self._charging_manager.__aexit__(None, None, None) await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds()) self._is_initialized = False diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 4a714cd3..59f54540 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -4,16 +4,20 @@ from dataclasses import dataclass from datetime import datetime, timezone from decimal import Decimal -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Protocol, Union from pydantic import TypeAdapter +from apify_shared.utils import ignore_docs + from apify._models import ActorRun, PricingModel from apify._utils import docs_group from apify.log import logger from apify.storages import Dataset if TYPE_CHECKING: + from types import TracebackType + from apify_client import ApifyClientAsync from apify._configuration import Configuration @@ -22,8 +26,74 @@ run_validator: TypeAdapter[ActorRun | None] = TypeAdapter(Union[ActorRun, None]) -@docs_group('Classes') -class ChargingManager: +@docs_group('Interfaces') +class ChargingManager(Protocol): + """Provides fine-grained access to pay-per-event functionality.""" + + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: + """Charge for a specified number of events - sub-operations of the Actor. + + This is relevant only for the pay-per-event pricing model. + + Args: + event_name: Name of the event to be charged for. + count: Number of events to charge for. + """ + + def calculate_total_charged_amount(self) -> Decimal: + """Calculate the total amount of money charged for pay-per-event events so far.""" + + def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: + """Calculate how many instances of an event can be charged before we reach the configured limit. + + Args: + event_name: Name of the inspected event. + """ + + def get_pricing_info(self) -> ActorPricingInfo: + """Retrieve detailed information about the effective pricing of the current Actor run. + + This can be used for instance when your code needs to support multiple pricing models in transition periods. + """ + + +@docs_group('Data structures') +@dataclass(frozen=True) +class ChargeResult: + """Result of the `ChargingManager.charge` method.""" + + event_charge_limit_reached: bool + """If true, no more events of this type can be charged within the limit.""" + + charged_count: int + """Total amount of charged events - may be lower than the requested amount.""" + + chargeable_within_limit: dict[str, int | None] + """How many events of each known type can still be charged within the limit.""" + + +@docs_group('Data structures') +@dataclass +class ActorPricingInfo: + """Result of the `ChargingManager.get_pricing_info` method.""" + + pricing_model: PricingModel | None + """The currently effective pricing model.""" + + max_total_charge_usd: Decimal + """A configured limit for the total charged amount - if you exceed it, you won't receive more money than this.""" + + is_pay_per_event: bool + """A shortcut - true if the Actor runs with the pay-per-event pricing model.""" + + per_event_prices: dict[str, Decimal] + """Price of every known event type.""" + + +@ignore_docs +class ChargingManagerImplementation(ChargingManager): + """Implementation of the `ChargingManager` Protocol - this is only meant to be instantiated internally.""" + LOCAL_CHARGING_LOG_DATASET_NAME = 'charging_log' def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None: @@ -50,7 +120,7 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._not_ppe_warning_printed = False - async def init(self) -> None: + async def __aenter__(self) -> None: """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" self._charging_state = {} @@ -93,11 +163,15 @@ async def init(self) -> None: self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) - async def charge(self, event_name: str, count: int = 1) -> ChargeResult: - """Charge for a specified number of events - sub-operations of the Actor. + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + pass - This is relevant only for the pay-per-event pricing model. - """ + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -192,7 +266,6 @@ def calculate_chargeable() -> dict[str, int | None]: ) def calculate_total_charged_amount(self) -> Decimal: - """Calculate the total amount of money charged for pay-per-event events so far.""" if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -202,7 +275,6 @@ def calculate_total_charged_amount(self) -> Decimal: ) def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: - """Calculate how many instances of an event can be charged before we reach the configured limit.""" if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -222,10 +294,6 @@ def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int return math.floor(result) if result.is_finite() else None def get_pricing_info(self) -> ActorPricingInfo: - """Retrieve detailed information about the effective pricing of the current Actor run. - - This can be used for instance when your code needs to support multiple pricing models in transition periods. - """ if self._charging_state is None: raise RuntimeError('Charging manager is not initialized') @@ -241,39 +309,6 @@ def get_pricing_info(self) -> ActorPricingInfo: ) -@docs_group('Data structures') -@dataclass(frozen=True) -class ChargeResult: - """Result of the `ChargingManager.charge` method.""" - - event_charge_limit_reached: bool - """If true, no more events of this type can be charged within the limit.""" - - charged_count: int - """Total amount of charged events - may be lower than the requested amount.""" - - chargeable_within_limit: dict[str, int | None] - """How many events of each known type can still be charged within the limit.""" - - -@docs_group('Data structures') -@dataclass -class ActorPricingInfo: - """Result of the `ChargingManager.get_pricing_info` method.""" - - pricing_model: PricingModel | None - """The currently effective pricing model.""" - - max_total_charge_usd: Decimal - """A configured limit for the total charged amount - if you exceed it, you won't receive more money than this.""" - - is_pay_per_event: bool - """A shortcut - true if the Actor runs with the pay-per-event pricing model.""" - - per_event_prices: dict[str, Decimal] - """Price of every known event type.""" - - @dataclass class ChargingStateItem: charge_count: int diff --git a/src/apify/_utils.py b/src/apify/_utils.py index 0179c414..c671e2e5 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -27,7 +27,7 @@ def is_running_in_ipython() -> bool: return getattr(builtins, '__IPYTHON__', False) -GroupName = Literal['Classes', 'Abstract classes', 'Data structures', 'Errors', 'Functions'] +GroupName = Literal['Classes', 'Abstract classes', 'Interfaces', 'Data structures', 'Errors', 'Functions'] def docs_group(group_name: GroupName) -> Callable: # noqa: ARG001 From d023fca1460a08c7635d2894a1a01dbec94ae087 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Wed, 19 Feb 2025 11:22:57 +0100 Subject: [PATCH 23/24] Improve docblock --- src/apify/_actor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 396b2672..c91f78f4 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -557,6 +557,10 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult: """Charge for a specified number of events - sub-operations of the Actor. This is relevant only for the pay-per-event pricing model. + + Args: + event_name: Name of the event to be charged for. + count: Number of events to charge for. """ self._raise_if_not_initialized() return await self._charging_manager.charge(event_name, count) From a602d4dde21f7a225c6b79768a23c1d68f423a45 Mon Sep 17 00:00:00 2001 From: Jan Buchar Date: Wed, 19 Feb 2025 12:23:07 +0100 Subject: [PATCH 24/24] Utilize @ensure_context --- src/apify/_charging.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/apify/_charging.py b/src/apify/_charging.py index 59f54540..2fb9dae5 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -9,6 +9,7 @@ from pydantic import TypeAdapter from apify_shared.utils import ignore_docs +from crawlee._utils.context import ensure_context from apify._models import ActorRun, PricingModel from apify._utils import docs_group @@ -115,14 +116,15 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._client = client self._charging_log_dataset: Dataset | None = None - self._charging_state: dict[str, ChargingStateItem] | None = None + self._charging_state: dict[str, ChargingStateItem] = {} self._pricing_info: dict[str, PricingInfoItem] = {} self._not_ppe_warning_printed = False + self.active = False async def __aenter__(self) -> None: """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" - self._charging_state = {} + self.active = True if self._is_at_home: # Running on the Apify platform - fetch pricing info for the current run. @@ -169,12 +171,13 @@ async def __aexit__( exc_value: BaseException | None, exc_traceback: TracebackType | None, ) -> None: - pass + if not self.active: + raise RuntimeError('Exiting an uninitialized ChargingManager') - async def charge(self, event_name: str, count: int = 1) -> ChargeResult: - if self._charging_state is None: - raise RuntimeError('Charging manager is not initialized') + self.active = False + @ensure_context + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: def calculate_chargeable() -> dict[str, int | None]: """Calculate the maximum number of events of each type that can be charged within the current budget.""" return { @@ -265,19 +268,15 @@ def calculate_chargeable() -> dict[str, int | None]: chargeable_within_limit=calculate_chargeable(), ) + @ensure_context def calculate_total_charged_amount(self) -> Decimal: - if self._charging_state is None: - raise RuntimeError('Charging manager is not initialized') - return sum( (item.total_charged_amount for item in self._charging_state.values()), start=Decimal(), ) + @ensure_context def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: - if self._charging_state is None: - raise RuntimeError('Charging manager is not initialized') - pricing_info = self._pricing_info.get(event_name) if pricing_info is not None: @@ -293,10 +292,8 @@ def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / price return math.floor(result) if result.is_finite() else None + @ensure_context def get_pricing_info(self) -> ActorPricingInfo: - if self._charging_state is None: - raise RuntimeError('Charging manager is not initialized') - return ActorPricingInfo( pricing_model=self._pricing_model, is_pay_per_event=self._pricing_model == 'PAY_PER_EVENT',