Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions bsv/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
BSV Utils Package

This package contains various utility functions for BSV blockchain operations.
"""

# Import commonly used utilities from submodules
from bsv.utils.base58_utils import from_base58, to_base58, from_base58_check, to_base58_check
from bsv.utils.binary import to_hex, from_hex, unsigned_to_varint, varint_to_unsigned, to_utf8, encode, to_base64
from bsv.utils.encoding import BytesList, BytesHex, Bytes32Base64, Bytes33Hex, StringBase64, Signature
from bsv.utils.pushdata import encode_pushdata, get_pushdata_code
from bsv.utils.script_chunks import read_script_chunks
from bsv.utils.reader import Reader
from bsv.utils.writer import Writer
from bsv.utils.misc import randbytes, bytes_to_bits, bits_to_bytes
from bsv.hash import hash256
from bsv.utils.address import decode_address, validate_address

# Import legacy functions in a clean, maintainable way
from bsv.utils.legacy import (
decode_wif,
text_digest,
stringify_ecdsa_recoverable,
unstringify_ecdsa_recoverable,
deserialize_ecdsa_recoverable,
serialize_ecdsa_der,
address_to_public_key_hash,
encode_int,
unsigned_to_bytes,
deserialize_ecdsa_der,
to_bytes,
reverse_hex_byte_order,
serialize_ecdsa_recoverable,
)

__all__ = [
# Base58 functions
'from_base58', 'to_base58', 'from_base58_check', 'to_base58_check',
# Binary functions
'to_hex', 'from_hex', 'unsigned_to_varint', 'varint_to_unsigned',
# Encoding classes
'BytesList', 'BytesHex', 'Bytes32Base64', 'Bytes33Hex', 'StringBase64', 'Signature',
# Pushdata functions
'encode_pushdata', 'get_pushdata_code', 'read_script_chunks',
# Reader/Writer classes
'Reader', 'Writer',
# Random bytes utility re-exported from bsv/utils.py
'randbytes', 'bytes_to_bits', 'bits_to_bytes',
# Hash helpers
'hash256',
# Address helpers
'decode_address', 'validate_address',
# Functions from main utils.py
'decode_wif', 'text_digest', 'stringify_ecdsa_recoverable',
'unstringify_ecdsa_recoverable', 'deserialize_ecdsa_recoverable',
'serialize_ecdsa_der', 'address_to_public_key_hash', 'encode_int', 'unsigned_to_bytes', 'deserialize_ecdsa_der', 'to_bytes', 'reverse_hex_byte_order',
'serialize_ecdsa_recoverable',
# binary.py から追加
'to_utf8', 'encode', 'to_base64',
]
39 changes: 39 additions & 0 deletions bsv/utils/address.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
address.py - Utilities for address and WIF decoding/validation.
"""
import re
from typing import Tuple, Optional
from ..constants import Network, ADDRESS_PREFIX_NETWORK_DICT, WIF_PREFIX_NETWORK_DICT
from .base58_utils import from_base58_check

def decode_address(address: str) -> Tuple[bytes, Network]:
if not re.match(r'^[1mn][a-km-zA-HJ-NP-Z1-9]{24,33}$', address):
raise ValueError(f'invalid P2PKH address {address}')
from ..base58 import base58check_decode
decoded = base58check_decode(address)
prefix = decoded[:1]
network = ADDRESS_PREFIX_NETWORK_DICT.get(prefix)
return decoded[1:], network

def validate_address(address: str, network: Optional[Network] = None) -> bool:
from contextlib import suppress
with suppress(Exception):
_, _network = decode_address(address)
if network is not None:
return _network == network
return True
return False

def address_to_public_key_hash(address: str) -> bytes:
return decode_address(address)[0]

