From 5c64b00551bb37595351228816424a4167740246 Mon Sep 17 00:00:00 2001 From: defiant1708 Date: Fri, 22 Aug 2025 11:17:10 +0900 Subject: [PATCH] fix(utils): Update utils module with latest changes and absolute imports --- bsv/utils/__init__.py | 60 +++++++++++++ bsv/utils/address.py | 39 +++++++++ bsv/utils/base58_utils.py | 64 ++++++++++++++ bsv/utils/binary.py | 86 +++++++++++++++++++ bsv/utils/ecdsa.py | 69 +++++++++++++++ bsv/utils/encoding.py | 63 ++++++++++++++ bsv/utils/misc.py | 23 +++++ bsv/utils/pushdata.py | 41 +++++++++ bsv/utils/reader.py | 101 ++++++++++++++++++++++ bsv/utils/reader_writer.py | 169 +++++++++++++++++++++++++++++++++++++ bsv/utils/script.py | 40 +++++++++ bsv/utils/script_chunks.py | 59 +++++++++++++ bsv/utils/writer.py | 77 +++++++++++++++++ 13 files changed, 891 insertions(+) create mode 100644 bsv/utils/__init__.py create mode 100644 bsv/utils/address.py create mode 100644 bsv/utils/base58_utils.py create mode 100644 bsv/utils/binary.py create mode 100644 bsv/utils/ecdsa.py create mode 100644 bsv/utils/encoding.py create mode 100644 bsv/utils/misc.py create mode 100644 bsv/utils/pushdata.py create mode 100644 bsv/utils/reader.py create mode 100644 bsv/utils/reader_writer.py create mode 100644 bsv/utils/script.py create mode 100644 bsv/utils/script_chunks.py create mode 100644 bsv/utils/writer.py diff --git a/bsv/utils/__init__.py b/bsv/utils/__init__.py new file mode 100644 index 0000000..97f1735 --- /dev/null +++ b/bsv/utils/__init__.py @@ -0,0 +1,60 @@ +""" +BSV Utils Package + +This package contains various utility functions for BSV blockchain operations. +""" + +# Import commonly used utilities from submodules +from bsv.utils.base58_utils import from_base58, to_base58, from_base58_check, to_base58_check +from bsv.utils.binary import to_hex, from_hex, unsigned_to_varint, varint_to_unsigned, to_utf8, encode, to_base64 +from bsv.utils.encoding import BytesList, BytesHex, Bytes32Base64, Bytes33Hex, StringBase64, Signature +from bsv.utils.pushdata import encode_pushdata, get_pushdata_code +from bsv.utils.script_chunks import read_script_chunks +from bsv.utils.reader import Reader +from bsv.utils.writer import Writer +from bsv.utils.misc import randbytes, bytes_to_bits, bits_to_bytes +from bsv.hash import hash256 +from bsv.utils.address import decode_address, validate_address + +# Import legacy functions in a clean, maintainable way +from bsv.utils.legacy import ( + decode_wif, + text_digest, + stringify_ecdsa_recoverable, + unstringify_ecdsa_recoverable, + deserialize_ecdsa_recoverable, + serialize_ecdsa_der, + address_to_public_key_hash, + encode_int, + unsigned_to_bytes, + deserialize_ecdsa_der, + to_bytes, + reverse_hex_byte_order, + serialize_ecdsa_recoverable, +) + +__all__ = [ + # Base58 functions + 'from_base58', 'to_base58', 'from_base58_check', 'to_base58_check', + # Binary functions + 'to_hex', 'from_hex', 'unsigned_to_varint', 'varint_to_unsigned', + # Encoding classes + 'BytesList', 'BytesHex', 'Bytes32Base64', 'Bytes33Hex', 'StringBase64', 'Signature', + # Pushdata functions + 'encode_pushdata', 'get_pushdata_code', 'read_script_chunks', + # Reader/Writer classes + 'Reader', 'Writer', + # Random bytes utility re-exported from bsv/utils.py + 'randbytes', 'bytes_to_bits', 'bits_to_bytes', + # Hash helpers + 'hash256', + # Address helpers + 'decode_address', 'validate_address', + # Functions from main utils.py + 'decode_wif', 'text_digest', 'stringify_ecdsa_recoverable', + 'unstringify_ecdsa_recoverable', 'deserialize_ecdsa_recoverable', + 'serialize_ecdsa_der', 'address_to_public_key_hash', 'encode_int', 'unsigned_to_bytes', 'deserialize_ecdsa_der', 'to_bytes', 'reverse_hex_byte_order', + 'serialize_ecdsa_recoverable', + # binary.py から追加 + 'to_utf8', 'encode', 'to_base64', +] diff --git a/bsv/utils/address.py b/bsv/utils/address.py new file mode 100644 index 0000000..6426cab --- /dev/null +++ b/bsv/utils/address.py @@ -0,0 +1,39 @@ +""" +address.py - Utilities for address and WIF decoding/validation. +""" +import re +from typing import Tuple, Optional +from ..constants import Network, ADDRESS_PREFIX_NETWORK_DICT, WIF_PREFIX_NETWORK_DICT +from .base58_utils import from_base58_check + +def decode_address(address: str) -> Tuple[bytes, Network]: + if not re.match(r'^[1mn][a-km-zA-HJ-NP-Z1-9]{24,33}$', address): + raise ValueError(f'invalid P2PKH address {address}') + from ..base58 import base58check_decode + decoded = base58check_decode(address) + prefix = decoded[:1] + network = ADDRESS_PREFIX_NETWORK_DICT.get(prefix) + return decoded[1:], network + +def validate_address(address: str, network: Optional[Network] = None) -> bool: + from contextlib import suppress + with suppress(Exception): + _, _network = decode_address(address) + if network is not None: + return _network == network + return True + return False + +def address_to_public_key_hash(address: str) -> bytes: + return decode_address(address)[0] + +def decode_wif(wif: str) -> Tuple[bytes, bool, Network]: + from ..base58 import base58check_decode + decoded = base58check_decode(wif) + prefix = decoded[:1] + network = WIF_PREFIX_NETWORK_DICT.get(prefix) + if not network: + raise ValueError(f'unknown WIF prefix {prefix.hex()}') + if len(wif) == 52 and decoded[-1] == 1: + return decoded[1:-1], True, network + return decoded[1:], False, network \ No newline at end of file diff --git a/bsv/utils/base58_utils.py b/bsv/utils/base58_utils.py new file mode 100644 index 0000000..144f4b9 --- /dev/null +++ b/bsv/utils/base58_utils.py @@ -0,0 +1,64 @@ +""" +base58_utils.py - Utilities for Base58 and Base58Check encoding/decoding. +""" +from typing import List, Optional + +base58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' + +def from_base58(str_: str) -> List[int]: + if not str_ or not isinstance(str_, str): + raise ValueError(f"Expected base58 string but got '{str_}'") + if '0' in str_ or 'I' in str_ or 'O' in str_ or 'l' in str_: + raise ValueError(f"Invalid base58 character in '{str_}'") + lz = len(str_) - len(str_.lstrip('1')) + psz = lz + acc = 0 + for char in str_: + acc = acc * 58 + base58chars.index(char) + result = [] + while acc > 0: + result.append(acc % 256) + acc //= 256 + return [0] * psz + list(reversed(result)) + +def to_base58(bin_: List[int]) -> str: + acc = 0 + for byte in bin_: + acc = acc * 256 + byte + result = '' + while acc > 0: + acc, mod = divmod(acc, 58) + result = base58chars[mod] + result + for byte in bin_: + if byte == 0: + result = '1' + result + else: + break + return result + +def to_base58_check(bin_: List[int], prefix: Optional[List[int]] = None) -> str: + import hashlib + if prefix is None: + prefix = [0] + hash_ = hashlib.sha256(hashlib.sha256(bytes(prefix + bin_)).digest()).digest() + return to_base58(prefix + bin_ + list(hash_[:4])) + +def from_base58_check(str_: str, enc: Optional[str] = None, prefix_length: int = 1): + import hashlib + try: + from .binary import to_hex + except ImportError: + # Fallback if relative import fails + def to_hex(data): + return data.hex() + bin_ = from_base58(str_) + prefix = bin_[:prefix_length] + data = bin_[prefix_length:-4] + checksum = bin_[-4:] + hash_ = hashlib.sha256(hashlib.sha256(bytes(prefix + data)).digest()).digest() + if list(hash_[:4]) != checksum: + raise ValueError('Invalid checksum') + if enc == 'hex': + prefix = to_hex(bytes(prefix)) + data = to_hex(bytes(data)) + return {'prefix': prefix, 'data': data} \ No newline at end of file diff --git a/bsv/utils/binary.py b/bsv/utils/binary.py new file mode 100644 index 0000000..46cda9a --- /dev/null +++ b/bsv/utils/binary.py @@ -0,0 +1,86 @@ +""" +binary.py - Utilities for byte/number conversion, varint, and encoding/decoding. +""" +import math +from typing import Union, List, Optional, Literal + +def unsigned_to_varint(num: int) -> bytes: + if num < 0 or num > 0xffffffffffffffff: + raise OverflowError(f"can't convert {num} to varint") + if num <= 0xfc: + return num.to_bytes(1, 'little') + elif num <= 0xffff: + return b'\xfd' + num.to_bytes(2, 'little') + elif num <= 0xffffffff: + return b'\xfe' + num.to_bytes(4, 'little') + else: + return b'\xff' + num.to_bytes(8, 'little') + +def varint_to_unsigned(data: bytes) -> tuple[int, int]: + """Convert varint bytes to unsigned int. Returns (value, bytes_consumed)""" + if not data: + raise ValueError("Empty data for varint") + + first_byte = data[0] + if first_byte <= 0xfc: + return first_byte, 1 + elif first_byte == 0xfd: + if len(data) < 3: + raise ValueError("Insufficient data for 2-byte varint") + return int.from_bytes(data[1:3], 'little'), 3 + elif first_byte == 0xfe: + if len(data) < 5: + raise ValueError("Insufficient data for 4-byte varint") + return int.from_bytes(data[1:5], 'little'), 5 + elif first_byte == 0xff: + if len(data) < 9: + raise ValueError("Insufficient data for 8-byte varint") + return int.from_bytes(data[1:9], 'little'), 9 + else: + raise ValueError(f"Invalid varint prefix: {first_byte}") + +def unsigned_to_bytes(num: int, byteorder: Literal['big', 'little'] = 'big') -> bytes: + return num.to_bytes(math.ceil(num.bit_length() / 8) or 1, byteorder) + +def to_hex(byte_array: bytes) -> str: + return byte_array.hex() + +def from_hex(hex_string: str) -> bytes: + """Convert hex string to bytes""" + # Remove any whitespace and ensure even length + hex_string = ''.join(hex_string.split()) + if len(hex_string) % 2 != 0: + hex_string = '0' + hex_string + return bytes.fromhex(hex_string) + +def to_bytes(msg: Union[bytes, str], enc: Optional[str] = None) -> bytes: + if isinstance(msg, bytes): + return msg + if not msg: + return bytes() + if isinstance(msg, str): + if enc == 'hex': + msg = ''.join(filter(str.isalnum, msg)) + if len(msg) % 2 != 0: + msg = '0' + msg + return bytes(int(msg[i:i + 2], 16) for i in range(0, len(msg), 2)) + elif enc == 'base64': + import base64 + return base64.b64decode(msg) + else: # UTF-8 encoding + return msg.encode('utf-8') + return bytes(msg) + +def to_utf8(arr: List[int]) -> str: + return bytes(arr).decode('utf-8') + +def encode(arr: List[int], enc: Optional[str] = None) -> Union[str, List[int]]: + if enc == 'hex': + return to_hex(bytes(arr)) + elif enc == 'utf8': + return to_utf8(arr) + return arr + +def to_base64(byte_array: List[int]) -> str: + import base64 + return base64.b64encode(bytes(byte_array)).decode('ascii') \ No newline at end of file diff --git a/bsv/utils/ecdsa.py b/bsv/utils/ecdsa.py new file mode 100644 index 0000000..aaf3379 --- /dev/null +++ b/bsv/utils/ecdsa.py @@ -0,0 +1,69 @@ +""" +ecdsa.py - Utilities for ECDSA signature serialization/deserialization. +""" +from base64 import b64encode, b64decode +from typing import Tuple +from ..constants import NUMBER_BYTE_LENGTH +from ..curve import curve + +def deserialize_ecdsa_der(signature: bytes) -> Tuple[int, int]: + try: + assert signature[0] == 0x30 + assert int(signature[1]) == len(signature) - 2 + assert signature[2] == 0x02 + r_len = int(signature[3]) + r = int.from_bytes(signature[4: 4 + r_len], 'big') + assert signature[4 + r_len] == 0x02 + s_len = int(signature[5 + r_len]) + s = int.from_bytes(signature[-s_len:], 'big') + return r, s + except Exception: + raise ValueError(f'invalid DER encoded {signature.hex()}') + +def serialize_ecdsa_der(signature: Tuple[int, int]) -> bytes: + r, s = signature + if s > curve.n // 2: + s = curve.n - s + r_bytes = r.to_bytes(NUMBER_BYTE_LENGTH, 'big').lstrip(b'\x00') + if r_bytes[0] & 0x80: + r_bytes = b'\x00' + r_bytes + serialized = bytes([2, len(r_bytes)]) + r_bytes + s_bytes = s.to_bytes(NUMBER_BYTE_LENGTH, 'big').lstrip(b'\x00') + if s_bytes[0] & 0x80: + s_bytes = b'\x00' + s_bytes + serialized += bytes([2, len(s_bytes)]) + s_bytes + return bytes([0x30, len(serialized)]) + serialized + +def deserialize_ecdsa_recoverable(signature: bytes) -> Tuple[int, int, int]: + assert len(signature) == 65, 'invalid length of recoverable ECDSA signature' + rec_id = signature[-1] + assert 0 <= rec_id <= 3, f'invalid recovery id {rec_id}' + r = int.from_bytes(signature[:NUMBER_BYTE_LENGTH], 'big') + s = int.from_bytes(signature[NUMBER_BYTE_LENGTH:-1], 'big') + return r, s, rec_id + +def serialize_ecdsa_recoverable(signature: Tuple[int, int, int]) -> bytes: + _r, _s, _rec_id = signature + assert 0 <= _rec_id < 4, f'invalid recovery id {_rec_id}' + r = _r.to_bytes(NUMBER_BYTE_LENGTH, 'big') + s = _s.to_bytes(NUMBER_BYTE_LENGTH, 'big') + rec_id = _rec_id.to_bytes(1, 'big') + return r + s + rec_id + +def stringify_ecdsa_recoverable(signature: bytes, compressed: bool = True) -> str: + r, s, recovery_id = deserialize_ecdsa_recoverable(signature) + prefix: int = 27 + recovery_id + (4 if compressed else 0) + signature: bytes = prefix.to_bytes(1, 'big') + signature[:-1] + return b64encode(signature).decode('ascii') + +def unstringify_ecdsa_recoverable(signature: str) -> Tuple[bytes, bool]: + serialized = b64decode(signature) + assert len(serialized) == 65, 'invalid length of recoverable ECDSA signature' + prefix = serialized[0] + assert 27 <= prefix < 35, f'invalid recoverable ECDSA signature prefix {prefix}' + compressed = False + if prefix >= 31: + compressed = True + prefix -= 4 + recovery_id = prefix - 27 + return serialized[1:] + recovery_id.to_bytes(1, 'big'), compressed \ No newline at end of file diff --git a/bsv/utils/encoding.py b/bsv/utils/encoding.py new file mode 100644 index 0000000..a4da7ca --- /dev/null +++ b/bsv/utils/encoding.py @@ -0,0 +1,63 @@ +import base64 +import json +from typing import Any, List, Union + +class BytesList(bytes): + def to_json(self) -> str: + # JSON array of numbers + return json.dumps([b for b in self]) + @staticmethod + def from_json(data: str) -> 'BytesList': + arr = json.loads(data) + return BytesList(bytes(arr)) + +class BytesHex(bytes): + def to_json(self) -> str: + return json.dumps(self.hex()) + @staticmethod + def from_json(data: str) -> 'BytesHex': + s = json.loads(data) + return BytesHex(bytes.fromhex(s)) + +class Bytes32Base64(bytes): + def __new__(cls, b: bytes): + if len(b) != 32: + raise ValueError(f"Bytes32Base64: expected 32 bytes, got {len(b)}") + return super().__new__(cls, b) + def to_json(self) -> str: + return json.dumps(base64.b64encode(self).decode('ascii')) + @staticmethod + def from_json(data: str) -> 'Bytes32Base64': + s = json.loads(data) + b = base64.b64decode(s) + return Bytes32Base64(b) + +class Bytes33Hex(bytes): + def __new__(cls, b: bytes): + if len(b) != 33: + raise ValueError(f"Bytes33Hex: expected 33 bytes, got {len(b)}") + return super().__new__(cls, b) + def to_json(self) -> str: + return json.dumps(self.hex()) + @staticmethod + def from_json(data: str) -> 'Bytes33Hex': + s = json.loads(data) + return Bytes33Hex(bytes.fromhex(s)) + +class StringBase64(str): + def to_array(self) -> bytes: + return base64.b64decode(self) + @staticmethod + def from_array(arr: bytes) -> 'StringBase64': + return StringBase64(base64.b64encode(arr).decode('ascii')) + +class Signature: + def __init__(self, sig_bytes: bytes): + self.sig_bytes = sig_bytes + def to_json(self) -> str: + # serialize as array of numbers + return json.dumps([b for b in self.sig_bytes]) + @staticmethod + def from_json(data: str) -> 'Signature': + arr = json.loads(data) + return Signature(bytes(arr)) diff --git a/bsv/utils/misc.py b/bsv/utils/misc.py new file mode 100644 index 0000000..5038177 --- /dev/null +++ b/bsv/utils/misc.py @@ -0,0 +1,23 @@ +""" +misc.py - Utilities for random generation, bits<->bytes conversion, and reverse hex byte order. +""" +import math +from secrets import randbits +from typing import Union + +def bytes_to_bits(octets: Union[str, bytes]) -> str: + b: bytes = octets if isinstance(octets, bytes) else bytes.fromhex(octets) + bits: str = bin(int.from_bytes(b, 'big'))[2:] + if len(bits) < len(b) * 8: + bits = '0' * (len(b) * 8 - len(bits)) + bits + return bits + +def bits_to_bytes(bits: str) -> bytes: + byte_length = math.ceil(len(bits) / 8) or 1 + return int(bits, 2).to_bytes(byte_length, byteorder='big') + +def randbytes(length: int) -> bytes: + return randbits(length * 8).to_bytes(length, 'big') + +def reverse_hex_byte_order(hex_str: str): + return bytes.fromhex(hex_str)[::-1].hex() \ No newline at end of file diff --git a/bsv/utils/pushdata.py b/bsv/utils/pushdata.py new file mode 100644 index 0000000..c89ba51 --- /dev/null +++ b/bsv/utils/pushdata.py @@ -0,0 +1,41 @@ +""" +Pushdata encoding utilities from main utils.py +""" + +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from constants import OpCode + + +def get_pushdata_code(length: int) -> bytes: + """get the pushdata opcode based on length of data you want to push onto the stack""" + if length <= 75: + return length.to_bytes(1, 'little') + elif length <= 255: + return OpCode.OP_PUSHDATA1 + length.to_bytes(1, 'little') + elif length <= 65535: + return OpCode.OP_PUSHDATA2 + length.to_bytes(2, 'little') + elif length <= 4294967295: + return OpCode.OP_PUSHDATA4 + length.to_bytes(4, 'little') + else: + raise ValueError("data too long to encode in a PUSHDATA opcode") + + +def encode_pushdata(pushdata: bytes, minimal_push: bool = True) -> bytes: + """encode pushdata with proper opcode + https://github.com/bitcoin-sv/bitcoin-sv/blob/v1.0.10/src/script/interpreter.cpp#L310-L337 + :param pushdata: bytes you want to push onto the stack in bitcoin script + :param minimal_push: if True then push data following the minimal push rule + """ + if minimal_push: + if pushdata == b'': + return OpCode.OP_0 + if len(pushdata) == 1 and 1 <= pushdata[0] <= 16: + return bytes([OpCode.OP_1[0] + pushdata[0] - 1]) + if len(pushdata) == 1 and pushdata[0] == 0x81: + return OpCode.OP_1NEGATE + else: + # non-minimal push requires pushdata != b'' + assert pushdata, 'empty pushdata' + return get_pushdata_code(len(pushdata)) + pushdata diff --git a/bsv/utils/reader.py b/bsv/utils/reader.py new file mode 100644 index 0000000..026f6ae --- /dev/null +++ b/bsv/utils/reader.py @@ -0,0 +1,101 @@ +""" +reader.py - Reader class (binary reading utilities). +""" +from io import BytesIO +from typing import Optional, Literal + +class Reader(BytesIO): + def __init__(self, data: bytes): + super().__init__(data) + + def eof(self) -> bool: + return self.tell() >= len(self.getvalue()) + + def read(self, length: int = None) -> bytes: + result = super().read(length) + return result if result else None + + def read_reverse(self, length: int = None) -> bytes: + data = self.read(length) + return data[::-1] if data else None + + def read_uint8(self) -> Optional[int]: + data = self.read(1) + return data[0] if data else None + + def read_int8(self) -> Optional[int]: + data = self.read(1) + return int.from_bytes(data, byteorder='big', signed=True) if data else None + + def read_uint16_be(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='big') if data else None + + def read_int16_be(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='big', signed=True) if data else None + + def read_uint16_le(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='little') if data else None + + def read_int16_le(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='little', signed=True) if data else None + + def read_uint32_be(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='big') if data else None + + def read_int32_be(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='big', signed=True) if data else None + + def read_uint32_le(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='little') if data else None + + def read_int32_le(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='little', signed=True) if data else None + + def read_var_int_num(self) -> Optional[int]: + first_byte = self.read_uint8() + if first_byte is None: + return None + if first_byte < 253: + return first_byte + elif first_byte == 253: + return self.read_uint16_le() + elif first_byte == 254: + return self.read_uint32_le() + elif first_byte == 255: + data = self.read(8) + return int.from_bytes(data, byteorder='little') if data else None + else: + raise ValueError("Invalid varint encoding") + + def read_var_int(self) -> Optional[bytes]: + first_byte = self.read(1) + if not first_byte: + return None + if first_byte[0] == 0xfd: + return first_byte + (self.read(2) or b'') + elif first_byte[0] == 0xfe: + return first_byte + (self.read(4) or b'') + elif first_byte[0] == 0xff: + return first_byte + (self.read(8) or b'') + else: + return first_byte + + def read_bytes(self, byte_length: Optional[int] = None) -> bytes: + result = self.read(byte_length) + return result if result else b'' + + def read_int( + self, byte_length: int, byteorder: Literal["big", "little"] = "little" + ) -> Optional[int]: + octets = self.read_bytes(byte_length) + if not octets: + return None + return int.from_bytes(octets, byteorder=byteorder) \ No newline at end of file diff --git a/bsv/utils/reader_writer.py b/bsv/utils/reader_writer.py new file mode 100644 index 0000000..f9d047f --- /dev/null +++ b/bsv/utils/reader_writer.py @@ -0,0 +1,169 @@ +""" +Reader and Writer utilities from main utils.py +""" + +from io import BytesIO +from typing import Optional + + +def unsigned_to_varint(num: int) -> bytes: + """ + convert an unsigned int to varint. + """ + if num < 0 or num > 0xffffffffffffffff: + raise OverflowError(f"can't convert {num} to varint") + if num <= 0xfc: + return num.to_bytes(1, 'little') + elif num <= 0xffff: + return b'\xfd' + num.to_bytes(2, 'little') + elif num <= 0xffffffff: + return b'\xfe' + num.to_bytes(4, 'little') + else: + return b'\xff' + num.to_bytes(8, 'little') + + +class Writer(BytesIO): + """ + A writer for binary data + """ + + def write_bytes(self, data: bytes) -> None: + self.write(data) + + def write_uint8(self, num: int) -> None: + self.write(num.to_bytes(1, 'little')) + + def write_int8(self, num: int) -> None: + self.write(num.to_bytes(1, 'little', signed=True)) + + def write_uint16_le(self, num: int) -> None: + self.write(num.to_bytes(2, 'little')) + + def write_int16_le(self, num: int) -> None: + self.write(num.to_bytes(2, 'little', signed=True)) + + def write_uint32_le(self, num: int) -> None: + self.write(num.to_bytes(4, 'little')) + + def write_int32_le(self, num: int) -> None: + self.write(num.to_bytes(4, 'little', signed=True)) + + def write_uint64_le(self, num: int) -> None: + self.write(num.to_bytes(8, 'little')) + + def write_int64_le(self, num: int) -> None: + self.write(num.to_bytes(8, 'little', signed=True)) + + def write_uint16_be(self, num: int) -> None: + self.write(num.to_bytes(2, 'big')) + + def write_int16_be(self, num: int) -> None: + self.write(num.to_bytes(2, 'big', signed=True)) + + def write_uint32_be(self, num: int) -> None: + self.write(num.to_bytes(4, 'big')) + + def write_int32_be(self, num: int) -> None: + self.write(num.to_bytes(4, 'big', signed=True)) + + def write_uint64_be(self, num: int) -> None: + self.write(num.to_bytes(8, 'big')) + + def write_int64_be(self, num: int) -> None: + self.write(num.to_bytes(8, 'big', signed=True)) + + def write_var_int_num(self, n: int) -> None: + self.write(unsigned_to_varint(n)) + + @staticmethod + def var_int_num(n: int) -> bytes: + return unsigned_to_varint(n) + + +class Reader(BytesIO): + def __init__(self, data: bytes): + super().__init__(data) + + def eof(self) -> bool: + return self.tell() >= len(self.getvalue()) + + def read(self, length: int = None) -> bytes: + result = super().read(length) + return result if result else None + + def read_reverse(self, length: int = None) -> bytes: + data = self.read(length) + return data[::-1] if data else None + + def read_uint8(self) -> Optional[int]: + data = self.read(1) + return data[0] if data else None + + def read_int8(self) -> Optional[int]: + data = self.read(1) + return int.from_bytes(data, byteorder='big', signed=True) if data else None + + def read_uint16_be(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='big') if data else None + + def read_int16_be(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='big', signed=True) if data else None + + def read_uint32_be(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='big') if data else None + + def read_int32_be(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='big', signed=True) if data else None + + def read_uint64_be(self) -> Optional[int]: + data = self.read(8) + return int.from_bytes(data, byteorder='big') if data else None + + def read_int64_be(self) -> Optional[int]: + data = self.read(8) + return int.from_bytes(data, byteorder='big', signed=True) if data else None + + def read_uint16_le(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='little') if data else None + + def read_int16_le(self) -> Optional[int]: + data = self.read(2) + return int.from_bytes(data, byteorder='little', signed=True) if data else None + + def read_uint32_le(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='little') if data else None + + def read_int32_le(self) -> Optional[int]: + data = self.read(4) + return int.from_bytes(data, byteorder='little', signed=True) if data else None + + def read_uint64_le(self) -> Optional[int]: + data = self.read(8) + return int.from_bytes(data, byteorder='little') if data else None + + def read_int64_le(self) -> Optional[int]: + data = self.read(8) + return int.from_bytes(data, byteorder='little', signed=True) if data else None + + def read_var_int_num(self) -> Optional[int]: + """read varint""" + first_byte = self.read_uint8() + if first_byte is None: + return None + + if first_byte <= 0xfc: + return first_byte + elif first_byte == 0xfd: + return self.read_uint16_le() + elif first_byte == 0xfe: + return self.read_uint32_le() + elif first_byte == 0xff: + return self.read_uint64_le() + else: + return None diff --git a/bsv/utils/script.py b/bsv/utils/script.py new file mode 100644 index 0000000..e9f2062 --- /dev/null +++ b/bsv/utils/script.py @@ -0,0 +1,40 @@ +""" +script.py - Utilities for Bitcoin Script pushdata and integer encoding. +""" +from ..constants import OpCode +from .binary import unsigned_to_bytes + +def get_pushdata_code(byte_length: int) -> bytes: + if byte_length <= 0x4b: + return byte_length.to_bytes(1, 'little') + elif byte_length <= 0xff: + return OpCode.OP_PUSHDATA1 + byte_length.to_bytes(1, 'little') + elif byte_length <= 0xffff: + return OpCode.OP_PUSHDATA2 + byte_length.to_bytes(2, 'little') + elif byte_length <= 0xffffffff: + return OpCode.OP_PUSHDATA4 + byte_length.to_bytes(4, 'little') + else: + raise ValueError("data too long to encode in a PUSHDATA opcode") + +def encode_pushdata(pushdata: bytes, minimal_push: bool = True) -> bytes: + if minimal_push: + if pushdata == b'': + return OpCode.OP_0 + if len(pushdata) == 1 and 1 <= pushdata[0] <= 16: + return bytes([OpCode.OP_1[0] + pushdata[0] - 1]) + if len(pushdata) == 1 and pushdata[0] == 0x81: + return OpCode.OP_1NEGATE + else: + assert pushdata, 'empty pushdata' + return get_pushdata_code(len(pushdata)) + pushdata + +def encode_int(num: int) -> bytes: + if num == 0: + return OpCode.OP_0 + negative: bool = num < 0 + octets: bytearray = bytearray(unsigned_to_bytes(-num if negative else num, 'little')) + if octets[-1] & 0x80: + octets += b'\x00' + if negative: + octets[-1] |= 0x80 + return encode_pushdata(octets) \ No newline at end of file diff --git a/bsv/utils/script_chunks.py b/bsv/utils/script_chunks.py new file mode 100644 index 0000000..c3612d5 --- /dev/null +++ b/bsv/utils/script_chunks.py @@ -0,0 +1,59 @@ +from dataclasses import dataclass +from typing import Optional, List + + +@dataclass +class ScriptChunk: + op: int + data: Optional[bytes] + + +def read_script_chunks(script: bytes) -> List[ScriptChunk]: + chunks: List[ScriptChunk] = [] + i = 0 + n = len(script) + while i < n: + op = script[i] + i += 1 + if op <= 75: # direct push + ln = op + if i + ln > n: + break + chunks.append(ScriptChunk(op=op, data=script[i:i+ln])) + i += ln + continue + if op == 0x4C: # OP_PUSHDATA1 + if i >= n: + break + ln = script[i] + i += 1 + if i + ln > n: + break + chunks.append(ScriptChunk(op=op, data=script[i:i+ln])) + i += ln + continue + if op == 0x4D: # OP_PUSHDATA2 + if i + 1 >= n: + break + ln = int.from_bytes(script[i:i+2], 'little') + i += 2 + if i + ln > n: + break + chunks.append(ScriptChunk(op=op, data=script[i:i+ln])) + i += ln + continue + if op == 0x4E: # OP_PUSHDATA4 + if i + 3 >= n: + break + ln = int.from_bytes(script[i:i+4], 'little') + i += 4 + if i + ln > n: + break + chunks.append(ScriptChunk(op=op, data=script[i:i+ln])) + i += ln + continue + # Non-push opcodes + chunks.append(ScriptChunk(op=op, data=None)) + return chunks + + diff --git a/bsv/utils/writer.py b/bsv/utils/writer.py new file mode 100644 index 0000000..4876253 --- /dev/null +++ b/bsv/utils/writer.py @@ -0,0 +1,77 @@ +""" +writer.py - Writer class (binary writing utilities). +""" +import struct +from io import BytesIO + +class Writer(BytesIO): + def __init__(self): + super().__init__() + + def write(self, buf: bytes) -> 'Writer': + super().write(buf) + return self + + def write_reverse(self, buf: bytes) -> 'Writer': + super().write(buf[::-1]) + return self + + def write_uint8(self, n: int) -> 'Writer': + self.write(struct.pack('B', n)) + return self + + def write_int8(self, n: int) -> 'Writer': + self.write(struct.pack('b', n)) + return self + + def write_uint16_be(self, n: int) -> 'Writer': + self.write(struct.pack('>H', n)) + return self + + def write_int16_be(self, n: int) -> 'Writer': + self.write(struct.pack('>h', n)) + return self + + def write_uint16_le(self, n: int) -> 'Writer': + self.write(struct.pack(' 'Writer': + self.write(struct.pack(' 'Writer': + self.write(struct.pack('>I', n)) + return self + + def write_int32_be(self, n: int) -> 'Writer': + self.write(struct.pack('>i', n)) + return self + + def write_uint32_le(self, n: int) -> 'Writer': + self.write(struct.pack(' 'Writer': + self.write(struct.pack(' 'Writer': + self.write(struct.pack('>Q', n)) + return self + + def write_uint64_le(self, n: int) -> 'Writer': + self.write(struct.pack(' 'Writer': + self.write(self.var_int_num(n)) + return self + + def to_bytes(self) -> bytes: + return self.getvalue() + + @staticmethod + def var_int_num(n: int) -> bytes: + from .binary import unsigned_to_varint + return unsigned_to_varint(n) \ No newline at end of file