diff --git a/docs/coverage.md b/docs/coverage.md index 1aa4a5d..0c1fca5 100644 --- a/docs/coverage.md +++ b/docs/coverage.md @@ -25,7 +25,10 @@ See which `algorand-python` stubs are implemented by the `algorand-python-testin | Application | Emulated | | subroutine | Emulated | | Global | Emulated | +| op.Box.\* | Emulated | | Box | Emulated | +| BoxRef | Emulated | +| BoxMap | Emulated | | Block | Emulated | | logicsig | Emulated | | log | Emulated | diff --git a/docs/usage.md b/docs/usage.md index ec3b8c6..4356615 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -232,9 +232,35 @@ To be documented... ### Boxes -To be documented... +The higher-level Boxes interface, introduced in version 2.1.0, along with all low-level Box 'op' calls, are available. -> NOTE: Higher level Boxes interface introduce in v2.1.0 is not supported yet, however all low level Box 'op' calls are available. +```py +import algopy + +# Check and mark the sender's POA claim in the Box by their address +# to prevent duplicates using low-level Box 'op' calls. +_id, has_claimed = algopy.op.Box.get(algopy.Txn.sender.bytes) +assert not has_claimed, "Already claimed POA" +algopy.op.Box.put(algopy.Txn.sender.bytes, algopy.op.itob(minted_asset.id)) + +# Utilizing the higher-level 'Box' interface for an alternative implementation. +box = algopy.Box(algopy.UInt64, key=algopy.Txn.sender.bytes) +has_claimed = bool(box) +assert not has_claimed, "Already claimed POA" +box.value = minted_asset.id + +# Utilizing the higher-level 'BoxRef' interface for an alternative implementation. +box_ref = algopy.BoxRef(key=algopy.Txn.sender.bytes) +has_claimed = bool(box_ref) +assert not has_claimed, "Already claimed POA" +box_ref.put(algopy.op.itob(minted_asset.id)) + +# Utilizing the higher-level 'BoxMap' interface for an alternative implementation. +box_map = algopy.BoxMap(algopy.Bytes, algopy.UInt64, key_prefix="box_map") +has_claimed = algopy.Txn.sender.bytes in self.box_map +assert not has_claimed, "Already claimed POA" +self.box_map[algopy.Txn.sender.bytes] = minted_asset.id +``` ## Smart Signatures diff --git a/examples/box/__init__.py b/examples/box/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/box/contract.py b/examples/box/contract.py new file mode 100644 index 0000000..15f44f6 --- /dev/null +++ b/examples/box/contract.py @@ -0,0 +1,20 @@ +from algopy import ARC4Contract, Box, OnCompleteAction, TransactionType, arc4, op + + +class BoxContract(ARC4Contract): + + def __init__(self) -> None: + self.oca = Box(OnCompleteAction) + self.txn = Box(TransactionType) + + @arc4.abimethod() + def store_enums(self) -> None: + self.oca.value = OnCompleteAction.OptIn + self.txn.value = TransactionType.ApplicationCall + + @arc4.abimethod() + def read_enums(self) -> tuple[OnCompleteAction, TransactionType]: + assert op.Box.get(b"oca")[0] == op.itob(self.oca.value) + assert op.Box.get(b"txn")[0] == op.itob(self.txn.value) + + return self.oca.value, self.txn.value diff --git a/examples/box/test_contract.py b/examples/box/test_contract.py new file mode 100644 index 0000000..c476a43 --- /dev/null +++ b/examples/box/test_contract.py @@ -0,0 +1,26 @@ +from collections.abc import Generator + +import pytest +from algopy import op +from algopy_testing import AlgopyTestContext, algopy_testing_context + +from .contract import BoxContract + + +@pytest.fixture() +def context() -> Generator[AlgopyTestContext, None, None]: + with algopy_testing_context() as ctx: + yield ctx + + +def test_enums(context: AlgopyTestContext) -> None: + # Arrange + contract = BoxContract() + + # Act + contract.store_enums() + oca, txn = contract.read_enums() + + # Assert + assert context.get_box(b"oca") == op.itob(oca) + assert context.get_box(b"txn") == op.itob(txn) diff --git a/examples/proof_of_attendance/contract.py b/examples/proof_of_attendance/contract.py index 101809b..8e7e819 100644 --- a/examples/proof_of_attendance/contract.py +++ b/examples/proof_of_attendance/contract.py @@ -6,6 +6,7 @@ def __init__(self) -> None: self.max_attendees = algopy.UInt64(30) self.asset_url = algopy.String("ipfs://QmW5vERkgeJJtSY1YQdcWU6gsHCZCyLFtM1oT9uyy2WGm8") self.total_attendees = algopy.UInt64(0) + self.box_map = algopy.BoxMap(algopy.Bytes, algopy.UInt64) @algopy.arc4.abimethod(create="require") def init(self, max_attendees: algopy.UInt64) -> None: @@ -24,12 +25,70 @@ def confirm_attendance(self) -> None: algopy.op.Box.put(algopy.Txn.sender.bytes, algopy.op.itob(minted_asset.id)) + @algopy.arc4.abimethod() + def confirm_attendance_with_box(self) -> None: + assert self.total_attendees < self.max_attendees, "Max attendees reached" + + minted_asset = self._mint_poa(algopy.Txn.sender) + self.total_attendees += 1 + + box = algopy.Box(algopy.UInt64, key=algopy.Txn.sender.bytes) + has_claimed = bool(box) + assert not has_claimed, "Already claimed POA" + + box.value = minted_asset.id + + @algopy.arc4.abimethod() + def confirm_attendance_with_box_ref(self) -> None: + assert self.total_attendees < self.max_attendees, "Max attendees reached" + + minted_asset = self._mint_poa(algopy.Txn.sender) + self.total_attendees += 1 + + box_ref = algopy.BoxRef(key=algopy.Txn.sender.bytes) + has_claimed = bool(box_ref) + assert not has_claimed, "Already claimed POA" + + box_ref.put(algopy.op.itob(minted_asset.id)) + + @algopy.arc4.abimethod() + def confirm_attendance_with_box_map(self) -> None: + assert self.total_attendees < self.max_attendees, "Max attendees reached" + + minted_asset = self._mint_poa(algopy.Txn.sender) + self.total_attendees += 1 + + has_claimed = algopy.Txn.sender.bytes in self.box_map + assert not has_claimed, "Already claimed POA" + + self.box_map[algopy.Txn.sender.bytes] = minted_asset.id + @algopy.arc4.abimethod(readonly=True) def get_poa_id(self) -> algopy.UInt64: poa_id, exists = algopy.op.Box.get(algopy.Txn.sender.bytes) assert exists, "POA not found" return algopy.op.btoi(poa_id) + @algopy.arc4.abimethod(readonly=True) + def get_poa_id_with_box(self) -> algopy.UInt64: + box = algopy.Box(algopy.UInt64, key=algopy.Txn.sender.bytes) + poa_id, exists = box.maybe() + assert exists, "POA not found" + return poa_id + + @algopy.arc4.abimethod(readonly=True) + def get_poa_id_with_box_ref(self) -> algopy.UInt64: + box_ref = algopy.BoxRef(key=algopy.Txn.sender.bytes) + poa_id, exists = box_ref.maybe() + assert exists, "POA not found" + return algopy.op.btoi(poa_id) + + @algopy.arc4.abimethod(readonly=True) + def get_poa_id_with_box_map(self) -> algopy.UInt64: + poa_id, exists = self.box_map.maybe(algopy.Txn.sender.bytes) + assert exists, "POA not found" + return poa_id + @algopy.arc4.abimethod() def claim_poa(self, opt_in_txn: algopy.gtxn.AssetTransferTransaction) -> None: poa_id, exists = algopy.op.Box.get(algopy.Txn.sender.bytes) @@ -49,6 +108,65 @@ def claim_poa(self, opt_in_txn: algopy.gtxn.AssetTransferTransaction) -> None: algopy.op.btoi(poa_id), ) + @algopy.arc4.abimethod() + def claim_poa_with_box(self, opt_in_txn: algopy.gtxn.AssetTransferTransaction) -> None: + box = algopy.Box(algopy.UInt64, key=algopy.Txn.sender.bytes) + poa_id, exists = box.maybe() + assert exists, "POA not found, attendance validation failed!" + assert opt_in_txn.xfer_asset.id == poa_id, "POA ID mismatch" + assert opt_in_txn.fee == algopy.UInt64(0), "We got you covered for free!" + assert opt_in_txn.asset_amount == algopy.UInt64(0) + assert ( + opt_in_txn.sender == opt_in_txn.asset_receiver == algopy.Txn.sender + ), "Opt-in transaction sender and receiver must be the same" + assert ( + opt_in_txn.asset_close_to == opt_in_txn.rekey_to == algopy.Global.zero_address + ), "Opt-in transaction close to must be zero address" + + self._send_poa( + algopy.Txn.sender, + poa_id, + ) + + @algopy.arc4.abimethod() + def claim_poa_with_box_ref(self, opt_in_txn: algopy.gtxn.AssetTransferTransaction) -> None: + box_ref = algopy.BoxRef(key=algopy.Txn.sender.bytes) + poa_id, exists = box_ref.maybe() + assert exists, "POA not found, attendance validation failed!" + assert opt_in_txn.xfer_asset.id == algopy.op.btoi(poa_id), "POA ID mismatch" + assert opt_in_txn.fee == algopy.UInt64(0), "We got you covered for free!" + assert opt_in_txn.asset_amount == algopy.UInt64(0) + assert ( + opt_in_txn.sender == opt_in_txn.asset_receiver == algopy.Txn.sender + ), "Opt-in transaction sender and receiver must be the same" + assert ( + opt_in_txn.asset_close_to == opt_in_txn.rekey_to == algopy.Global.zero_address + ), "Opt-in transaction close to must be zero address" + + self._send_poa( + algopy.Txn.sender, + algopy.op.btoi(poa_id), + ) + + @algopy.arc4.abimethod() + def claim_poa_with_box_map(self, opt_in_txn: algopy.gtxn.AssetTransferTransaction) -> None: + poa_id, exists = self.box_map.maybe(algopy.Txn.sender.bytes) + assert exists, "POA not found, attendance validation failed!" + assert opt_in_txn.xfer_asset.id == poa_id, "POA ID mismatch" + assert opt_in_txn.fee == algopy.UInt64(0), "We got you covered for free!" + assert opt_in_txn.asset_amount == algopy.UInt64(0) + assert ( + opt_in_txn.sender == opt_in_txn.asset_receiver == algopy.Txn.sender + ), "Opt-in transaction sender and receiver must be the same" + assert ( + opt_in_txn.asset_close_to == opt_in_txn.rekey_to == algopy.Global.zero_address + ), "Opt-in transaction close to must be zero address" + + self._send_poa( + algopy.Txn.sender, + poa_id, + ) + @algopy.subroutine def _mint_poa(self, claimer: algopy.Account) -> algopy.Asset: algopy.ensure_budget(algopy.UInt64(10000), algopy.OpUpFeeSource.AppAccount) diff --git a/examples/proof_of_attendance/test_contract.py b/examples/proof_of_attendance/test_contract.py index 4b6d11a..3c4de3d 100644 --- a/examples/proof_of_attendance/test_contract.py +++ b/examples/proof_of_attendance/test_contract.py @@ -27,21 +27,46 @@ def test_init(context: AlgopyTestContext) -> None: assert contract.max_attendees == max_attendees +@pytest.mark.parametrize( + ("confirm_attendance", "key_prefix"), + [ + ("confirm_attendance", b""), + ("confirm_attendance_with_box", b""), + ("confirm_attendance_with_box_ref", b""), + ("confirm_attendance_with_box_map", b"box_map"), + ], +) def test_confirm_attendance( context: AlgopyTestContext, + confirm_attendance: str, + key_prefix: bytes, ) -> None: # Arrange contract = ProofOfAttendance() contract.max_attendees = context.any_uint64(1, 100) # Act - contract.confirm_attendance() + confirm = getattr(contract, confirm_attendance) + confirm() # Assert - assert context.get_box(context.default_creator.bytes) == algopy.op.itob(1) - - -def test_claim_poa(context: AlgopyTestContext) -> None: + assert context.get_box(key_prefix + context.default_creator.bytes) == algopy.op.itob(1) + + +@pytest.mark.parametrize( + ("claim_poa", "key_prefix"), + [ + ("claim_poa", b""), + ("claim_poa_with_box", b""), + ("claim_poa_with_box_ref", b""), + ("claim_poa_with_box_map", b"box_map"), + ], +) +def test_claim_poa( + context: AlgopyTestContext, + claim_poa: str, + key_prefix: bytes, +) -> None: # Arrange contract = ProofOfAttendance() dummy_poa = context.any_asset() @@ -54,10 +79,11 @@ def test_claim_poa(context: AlgopyTestContext) -> None: fee=algopy.UInt64(0), asset_amount=algopy.UInt64(0), ) - context.set_box(context.default_creator.bytes, algopy.op.itob(dummy_poa.id)) + context.set_box(key_prefix + context.default_creator.bytes, algopy.op.itob(dummy_poa.id)) # Act - contract.claim_poa(opt_in_txn) + claim = getattr(contract, claim_poa) + claim(opt_in_txn) # Assert axfer_itxn = context.get_submitted_itxn_group(-1).asset_transfer(0) diff --git a/src/algopy/__init__.py b/src/algopy/__init__.py index 8cbde25..f457559 100644 --- a/src/algopy/__init__.py +++ b/src/algopy/__init__.py @@ -10,11 +10,14 @@ GTxn, ITxn, LogicSig, + StateTotals, TemplateVar, Txn, logicsig, + uenumerate, urange, ) +from algopy_testing.models.box import Box, BoxMap, BoxRef from algopy_testing.primitives import BigUInt, Bytes, String, UInt64 from algopy_testing.protocols import BytesBacked from algopy_testing.state import GlobalState, LocalState @@ -55,4 +58,7 @@ "subroutine", "uenumerate", "urange", + "Box", + "BoxRef", + "BoxMap", ] diff --git a/src/algopy_testing/context.py b/src/algopy_testing/context.py index b3b1a68..f45fec7 100644 --- a/src/algopy_testing/context.py +++ b/src/algopy_testing/context.py @@ -285,7 +285,7 @@ def __init__( self._scratch_spaces: dict[str, list[algopy.Bytes | algopy.UInt64 | bytes | int]] = {} self._template_vars: dict[str, Any] = template_vars or {} self._blocks: dict[int, dict[str, int]] = {} - self._boxes: dict[bytes, algopy.Bytes] = {} + self._boxes: dict[bytes, bytes] = {} self._lsigs: dict[algopy.LogicSig, Callable[[], algopy.UInt64 | bool]] = {} self._active_lsig_args: Sequence[algopy.Bytes] = [] @@ -1047,20 +1047,23 @@ def any_transaction( # type: ignore[misc] return new_txn - def get_box(self, name: algopy.Bytes | bytes) -> algopy.Bytes: + def does_box_exist(self, name: algopy.Bytes | bytes) -> bool: + """return true if the box with the given name exists.""" + name_bytes = name if isinstance(name, bytes) else name.value + return name_bytes in self._boxes + + def get_box(self, name: algopy.Bytes | bytes) -> bytes: """Get the content of a box.""" - import algopy name_bytes = name if isinstance(name, bytes) else name.value - return self._boxes.get(name_bytes, algopy.Bytes(b"")) + return self._boxes.get(name_bytes, b"") def set_box(self, name: algopy.Bytes | bytes, content: algopy.Bytes | bytes) -> None: """Set the content of a box.""" - import algopy name_bytes = name if isinstance(name, bytes) else name.value content_bytes = content if isinstance(content, bytes) else content.value - self._boxes[name_bytes] = algopy.Bytes(content_bytes) + self._boxes[name_bytes] = content_bytes def execute_logicsig( self, lsig: algopy.LogicSig, lsig_args: Sequence[algopy.Bytes] | None = None @@ -1071,12 +1074,14 @@ def execute_logicsig( self._lsigs[lsig] = lsig.func return lsig.func() - def clear_box(self, name: algopy.Bytes | bytes) -> None: + def clear_box(self, name: algopy.Bytes | bytes) -> bool: """Clear the content of a box.""" name_bytes = name if isinstance(name, bytes) else name.value if name_bytes in self._boxes: del self._boxes[name_bytes] + return True + return False def clear_all_boxes(self) -> None: """Clear all boxes.""" @@ -1170,7 +1175,6 @@ def reset(self) -> None: self._app_id = iter(range(1, 2**64)) -# _var: ContextVar[AlgopyTestContext] = ContextVar("_var") diff --git a/src/algopy_testing/enums.py b/src/algopy_testing/enums.py index f17a3ba..4b7c5fd 100644 --- a/src/algopy_testing/enums.py +++ b/src/algopy_testing/enums.py @@ -1,22 +1,42 @@ -from enum import Enum, IntEnum, StrEnum +from __future__ import annotations +from enum import Enum, StrEnum -class OnCompleteAction(Enum): - NoOp = 0 - OptIn = 1 - CloseOut = 2 - ClearState = 3 - UpdateApplication = 4 - DeleteApplication = 5 +from algopy_testing.primitives import UInt64 -class TransactionType(IntEnum): - Payment = 0 - KeyRegistration = 1 - AssetConfig = 2 - AssetTransfer = 3 - AssetFreeze = 4 - ApplicationCall = 5 +class OnCompleteAction(UInt64): + NoOp: OnCompleteAction + OptIn: OnCompleteAction + CloseOut: OnCompleteAction + ClearState: OnCompleteAction + UpdateApplication: OnCompleteAction + DeleteApplication: OnCompleteAction + + +OnCompleteAction.NoOp = OnCompleteAction(0) +OnCompleteAction.OptIn = OnCompleteAction(1) +OnCompleteAction.CloseOut = OnCompleteAction(2) +OnCompleteAction.ClearState = OnCompleteAction(3) +OnCompleteAction.UpdateApplication = OnCompleteAction(4) +OnCompleteAction.DeleteApplication = OnCompleteAction(5) + + +class TransactionType(UInt64): + Payment: TransactionType + KeyRegistration: TransactionType + AssetConfig: TransactionType + AssetTransfer: TransactionType + AssetFreeze: TransactionType + ApplicationCall: TransactionType + + +TransactionType.Payment = TransactionType(0) +TransactionType.KeyRegistration = TransactionType(1) +TransactionType.AssetConfig = TransactionType(2) +TransactionType.AssetTransfer = TransactionType(3) +TransactionType.AssetFreeze = TransactionType(4) +TransactionType.ApplicationCall = TransactionType(5) class ECDSA(Enum): diff --git a/src/algopy_testing/models/box.py b/src/algopy_testing/models/box.py index fb05738..fc7e498 100644 --- a/src/algopy_testing/models/box.py +++ b/src/algopy_testing/models/box.py @@ -1,141 +1,258 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import typing +from algopy_testing.constants import MAX_BOX_SIZE from algopy_testing.context import get_test_context +from algopy_testing.utils import as_bytes, as_string -if TYPE_CHECKING: +_TKey = typing.TypeVar("_TKey") +_TValue = typing.TypeVar("_TValue") + +if typing.TYPE_CHECKING: import algopy -class Box: - @staticmethod - def create(a: algopy.Bytes | bytes, b: algopy.UInt64 | int, /) -> bool: +class Box(typing.Generic[_TValue]): + """ + Box abstracts the reading and writing of a single value to a single box. + The box size will be reconfigured dynamically to fit the size of the value being assigned to + it. + """ + + def __init__( + self, type_: type[_TValue], /, *, key: bytes | str | algopy.Bytes | algopy.String = "" + ) -> None: import algopy + self._type = type_ + + self._key = ( + algopy.String(as_string(key)).bytes + if isinstance(key, str | algopy.String) + else algopy.Bytes(as_bytes(key)) + ) + + def __bool__(self) -> bool: + """ + Returns True if the box exists, regardless of the truthiness of the contents + of the box + """ context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - size = int(b) - if not name_bytes or size > 32768: - raise ValueError("Invalid box name or size") - if context.get_box(name_bytes): - return False - context.set_box(name_bytes, b"\x00" * size) - return True + return context.does_box_exist(self.key) - @staticmethod - def delete(a: algopy.Bytes | bytes, /) -> bool: - import algopy + @property + def key(self) -> algopy.Bytes: + """Provides access to the raw storage key""" + if not self._key: + raise RuntimeError("Box key is empty") + return self._key + @property + def value(self) -> _TValue: + """Retrieve the contents of the box. Fails if the box has not been created.""" context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - if context.get_box(name_bytes): - context.clear_box(name_bytes) - return True - return False + if not context.does_box_exist(self.key): + raise RuntimeError("Box has not been created") + return _cast_to_value_type(self._type, context.get_box(self.key)) - @staticmethod - def extract( - a: algopy.Bytes | bytes, b: algopy.UInt64 | int, c: algopy.UInt64 | int, / - ) -> algopy.Bytes: - import algopy + @value.setter + def value(self, value: _TValue) -> None: + """Write _value_ to the box. Creates the box if it does not exist.""" + context = get_test_context() + bytes_value = _cast_to_bytes(value) + context.set_box(self.key, bytes_value) + @value.deleter + def value(self) -> None: + """Delete the box""" context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - start = int(b) - length = int(c) - box_content = context.get_box(name_bytes) - if not box_content: - raise ValueError("Box does not exist") - return box_content[start : start + length] + context.clear_box(self.key) - @staticmethod - def get(a: algopy.Bytes | bytes, /) -> tuple[algopy.Bytes, bool]: - import algopy + def get(self, *, default: _TValue) -> _TValue: + """ + Retrieve the contents of the box, or return the default value if the box has not been + created. + + :arg default: The default value to return if the box has not been created + """ + box_content, box_exists = self.maybe() + return default if not box_exists else box_content + def maybe(self) -> tuple[_TValue, bool]: + """ + Retrieve the contents of the box if it exists, and return a boolean indicating if the box + exists. + + """ context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - box_content = context.get_box(name_bytes) - return box_content, bool(box_content) + box_exists = context.does_box_exist(self.key) + box_content_bytes = context.get_box(self.key) + box_content = _cast_to_value_type(self._type, box_content_bytes) + return box_content, box_exists - @staticmethod - def length(a: algopy.Bytes | bytes, /) -> tuple[algopy.UInt64, bool]: + @property + def length(self) -> algopy.UInt64: + """ + Get the length of this Box. Fails if the box does not exist + """ import algopy context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - box_content = context.get_box(name_bytes) - return algopy.UInt64(len(box_content)), bool(box_content) + if not context.does_box_exist(self.key): + raise RuntimeError("Box has not been created") + return algopy.UInt64(len(context.get_box(self.key))) + - @staticmethod - def put(a: algopy.Bytes | bytes, b: algopy.Bytes | bytes, /) -> None: +class BoxRef: + """ + BoxRef abstracts the reading and writing of boxes containing raw binary data. The size is + configured manually, and can be set to values larger than what the AVM can handle in a single + value. + """ + + def __init__(self, /, *, key: bytes | str | algopy.Bytes | algopy.String = "") -> None: import algopy + self._key = ( + algopy.String(as_string(key)).bytes + if isinstance(key, str | algopy.String) + else algopy.Bytes(as_bytes(key)) + ) + + def __bool__(self) -> bool: + """Returns True if the box has a value set, regardless of the truthiness of that value""" context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - content = b.value if isinstance(b, algopy.Bytes) else b - existing_content = context.get_box(name_bytes) - if existing_content and len(existing_content) != len(content): - raise ValueError("New content length does not match existing box length") - context.set_box(name_bytes, content) + return context.does_box_exist(self.key) - @staticmethod - def replace( - a: algopy.Bytes | bytes, b: algopy.UInt64 | int, c: algopy.Bytes | bytes, / - ) -> None: - import algopy + @property + def key(self) -> algopy.Bytes: + """Provides access to the raw storage key""" + if not self._key: + raise RuntimeError("Box key is empty") + + return self._key + + def create(self, *, size: algopy.UInt64 | int) -> bool: + """ + Creates a box with the specified size, setting all bits to zero. Fails if the box already + exists with a different size. Fails if the specified size is greater than the max box size + (32,768) + + Returns True if the box was created, False if the box already existed + """ + size_int = int(size) + if size_int > MAX_BOX_SIZE: + raise ValueError(f"Box size cannot exceed {MAX_BOX_SIZE}") + + box_content, box_exists = self._maybe() + if box_exists and len(box_content) != size_int: + raise ValueError("Box already exists with a different size") + if box_exists: + return False + context = get_test_context() + context.set_box(self.key, b"\x00" * size_int) + return True + def delete(self) -> bool: + """ + Deletes the box if it exists and returns a value indicating if the box existed + """ context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - start = int(b) - new_content = c.value if isinstance(c, algopy.Bytes) else c - box_content = context.get_box(name_bytes) - if not box_content: - raise ValueError("Box does not exist") - if start + len(new_content) > len(box_content): - raise ValueError("Replacement content exceeds box size") - updated_content = ( - box_content[:start] + new_content + box_content[start + len(new_content) :] - ) - context.set_box(name_bytes, updated_content) + return context.clear_box(self.key) - @staticmethod - def resize(a: algopy.Bytes | bytes, b: algopy.UInt64 | int, /) -> None: + def extract( + self, start_index: algopy.UInt64 | int, length: algopy.UInt64 | int + ) -> algopy.Bytes: + """ + Extract a slice of bytes from the box. + + Fails if the box does not exist, or if `start_index + length > len(box)` + + :arg start_index: The offset to start extracting bytes from + :arg length: The number of bytes to extract + """ import algopy + box_content, box_exists = self._maybe() + start_int = int(start_index) + length_int = int(length) + if not box_exists: + raise RuntimeError("Box has not been created") + if (start_int + length_int) > len(box_content): + raise ValueError("Index out of bounds") + result = box_content[start_int : start_int + length_int] + return algopy.Bytes(result) + + def resize(self, new_size: algopy.UInt64 | int) -> None: + """ + Resizes the box the specified `new_size`. Truncating existing data if the new value is + shorter or padding with zero bytes if it is longer. + + :arg new_size: The new size of the box + """ context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - new_size = int(b) - if not name_bytes or new_size > 32768: - raise ValueError("Invalid box name or size") - box_content = context.get_box(name_bytes) - if not box_content: - raise ValueError("Box does not exist") - if new_size > len(box_content): - updated_content = box_content + b"\x00" * (new_size - len(box_content)) + new_size_int = int(new_size) + + if new_size_int > MAX_BOX_SIZE: + raise ValueError(f"Box size cannot exceed {MAX_BOX_SIZE}") + box_content, box_exists = self._maybe() + if not box_exists: + raise RuntimeError("Box has not been created") + if new_size_int > len(box_content): + updated_content = box_content + b"\x00" * (new_size_int - len(box_content)) else: - updated_content = box_content[:new_size] - context.set_box(name_bytes, updated_content) + updated_content = box_content[:new_size_int] + context.set_box(self.key, updated_content) + + def replace(self, start_index: algopy.UInt64 | int, value: algopy.Bytes | bytes) -> None: + """ + Write `value` to the box starting at `start_index`. Fails if the box does not exist, + or if `start_index + len(value) > len(box)` + + :arg start_index: The offset to start writing bytes from + :arg value: The bytes to be written + """ + context = get_test_context() + box_content, box_exists = self._maybe() + if not box_exists: + raise RuntimeError("Box has not been created") + start = int(start_index) + length = len(value) + if (start + length) > len(box_content): + raise ValueError("Replacement content exceeds box size") + updated_content = box_content[:start] + value + box_content[start + length :] + context.set_box(self.key, updated_content) - @staticmethod def splice( - a: algopy.Bytes | bytes, - b: algopy.UInt64 | int, - c: algopy.UInt64 | int, - d: algopy.Bytes | bytes, - /, + self, + start_index: algopy.UInt64 | int, + length: algopy.UInt64 | int, + value: algopy.Bytes | bytes, ) -> None: + """ + set box to contain its previous bytes up to index `start_index`, followed by `bytes`, + followed by the original bytes of the box that began at index `start_index + length` + + **Important: This op does not resize the box** + If the new value is longer than the box size, it will be truncated. + If the new value is shorter than the box size, it will be padded with zero bytes + + :arg start_index: The index to start inserting `value` + :arg length: The number of bytes after `start_index` to omit from the new value + :arg value: The `value` to be inserted. + """ import algopy context = get_test_context() - name_bytes = a.value if isinstance(a, algopy.Bytes) else a - start = int(b) - delete_count = int(c) - insert_content = d.value if isinstance(d, algopy.Bytes) else d - box_content = context.get_box(name_bytes) + box_content, box_exists = self._maybe() + + start = int(start_index) + delete_count = int(length) + insert_content = value.value if isinstance(value, algopy.Bytes) else value - if not box_content: - raise ValueError("Box does not exist") + if not box_exists: + raise RuntimeError("Box has not been created") if start > len(box_content): raise ValueError("Start index exceeds box size") @@ -155,4 +272,249 @@ def splice( new_content += b"\x00" * (len(box_content) - len(new_content)) # Update the box with the new content - context.set_box(name_bytes, new_content) + context.set_box(self.key, new_content) + + def get(self, *, default: algopy.Bytes | bytes) -> algopy.Bytes: + """ + Retrieve the contents of the box, or return the default value if the box has not been + created. + + :arg default: The default value to return if the box has not been created + """ + import algopy + + box_content, box_exists = self._maybe() + default_bytes = default if isinstance(default, algopy.Bytes) else algopy.Bytes(default) + return default_bytes if not box_exists else algopy.Bytes(box_content) + + def put(self, value: algopy.Bytes | bytes) -> None: + """ + Replaces the contents of box with value. Fails if box exists and len(box) != len(value). + Creates box if it does not exist + + :arg value: The value to write to the box + """ + import algopy + + box_content, box_exists = self._maybe() + if box_exists and len(box_content) != len(value): + raise ValueError("Box already exists with a different size") + + context = get_test_context() + content = value if isinstance(value, algopy.Bytes) else algopy.Bytes(value) + context.set_box(self.key, content) + + def maybe(self) -> tuple[algopy.Bytes, bool]: + """ + Retrieve the contents of the box if it exists, and return a boolean indicating if the box + exists. + """ + import algopy + + box_content, box_exists = self._maybe() + return algopy.Bytes(box_content), box_exists + + def _maybe(self) -> tuple[bytes, bool]: + context = get_test_context() + box_exists = context.does_box_exist(self.key) + box_content = context.get_box(self.key) + return box_content, box_exists + + @property + def length(self) -> algopy.UInt64: + """ + Get the length of this Box. Fails if the box does not exist + """ + import algopy + + box_content, box_exists = self._maybe() + if not box_exists: + raise RuntimeError("Box has not been created") + return algopy.UInt64(len(box_content)) + + +class BoxMap(typing.Generic[_TKey, _TValue]): + """ + BoxMap abstracts the reading and writing of a set of boxes using a common key and content type. + Each composite key (prefix + key) still needs to be made available to the application via the + `boxes` property of the Transaction. + """ + + def __init__( + self, + key_type: type[_TKey], + value_type: type[_TValue], + /, + *, + key_prefix: bytes | str | algopy.Bytes | algopy.String = "", + ) -> None: + """Declare a box map. + + :arg key_type: The type of the keys + :arg value_type: The type of the values + :arg key_prefix: The value used as a prefix to key data, can be empty. + When the BoxMap is being assigned to a member variable, + this argument is optional and defaults to the member variable name, + and if a custom value is supplied it must be static. + """ + import algopy + + self._key_type = key_type + self._value_type = value_type + self._key_prefix = ( + algopy.String(as_string(key_prefix)).bytes + if isinstance(key_prefix, str | algopy.String) + else algopy.Bytes(as_bytes(key_prefix)) + ) + + @property + def key_prefix(self) -> algopy.Bytes: + """Provides access to the raw storage key-prefix""" + if not self._key_prefix: + raise RuntimeError("Box key prefix is empty") + return self._key_prefix + + def __getitem__(self, key: _TKey) -> _TValue: + """ + Retrieve the contents of a keyed box. Fails if the box for the key has not been created. + """ + box_content, box_exists = self.maybe(key) + if not box_exists: + raise RuntimeError("Box has not been created") + return box_content + + def __setitem__(self, key: _TKey, value: _TValue) -> None: + """Write _value_ to a keyed box. Creates the box if it does not exist""" + context = get_test_context() + key_bytes = self._full_key(key) + bytes_value = _cast_to_bytes(value) + context.set_box(key_bytes, bytes_value) + + def __delitem__(self, key: _TKey) -> None: + """Deletes a keyed box""" + context = get_test_context() + key_bytes = self._full_key(key) + context.clear_box(key_bytes) + + def __contains__(self, key: _TKey) -> bool: + """ + Returns True if a box with the specified key exists in the map, regardless of the + truthiness of the contents of the box + """ + context = get_test_context() + key_bytes = self._full_key(key) + return context.does_box_exist(key_bytes) + + def get(self, key: _TKey, *, default: _TValue) -> _TValue: + """ + Retrieve the contents of a keyed box, or return the default value if the box has not been + created. + + :arg key: The key of the box to get + :arg default: The default value to return if the box has not been created. + """ + box_content, box_exists = self.maybe(key) + return default if not box_exists else box_content + + def maybe(self, key: _TKey) -> tuple[_TValue, bool]: + """ + Retrieve the contents of a keyed box if it exists, and return a boolean indicating if the + box exists. + + :arg key: The key of the box to get + """ + context = get_test_context() + key_bytes = self._full_key(key) + box_exists = context.does_box_exist(key_bytes) + box_content_bytes = context.get_box(key_bytes) + box_content = _cast_to_value_type(self._value_type, box_content_bytes) + return box_content, box_exists + + def length(self, key: _TKey) -> algopy.UInt64: + """ + Get the length of an item in this BoxMap. Fails if the box does not exist + + :arg key: The key of the box to get + """ + import algopy + + context = get_test_context() + key_bytes = self._full_key(key) + box_exists = context.does_box_exist(key_bytes) + if not box_exists: + raise RuntimeError("Box has not been created") + box_content_bytes = context.get_box(key_bytes) + return algopy.UInt64(len(box_content_bytes)) + + def _full_key(self, key: _TKey) -> algopy.Bytes: + return self.key_prefix + _cast_to_bytes(key) + + +def _cast_to_value_type(t: type[_TValue], value: bytes) -> _TValue: # noqa: PLR0911 + """ + assuming _TValue to be one of the followings: + - bool, + - algopy.Bytes, + - algopy.UInt64 + - algopy.Asset, + - algopy.Application, + - algopy.UInt64 enums + - algopy.arc4.Struct + - algopy_testing.BytesBacked + - any type with `from_bytes` class method and `bytes` property + - .e.g algopy.String, algopy.Address, algopy.arc4.DynamicArray etc. + """ + import algopy + + context = get_test_context() + + if t is bool: + return algopy.op.btoi(value) == 1 # type: ignore[return-value] + elif t is algopy.Bytes: + return algopy.Bytes(value) # type: ignore[return-value] + elif t is algopy.UInt64: + return algopy.op.btoi(value) # type: ignore[return-value] + elif t is algopy.OnCompleteAction: + return algopy.OnCompleteAction(algopy.op.btoi(value).value) # type: ignore[return-value] + elif t is algopy.TransactionType: + return algopy.TransactionType(algopy.op.btoi(value).value) # type: ignore[return-value] + elif t is algopy.Asset: + asset_id = algopy.op.btoi(value) + return context.get_asset(asset_id) # type: ignore[return-value] + elif t is algopy.Application: + application_id = algopy.op.btoi(value) + return context.get_application(application_id) # type: ignore[return-value] + elif hasattr(t, "from_bytes"): + return t.from_bytes(value) # type: ignore[attr-defined, no-any-return] + + raise ValueError(f"Unsupported type: {t}") + + +def _cast_to_bytes(value: _TValue) -> algopy.Bytes: + """ + assuming _TValue to be one of the followings: + - bool, + - algopy.Bytes, + - algopy.UInt64 + - algopy.Asset, + - algopy.Application, + - algopy.UInt64 enums + - algopy.arc4.Struct + - algopy_testing.BytesBacked + - any type with `from_bytes` class method and `bytes` property + - .e.g algopy.String, algopy.Address, algopy.arc4.DynamicArray etc. + """ + import algopy + + if isinstance(value, bool): + return algopy.op.itob(1 if value else 0) + elif isinstance(value, algopy.Bytes): + return value + elif isinstance(value, algopy.UInt64): + return algopy.op.itob(value) + elif isinstance(value, algopy.Asset | algopy.Application): + return algopy.op.itob(value.id) + elif hasattr(value, "bytes"): + return typing.cast(algopy.Bytes, value.bytes) + + raise ValueError(f"Unsupported type: {type(value)}") diff --git a/src/algopy_testing/models/contract.py b/src/algopy_testing/models/contract.py index 96717f5..db7c7ec 100644 --- a/src/algopy_testing/models/contract.py +++ b/src/algopy_testing/models/contract.py @@ -69,6 +69,18 @@ def wrapper(*args: Any, **kwargs: dict[str, Any]) -> Any: return wrapper return attr + def __setattr__(self, name: str, value: Any) -> None: + import algopy + + name_bytes = algopy.String(name).bytes + match value: + case algopy.Box() | algopy.BoxRef() | algopy.GlobalState() | algopy.LocalState(): + value._key = name_bytes + case algopy.BoxMap(): + value._key_prefix = name_bytes + + super().__setattr__(name, value) + class ARC4Contract(Contract): @final diff --git a/src/algopy_testing/op.py b/src/algopy_testing/op.py index 2ffde05..4ccf0af 100644 --- a/src/algopy_testing/op.py +++ b/src/algopy_testing/op.py @@ -22,12 +22,13 @@ from algopy_testing.constants import ( BITS_IN_BYTE, DEFAULT_ACCOUNT_MIN_BALANCE, + MAX_BOX_SIZE, MAX_BYTES_SIZE, MAX_UINT64, ) +from algopy_testing.context import get_test_context from algopy_testing.enums import EC, ECDSA, Base64, VrfVerify from algopy_testing.models.block import Block -from algopy_testing.models.box import Box from algopy_testing.models.gitxn import GITxn from algopy_testing.models.global_values import Global from algopy_testing.models.gtxn import GTxn @@ -1023,6 +1024,160 @@ def __getattr__(self, __name: str) -> Any: EllipticCurve = _EllipticCurve() + +class Box: + @staticmethod + def create(a: algopy.Bytes | bytes, b: algopy.UInt64 | int, /) -> bool: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + size = int(b) + if not name_bytes or size > MAX_BOX_SIZE: + raise ValueError("Invalid box name or size") + if context.get_box(name_bytes): + return False + context.set_box(name_bytes, b"\x00" * size) + return True + + @staticmethod + def delete(a: algopy.Bytes | bytes, /) -> bool: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + if context.get_box(name_bytes): + context.clear_box(name_bytes) + return True + return False + + @staticmethod + def extract( + a: algopy.Bytes | bytes, b: algopy.UInt64 | int, c: algopy.UInt64 | int, / + ) -> algopy.Bytes: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + start = int(b) + length = int(c) + box_content = context.get_box(name_bytes) + if not box_content: + raise RuntimeError("Box does not exist") + result = box_content[start : start + length] + return algopy.Bytes(result) + + @staticmethod + def get(a: algopy.Bytes | bytes, /) -> tuple[algopy.Bytes, bool]: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + box_content = algopy.Bytes(context.get_box(name_bytes)) + box_exists = context.does_box_exist(name_bytes) + return box_content, box_exists + + @staticmethod + def length(a: algopy.Bytes | bytes, /) -> tuple[algopy.UInt64, bool]: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + box_content = context.get_box(name_bytes) + box_exists = context.does_box_exist(name_bytes) + return algopy.UInt64(len(box_content)), box_exists + + @staticmethod + def put(a: algopy.Bytes | bytes, b: algopy.Bytes | bytes, /) -> None: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + content = b.value if isinstance(b, algopy.Bytes) else b + existing_content = context.get_box(name_bytes) + if existing_content and len(existing_content) != len(content): + raise ValueError("New content length does not match existing box length") + context.set_box(name_bytes, algopy.Bytes(content)) + + @staticmethod + def replace( + a: algopy.Bytes | bytes, b: algopy.UInt64 | int, c: algopy.Bytes | bytes, / + ) -> None: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + start = int(b) + new_content = c.value if isinstance(c, algopy.Bytes) else c + box_content = context.get_box(name_bytes) + if not box_content: + raise RuntimeError("Box does not exist") + if start + len(new_content) > len(box_content): + raise ValueError("Replacement content exceeds box size") + updated_content = ( + box_content[:start] + new_content + box_content[start + len(new_content) :] + ) + context.set_box(name_bytes, updated_content) + + @staticmethod + def resize(a: algopy.Bytes | bytes, b: algopy.UInt64 | int, /) -> None: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + new_size = int(b) + if not name_bytes or new_size > MAX_BOX_SIZE: + raise ValueError("Invalid box name or size") + box_content = context.get_box(name_bytes) + if not box_content: + raise RuntimeError("Box does not exist") + if new_size > len(box_content): + updated_content = box_content + b"\x00" * (new_size - len(box_content)) + else: + updated_content = box_content[:new_size] + context.set_box(name_bytes, updated_content) + + @staticmethod + def splice( + a: algopy.Bytes | bytes, + b: algopy.UInt64 | int, + c: algopy.UInt64 | int, + d: algopy.Bytes | bytes, + /, + ) -> None: + import algopy + + context = get_test_context() + name_bytes = a.value if isinstance(a, algopy.Bytes) else a + start = int(b) + delete_count = int(c) + insert_content = d.value if isinstance(d, algopy.Bytes) else d + box_content = context.get_box(name_bytes) + + if not box_content: + raise RuntimeError("Box does not exist") + + if start > len(box_content): + raise ValueError("Start index exceeds box size") + + # Calculate the end index for deletion + end = min(start + delete_count, len(box_content)) + + # Construct the new content + new_content = box_content[:start] + insert_content + box_content[end:] + + # Adjust the size if necessary + if len(new_content) > len(box_content): + # Truncate if the new content is too long + new_content = new_content[: len(box_content)] + elif len(new_content) < len(box_content): + # Pad with zeros if the new content is too short + new_content += b"\x00" * (len(box_content) - len(new_content)) + + # Update the box with the new content + context.set_box(name_bytes, new_content) + + __all__ = [ "AcctParamsGet", "AppGlobal", diff --git a/src/algopy_testing/state/global_state.py b/src/algopy_testing/state/global_state.py index 060bc91..178dc28 100644 --- a/src/algopy_testing/state/global_state.py +++ b/src/algopy_testing/state/global_state.py @@ -3,6 +3,9 @@ import typing from typing import cast, overload +if typing.TYPE_CHECKING: + import algopy + _T = typing.TypeVar("_T") @@ -35,6 +38,8 @@ def __init__( key: bytes | str = "", description: str = "", ) -> None: + import algopy + if isinstance(type_or_value, type): self.type_ = type_or_value self._value: _T | None = None @@ -42,9 +47,21 @@ def __init__( self.type_ = type(type_or_value) self._value = type_or_value - self.key = key + match key: + case bytes(key): + self._key = algopy.Bytes(key) + case str(key): + self._key = algopy.String(key).bytes + case _: + raise ValueError("Key must be bytes or str") + self.description = description + @property + def key(self) -> algopy.Bytes: + """Provides access to the raw storage key""" + return self._key + @property def value(self) -> _T: if self._value is None: diff --git a/src/algopy_testing/state/local_state.py b/src/algopy_testing/state/local_state.py index a965aa2..67babfd 100644 --- a/src/algopy_testing/state/local_state.py +++ b/src/algopy_testing/state/local_state.py @@ -18,11 +18,24 @@ def __init__( key: bytes | str = "", description: str = "", ) -> None: + import algopy + self.type_ = type_ - self.key = key + match key: + case bytes(key): + self._key = algopy.Bytes(key) + case str(key): + self._key = algopy.String(key).bytes + case _: + raise ValueError("Key must be bytes or str") self.description = description self._state: dict[object, _T] = {} + @property + def key(self) -> algopy.Bytes: + """Provides access to the raw storage key""" + return self._key + def _validate_local_state_key(self, key: algopy.Account | algopy.UInt64 | int) -> None: from algopy import Account, UInt64 diff --git a/src/algopy_testing/utilities/__init__.py b/src/algopy_testing/utilities/__init__.py index 75162e7..88725a2 100644 --- a/src/algopy_testing/utilities/__init__.py +++ b/src/algopy_testing/utilities/__init__.py @@ -1,3 +1,4 @@ from algopy_testing.utilities.budget import OpUpFeeSource, ensure_budget +from algopy_testing.utilities.log import log __all__ = ["OpUpFeeSource", "ensure_budget", "log"] diff --git a/tests/models/test_box.py b/tests/models/test_box.py new file mode 100644 index 0000000..bc97eb1 --- /dev/null +++ b/tests/models/test_box.py @@ -0,0 +1,199 @@ +import typing +from collections.abc import Generator + +import algopy +import pytest +from algopy_testing import arc4 +from algopy_testing.context import AlgopyTestContext, algopy_testing_context +from algopy_testing.models.box import Box +from algopy_testing.primitives.biguint import BigUInt +from algopy_testing.primitives.bytes import Bytes +from algopy_testing.primitives.string import String +from algopy_testing.primitives.uint64 import UInt64 +from algopy_testing.utils import as_bytes, as_string + +BOX_NOT_CREATED_ERROR = "Box has not been created" + + +class ATestContract(algopy.Contract): + def __init__(self) -> None: + self.uint_64_box = algopy.Box(algopy.UInt64) + + +@pytest.fixture() +def context() -> Generator[AlgopyTestContext, None, None]: + with algopy_testing_context() as ctx: + yield ctx + ctx.reset() + + +def test_init_without_key( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + contract = ATestContract() + assert contract.uint_64_box.key == b"uint_64_box" + + +@pytest.mark.parametrize( + ("value_type", "key"), + [ + (UInt64, "key"), + (Bytes, b"key"), + (String, Bytes(b"key")), + (BigUInt, String("key")), + (arc4.String, "Key"), + (arc4.DynamicArray, b"Key"), + ], +) +def test_init_with_key( + context: AlgopyTestContext, # noqa: ARG001 + value_type: type, + key: bytes | str | Bytes | String, +) -> None: + box = Box(value_type, key=key) # type: ignore[var-annotated] + assert not box + assert len(box.key) > 0 + + key_bytes = ( + String(as_string(key)).bytes if isinstance(key, str | String) else Bytes(as_bytes(key)) + ) + assert box.key == key_bytes + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box.value + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box.length + + +@pytest.mark.parametrize( + ("value_type", "value"), + [ + (UInt64, UInt64(100)), + (Bytes, Bytes(b"Test")), + (String, String("Test")), + (BigUInt, BigUInt(100)), + (arc4.String, arc4.String("Test")), + (arc4.DynamicArray[arc4.UInt64], arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)])), + ], +) +def test_value_setter( + context: AlgopyTestContext, # noqa: ARG001 + value_type: type, + value: typing.Any, +) -> None: + key = b"test_key" + box = Box(value_type, key=key) # type: ignore[var-annotated] + box.value = value + + op_box_content, op_box_exists = algopy.op.Box.get(key) + op_box_length, _ = algopy.op.Box.length(key) + assert op_box_exists + assert box.length == op_box_length + + _assert_box_content_equality(value, box.value, op_box_content) + + +@pytest.mark.parametrize( + ("value_type", "value"), + [ + (UInt64, UInt64(100)), + (Bytes, Bytes(b"Test")), + (String, String("Test")), + (BigUInt, BigUInt(100)), + (arc4.String, arc4.String("Test")), + (arc4.DynamicArray[arc4.UInt64], arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)])), + ], +) +def test_value_deleter( + context: AlgopyTestContext, # noqa: ARG001 + value_type: type, + value: typing.Any, +) -> None: + key = b"test_key" + box = Box(value_type, key=key) # type: ignore[var-annotated] + box.value = value + + del box.value + assert not box + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box.value + + op_box_content, op_box_exists = algopy.op.Box.get(key) + assert not op_box_exists + assert not op_box_content + + +@pytest.mark.parametrize( + ("value_type", "value"), + [ + (UInt64, UInt64(100)), + (Bytes, Bytes(b"Test")), + (String, String("Test")), + (BigUInt, BigUInt(100)), + (arc4.String, arc4.String("Test")), + (arc4.DynamicArray[arc4.UInt64], arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)])), + ], +) +def test_maybe( + context: AlgopyTestContext, # noqa: ARG001 + value_type: type, + value: typing.Any, +) -> None: + key = b"test_key" + + box = Box(value_type, key=key) # type: ignore[var-annotated] + box.value = value + box_content, box_exists = box.maybe() + + op_box_content, op_box_exists = algopy.op.Box.get(key) + + assert box_exists + assert op_box_exists + _assert_box_content_equality(value, box_content, op_box_content) + + +@pytest.mark.parametrize( + ("value_type", "value"), + [ + (UInt64, UInt64(100)), + (Bytes, Bytes(b"Test")), + (String, String("Test")), + (BigUInt, BigUInt(100)), + (arc4.String, arc4.String("Test")), + (arc4.DynamicArray[arc4.UInt64], arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)])), + ], +) +def test_maybe_when_box_does_not_exist( + context: AlgopyTestContext, # noqa: ARG001 + value_type: type, + value: typing.Any, +) -> None: + key = b"test_key" + + box = Box(value_type, key=key) # type: ignore[var-annotated] + box.value = value + del box.value + + box_content, box_exists = box.maybe() + assert not box_content + assert not box_exists + + op_box_content, op_box_exists = algopy.op.Box.get(key) + assert not op_box_content + assert not op_box_exists + + +def _assert_box_content_equality( + expected_value: typing.Any, box_content: typing.Any, op_box_content: Bytes +) -> None: + if hasattr(expected_value, "bytes"): + assert box_content.bytes == expected_value.bytes + assert box_content.bytes == op_box_content + elif isinstance(expected_value, UInt64): + assert box_content == expected_value + assert box_content == algopy.op.btoi(op_box_content) + else: + assert box_content == expected_value + assert box_content == op_box_content diff --git a/tests/models/test_box_map.py b/tests/models/test_box_map.py new file mode 100644 index 0000000..9a694fb --- /dev/null +++ b/tests/models/test_box_map.py @@ -0,0 +1,237 @@ +import typing +from collections.abc import Generator + +import algopy +import pytest +from algopy_testing import arc4 +from algopy_testing.context import AlgopyTestContext, algopy_testing_context +from algopy_testing.models.box import BoxMap +from algopy_testing.primitives.biguint import BigUInt +from algopy_testing.primitives.bytes import Bytes +from algopy_testing.primitives.string import String +from algopy_testing.primitives.uint64 import UInt64 +from algopy_testing.utils import as_bytes, as_string + +BOX_NOT_CREATED_ERROR = "Box has not been created" + + +class ATestContract(algopy.Contract): + def __init__(self) -> None: + self.uint_64_box_map = algopy.BoxMap(algopy.UInt64, algopy.Bytes) + + +@pytest.fixture() +def context() -> Generator[AlgopyTestContext, None, None]: + with algopy_testing_context() as ctx: + yield ctx + ctx.reset() + + +def test_init_without_key_prefix( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + contract = ATestContract() + assert contract.uint_64_box_map.key_prefix == b"uint_64_box_map" + + +@pytest.mark.parametrize( + ("key_type", "value_type", "key_prefix"), + [ + (Bytes, UInt64, "key_prefix"), + (String, Bytes, b"key_prefix"), + (BigUInt, String, Bytes(b"key_prefix")), + (arc4.String, BigUInt, String("key_prefix")), + (UInt64, arc4.String, "key_prefix"), + (String, arc4.DynamicArray, b"key_prefix"), + ], +) +def test_init_with_key_prefix( + context: AlgopyTestContext, # noqa: ARG001 + key_type: type, + value_type: type, + key_prefix: bytes | str | Bytes | String, +) -> None: + box = BoxMap(key_type, value_type, key_prefix=key_prefix) # type: ignore[var-annotated] + assert len(box.key_prefix) > 0 + + key_prefix_bytes = ( + String(as_string(key_prefix)).bytes + if isinstance(key_prefix, str | String) + else Bytes(as_bytes(key_prefix)) + ) + assert box.key_prefix == key_prefix_bytes + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box[key_type()] + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box.length(key_type()) + + +@pytest.mark.parametrize( + ("key_type", "value_type", "key", "value"), + [ + (Bytes, UInt64, Bytes(b"abc"), UInt64(100)), + (String, Bytes, String("def"), Bytes(b"Test")), + (BigUInt, String, BigUInt(123), String("Test")), + (arc4.String, BigUInt, arc4.String("ghi"), BigUInt(100)), + (UInt64, arc4.String, UInt64(456), arc4.String("Test")), + ( + String, + arc4.DynamicArray[arc4.UInt64], + String("jkl"), + arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)]), + ), + ], +) +def test_value_setter( + context: AlgopyTestContext, # noqa: ARG001 + key_type: type, + value_type: type, + key: typing.Any, + value: typing.Any, +) -> None: + key_prefix = b"test_key_pefix" + box = BoxMap(key_type, value_type, key_prefix=key_prefix) # type: ignore[var-annotated] + box[key] = value + box_content = box[key] + + full_key = box._full_key(key) + op_box_content, op_box_exists = algopy.op.Box.get(full_key) + op_box_length, _ = algopy.op.Box.length(full_key) + assert op_box_exists + assert box.length(key) == op_box_length + + _assert_box_content_equality(value, box_content, op_box_content) + + +@pytest.mark.parametrize( + ("key_type", "value_type", "key", "value"), + [ + (Bytes, UInt64, Bytes(b"abc"), UInt64(100)), + (String, Bytes, String("def"), Bytes(b"Test")), + (BigUInt, String, BigUInt(123), String("Test")), + (arc4.String, BigUInt, arc4.String("ghi"), BigUInt(100)), + (UInt64, arc4.String, UInt64(456), arc4.String("Test")), + ( + String, + arc4.DynamicArray[arc4.UInt64], + String("jkl"), + arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)]), + ), + ], +) +def test_value_deleter( + context: AlgopyTestContext, # noqa: ARG001 + key_type: type, + value_type: type, + key: typing.Any, + value: typing.Any, +) -> None: + key_prefix = b"test_key_pefix" + box = BoxMap(key_type, value_type, key_prefix=key_prefix) # type: ignore[var-annotated] + box[key] = value + + del box[key] + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box[key] + + full_key = box._full_key(key) + + op_box_content, op_box_exists = algopy.op.Box.get(full_key) + assert not op_box_exists + assert not op_box_content + + +@pytest.mark.parametrize( + ("key_type", "value_type", "key", "value"), + [ + (Bytes, UInt64, Bytes(b"abc"), UInt64(100)), + (String, Bytes, String("def"), Bytes(b"Test")), + (BigUInt, String, BigUInt(123), String("Test")), + (arc4.String, BigUInt, arc4.String("ghi"), BigUInt(100)), + (UInt64, arc4.String, UInt64(456), arc4.String("Test")), + ( + String, + arc4.DynamicArray[arc4.UInt64], + String("jkl"), + arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)]), + ), + ], +) +def test_maybe( + context: AlgopyTestContext, # noqa: ARG001 + key_type: type, + value_type: type, + key: typing.Any, + value: typing.Any, +) -> None: + key_prefix = b"test_key_pefix" + box = BoxMap(key_type, value_type, key_prefix=key_prefix) # type: ignore[var-annotated] + box[key] = value + + box_content, box_exists = box.maybe(key) + + full_key = box._full_key(key) + op_box_content, op_box_exists = algopy.op.Box.get(full_key) + op_box_length, _ = algopy.op.Box.length(full_key) + assert box_exists + assert op_box_exists + assert box.length(key) == op_box_length + + _assert_box_content_equality(value, box_content, op_box_content) + + +@pytest.mark.parametrize( + ("key_type", "value_type", "key", "value"), + [ + (Bytes, UInt64, Bytes(b"abc"), UInt64(100)), + (String, Bytes, String("def"), Bytes(b"Test")), + (BigUInt, String, BigUInt(123), String("Test")), + (arc4.String, BigUInt, arc4.String("ghi"), BigUInt(100)), + (UInt64, arc4.String, UInt64(456), arc4.String("Test")), + ( + String, + arc4.DynamicArray[arc4.UInt64], + String("jkl"), + arc4.DynamicArray(*[arc4.UInt64(100), arc4.UInt64(200)]), + ), + ], +) +def test_maybe_when_box_does_not_exists( + context: AlgopyTestContext, # noqa: ARG001 + key_type: type, + value_type: type, + key: typing.Any, + value: typing.Any, +) -> None: + key_prefix = b"test_key_pefix" + box = BoxMap(key_type, value_type, key_prefix=key_prefix) # type: ignore[var-annotated] + box[key] = value + + del box[key] + + box_content, box_exists = box.maybe(key) + assert not box_content + assert not box_exists + + full_key = box._full_key(key) + + op_box_content, op_box_exists = algopy.op.Box.get(full_key) + assert not op_box_exists + assert not op_box_content + + +def _assert_box_content_equality( + expected_value: typing.Any, box_content: typing.Any, op_box_content: Bytes +) -> None: + if hasattr(expected_value, "bytes"): + assert box_content.bytes == expected_value.bytes + assert box_content.bytes == op_box_content + elif isinstance(expected_value, UInt64): + assert box_content == expected_value + assert box_content == algopy.op.btoi(op_box_content) + else: + assert box_content == expected_value + assert box_content == op_box_content diff --git a/tests/models/test_box_ref.py b/tests/models/test_box_ref.py new file mode 100644 index 0000000..2684100 --- /dev/null +++ b/tests/models/test_box_ref.py @@ -0,0 +1,412 @@ +from collections.abc import Generator + +import algopy +import pytest +from algopy_testing.constants import MAX_BOX_SIZE, MAX_BYTES_SIZE +from algopy_testing.context import AlgopyTestContext, algopy_testing_context +from algopy_testing.models.box import BoxRef +from algopy_testing.primitives.bytes import Bytes +from algopy_testing.primitives.string import String +from algopy_testing.utils import as_bytes, as_string + +TEST_BOX_KEY = b"test_key" +BOX_NOT_CREATED_ERROR = "Box has not been created" + + +class ATestContract(algopy.Contract): + def __init__(self) -> None: + self.uint_64_box_ref = algopy.BoxRef() + + +@pytest.fixture() +def context() -> Generator[AlgopyTestContext, None, None]: + with algopy_testing_context() as ctx: + yield ctx + ctx.reset() + + +def test_init_without_key( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + contract = ATestContract() + assert contract.uint_64_box_ref.key == b"uint_64_box_ref" + + +@pytest.mark.parametrize( + "key", + [ + "key", + b"key", + Bytes(b"key"), + String("key"), + ], +) +def test_init_with_key( + context: AlgopyTestContext, # noqa: ARG001 + key: bytes | str | Bytes | String, +) -> None: + box = BoxRef(key=key) + assert not box + assert len(box.key) > 0 + + key_bytes = ( + String(as_string(key)).bytes if isinstance(key, str | String) else Bytes(as_bytes(key)) + ) + assert box.key == key_bytes + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box.length + + +@pytest.mark.parametrize( + "size", + [ + 0, + 1, + 10, + MAX_BOX_SIZE, + ], +) +def test_create( + context: AlgopyTestContext, # noqa: ARG001 + size: int, +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + box.create(size=size) + + assert box.length == size + + _assert_box_value(box, b"\x00" * size) + + +def test_create_overflow( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + with pytest.raises(ValueError, match=f"Box size cannot exceed {MAX_BOX_SIZE}"): + box.create(size=MAX_BOX_SIZE + 1) + + +def test_delete( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + box.create(size=MAX_BOX_SIZE) + assert box.length == MAX_BOX_SIZE + + box_existed = box.delete() + assert box_existed + assert not box + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + _ = box.length + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + box.resize(10) + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + box.replace(0, b"\x11") + + assert not box.delete() + + +@pytest.mark.parametrize( + ("size", "new_size"), + [ + (1, 0), + (10, 1), + (MAX_BYTES_SIZE, 10), + (MAX_BOX_SIZE, MAX_BYTES_SIZE), + ], +) +def test_resize_to_smaller( + context: AlgopyTestContext, # noqa: ARG001 + size: int, + new_size: int, +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + box.create(size=size) + _initialise_box_value(box, b"\x11" * size) + + box.resize(new_size) + assert box.length == new_size + _assert_box_value(box, b"\x11" * new_size) + + +@pytest.mark.parametrize( + ("size", "new_size"), + [ + (0, 1), + (1, 10), + (10, MAX_BYTES_SIZE), + (MAX_BYTES_SIZE, MAX_BOX_SIZE), + ], +) +def test_resize_to_bigger( + context: AlgopyTestContext, # noqa: ARG001 + size: int, + new_size: int, +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + box.create(size=size) + _initialise_box_value(box, b"\x11" * size) + + box.resize(new_size) + assert box.length == new_size + + _assert_box_value(box, b"\x00" * new_size, new_size - 1) + + +def test_resize_overflow( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + box.create(size=10) + + with pytest.raises(ValueError, match=f"Box size cannot exceed {MAX_BOX_SIZE}"): + box.resize(MAX_BOX_SIZE + 1) + + +def test_replace_extract( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + box.create(size=MAX_BOX_SIZE) + box_value = b"\x01\x02" * int(MAX_BOX_SIZE / 2) + + _initialise_box_value(box, box_value) + + _assert_box_value(box, box_value) + + +def test_replace_when_box_does_not_exists( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + box.replace(0, b"\x11") + + +@pytest.mark.parametrize( + ("start", "replacement"), + [ + (0, b"\x11" * 11), + (10, b"\x11"), + (9, b"\x11" * 2), + ], +) +def test_replace_overflow( + context: AlgopyTestContext, # noqa: ARG001 + start: int, + replacement: bytes, +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + box.create(size=10) + + with pytest.raises(ValueError, match="Replacement content exceeds box size"): + box.replace(start, replacement) + + +def test_maybe( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + key = b"test_key" + + box = BoxRef(key=key) + box.create(size=10) + box_value = b"\x01\x02" * 5 + box.put(box_value) + + box_content, box_exists = box.maybe() + + op_box_content, op_box_exists = algopy.op.Box.get(key) + + assert box_exists + assert op_box_exists + assert box_content == op_box_content + assert box_content == Bytes(box_value) + + +def test_maybe_when_box_does_not_exist( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + key = b"test_key" + + box = BoxRef(key=key) + box.create(size=10) + box_value = b"\x01\x02" * 5 + box.put(box_value) + box.delete() + + box_content, box_exists = box.maybe() + assert not box_content + assert not box_exists + + op_box_content, op_box_exists = algopy.op.Box.get(key) + assert not op_box_content + assert not op_box_exists + + +@pytest.mark.parametrize( + ("size", "value"), + [ + (0, b""), + (10, b"\x11" * 10), + (MAX_BYTES_SIZE, b"\x11" * MAX_BYTES_SIZE), + ], +) +def test_put_get( + context: AlgopyTestContext, # noqa: ARG001 + size: int, + value: bytes, +) -> None: + key = b"test_key" + + box = BoxRef(key=key) + box.create(size=size) + + box.put(value) + + box_content = box.get(default=b"\x00" * size) + assert box_content == Bytes(value) + + op_box_content, op_box_exists = algopy.op.Box.get(key) + assert op_box_exists + assert box_content == op_box_content + + +def test_put_when_box_does_not_exist( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + key = b"test_key" + + box = BoxRef(key=key) + box_value = b"\x01\x02" * 5 + box.put(box_value) + + box_content = box.get(default=b"\x00" * 10) + assert box_content == Bytes(box_value) + + +def test_get_when_box_does_not_exist( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + key = b"test_key" + box = BoxRef(key=key) + + default_value = b"\x00" * 10 + box_content = box.get(default=default_value) + assert box_content == Bytes(default_value) + + +def test_put_get_overflow( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + key = b"test_key" + + box = BoxRef(key=key) + box.create(size=MAX_BOX_SIZE) + + with pytest.raises(ValueError, match=f"expected value length <= {MAX_BYTES_SIZE}"): + box.put(b"\x11" * MAX_BOX_SIZE) + with pytest.raises(ValueError, match=f"expected value length <= {MAX_BYTES_SIZE}"): + box.get(default=b"\x00" * MAX_BOX_SIZE) + + +def test_splice_when_new_value_is_longer( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + size = 10 + box = BoxRef(key=TEST_BOX_KEY) + box.create(size=size) + box_value = b"\x01\x02" * 5 + replacement_value = b"\x11" * 2 + box.put(box_value) + + box.splice(1, 1, replacement_value) + box_content = box.get(default=b"\x00" * size) + + op_box_key = b"another_key" + algopy.op.Box.create(op_box_key, size) + algopy.op.Box.put(op_box_key, box_value) + algopy.op.Box.splice(op_box_key, 1, 1, replacement_value) + op_box_content, _ = algopy.op.Box.get(op_box_key) + op_box_length, _ = algopy.op.Box.length(op_box_key) + assert box_content == Bytes(b"\x01" + replacement_value + b"\x01\x02" * 3 + b"\x01") + assert box_content == op_box_content + assert box.length == size + assert box.length == op_box_length + + +def test_splice_when_new_value_is_shorter( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + size = 10 + box = BoxRef(key=TEST_BOX_KEY) + box.create(size=size) + box_value = b"\x01\x02" * 5 + replacement_value = b"\x11" * 2 + box.put(box_value) + + box.splice(1, 5, replacement_value) + box_content = box.get(default=b"\x00" * size) + + op_box_key = b"another_key" + algopy.op.Box.create(op_box_key, size) + algopy.op.Box.put(op_box_key, box_value) + algopy.op.Box.splice(op_box_key, 1, 5, replacement_value) + op_box_content, _ = algopy.op.Box.get(op_box_key) + op_box_length, _ = algopy.op.Box.length(op_box_key) + assert box_content == Bytes(b"\x01" + replacement_value + b"\x01\x02" * 2 + b"\x00" * 3) + assert box_content == op_box_content + assert box.length == size + assert box.length == op_box_length + + +def test_splice_when_box_does_not_exist( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + + with pytest.raises(RuntimeError, match=BOX_NOT_CREATED_ERROR): + box.splice(0, 1, b"\x11") + + +def test_splice_out_of_bounds( + context: AlgopyTestContext, # noqa: ARG001 +) -> None: + box = BoxRef(key=TEST_BOX_KEY) + box.create(size=10) + + with pytest.raises(ValueError, match="Start index exceeds box size"): + box.splice(11, 1, b"\x11") + + +def _initialise_box_value(box: BoxRef, value: bytes) -> None: + index = 0 + size = len(value) + while index < size: + length = min((size - index), MAX_BYTES_SIZE) + box.replace(index, value[index : index + length]) + index += length + + +def _assert_box_value(box: BoxRef, expected_value: bytes, start: int = 0) -> None: + index = start + size = len(expected_value) + while index < size: + length = min((size - index), MAX_BYTES_SIZE) + box_content = box.extract(index, length) + op_box_content = algopy.op.Box.extract(box.key, index, length) + assert box_content == op_box_content + assert box_content == Bytes(expected_value[index : index + length]) + index += length