def decode_wif(wif: str) -> Tuple[bytes, bool, Network]:
from ..base58 import base58check_decode
decoded = base58check_decode(wif)
prefix = decoded[:1]
network = WIF_PREFIX_NETWORK_DICT.get(prefix)
if not network:
raise ValueError(f'unknown WIF prefix {prefix.hex()}')
if len(wif) == 52 and decoded[-1] == 1:
return decoded[1:-1], True, network
return decoded[1:], False, network
64 changes: 64 additions & 0 deletions bsv/utils/base58_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
base58_utils.py - Utilities for Base58 and Base58Check encoding/decoding.
"""
from typing import List, Optional

base58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'

def from_base58(str_: str) -> List[int]:
if not str_ or not isinstance(str_, str):
raise ValueError(f"Expected base58 string but got '{str_}'")
if '0' in str_ or 'I' in str_ or 'O' in str_ or 'l' in str_:
raise ValueError(f"Invalid base58 character in '{str_}'")
lz = len(str_) - len(str_.lstrip('1'))
psz = lz
acc = 0
for char in str_:
acc = acc * 58 + base58chars.index(char)
result = []
while acc > 0:
result.append(acc % 256)
acc //= 256
return [0] * psz + list(reversed(result))

def to_base58(bin_: List[int]) -> str:
acc = 0
for byte in bin_:
acc = acc * 256 + byte
result = ''
while acc > 0:
acc, mod = divmod(acc, 58)
result = base58chars[mod] + result
for byte in bin_:
if byte == 0:
result = '1' + result
else:
break
return result

def to_base58_check(bin_: List[int], prefix: Optional[List[int]] = None) -> str:
import hashlib
if prefix is None:
prefix = [0]
hash_ = hashlib.sha256(hashlib.sha256(bytes(prefix + bin_)).digest()).digest()
return to_base58(prefix + bin_ + list(hash_[:4]))

def from_base58_check(str_: str, enc: Optional[str] = None, prefix_length: int = 1):
import hashlib
try:
from .binary import to_hex
except ImportError:
# Fallback if relative import fails
def to_hex(data):
return data.hex()
bin_ = from_base58(str_)
prefix = bin_[:prefix_length]
data = bin_[prefix_length:-4]
checksum = bin_[-4:]
hash_ = hashlib.sha256(hashlib.sha256(bytes(prefix + data)).digest()).digest()
if list(hash_[:4]) != checksum:
raise ValueError('Invalid checksum')
if enc == 'hex':
prefix = to_hex(bytes(prefix))
data = to_hex(bytes(data))
return {'prefix': prefix, 'data': data}
86 changes: 86 additions & 0 deletions bsv/utils/binary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
binary.py - Utilities for byte/number conversion, varint, and encoding/decoding.
"""
import math
from typing import Union, List, Optional, Literal

def unsigned_to_varint(num: int) -> bytes:
if num < 0 or num > 0xffffffffffffffff:
raise OverflowError(f"can't convert {num} to varint")
if num <= 0xfc:
return num.to_bytes(1, 'little')
elif num <= 0xffff:
return b'\xfd' + num.to_bytes(2, 'little')
elif num <= 0xffffffff:
return b'\xfe' + num.to_bytes(4, 'little')
else:
return b'\xff' + num.to_bytes(8, 'little')

def varint_to_unsigned(data: bytes) -> tuple[int, int]:
"""Convert varint bytes to unsigned int. Returns (value, bytes_consumed)"""
if not data:
raise ValueError("Empty data for varint")

first_byte = data[0]
if first_byte <= 0xfc:
return first_byte, 1
elif first_byte == 0xfd:
if len(data) < 3:
raise ValueError("Insufficient data for 2-byte varint")
return int.from_bytes(data[1:3], 'little'), 3
elif first_byte == 0xfe:
if len(data) < 5:
raise ValueError("Insufficient data for 4-byte varint")
return int.from_bytes(data[1:5], 'little'), 5
elif first_byte == 0xff:
if len(data) < 9:
raise ValueError("Insufficient data for 8-byte varint")
return int.from_bytes(data[1:9], 'little'), 9
else:
raise ValueError(f"Invalid varint prefix: {first_byte}")

def unsigned_to_bytes(num: int, byteorder: Literal['big', 'little'] = 'big') -> bytes:
return num.to_bytes(math.ceil(num.bit_length() / 8) or 1, byteorder)

def to_hex(byte_array: bytes) -> str:
return byte_array.hex()

def from_hex(hex_string: str) -> bytes:
"""Convert hex string to bytes"""
# Remove any whitespace and ensure even length
hex_string = ''.join(hex_string.split())
if len(hex_string) % 2 != 0:
hex_string = '0' + hex_string
return bytes.fromhex(hex_string)

def to_bytes(msg: Union[bytes, str], enc: Optional[str] = None) -> bytes:
if isinstance(msg, bytes):
return msg
if not msg:
return bytes()
if isinstance(msg, str):
if enc == 'hex':
msg = ''.join(filter(str.isalnum, msg))
if len(msg) % 2 != 0:
msg = '0' + msg
return bytes(int(msg[i:i + 2], 16) for i in range(0, len(msg), 2))
elif enc == 'base64':
import base64
return base64.b64decode(msg)
else: # UTF-8 encoding
return msg.encode('utf-8')
return bytes(msg)

def to_utf8(arr: List[int]) -> str:
return bytes(arr).decode('utf-8')

def encode(arr: List[int], enc: Optional[str] = None) -> Union[str, List[int]]:
if enc == 'hex':
return to_hex(bytes(arr))
elif enc == 'utf8':
return to_utf8(arr)
return arr

