diff --git a/bsv/registry/__init__.py b/bsv/registry/__init__.py new file mode 100644 index 0000000..00a56ba --- /dev/null +++ b/bsv/registry/__init__.py @@ -0,0 +1,26 @@ +from .types import ( + DefinitionType, + CertificateFieldDescriptor, + BasketDefinitionData, + ProtocolDefinitionData, + CertificateDefinitionData, + DefinitionData, + TokenData, + RegistryRecord, +) + +from .client import RegistryClient + +__all__ = [ + "DefinitionType", + "CertificateFieldDescriptor", + "BasketDefinitionData", + "ProtocolDefinitionData", + "CertificateDefinitionData", + "DefinitionData", + "TokenData", + "RegistryRecord", + "RegistryClient", +] + + diff --git a/bsv/registry/client.py b/bsv/registry/client.py new file mode 100644 index 0000000..7dc3dce --- /dev/null +++ b/bsv/registry/client.py @@ -0,0 +1,370 @@ +from __future__ import annotations + +from dataclasses import asdict +from typing import Any, Dict, List, Optional, Tuple, Union, cast + +from bsv.registry.types import ( + DefinitionType, + BasketDefinitionData, + ProtocolDefinitionData, + CertificateDefinitionData, + DefinitionData, + TokenData, +) +from bsv.wallet.wallet_interface import WalletInterface +from bsv.wallet.key_deriver import Protocol as WalletProtocol +from bsv.transaction.pushdrop import ( + build_lock_before_pushdrop, + decode_lock_before_pushdrop, + make_pushdrop_unlocker, + SignOutputsMode, +) +from bsv.transaction import Transaction +from bsv.broadcaster_core import default_broadcaster +from bsv.overlay.lookup import LookupResolver, LookupQuestion +from bsv.overlay.topic import TopicBroadcaster, BroadcasterConfig + + +REGISTRANT_TOKEN_AMOUNT = 1 + + +def _map_definition_type_to_wallet_protocol(definition_type: DefinitionType) -> Dict[str, Any]: + if definition_type == "basket": + return {"securityLevel": 1, "protocol": "basketmap"} + if definition_type == "protocol": + return {"securityLevel": 1, "protocol": "protomap"} + if definition_type == "certificate": + return {"securityLevel": 1, "protocol": "certmap"} + raise ValueError(f"Unknown definition type: {definition_type}") + + +def _map_definition_type_to_basket_name(definition_type: DefinitionType) -> str: + return { + "basket": "basketmap", + "protocol": "protomap", + "certificate": "certmap", + }[definition_type] + + +def _build_pushdrop_fields(data: DefinitionData, registry_operator: str) -> List[bytes]: + if isinstance(data, BasketDefinitionData): + fields = [ + data.basketID, + data.name, + data.iconURL, + data.description, + data.documentationURL, + ] + elif isinstance(data, ProtocolDefinitionData): + import json + + fields = [ + json.dumps(data.protocolID), + data.name, + data.iconURL, + data.description, + data.documentationURL, + ] + elif isinstance(data, CertificateDefinitionData): + import json + + fields = [ + data.type, + data.name, + data.iconURL, + data.description, + data.documentationURL, + json.dumps(data.fields), + ] + else: + raise ValueError("Unsupported definition type") + + fields.append(registry_operator) + return [f.encode("utf-8") for f in fields] + + +def _parse_locking_script(definition_type: DefinitionType, locking_script_hex: str) -> DefinitionData: + from bsv.script.script import Script + + script = Script(locking_script_hex) + decoded = decode_lock_before_pushdrop(script.serialize()) + if not decoded or not decoded.get("fields"): + raise ValueError("Not a valid registry pushdrop script") + + fields: List[bytes] = cast(List[bytes], decoded["fields"]) + + # Expect last field is registry operator + if definition_type == "basket": + if len(fields) != 6: + raise ValueError("Unexpected field count for basket type") + return BasketDefinitionData( + definitionType="basket", + basketID=fields[0].decode(), + name=fields[1].decode(), + iconURL=fields[2].decode(), + description=fields[3].decode(), + documentationURL=fields[4].decode(), + registryOperator=fields[5].decode(), + ) + if definition_type == "protocol": + if len(fields) != 6: + raise ValueError("Unexpected field count for protocol type") + import json + + return ProtocolDefinitionData( + definitionType="protocol", + protocolID=json.loads(fields[0].decode()), + name=fields[1].decode(), + iconURL=fields[2].decode(), + description=fields[3].decode(), + documentationURL=fields[4].decode(), + registryOperator=fields[5].decode(), + ) + if definition_type == "certificate": + if len(fields) != 7: + raise ValueError("Unexpected field count for certificate type") + import json + + parsed_fields: Dict[str, Any] + try: + parsed_fields = json.loads(fields[5].decode()) + except Exception: + parsed_fields = {} + return CertificateDefinitionData( + definitionType="certificate", + type=fields[0].decode(), + name=fields[1].decode(), + iconURL=fields[2].decode(), + description=fields[3].decode(), + documentationURL=fields[4].decode(), + fields=cast(Dict[str, Any], parsed_fields), + registryOperator=fields[6].decode(), + ) + raise ValueError(f"Unsupported definition type: {definition_type}") + + +class RegistryClient: + def __init__(self, wallet: WalletInterface, originator: str = "registry-client") -> None: + self.wallet = wallet + self.originator = originator + self._resolver = LookupResolver() + + def register_definition(self, ctx: Any, data: DefinitionData) -> Dict[str, Any]: + pub = self.wallet.get_public_key(ctx, {"identityKey": True}, self.originator) or {} + operator = cast(str, pub.get("publicKey") or "") + + protocol = _map_definition_type_to_wallet_protocol(data.definitionType) + fields = _build_pushdrop_fields(data, operator) + + # Build lock-before pushdrop script + from bsv.keys import PublicKey + + op_bytes = PublicKey(operator).serialize(compressed=True) + locking_script_bytes = build_lock_before_pushdrop(fields, op_bytes, include_signature=False) + + # Create transaction + randomize_outputs = False + ca_res = self.wallet.create_action( + ctx, + { + "description": f"Register a new {data.definitionType} item", + "outputs": [ + { + "satoshis": REGISTRANT_TOKEN_AMOUNT, + "lockingScript": locking_script_bytes, + "outputDescription": f"New {data.definitionType} registration token", + "basket": _map_definition_type_to_basket_name(data.definitionType), + } + ], + "options": {"randomizeOutputs": randomize_outputs}, + }, + self.originator, + ) or {} + + # For now, return create_action-like structure; broadcasting can be done by caller via Transaction.broadcast + return ca_res + + def list_own_registry_entries(self, ctx: Any, definition_type: DefinitionType) -> List[Dict[str, Any]]: + include_instructions = True + include_tags = True + include_labels = True + lo = self.wallet.list_outputs( + ctx, + { + "basket": _map_definition_type_to_basket_name(definition_type), + "include": "entire transactions", + "includeCustomInstructions": include_instructions, + "includeTags": include_tags, + "includeLabels": include_labels, + }, + self.originator, + ) or {} + + outputs = cast(List[Dict[str, Any]], lo.get("outputs") or []) + beef = cast(bytes, lo.get("BEEF") or b"") + results: List[Dict[str, Any]] = [] + if not outputs or not beef: + return results + + try: + tx = Transaction.from_beef(beef) + except Exception: + return results + + for out in outputs: + if not out.get("spendable", False): + continue + idx = int(out.get("outputIndex", 0)) + try: + ls_hex = tx.outputs[idx].locking_script.hex() + except Exception: + continue + try: + record = _parse_locking_script(definition_type, ls_hex) + except Exception: + continue + # Merge with token data + results.append( + { + **asdict(record), + "txid": out.get("txid", ""), + "outputIndex": idx, + "satoshis": int(out.get("satoshis", 0)), + "lockingScript": ls_hex, + "beef": beef, + } + ) + + return results + + def revoke_own_registry_entry(self, ctx: Any, record: Dict[str, Any]) -> Dict[str, Any]: + # Owner check: ensure this wallet controls the registry operator key + me = self.wallet.get_public_key(ctx, {"identityKey": True}, self.originator) or {} + my_pub = cast(str, me.get("publicKey") or "") + operator = cast(str, record.get("registryOperator") or "") + if operator and my_pub and operator.lower() != my_pub.lower(): + raise ValueError("this registry token does not belong to the current wallet") + + txid = cast(str, record.get("txid") or "") + output_index = int(record.get("outputIndex") or 0) + beef = cast(bytes, record.get("beef") or b"") + satoshis = int(record.get("satoshis") or 0) + if not txid or not beef: + raise ValueError("Invalid registry record - missing txid or beef") + + # Create partial transaction that spends the registry UTXO + ca_res = self.wallet.create_action( + ctx, + { + "description": f"Revoke {record.get('definitionType', 'registry')} item", + "inputBEEF": beef, + "inputs": [ + { + "outpoint": f"{txid}.{output_index}", + "unlockingScriptLength": 73, + "inputDescription": "Revoking registry token", + } + ], + }, + self.originator, + ) or {} + + signable = cast(Dict[str, Any], (ca_res.get("signableTransaction") or {})) + reference = signable.get("reference") or b"" + + # Build a real unlocker and sign the partial transaction input + # signableTransaction.tx is expected to be raw tx bytes (WalletWire signable), not BEEF + partial_tx = ( + Transaction.from_hex(cast(bytes, signable.get("tx"))) + if signable.get("tx") + else Transaction() + ) + unlocker = make_pushdrop_unlocker( + self.wallet, + protocol_id=_map_definition_type_to_wallet_protocol(cast(DefinitionType, record.get("definitionType", "basket"))), + key_id="1", + counterparty={"type": 2}, # anyone + sign_outputs_mode=SignOutputsMode.ALL, + anyone_can_pay=False, + prev_txid=txid, + prev_vout=output_index, + prev_satoshis=satoshis, + prev_locking_script=bytes.fromhex(cast(str, record.get("lockingScript", ""))) if record.get("lockingScript") else None, + ) + unlocking_script = unlocker.sign(ctx, partial_tx, output_index) + + spends = {output_index: {"unlockingScript": unlocking_script}} + sign_res = self.wallet.sign_action( + ctx, + { + "reference": reference, + "spends": spends, + "options": {"acceptDelayedBroadcast": False}, + }, + self.originator, + ) or {} + + # Broadcast via default broadcaster if tx present + tx_bytes = cast(bytes, sign_res.get("tx") or b"") + if tx_bytes: + try: + tx = Transaction.from_hex(tx_bytes) + # Broadcast via topic mapping (tm_*) using TopicBroadcaster + topic_map = { + "basket": "tm_basketmap", + "protocol": "tm_protomap", + "certificate": "tm_certmap", + } + topic = topic_map.get(cast(str, record.get("definitionType", "basket")), "tm_basketmap") + # network preset from wallet + net_res = self.wallet.get_network(ctx, {}, self.originator) or {} + network_preset = cast(str, net_res.get("network") or "mainnet") + tb = TopicBroadcaster([topic], BroadcasterConfig(network_preset)) + try: + tb.sync_broadcast(tx) + except Exception: + pass + except Exception: + pass + return sign_res + + def resolve(self, ctx: Any, definition_type: DefinitionType, query: Dict[str, Any], resolver: Optional[Any] = None) -> List[DefinitionData]: + """Resolve registry records using a provided resolver compatible with TS/Go. + + Resolver signature: resolver(ctx, service_name: str, query: Dict) -> List[{"beef": bytes, "outputIndex": int}] + Service names: ls_basketmap | ls_protomap | ls_certmap + """ + if resolver is None: + return [] + + service_name = {"basket": "ls_basketmap", "protocol": "ls_protomap", "certificate": "ls_certmap"}[definition_type] + self._resolver.set_backend(resolver) + ans = self._resolver.query(ctx, LookupQuestion(service=service_name, query=query)) + outputs = [{"beef": o.beef, "outputIndex": o.outputIndex} for o in ans.outputs] + parsed: List[DefinitionData] = [] + for o in outputs: + try: + tx = Transaction.from_beef(cast(bytes, o.get("beef") or b"")) + idx = int(o.get("outputIndex") or 0) + ls_hex = tx.outputs[idx].locking_script.hex() + rec = _parse_locking_script(definition_type, ls_hex) + parsed.append(rec) + except Exception: + continue + if parsed: + return parsed + # Fallback: use list_own_registry_entries and re-parse locking scripts + own = self.list_own_registry_entries(ctx, definition_type) + for it in own: + try: + ls_hex = cast(str, it.get("lockingScript", "")) + rec = _parse_locking_script(definition_type, ls_hex) + parsed.append(rec) + except Exception: + continue + # Apply simple filters if present + if definition_type == "basket" and "basketID" in query: + parsed = [r for r in parsed if getattr(r, "basketID", None) == query.get("basketID")] + return parsed + + diff --git a/bsv/registry/resolver.py b/bsv/registry/resolver.py new file mode 100644 index 0000000..7a6493d --- /dev/null +++ b/bsv/registry/resolver.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Optional, cast +import os + +from bsv.registry.types import DefinitionType +from bsv.registry.client import _parse_locking_script +from bsv.transaction import Transaction +from bsv.wallet.wallet_interface import WalletInterface + + +def _basket_name(definition_type: DefinitionType) -> str: + return { + "basket": "basketmap", + "protocol": "protomap", + "certificate": "certmap", + }[definition_type] + + +class WalletWireResolver: + """Simple resolver that uses the wallet wire list_outputs to emulate a lookup service. + + This does not discover global registry entries across the network; it queries the connected + wallet and filters locally by parsed registry fields. + """ + + def __init__(self, wallet: WalletInterface, originator: str = "registry-resolver") -> None: + self.wallet = wallet + self.originator = originator + + def __call__(self, ctx: Any, service_name: str, query: Dict[str, Any]) -> List[Dict[str, Any]]: + # Map service name to definition type (TS/Go alias) + # For responsibility separation and reusability + # __call__(service_name, ...) is the interoperability entry point, query(definition_type, ...) is the actual logic. + # The mapping allows both to be unified, and the internal logic is reusable and readable. + # Even if service names increase or change in the future, only the mapping needs to be updated. + # The design allows for invalid service names to be handled gracefully. + + service_to_type = { + "ls_basketmap": "basket", + "ls_protomap": "protocol", + "ls_certmap": "certificate", + } + definition_type = cast(DefinitionType, service_to_type.get(service_name)) + if not definition_type: + return [] + return self.query(ctx, definition_type, query) + + def query(self, ctx: Any, definition_type: DefinitionType, query: Dict[str, Any]) -> List[Dict[str, Any]]: + lo = self.wallet.list_outputs( + ctx, + { + "basket": _basket_name(definition_type), + "include": "entire transactions", + }, + self.originator, + ) or {} + + outputs = cast(List[Dict[str, Any]], lo.get("outputs") or []) + if os.getenv("REGISTRY_DEBUG") == "1": + print("[DEBUG resolver.outputs]", len(outputs), outputs[:1]) + # For WalletWire-backed resolver, prefer direct lockingScript from outputs (BEEF not required) + + matches: List[Dict[str, Any]] = [] + for out in outputs: + idx = int(out.get("outputIndex", 0)) + try: + ls_field = out.get("lockingScript") or "" + if isinstance(ls_field, str): + ls_hex = ls_field + else: + from bsv.script.script import Script + ls_hex = Script(cast(bytes, ls_field)).hex() + rec = _parse_locking_script(definition_type, ls_hex) + except Exception: + continue + + # NOTE: WalletWireResolver only targets outputs within the wallet for simple interoperability. + # The main Lookup is for global search + detailed filtering, but here we only keep it at the basket level. + + matches.append({"beef": b"", "outputIndex": idx}) + + return matches + + diff --git a/bsv/registry/types.py b/bsv/registry/types.py new file mode 100644 index 0000000..812ab25 --- /dev/null +++ b/bsv/registry/types.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal, TypedDict, Dict, Union, List, Any + +DefinitionType = Literal["basket", "protocol", "certificate"] + + +class CertificateFieldDescriptor(TypedDict): + friendlyName: str + description: str + type: Literal["text", "imageURL", "other"] + fieldIcon: str + + +@dataclass +class BasketDefinitionData: + definitionType: Literal["basket"] + basketID: str + name: str + iconURL: str + description: str + documentationURL: str + registryOperator: str | None = None + + +@dataclass +class ProtocolDefinitionData: + definitionType: Literal["protocol"] + protocolID: Dict[str, Any] # WalletProtocol-like: {securityLevel, protocol} + name: str + iconURL: str + description: str + documentationURL: str + registryOperator: str | None = None + + +@dataclass +class CertificateDefinitionData: + definitionType: Literal["certificate"] + type: str + name: str + iconURL: str + description: str + documentationURL: str + fields: Dict[str, CertificateFieldDescriptor] + registryOperator: str | None = None + + +DefinitionData = Union[ + BasketDefinitionData, + ProtocolDefinitionData, + CertificateDefinitionData, +] + + +@dataclass +class TokenData: + txid: str + outputIndex: int + satoshis: int + lockingScript: str + beef: bytes + + +RegistryRecord = Union[ + BasketDefinitionData, + ProtocolDefinitionData, + CertificateDefinitionData, +] # will be merged with TokenData at runtime where needed + +