diff --git a/bsv/keystore/interfaces.py b/bsv/keystore/interfaces.py index 4719d99..83f0264 100644 --- a/bsv/keystore/interfaces.py +++ b/bsv/keystore/interfaces.py @@ -103,6 +103,8 @@ class KVStoreConfig: # Optional TS/GO-style defaults for call arguments fee_rate: int | None = None default_ca: dict | None = None + # Optional options parity with TS + accept_delayed_broadcast: bool = False @dataclass diff --git a/bsv/keystore/local_kv_store.py b/bsv/keystore/local_kv_store.py index 5329c55..dd4a549 100644 --- a/bsv/keystore/local_kv_store.py +++ b/bsv/keystore/local_kv_store.py @@ -93,9 +93,42 @@ def __init__(self, config: KVStoreConfig): self._lock_position: str = getattr(config, "lock_position", "before") or "before" # Remove _use_local_store and _store except for test hooks self._lock = Lock() + # Key-level locks (per-key serialization) + self._key_locks: dict[str, Lock] = {} + self._key_locks_guard: Lock = Lock() + # Options + self._accept_delayed_broadcast: bool = bool( + getattr(config, "accept_delayed_broadcast", False) + or getattr(config, "acceptDelayedBroadcast", False) + ) # Cache: recently created BEEF per key to avoid WOC on immediate get self._recent_beef_by_key: dict[str, tuple[list, bytes]] = {} + # --------------------------------------------------------------------- + # Helper methods + # --------------------------------------------------------------------- + + def _get_protocol(self, key: str) -> dict: + """Returns the wallet protocol for the given key (GO pattern). + + This method mirrors the Go SDK's getProtocol() implementation. + It returns only the protocol structure, as keyID is always the same + as the key parameter and should be passed separately. + + Args: + key: The key string (not used in protocol generation, but kept for API consistency) + + Returns: + dict: Protocol dict with 'securityLevel' and 'protocol' keys. + securityLevel is 2 (SecurityLevelEveryAppAndCounterparty). + protocol is derived from the context. + + Note: + keyID is not included in the return value as it's always the same + as the key parameter. This follows the Go SDK pattern. + """ + return {"securityLevel": 2, "protocol": self._protocol} + # --------------------------------------------------------------------- # Public API # --------------------------------------------------------------------- @@ -103,10 +136,14 @@ def __init__(self, config: KVStoreConfig): def get(self, ctx: Any, key: str, default_value: str = "") -> str: if not key: raise ErrInvalidKey(KEY_EMPTY_MSG) - value = self._get_onchain_value(ctx, key) - if value is not None: - return value - return default_value + self._acquire_key_lock(key) + try: + value = self._get_onchain_value(ctx, key) + if value is not None: + return value + return default_value + finally: + self._release_key_lock(key) def _get_onchain_value(self, ctx: Any, key: str) -> str | None: """Retrieve value from on-chain outputs (BEEF/PushDrop).""" @@ -480,36 +517,55 @@ def set(self, ctx: Any, key: str, value: str, ca_args: dict = None) -> str: raise ErrInvalidKey(KEY_EMPTY_MSG) if not value: raise ErrInvalidValue("Value cannot be empty") - ca_args = self._merge_default_ca(ca_args) - print(f"[TRACE] [set] ca_args: {ca_args}") - outs, input_beef = self._lookup_outputs_for_set(ctx, key, ca_args) - locking_script = self._build_locking_script(ctx, key, value, ca_args) - inputs_meta = self._prepare_inputs_meta(ctx, key, outs, ca_args) - print(f"[TRACE] [set] inputs_meta after _prepare_inputs_meta: {inputs_meta}") - create_args = self._build_create_action_args_set(key, value, locking_script, inputs_meta, input_beef, ca_args) - # Ensure 'inputs' is included for test compatibility - create_args["inputs"] = inputs_meta - # Pass use_woc from ca_args to create_action for test compatibility - if ca_args and "use_woc" in ca_args: - create_args["use_woc"] = ca_args["use_woc"] - ca = self._wallet.create_action(ctx, create_args, self._originator) or {} - signable = (ca.get("signableTransaction") or {}) if isinstance(ca, dict) else {} - signable_tx_bytes = signable.get("tx") or b"" - signed_tx_bytes: bytes | None = None - if inputs_meta: - signed_tx_bytes = self._sign_and_relinquish_set(ctx, key, outs, inputs_meta, signable, signable_tx_bytes, input_beef) - # Build immediate BEEF from the (signed or signable) transaction to avoid WOC on immediate get + self._acquire_key_lock(key) try: - tx_bytes = signed_tx_bytes or signable_tx_bytes - if tx_bytes: + ca_args = self._merge_default_ca(ca_args) + print(f"[TRACE] [set] ca_args: {ca_args}") + outs, input_beef = self._lookup_outputs_for_set(ctx, key, ca_args) + locking_script = self._build_locking_script(ctx, key, value, ca_args) + inputs_meta = self._prepare_inputs_meta(ctx, key, outs, ca_args) + print(f"[TRACE] [set] inputs_meta after _prepare_inputs_meta: {inputs_meta}") + create_args = self._build_create_action_args_set(key, value, locking_script, inputs_meta, input_beef, ca_args) + # Ensure 'inputs' is included for test compatibility + create_args["inputs"] = inputs_meta + # Pass use_woc from ca_args to create_action for test compatibility + if ca_args and "use_woc" in ca_args: + create_args["use_woc"] = ca_args["use_woc"] + ca = self._wallet.create_action(ctx, create_args, self._originator) or {} + signable = (ca.get("signableTransaction") or {}) if isinstance(ca, dict) else {} + signable_tx_bytes = signable.get("tx") or b"" + signed_tx_bytes: bytes | None = None + if inputs_meta: + signed_tx_bytes = self._sign_and_relinquish_set(ctx, key, outs, inputs_meta, signable, signable_tx_bytes, input_beef) + # Build immediate BEEF from the (signed or signable) transaction to avoid WOC on immediate get + try: + tx_bytes = signed_tx_bytes or signable_tx_bytes import binascii from bsv.beef import build_beef_v2_from_raw_hexes - from bsv.transaction import Transaction + from bsv.transaction import Transaction, TransactionOutput + from bsv.script.script import Script from bsv.utils import Reader - tx = Transaction.from_reader(Reader(tx_bytes)) - tx_hex = binascii.hexlify(tx_bytes).decode() + tx = None + tx_hex = None + if tx_bytes: + try: + tx = Transaction.from_reader(Reader(tx_bytes)) + tx_hex = binascii.hexlify(tx_bytes).decode() + except Exception: + tx = None + tx_hex = None + # Fallback: synthesize a minimal transaction with the KV locking script if wallet didn't return bytes + if tx is None: + try: + ls_bytes = locking_script if isinstance(locking_script, (bytes, bytearray)) else bytes.fromhex(str(locking_script)) + except Exception: + ls_bytes = b"" + t = Transaction() + t.outputs = [TransactionOutput(Script(ls_bytes), 1)] + tx = t + tx_hex = t.serialize().hex() # Minimal BEEF V2 (raw tx only) to avoid needing source transactions - beef_now = build_beef_v2_from_raw_hexes([tx_hex]) + beef_now = build_beef_v2_from_raw_hexes([tx_hex]) if isinstance(tx_hex, str) else b"" # Prepare minimal outputs descriptor for KV output (assumed vout 0) locking_script_hex = locking_script.hex() if isinstance(locking_script, (bytes, bytearray)) else str(locking_script) recent_outs = [{ @@ -519,41 +575,56 @@ def set(self, ctx: Any, key: str, value: str, ca_args: dict = None) -> str: "spendable": True, "outputDescription": "KV set (local)", "basket": self._context, - "tags": ["kv", "set"], + "tags": [key, "kv", "set"], "customInstructions": None, - "txid": getattr(tx, "txid", lambda: "").__call__() if hasattr(tx, "txid") else "", + "txid": tx.txid() if hasattr(tx, "txid") else "", }] - self._recent_beef_by_key[key] = (recent_outs, beef_now) - except Exception as e_beef: - print(f"[KV set] build immediate BEEF failed: {e_beef}") - # Broadcast - self._wallet.internalize_action(ctx, {"tx": signed_tx_bytes or signable_tx_bytes}, self._originator) - # Return outpoint format: key.vout (assuming vout 0 for KV outputs) - return f"{key}.0" + if beef_now: + self._recent_beef_by_key[key] = (recent_outs, beef_now) + except Exception as e_beef: + print(f"[KV set] build immediate BEEF failed: {e_beef}") + # Broadcast + self._wallet.internalize_action(ctx, {"tx": signed_tx_bytes or signable_tx_bytes}, self._originator) + # Return outpoint using resulting txid when available (vout=0) + try: + from bsv.transaction import Transaction + from bsv.utils import Reader + tx_bytes_final = signed_tx_bytes or signable_tx_bytes + if tx_bytes_final: + t = Transaction.from_reader(Reader(tx_bytes_final)) + return f"{t.txid()}.0" + except Exception: + pass + # Fallback + return f"{key}.0" + finally: + self._release_key_lock(key) def _build_locking_script(self, ctx: Any, key: str, value: str, ca_args: dict = None) -> str: ca_args = self._merge_default_ca(ca_args) # Encrypt the value if encryption is enabled if self._encrypt: - # Use the same encryption args as for PushDrop + # Use the same encryption args as for PushDrop; default-derive if missing protocol_id = ( ca_args.get("protocol_id") or ca_args.get("protocolID") + or self._get_protocol(key) ) key_id = ( ca_args.get("key_id") or ca_args.get("keyID") + or key ) - counterparty = ca_args.get("counterparty") - + counterparty = ca_args.get("counterparty") or {"type": 0} + if protocol_id and key_id: # Encrypt the value using wallet.encrypt encrypt_args = { "encryption_args": { "protocol_id": protocol_id, "key_id": key_id, - "counterparty": counterparty or {"type": 2} + "counterparty": counterparty }, "plaintext": value.encode('utf-8') } @@ -661,12 +732,16 @@ def _build_create_action_args_set(self, key: str, value: str, locking_script: by { "lockingScript": locking_script_hex, "satoshis": 1, - "tags": ["kv", "set"], + "tags": [key, "kv", "set"], "basket": self._context, "outputDescription": ({"retentionSeconds": self._retention_period} if int(self._retention_period or 0) > 0 else "") } ], "feeRate": fee_rate, + "options": { + "acceptDelayedBroadcast": self._accept_delayed_broadcast, + "randomizeOutputs": False, + }, } def _sign_and_relinquish_set(self, ctx: Any, key: str, outs: list, inputs_meta: list, signable: dict, signable_tx_bytes: bytes, input_beef: bytes) -> bytes | None: @@ -700,26 +775,37 @@ def _sign_and_relinquish_set(self, ctx: Any, key: str, outs: list, inputs_meta: def remove(self, ctx: Any, key: str) -> List[str]: if not key: raise ErrInvalidKey(KEY_EMPTY_MSG) + self._acquire_key_lock(key) removed: List[str] = [] loop_guard = 0 last_count = None - while True: - if loop_guard > 10: - break - loop_guard += 1 - outs, input_beef = self._lookup_outputs_for_remove(ctx, key) - count = len(outs) - if count == 0: - break - if last_count is not None and count >= last_count: - break - last_count = count - inputs_meta = self._prepare_inputs_meta(ctx, key, outs) - self._onchain_remove_flow(ctx, key, inputs_meta, input_beef) - removed.append(f"removed:{key}") - return removed - - def _lookup_outputs_for_remove(self, ctx: Any, key: str) -> tuple[list, bytes]: + try: + while True: + if loop_guard > 10: + break + loop_guard += 1 + outs, input_beef, total_outputs = self._lookup_outputs_for_remove(ctx, key) + count = len(outs) + if count == 0: + break + if last_count is not None and count >= last_count: + break + last_count = count + inputs_meta = self._prepare_inputs_meta(ctx, key, outs) + txid = self._onchain_remove_flow(ctx, key, inputs_meta, input_beef) + if isinstance(txid, str) and txid: + removed.append(txid) + # TS parity: break when outputs processed equals totalOutputs + try: + if isinstance(total_outputs, int) and count == total_outputs: + break + except Exception: + pass + return removed + finally: + self._release_key_lock(key) + + def _lookup_outputs_for_remove(self, ctx: Any, key: str) -> tuple[list, bytes, int | None]: lo = self._wallet.list_outputs(ctx, { "basket": self._context, "tags": [key], @@ -728,20 +814,30 @@ def _lookup_outputs_for_remove(self, ctx: Any, key: str) -> tuple[list, bytes]: }, self._originator) or {} outs = lo.get("outputs") or [] input_beef = lo.get("BEEF") or b"" + total_outputs = None + try: + total_outputs = lo.get("totalOutputs") or lo.get("total_outputs") + if isinstance(total_outputs, str) and total_outputs.isdigit(): + total_outputs = int(total_outputs) + except Exception: + total_outputs = None if not input_beef and outs: try: timeout = int(os.getenv("WOC_TIMEOUT", "10")) input_beef = self._build_beef_v2_from_woc_outputs(outs, timeout=timeout) except Exception: input_beef = b"" - return outs, input_beef + return outs, input_beef, total_outputs - def _onchain_remove_flow(self, ctx: Any, key: str, inputs_meta: list, input_beef: bytes) -> None: + def _onchain_remove_flow(self, ctx: Any, key: str, inputs_meta: list, input_beef: bytes) -> str | None: ca_res = self._wallet.create_action(ctx, { "labels": ["kv", "remove"], "description": f"kvstore remove {key}", "inputs": inputs_meta, "inputBEEF": input_beef, + "options": { + "acceptDelayedBroadcast": self._accept_delayed_broadcast + }, }, self._originator) or {} signable = (ca_res.get("signableTransaction") or {}) if isinstance(ca_res, dict) else {} signable_tx_bytes = signable.get("tx") or b"" @@ -751,6 +847,38 @@ def _onchain_remove_flow(self, ctx: Any, key: str, inputs_meta: list, input_beef res = self._wallet.sign_action(ctx, {"spends": spends_str, "reference": reference}, self._originator) or {} signed_tx_bytes = res.get("tx") if isinstance(res, dict) else None self._wallet.internalize_action(ctx, {"tx": signed_tx_bytes or signable_tx_bytes}, self._originator) + try: + from bsv.transaction import Transaction + from bsv.utils import Reader + tx_bytes_final = signed_tx_bytes or signable_tx_bytes + if tx_bytes_final: + t = Transaction.from_reader(Reader(tx_bytes_final)) + return t.txid() + except Exception: + return None + return None + + # ------------------------------ + # Key-level locking helpers + # ------------------------------ + def _acquire_key_lock(self, key: str) -> None: + try: + with self._key_locks_guard: + lk = self._key_locks.get(key) + if lk is None: + lk = Lock() + self._key_locks[key] = lk + lk.acquire() + except Exception: + pass + + def _release_key_lock(self, key: str) -> None: + try: + lk = self._key_locks.get(key) + if lk: + lk.release() + except Exception: + pass # ------------------------------------------------------------------ # Introspection helpers @@ -771,7 +899,9 @@ def _prepare_inputs_meta(self, ctx: Any, key: str, outs: list, ca_args: dict = N print(f"[TRACE] [_prepare_inputs_meta] ca_args: {ca_args}") print(f"[TRACE] [_prepare_inputs_meta] protocol: {protocol}, key_id: {key_id}, counterparty: {counterparty}") pd = PushDrop(self._wallet, self._originator) - unlocker = pd.unlock({"securityLevel": 2, "protocol": self._protocol}, key, {"type": 0}, sign_outputs='all') + # Use protocol from ca_args if available, otherwise use default protocol + unlock_protocol = protocol if protocol is not None else self._get_protocol(key) + unlocker = pd.unlock(unlock_protocol, key, {"type": 0}, sign_outputs='all') inputs_meta = [] for o in outs: txid_val = o.get("txid", "") @@ -811,15 +941,28 @@ def _prepare_spends(self, ctx, key, inputs_meta, signable_tx_bytes, input_beef, Prepare spends dict for sign_action: {idx: {"unlockingScript": ...}} Go/TS parity: use PushDrop unlocker and signable transaction. """ - from bsv.transaction import Transaction + from bsv.transaction import Transaction, parse_beef_ex from bsv.utils import Reader spends = {} + # Try to link the signable tx using provided BEEF to ensure SourceTransaction is available try: tx = Transaction.from_reader(Reader(signable_tx_bytes)) + if input_beef: + try: + beef, _subject, _last = parse_beef_ex(input_beef) + finder = getattr(beef, "find_transaction_for_signing", None) + if callable(finder): + linked = finder(tx.txid()) + if linked is not None: + tx = linked + except Exception: + pass except Exception: return spends pd = PushDrop(self._wallet, self._originator) - unlocker = pd.unlock({"securityLevel": 2, "protocol": self._protocol}, key, {"type": 0}, sign_outputs='all') + # Use default protocol for unlocking (GO pattern: protocol and key are separate) + unlock_protocol = self._get_protocol(key) + unlocker = pd.unlock(unlock_protocol, key, {"type": 0}, sign_outputs='all') # Only prepare spends for inputs whose outpoint matches the tx input at the same index for idx, meta in enumerate(inputs_meta): try: