diff --git a/pycardano/certificate.py b/pycardano/certificate.py index 84aa04d1..b4717224 100644 --- a/pycardano/certificate.py +++ b/pycardano/certificate.py @@ -4,8 +4,20 @@ from enum import Enum, unique from typing import Optional, Tuple, Type, Union -from pycardano.exception import DeserializeException -from pycardano.hash import AnchorDataHash, PoolKeyHash, ScriptHash, VerificationKeyHash +from pycardano.crypto.bech32 import bech32_decode, convertbits, encode +from pycardano.exception import ( + DecodingException, + DeserializeException, + SerializeException, +) +from pycardano.hash import ( + CIP129_PAYLOAD_SIZE, + VERIFICATION_KEY_HASH_SIZE, + AnchorDataHash, + PoolKeyHash, + ScriptHash, + VerificationKeyHash, +) from pycardano.serialization import ( ArrayCBORSerializable, CodedSerializable, @@ -36,6 +48,8 @@ "RegDRepCert", "UnregDRepCertificate", "UpdateDRepCertificate", + "GovernanceCredential", + "GovernanceKeyType", ] from pycardano.pool_params import PoolParams @@ -92,15 +106,162 @@ def __hash__(self): return hash(self.to_cbor()) +class IdFormat(Enum): + """ + Id format definition. + """ + + CIP129 = "cip129" + CIP105 = "cip105" + + +class CredentialType(Enum): + """ + Credential type definition. + """ + + KEY_HASH = 0b0010 + """Key hash""" + + SCRIPT_HASH = 0b0011 + """Script hash""" + + +class GovernanceKeyType(Enum): + """ + Governance key type definition. + """ + + CC_HOT = 0b0000 + """Committee cold hot key""" + + CC_COLD = 0b0001 + """Committee cold key""" + + DREP = 0b0010 + """DRep key""" + + +@dataclass(repr=False) +class GovernanceCredential(StakeCredential): + """Represents a governance credential.""" + + governance_key_type: GovernanceKeyType = field(init=False) + """Governance key type.""" + + id_format: IdFormat = field(default=IdFormat.CIP129, compare=False) + """Id format.""" + + def __repr__(self): + return f"{self.encode()}" + + def __bytes__(self): + if self.id_format == IdFormat.CIP129: + return self._compute_header_byte() + bytes(self.credential.payload) + else: + return bytes(self.credential.payload) + + @property + def credential_type(self) -> CredentialType: + """Credential type.""" + if isinstance(self.credential, VerificationKeyHash): + return CredentialType.KEY_HASH + else: + return CredentialType.SCRIPT_HASH + + def _compute_header_byte(self) -> bytes: + """Compute the header byte.""" + return ( + self.governance_key_type.value << 4 | self.credential_type.value + ).to_bytes(1, byteorder="big") + + def _compute_hrp(self, id_format: IdFormat = IdFormat.CIP129) -> str: + """Compute human-readable prefix for bech32 encoder. + + Based on + `miscellaneous section `_ + in CIP-5. + """ + prefix = "" + if self.governance_key_type == GovernanceKeyType.CC_HOT: + prefix = "cc_hot" + elif self.governance_key_type == GovernanceKeyType.CC_COLD: + prefix = "cc_cold" + elif self.governance_key_type == GovernanceKeyType.DREP: + prefix = "drep" + + suffix = "" + if isinstance(self.credential, VerificationKeyHash): + suffix = "" + elif isinstance(self.credential, ScriptHash): + suffix = "_script" + + return prefix + suffix if id_format == IdFormat.CIP105 else prefix + + def encode(self) -> str: + """Encode the governance credential in Bech32 format. + + More info about Bech32 `here `_. + + Returns: + str: Encoded governance credential in Bech32 format. + """ + data = bytes(self) + return encode(self._compute_hrp(self.id_format), data) + + @classmethod + def decode(cls: Type[GovernanceCredential], data: str) -> GovernanceCredential: + """Decode a bech32 string into a governance credential object. + + Args: + data (str): Bech32-encoded string. + + Returns: + GovernanceCredential: Decoded governance credential. + + Raises: + DecodingException: When the input string is not a valid governance credential. + """ + hrp, checksum, _ = bech32_decode(data) + value = bytes(convertbits(checksum, 5, 8, False)) + if len(value) == VERIFICATION_KEY_HASH_SIZE: + # CIP-105 + if "script" in hrp: + return cls(credential=ScriptHash(value)) + else: + return cls(credential=VerificationKeyHash(value)) + elif len(value) == CIP129_PAYLOAD_SIZE: + header = value[0] + payload = value[1:] + + key_type = GovernanceKeyType((header & 0xF0) >> 4) + credential_type = CredentialType(header & 0x0F) + + if key_type != cls.governance_key_type: + raise DecodingException(f"Invalid key type: {key_type}") + + if credential_type == CredentialType.KEY_HASH: + return cls(credential=VerificationKeyHash(payload)) + elif credential_type == CredentialType.SCRIPT_HASH: + return cls(credential=ScriptHash(payload)) + else: + raise DecodingException(f"Invalid credential type: {credential_type}") + else: + raise DecodingException(f"Invalid data length: {len(value)}") + + def to_primitive(self): + return [self._CODE, self.credential.to_primitive()] + + @dataclass(repr=False) -class DRepCredential(StakeCredential): +class DRepCredential(GovernanceCredential): """Represents a Delegate Representative (DRep) credential. This credential type is specifically used for DReps in the governance system, - inheriting from StakeCredential. + inheriting from GovernanceCredential. """ - pass + governance_key_type: GovernanceKeyType = GovernanceKeyType.DREP @unique @@ -135,13 +296,26 @@ class DRep(ArrayCBORSerializable): ) """The credential associated with this DRep, if applicable""" + id_format: IdFormat = field(default=IdFormat.CIP129, compare=False) + + def __repr__(self): + return f"{self.encode()}" + + def __bytes__(self): + if self.credential is not None: + drep_credential = DRepCredential( + credential=self.credential, id_format=self.id_format + ) + return bytes(drep_credential) + return b"" + @classmethod @limit_primitive_type(list) def from_primitive(cls: Type[DRep], values: Union[list, tuple]) -> DRep: try: kind = DRepKind(values[0]) - except ValueError: - raise DeserializeException(f"Invalid DRep type {values[0]}") + except ValueError as e: + raise DeserializeException(f"Invalid DRep type {values[0]}") from e if kind == DRepKind.VERIFICATION_KEY_HASH: return cls(kind=kind, credential=VerificationKeyHash(values[1])) @@ -159,6 +333,65 @@ def to_primitive(self): return [self.kind.value, self.credential.to_primitive()] return [self.kind.value] + def encode(self) -> str: + """Encode the DRep in Bech32 format. + + More info about Bech32 `here `_. + + Returns: + str: Encoded DRep in Bech32 format. + + Examples: + >>> vkey_bytes = bytes.fromhex("00000000000000000000000000000000000000000000000000000000") + >>> credential = VerificationKeyHash(vkey_bytes) + >>> print(DRep(kind=DRepKind.VERIFICATION_KEY_HASH, credential=credential).encode()) + drep1ygqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq7vlc9n + """ + if self.kind == DRepKind.ALWAYS_ABSTAIN: + return "drep_always_abstain" + elif self.kind == DRepKind.ALWAYS_NO_CONFIDENCE: + return "drep_always_no_confidence" + elif self.credential is not None: + drep_credential = DRepCredential( + credential=self.credential, id_format=self.id_format + ) + return drep_credential.encode() + else: + raise SerializeException("DRep credential is None") + + @classmethod + def decode(cls: Type[DRep], data: str) -> DRep: + """Decode a bech32 string into a DRep object. + + Args: + data (str): Bech32-encoded string. + + Returns: + DRep: Decoded DRep. + + Raises: + DecodingException: When the input string is not a valid DRep. + + Examples: + >>> credential = DRep.decode("drep1ygqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq7vlc9n") + >>> khash = VerificationKeyHash(bytes.fromhex("00000000000000000000000000000000000000000000000000000000")) + >>> assert credential == DRep(DRepKind.VERIFICATION_KEY_HASH, khash) + """ + if data == "drep_always_abstain": + return cls(kind=DRepKind.ALWAYS_ABSTAIN) + elif data == "drep_always_no_confidence": + return cls(kind=DRepKind.ALWAYS_NO_CONFIDENCE) + else: + drep_credential = DRepCredential.decode(data) + return cls( + kind=( + DRepKind.VERIFICATION_KEY_HASH + if isinstance(drep_credential.credential, VerificationKeyHash) + else DRepKind.SCRIPT_HASH + ), + credential=drep_credential.credential, + ) + @dataclass(repr=False) class StakeRegistration(CodedSerializable): diff --git a/pycardano/governance.py b/pycardano/governance.py index ab51646f..c217fc4f 100644 --- a/pycardano/governance.py +++ b/pycardano/governance.py @@ -5,8 +5,13 @@ from fractions import Fraction from typing import Dict, Optional, Tuple, Type, Union -from pycardano.certificate import Anchor, StakeCredential -from pycardano.exception import DeserializeException +from pycardano.certificate import Anchor, GovernanceCredential, GovernanceKeyType +from pycardano.crypto.bech32 import bech32_decode, convertbits, encode +from pycardano.exception import ( + DecodingException, + DeserializeException, + InvalidDataException, +) from pycardano.hash import PolicyHash, ScriptHash, TransactionId, VerificationKeyHash from pycardano.plutus import ExecutionUnits from pycardano.serialization import ( @@ -21,6 +26,7 @@ __all__ = [ "CommitteeColdCredential", + "CommitteeHotCredential", "CommitteeColdCredentialEpochMap", "ParameterChangeAction", "HardForkInitiationAction", @@ -47,10 +53,16 @@ ] -class CommitteeColdCredential(StakeCredential): +class CommitteeColdCredential(GovernanceCredential): """Represents a cold credential for a committee member.""" - pass + governance_key_type: GovernanceKeyType = GovernanceKeyType.CC_COLD + + +class CommitteeHotCredential(GovernanceCredential): + """Represents a hot credential for a committee member.""" + + governance_key_type: GovernanceKeyType = GovernanceKeyType.CC_HOT @unique @@ -89,6 +101,78 @@ def from_primitive( def __hash__(self): return hash((self.transaction_id, self.gov_action_index)) + def __repr__(self): + return f"{self.encode()}" + + def __bytes__(self): + # Convert index to hex (no prefix, lowercase) + idx_hex = f"{self.gov_action_index:x}" + + # Pad to even-length hex + if len(idx_hex) % 2 != 0: + idx_hex = f"0{idx_hex}" + + try: + idx_bytes = bytes.fromhex(idx_hex) + return self.transaction_id.payload + idx_bytes + except ValueError as e: + raise InvalidDataException(f"Error encoding data: {idx_hex}") from e + + def encode(self) -> str: + """Encode the governance action ID in Bech32 format. + + More info about Bech32 `here `_. + + Returns: + str: Encoded pool key hash in Bech32. + + Examples: + >>> transaction_id = TransactionId(bytes.fromhex("00" * 32)) + >>> gov_action_id = GovActionId(transaction_id=transaction_id, gov_action_index=17) + >>> print(gov_action_id.encode()) + gov_action1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqpzklpgpf + """ + return encode( + "gov_action", + bytes(self), + ) + + @classmethod + def decode(cls, data: str) -> GovActionId: + """Decode a bech32 string into a governance action ID object. + + Args: + data (str): Bech32-encoded string. + + Returns: + GovActionId: Decoded governance action ID. + + Raises: + DecodingException: When the input string is not a valid governance action ID. + + Examples: + >>> bech32_id = "gov_action1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqpzklpgpf" + >>> gov_action_id = GovActionId.decode(bech32_id) + >>> transaction_id = TransactionId(bytes.fromhex("00" * 32)) + >>> assert gov_action_id == GovActionId(transaction_id, 17) + """ + hrp, checksum, _ = bech32_decode(data) + value = bytes(convertbits(checksum, 5, 8, False)) + + if hrp != "gov_action": + raise DecodingException("Invalid GovActionId bech32 string") + + # Transaction ID is always 32 bytes, the rest is the index + if len(value) < 33: + raise DecodingException( + f"Invalid GovActionId length: {len(value)}, expected at least 33 bytes" + ) + + tx_id = TransactionId(value[:32]) + index_bytes = value[32:] + index = int.from_bytes(index_bytes, "big") + return cls(transaction_id=tx_id, gov_action_index=index) + @dataclass(repr=False) class ExUnitPrices(ArrayCBORSerializable): diff --git a/pycardano/hash.py b/pycardano/hash.py index 793c982e..e1bcfe4f 100644 --- a/pycardano/hash.py +++ b/pycardano/hash.py @@ -15,6 +15,7 @@ "VRF_KEY_HASH_SIZE", "POOL_METADATA_HASH_SIZE", "REWARD_ACCOUNT_HASH_SIZE", + "CIP129_PAYLOAD_SIZE", "ConstrainedBytes", "VerificationKeyHash", "ScriptHash", @@ -42,6 +43,7 @@ VRF_KEY_HASH_SIZE = 32 REWARD_ACCOUNT_HASH_SIZE = 29 ANCHOR_DATA_HASH_SIZE = 32 +CIP129_PAYLOAD_SIZE = 29 T = TypeVar("T", bound="ConstrainedBytes") diff --git a/pycardano/pool_params.py b/pycardano/pool_params.py index 418b9a72..c2f48c33 100644 --- a/pycardano/pool_params.py +++ b/pycardano/pool_params.py @@ -9,8 +9,8 @@ from fractions import Fraction from typing import List, Optional, Type, Union -from pycardano.crypto.bech32 import bech32_decode -from pycardano.exception import DeserializeException +from pycardano.crypto.bech32 import bech32_decode, decode, encode +from pycardano.exception import DecodingException, DeserializeException from pycardano.hash import ( PoolKeyHash, PoolMetadataHash, @@ -28,6 +28,7 @@ __all__ = [ "PoolId", "PoolMetadata", + "PoolOperator", "PoolParams", "Relay", "SingleHostAddr", @@ -247,3 +248,75 @@ class PoolParams(ArrayCBORSerializable): relays: Optional[List[Relay]] = None pool_metadata: Optional[PoolMetadata] = None id: Optional[PoolId] = field(default=None, metadata={"optional": True}) + + +@dataclass(repr=False) +class PoolOperator(CBORSerializable): + pool_key_hash: PoolKeyHash + + def __init__(self, pool_key_hash: PoolKeyHash): + self.pool_key_hash = pool_key_hash + + def __repr__(self): + return f"{self.encode()}" + + def __bytes__(self): + return self.pool_key_hash.payload + + def encode(self) -> str: + """Encode the pool key hash in Bech32 format. + + More info about Bech32 `here `_. + + Returns: + str: Encoded pool key hash in Bech32. + + Examples: + >>> pool_key_hash = PoolKeyHash(bytes.fromhex("cc30497f4ff962f4c1dca54cceefe39f86f1d7179668009f8eb71e59")) + >>> pool_operator = PoolOperator(pool_key_hash=pool_key_hash) + >>> print(pool_operator.encode()) + pool1escyjl60l930fswu54xvamlrn7r0r4chje5qp8uwku09j7x68x6 + """ + return encode("pool", self.pool_key_hash.payload) + + @classmethod + def decode(cls, data: str) -> PoolOperator: + """Decode a bech32 string into a pool operator object. + + Args: + data (str): Bech32-encoded string. + + Returns: + PoolOperator: Decoded pool operator. + + Raises: + DecodingException: When the input string is not a valid Shelley address. + + Examples: + >>> pool_operator = PoolOperator.decode("pool1escyjl60l930fswu54xvamlrn7r0r4chje5qp8uwku09j7x68x6") + >>> pool_key_hash = PoolKeyHash(bytes.fromhex("cc30497f4ff962f4c1dca54cceefe39f86f1d7179668009f8eb71e59")) + >>> assert pool_operator == PoolOperator(pool_key_hash) + """ + return cls.from_primitive(data) + + def to_shallow_primitive(self) -> bytes: + return self.pool_key_hash.to_primitive() + + @classmethod + @limit_primitive_type(bytes, str) + def from_primitive( + cls: Type[PoolOperator], value: Union[bytes, str] + ) -> PoolOperator: + # Convert string to bytes + if isinstance(value, str): + # Check if bech32 poolid + if value.startswith("pool"): + value = bytes(decode(value)) + else: + try: + value = bytes.fromhex(value) + except Exception as e: + raise DecodingException( + f"Failed to decode pool id string: {e}" + ) from e + return cls(PoolKeyHash.from_primitive(value)) diff --git a/test/pycardano/test_certificate.py b/test/pycardano/test_certificate.py index 322b5af1..bf904c97 100644 --- a/test/pycardano/test_certificate.py +++ b/test/pycardano/test_certificate.py @@ -10,6 +10,7 @@ DRep, DRepCredential, DRepKind, + IdFormat, PoolRegistration, PoolRetirement, ResignCommitteeColdCertificate, @@ -188,6 +189,145 @@ def test_anchor(): assert Anchor.from_cbor(anchor_cbor_hex) == anchor +def test_drep_decode(): + staking_key = StakeSigningKey.from_cbor( + "5820ff3a330df8859e4e5f42a97fcaee73f6a00d0cf864f4bca902bd106d423f02c0" + ) + vkey_hash = staking_key.to_verification_key().hash() + drep = DRep(kind=DRepKind.VERIFICATION_KEY_HASH, credential=vkey_hash) + + drep.id_format = IdFormat.CIP105 + drep1 = DRep.decode(str(drep)) + drep.id_format = IdFormat.CIP129 + drep2 = DRep.decode(str(drep)) + + drep_id_cip105 = "drep1fq529kkm4972nlgvmjvewkyeguxzrx7upkpge7ndmakkjnstaxx" + drep_id_cip129 = "drep1yfyz3gk6mw5he20apnwfn96cn9rscgvmmsxc9r86dh0k66gr0rurp" + drep_id_hex = "4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d69" + + drep1.id_format = IdFormat.CIP105 + assert str(drep1) == drep_id_cip105 + drep1.id_format = IdFormat.CIP129 + assert str(drep1) == drep_id_cip129 + + drep2.id_format = IdFormat.CIP105 + assert str(drep2) == drep_id_cip105 + drep2.id_format = IdFormat.CIP129 + assert str(drep2) == drep_id_cip129 + + drep1.id_format = IdFormat.CIP105 + assert bytes(drep1).hex() == drep_id_hex + drep2.id_format = IdFormat.CIP105 + assert bytes(drep2).hex() == drep_id_hex + + assert drep == drep1 + assert drep == drep2 + assert drep1 == drep2 + + +@pytest.mark.parametrize( + "drep_kind,credential,expected_cip105,expected_cip129,expected_hex,test_id", + [ + # Happy path: VERIFICATION_KEY_HASH + ( + DRepKind.VERIFICATION_KEY_HASH, + VerificationKeyHash( + bytes.fromhex( + "4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d69" + ) + ), + "drep1fq529kkm4972nlgvmjvewkyeguxzrx7upkpge7ndmakkjnstaxx", + "drep1yfyz3gk6mw5he20apnwfn96cn9rscgvmmsxc9r86dh0k66gr0rurp", + "4828a2dadba97ca9fd0cdc99975899470c219bdc0d828cfa6ddf6d69", + "verification_key_hash", + ), + # Happy path: SCRIPT_HASH + ( + DRepKind.SCRIPT_HASH, + ScriptHash(b"1" * SCRIPT_HASH_SIZE), + "drep_script1xycnzvf3xycnzvf3xycnzvf3xycnzvf3xycnzvf3xycnzarvrfk", + "drep1yvcnzvf3xycnzvf3xycnzvf3xycnzvf3xycnzvf3xycnzvg4m7mek", + "31313131313131313131313131313131313131313131313131313131", + "script_hash", + ), + # Happy path: ALWAYS_ABSTAIN + ( + DRepKind.ALWAYS_ABSTAIN, + None, + "drep_always_abstain", + "drep_always_abstain", + "", + "always_abstain", + ), + # Happy path: ALWAYS_NO_CONFIDENCE + ( + DRepKind.ALWAYS_NO_CONFIDENCE, + None, + "drep_always_no_confidence", + "drep_always_no_confidence", + "", + "always_no_confidence", + ), + ], + ids=lambda p: p if isinstance(p, str) else None, +) +def test_drep_encode_decode_parametrized( + drep_kind, credential, expected_cip105, expected_cip129, expected_hex, test_id +): + """Parametrized test for DRep encode/decode functionality. + + Tests all DRep kinds with both CIP105 and CIP129 encoding formats. + """ + # Arrange + drep = DRep(kind=drep_kind, credential=credential) + + # Act - Encode + drep.id_format = IdFormat.CIP105 + encoded_cip105 = str(drep) + encoded_hex = bytes(drep).hex() + + drep.id_format = IdFormat.CIP129 + encoded_cip129 = str(drep) + + # Assert - Encoding + assert encoded_cip105 == expected_cip105 + assert encoded_cip129 == expected_cip129 + assert encoded_hex == expected_hex + + # Act - Decode and verify round-trip + if drep_kind in (DRepKind.ALWAYS_ABSTAIN, DRepKind.ALWAYS_NO_CONFIDENCE): + # Special DReps only have one encoding format + decoded = DRep.decode(encoded_cip105) + assert decoded == drep + assert decoded.kind == drep_kind + assert decoded.credential is None + else: + # Decode from CIP105 format + decoded_cip105 = DRep.decode(expected_cip105) + assert decoded_cip105 == drep + assert decoded_cip105.kind == drep_kind + assert decoded_cip105.credential == credential + decoded_cip105.id_format = IdFormat.CIP105 + assert str(decoded_cip105) == expected_cip105 + assert bytes(decoded_cip105).hex() == expected_hex + decoded_cip105.id_format = IdFormat.CIP129 + assert str(decoded_cip105) == expected_cip129 + + # Decode from CIP129 format + decoded_cip129 = DRep.decode(expected_cip129) + assert decoded_cip129 == drep + assert decoded_cip129.kind == drep_kind + assert decoded_cip129.credential == credential + decoded_cip129.id_format = IdFormat.CIP105 + assert str(decoded_cip129) == expected_cip105 + assert bytes(decoded_cip129).hex() == expected_hex + decoded_cip129.id_format = IdFormat.CIP129 + assert str(decoded_cip129) == expected_cip129 + + # Verify both decoded objects are equal + assert decoded_cip105 == decoded_cip129 + + def test_drep_credential(): staking_key = StakeSigningKey.from_cbor( "5820ff3a330df8859e4e5f42a97fcaee73f6a00d0cf864f4bca902bd106d423f02c0" @@ -280,7 +420,6 @@ def test_drep_credential(): def test_drep_from_primitive( input_values, expected_kind, expected_credential, expected_exception, case_id ): - # Arrange # (All input values are provided via test parameters) diff --git a/test/pycardano/test_governance.py b/test/pycardano/test_governance.py index 0e883995..2f9b933f 100644 --- a/test/pycardano/test_governance.py +++ b/test/pycardano/test_governance.py @@ -3,11 +3,12 @@ import pytest from pycardano.address import Address -from pycardano.certificate import Anchor, StakeCredential +from pycardano.certificate import Anchor, IdFormat, StakeCredential from pycardano.exception import DeserializeException from pycardano.governance import ( CommitteeColdCredential, CommitteeColdCredentialEpochMap, + CommitteeHotCredential, ExUnitPrices, GovActionId, GovActionIdToVotingProcedure, @@ -69,6 +70,87 @@ def test_gov_action_id_serialization(self): assert deserialized == gov_action_id + @pytest.mark.parametrize( + "tx_id_hex,gov_action_index,expected_bech32,expected_hex,test_id", + [ + # Test case 1: Index 17 (single byte) + ( + "0000000000000000000000000000000000000000000000000000000000000000", + 17, + "gov_action1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqpzklpgpf", + "000000000000000000000000000000000000000000000000000000000000000011", + "index_17", + ), + # Test case 2: Index 0 (single byte) + ( + "1111111111111111111111111111111111111111111111111111111111111111", + 0, + "gov_action1zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zygsq6dmejn", + "111111111111111111111111111111111111111111111111111111111111111100", + "index_0", + ), + # Test case 3: Index 255 (max single byte) + ( + "0000000000000000000000000000000000000000000000000000000000000000", + 255, + "gov_action1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq076z8mus", + "0000000000000000000000000000000000000000000000000000000000000000ff", + "index_255", + ), + # Test case 4: Index 256 (requires 2 bytes) + ( + "0000000000000000000000000000000000000000000000000000000000000000", + 256, + "gov_action1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzqqhppxxg", + "00000000000000000000000000000000000000000000000000000000000000000100", + "index_256", + ), + # Test case 5: Index 1000 (requires 2 bytes) + ( + "0000000000000000000000000000000000000000000000000000000000000000", + 1000, + "gov_action1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq86qxn7ng9", + "000000000000000000000000000000000000000000000000000000000000000003e8", + "index_1000", + ), + # Test case 6: Index 65535 (max value, 2 bytes) + ( + "0000000000000000000000000000000000000000000000000000000000000000", + 65535, + "gov_action1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq0llc8mzc0y", + "0000000000000000000000000000000000000000000000000000000000000000ffff", + "index_65535", + ), + ], + ids=lambda p: p if isinstance(p, str) and p.startswith("index_") else None, + ) + def test_gov_action_id_decode( + self, tx_id_hex, gov_action_index, expected_bech32, expected_hex, test_id + ): + """Parametrized test for GovActionId encode/decode functionality. + + Tests various indices including single-byte (0-255) and two-byte (256-65535) encodings. + """ + # Arrange + transaction_id = TransactionId(bytes.fromhex(tx_id_hex)) + gov_action_id = GovActionId( + transaction_id=transaction_id, gov_action_index=gov_action_index + ) + + # Act - Decode from bech32 + decoded_gov_action_id = GovActionId.decode(expected_bech32) + + # Assert - Equality + assert gov_action_id == decoded_gov_action_id + + # Assert - Encoding matches expected + assert str(gov_action_id) == expected_bech32 + assert bytes(gov_action_id).hex() == expected_hex + + # Assert - Decoded values match original + assert decoded_gov_action_id.transaction_id == transaction_id + assert decoded_gov_action_id.gov_action_index == gov_action_index + class TestVote: def test_vote_values(self): @@ -529,3 +611,33 @@ def test_proposal_procedure_serialization(self): assert deserialized.deposit == procedure.deposit assert deserialized.reward_account == procedure.reward_account assert deserialized.anchor == procedure.anchor + + +class TestCommitteeCredentialEpochMap: + def test_committee_cold_credential(self): + key_hash = "00000000000000000000000000000000000000000000000000000000" + identifier = "1300000000000000000000000000000000000000000000000000000000" + bech32 = "cc_cold1zvqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq6kflvs" + + credential = CommitteeColdCredential.decode(bech32) + + assert str(credential) == bech32 + + credential.id_format = IdFormat.CIP105 + assert bytes(credential).hex() == key_hash + credential.id_format = IdFormat.CIP129 + assert bytes(credential).hex() == identifier + + def test_committee_hot_credential(self): + key_hash = "00000000000000000000000000000000000000000000000000000000" + identifier = "0200000000000000000000000000000000000000000000000000000000" + bech32 = "cc_hot1qgqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqvcdjk7" + + credential = CommitteeHotCredential.decode(bech32) + + assert str(credential) == bech32 + + credential.id_format = IdFormat.CIP105 + assert bytes(credential).hex() == key_hash + credential.id_format = IdFormat.CIP129 + assert bytes(credential).hex() == identifier diff --git a/test/pycardano/test_pool_params.py b/test/pycardano/test_pool_params.py index 70d797ec..27340a0b 100644 --- a/test/pycardano/test_pool_params.py +++ b/test/pycardano/test_pool_params.py @@ -20,6 +20,7 @@ MultiHostName, PoolId, PoolMetadata, + PoolOperator, PoolParams, SingleHostAddr, SingleHostName, @@ -252,3 +253,186 @@ def test_pool_params( ] assert PoolParams.from_primitive(primitive_values).to_primitive() == primitive_out + + +# PoolOperator Tests +def test_pool_operator_initialization(): + """Test PoolOperator can be initialized with a PoolKeyHash.""" + # Arrange + pool_key_hash = PoolKeyHash(b"1" * POOL_KEY_HASH_SIZE) + + # Act + pool_operator = PoolOperator(pool_key_hash) + + # Assert + assert pool_operator.pool_key_hash == pool_key_hash + + +def test_pool_operator_encode(): + """Test PoolOperator can encode to bech32 format.""" + # Arrange + pool_key_hash_hex = "dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22" + pool_key_hash = PoolKeyHash(bytes.fromhex(pool_key_hash_hex)) + pool_operator = PoolOperator(pool_key_hash) + + # Act + encoded = pool_operator.encode() + + # Assert + assert encoded.startswith("pool") + assert isinstance(encoded, str) + assert encoded == TEST_POOL_ID + + +def test_pool_operator_decode(): + """Test PoolOperator can decode from bech32 format.""" + # Act + pool_operator = PoolOperator.decode(TEST_POOL_ID) + + # Assert + assert isinstance(pool_operator, PoolOperator) + assert pool_operator.encode() == TEST_POOL_ID + + +def test_pool_operator_id_property(): + """Test PoolOperator.id property returns a PoolId.""" + # Arrange + pool_key_hash_hex = "dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22" + pool_key_hash = PoolKeyHash(bytes.fromhex(pool_key_hash_hex)) + pool_operator = PoolOperator(pool_key_hash) + + # Act + pool_id = str(pool_operator) + + # Assert + assert pool_id == TEST_POOL_ID + + +def test_pool_operator_id_hex_property(): + """Test PoolOperator.id_hex property returns hex string.""" + # Arrange + pool_key_hash_hex = "dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22" + pool_key_hash = PoolKeyHash(bytes.fromhex(pool_key_hash_hex)) + pool_operator = PoolOperator(pool_key_hash) + + # Act + id_hex = bytes(pool_operator).hex() + + # Assert + assert id_hex == pool_key_hash_hex + + +def test_pool_operator_repr(): + """Test PoolOperator __repr__ returns encoded string.""" + # Arrange + pool_key_hash_hex = "dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22" + pool_key_hash = PoolKeyHash(bytes.fromhex(pool_key_hash_hex)) + pool_operator = PoolOperator(pool_key_hash) + + # Act + repr_str = repr(pool_operator) + + # Assert + assert repr_str == TEST_POOL_ID + + +def test_pool_operator_bytes(): + """Test PoolOperator __bytes__ returns pool key hash payload.""" + # Arrange + pool_key_hash_hex = "dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22" + pool_key_hash = PoolKeyHash(bytes.fromhex(pool_key_hash_hex)) + pool_operator = PoolOperator(pool_key_hash) + + # Act + bytes_result = bytes(pool_operator) + + # Assert + assert isinstance(bytes_result, bytes) + assert bytes_result == bytes.fromhex(pool_key_hash_hex) + + +def test_pool_operator_to_primitive(): + """Test PoolOperator.to_primitive returns bytes.""" + # Arrange + pool_key_hash_hex = "dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22" + pool_key_hash = PoolKeyHash(bytes.fromhex(pool_key_hash_hex)) + pool_operator = PoolOperator(pool_key_hash) + + # Act + primitive = pool_operator.to_primitive() + + # Assert + assert isinstance(primitive, bytes) + assert primitive == bytes.fromhex(pool_key_hash_hex) + + +@pytest.mark.parametrize( + "input_value, description", + [ + (TEST_POOL_ID, "bech32 pool id"), + ("dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22", "hex string"), + ( + bytes.fromhex("dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22"), + "bytes", + ), + ], +) +def test_pool_operator_from_primitive(input_value, description): + """Test PoolOperator.from_primitive handles different input formats.""" + # Act + pool_operator = PoolOperator.from_primitive(input_value) + + # Assert + assert isinstance(pool_operator, PoolOperator) + assert ( + bytes(pool_operator).hex() + == "dacf06a23e4aaf119024e63deb79861ca175b24e7d44d97fb92b1a22" + ) + + +def test_pool_operator_roundtrip_bech32(): + """Test PoolOperator can encode and decode maintaining consistency.""" + # Arrange + pool_key_hash = PoolKeyHash(b"1" * POOL_KEY_HASH_SIZE) + original_operator = PoolOperator(pool_key_hash) + + # Act + encoded = original_operator.encode() + decoded_operator = PoolOperator.decode(encoded) + + # Assert + assert decoded_operator.pool_key_hash == original_operator.pool_key_hash + assert decoded_operator.encode() == encoded + + +def test_pool_operator_roundtrip_primitive(): + """Test PoolOperator serialization roundtrip.""" + # Arrange + pool_key_hash = PoolKeyHash(b"2" * POOL_KEY_HASH_SIZE) + original_operator = PoolOperator(pool_key_hash) + + # Act + primitive = original_operator.to_primitive() + restored_operator = PoolOperator.from_primitive(primitive) + + # Assert + assert restored_operator.pool_key_hash == original_operator.pool_key_hash + assert restored_operator.to_primitive() == primitive + + +@pytest.mark.parametrize( + "invalid_input, expected_exception", + [ + ("invalid_pool_id", Exception), # Invalid hex string + ( + "stake1uxtr5m6kygt77399zxqrykkluqr0grr4yrjtl5xplza6k8q5fghrp", + Exception, + ), # Wrong prefix + ("", Exception), # Empty string + ], +) +def test_pool_operator_from_primitive_errors(invalid_input, expected_exception): + """Test PoolOperator.from_primitive raises errors for invalid inputs.""" + # Act & Assert + with pytest.raises(expected_exception): + PoolOperator.from_primitive(invalid_input)