def to_base64(byte_array: List[int]) -> str:
import base64
return base64.b64encode(bytes(byte_array)).decode('ascii')
69 changes: 69 additions & 0 deletions bsv/utils/ecdsa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
ecdsa.py - Utilities for ECDSA signature serialization/deserialization.
"""
from base64 import b64encode, b64decode
from typing import Tuple
from ..constants import NUMBER_BYTE_LENGTH
from ..curve import curve

def deserialize_ecdsa_der(signature: bytes) -> Tuple[int, int]:
try:
assert signature[0] == 0x30
assert int(signature[1]) == len(signature) - 2
assert signature[2] == 0x02
r_len = int(signature[3])
r = int.from_bytes(signature[4: 4 + r_len], 'big')
assert signature[4 + r_len] == 0x02
s_len = int(signature[5 + r_len])
s = int.from_bytes(signature[-s_len:], 'big')
return r, s
except Exception:
raise ValueError(f'invalid DER encoded {signature.hex()}')

def serialize_ecdsa_der(signature: Tuple[int, int]) -> bytes:
r, s = signature
if s > curve.n // 2:
s = curve.n - s
r_bytes = r.to_bytes(NUMBER_BYTE_LENGTH, 'big').lstrip(b'\x00')
if r_bytes[0] & 0x80:
r_bytes = b'\x00' + r_bytes
serialized = bytes([2, len(r_bytes)]) + r_bytes
s_bytes = s.to_bytes(NUMBER_BYTE_LENGTH, 'big').lstrip(b'\x00')
if s_bytes[0] & 0x80:
s_bytes = b'\x00' + s_bytes
serialized += bytes([2, len(s_bytes)]) + s_bytes
return bytes([0x30, len(serialized)]) + serialized

def deserialize_ecdsa_recoverable(signature: bytes) -> Tuple[int, int, int]:
assert len(signature) == 65, 'invalid length of recoverable ECDSA signature'
rec_id = signature[-1]
assert 0 <= rec_id <= 3, f'invalid recovery id {rec_id}'
r = int.from_bytes(signature[:NUMBER_BYTE_LENGTH], 'big')
s = int.from_bytes(signature[NUMBER_BYTE_LENGTH:-1], 'big')
return r, s, rec_id

def serialize_ecdsa_recoverable(signature: Tuple[int, int, int]) -> bytes:
_r, _s, _rec_id = signature
assert 0 <= _rec_id < 4, f'invalid recovery id {_rec_id}'
r = _r.to_bytes(NUMBER_BYTE_LENGTH, 'big')
s = _s.to_bytes(NUMBER_BYTE_LENGTH, 'big')
rec_id = _rec_id.to_bytes(1, 'big')
return r + s + rec_id

def stringify_ecdsa_recoverable(signature: bytes, compressed: bool = True) -> str:
r, s, recovery_id = deserialize_ecdsa_recoverable(signature)
prefix: int = 27 + recovery_id + (4 if compressed else 0)
signature: bytes = prefix.to_bytes(1, 'big') + signature[:-1]
return b64encode(signature).decode('ascii')

def unstringify_ecdsa_recoverable(signature: str) -> Tuple[bytes, bool]:
serialized = b64decode(signature)
assert len(serialized) == 65, 'invalid length of recoverable ECDSA signature'
prefix = serialized[0]
assert 27 <= prefix < 35, f'invalid recoverable ECDSA signature prefix {prefix}'
compressed = False
if prefix >= 31:
compressed = True
prefix -= 4
recovery_id = prefix - 27
return serialized[1:] + recovery_id.to_bytes(1, 'big'), compressed
63 changes: 63 additions & 0 deletions bsv/utils/encoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import base64
import json
from typing import Any, List, Union

class BytesList(bytes):
def to_json(self) -> str:
# JSON array of numbers
return json.dumps([b for b in self])
@staticmethod
def from_json(data: str) -> 'BytesList':
arr = json.loads(data)
return BytesList(bytes(arr))

class BytesHex(bytes):
def to_json(self) -> str:
return json.dumps(self.hex())
@staticmethod
def from_json(data: str) -> 'BytesHex':
s = json.loads(data)
return BytesHex(bytes.fromhex(s))

class Bytes32Base64(bytes):
def __new__(cls, b: bytes):
if len(b) != 32:
raise ValueError(f"Bytes32Base64: expected 32 bytes, got {len(b)}")
return super().__new__(cls, b)
def to_json(self) -> str:
return json.dumps(base64.b64encode(self).decode('ascii'))
@staticmethod
def from_json(data: str) -> 'Bytes32Base64':
s = json.loads(data)
b = base64.b64decode(s)
return Bytes32Base64(b)

class Bytes33Hex(bytes):
def __new__(cls, b: bytes):
if len(b) != 33:
raise ValueError(f"Bytes33Hex: expected 33 bytes, got {len(b)}")
return super().__new__(cls, b)
def to_json(self) -> str:
return json.dumps(self.hex())
@staticmethod
def from_json(data: str) -> 'Bytes33Hex':
s = json.loads(data)
return Bytes33Hex(bytes.fromhex(s))

class StringBase64(str):
def to_array(self) -> bytes:
return base64.b64decode(self)
@staticmethod
def from_array(arr: bytes) -> 'StringBase64':
return StringBase64(base64.b64encode(arr).decode('ascii'))

class Signature:
def __init__(self, sig_bytes: bytes):
self.sig_bytes = sig_bytes
def to_json(self) -> str:
# serialize as array of numbers
return json.dumps([b for b in self.sig_bytes])
@staticmethod
def from_json(data: str) -> 'Signature':
arr = json.loads(data)
return Signature(bytes(arr))
Loading