In [1]:
import bip0327

from hwilib.psbt import PSBT, PartiallySignedInput, PartiallySignedOutput, CTransaction, CTxIn, CTxOut, KeyOriginInfo

import json
import base58
from bip32 import BIP32
from common import hash160
import mnemonic
import segwit_addr
from bip380.descriptors import descriptor_from_str
mnemo = mnemonic.Mnemonic("english")

DEFAULT_SPECULOS_MNEMONIC = "glory promote mansion idle axis finger extra february uncover one trip resource lawn turtle enact monster seven myth punch hobby comfort wild raise skin"
WALLET_POLICY_SLIP21_LABEL = b"LEDGER-Wallet policy"

speculos_seed = mnemo.to_seed(DEFAULT_SPECULOS_MNEMONIC)
bip32 = BIP32.from_seed(speculos_seed, 'test')
master_key_fingerprint = hash160(bip32.pubkey)[0:4]
FPR = master_key_fingerprint.hex()

BIP32_MAINNET_PUBKEY_VERSION = b'\x04\x88\xB2\x1E'
BIP32_TESTNET_PUBKEY_VERSION = b'\x04\x35\x87\xCF'




In [2]:
with open("musig/key_agg_vectors.json") as f:
    keyagg_vectors = json.load(f)

In [3]:
keyagg_vectors

{'pubkeys': ['02F9308A019258C31049344F85F89D5229B531C845836F99B08601F113BCE036F9',
  '03DFF1D77F2A671C5F36183726DB2341BE58FEAE1DA2DECED843240F7B502BA659',
  '023590A94E768F8E1815C2F24B4D80A8E3149316C3518CE7B7AD338368D038CA66',
  '020000000000000000000000000000000000000000000000000000000000000005',
  '02FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC30',
  '04F9308A019258C31049344F85F89D5229B531C845836F99B08601F113BCE036F9',
  '03935F972DA013F80AE011890FA89B67A27B7BE6CCB24D3274D18B2D4067F261A9'],
 'tweaks': ['FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141',
  '252E4BD67410A76CDF933D30EAA1608214037F1B105A013ECCD3C5C184A6110B'],
 'valid_test_cases': [{'key_indices': [0, 1, 2],
   'expected': '90539EEDE565F5D054F32CC0C220126889ED1E5D193BAF15AEF344FE59D4610C'},
  {'key_indices': [2, 1, 0],
   'expected': '6204DE8B083426DC6EAF9502D27024D53FC826BF7D2012148A0575435DF54B2B'},
  {'key_indices': [0, 0, 0],
   'expected': 'B436E3BAD62B8CD409969A224731C193D051162D8C

In [4]:
def xpub_from_hexkey(pubkey_hex: str) -> str:
    version_bytes = BIP32_TESTNET_PUBKEY_VERSION
    depth = b'\x00'
    fingerprint = b'\x00\x00\x00\x00'
    child_number = b'\x00\x00\x00\x00'
    chain_code = b'\x00' * 32
    xpub = version_bytes + depth + fingerprint + child_number + chain_code + bytes.fromhex(pubkey_hex)
    if len(xpub) != 78:
        raise ValueError("Invalid xpub length")
    return base58.b58encode_check(xpub)

xpubs = keyagg_vectors["pubkeys"]

In [5]:
key_agg_ctx = bip0327.key_agg(list(map(bytes.fromhex, keyagg_vectors["pubkeys"][0:3])))

In [6]:
agg_xonly_pk = bip0327.get_xonly_pk(key_agg_ctx).hex()

In [7]:
def musig(keys: list[bytes]) -> str:
    version_bytes = BIP32_TESTNET_PUBKEY_VERSION
    depth = b'\x00'
    fingerprint = b'\x00\x00\x00\x00'
    child_number = b'\x00\x00\x00\x00'

    key_agg_ctx = bip0327.key_agg(keys)
    Q = key_agg_ctx.Q
    compressed_pubkey = (b'\x02' if Q[1] % 2 == 0 else b'\x03') + bip0327.get_xonly_pk(key_agg_ctx)
    chaincode = bytes.fromhex("868087ca02a6f974c4598924c36b57762d32cb45717167e300622c7167e38965")
    ext_pubkey = version_bytes + depth + fingerprint + child_number + chaincode + compressed_pubkey
    return base58.b58encode_check(ext_pubkey).decode()

In [8]:
test_agg_xpub = musig(list(map(bytes.fromhex, keyagg_vectors["pubkeys"][0:3])))
print("Aggregate xpub:", test_agg_xpub)

Aggregate xpub: tpubD6NzVbkrYhZ4XgHkCEtfpuZPJDLaLPxu5ZBEtAbub9GcUX1mTS2t3eCnBXprykFyvNTz7k9MaKXtxr5rYsrLkfbUw1A27n7sVbLeDn2sna5


In [9]:
assert base58.b58decode_check(test_agg_xpub)[-33:].hex() == '0290539eede565f5d054f32cc0c220126889ed1e5d193baf15aef344fe59d4610c'

print("Aggregate pubkey", base58.b58decode_check(test_agg_xpub)[-33:].hex())

Aggregate pubkey 0290539eede565f5d054f32cc0c220126889ed1e5d193baf15aef344fe59d4610c


In [10]:
path_1 = "44'/1'/0'"
xpub_1 = bip32.get_xpub_from_path(f"m/{path_1}")
pk_1 = base58.b58decode_check(xpub_1)[-33:]

path_2 = "44'/1'/1'"
xpub_2 = bip32.get_xpub_from_path(f"m/{path_2}")
pk_2 = base58.b58decode_check(xpub_2)[-33:]

print(f"[{path_1}]{xpub_1}")
print(f"[{path_2}]{xpub_2}")

print(f"tr(musig([{FPR}/{path_1}]{xpub_1},{xpub_2})/**)")

[44'/1'/0']tpubDCwYjpDhUdPGP5rS3wgNg13mTrrjBuG8V9VpWbyptX6TRPbNoZVXsoVUSkCjmQ8jJycjuDKBb9eataSymXakTTaGifxR6kmVsfFehH1ZgJT
[44'/1'/1']tpubDCwYjpDhUdPGQWG6wG6hkBJuWFZEtrn7j3xwG3i8XcQabcGC53xWZm1hSXrUPFS5UvZ3QhdPSjXWNfWmFGTioARHuG5J7XguEjgg7p8PxAm
tr(musig([f5acc2fd/44'/1'/0']tpubDCwYjpDhUdPGP5rS3wgNg13mTrrjBuG8V9VpWbyptX6TRPbNoZVXsoVUSkCjmQ8jJycjuDKBb9eataSymXakTTaGifxR6kmVsfFehH1ZgJT,tpubDCwYjpDhUdPGQWG6wG6hkBJuWFZEtrn7j3xwG3i8XcQabcGC53xWZm1hSXrUPFS5UvZ3QhdPSjXWNfWmFGTioARHuG5J7XguEjgg7p8PxAm)/**)


In [11]:
agg_xpub = musig([pk_1, pk_2])

agg_xpub

'tpubD6NzVbkrYhZ4XgHkCEtfpuZPJDLaLPxu5ZBEtAbub9GcUX1mTS2t3eCnBYbsr3Ya2KGVfoWGNfa65rNS8fx3ssuwAwitJiFN4WPwEUTnqsy'

In [12]:
print("compressed pubkey after aggregation:", base58.b58decode_check(agg_xpub)[-33:].hex())

base58.b58decode_check(agg_xpub).hex()

compressed pubkey after aggregation: 02f689e19cb157cc096342294b02485cc5d9723a849a36c68a7553fe90020bb53c


'043587cf000000000000000000868087ca02a6f974c4598924c36b57762d32cb45717167e300622c7167e3896502f689e19cb157cc096342294b02485cc5d9723a849a36c68a7553fe90020bb53c'

In [13]:
change = 0
address_index = 3

der_agg_xpub = bip32.from_xpub(agg_xpub).get_xpub_from_path(f"m/{change}/{address_index}")
print("Derived aggregate xpub:", der_agg_xpub)

der_agg_xpub_rawpk = base58.b58decode_check(der_agg_xpub)[-32:]

desc_str = f"tr({der_agg_xpub_rawpk.hex()})"
print(desc_str)
desc = descriptor_from_str(desc_str)

Derived aggregate xpub: tpubDArULRJ2eVJPp9R9EcfRLjNhZuNmjjRvtZU2ScNAfaFYgcBMejagdYSgzQVmWWv5dKrHdxBN2wtyMYBzbAHHzdZGq5SPp9YdfNTS2YBRwhX
tr(c600d12b3e8cd23fbb2318ceca0f5d7f59c28e7b4227c06a04673f4c3d0ec1c6)


In [14]:
script = desc.script_pubkey

assert script[0] == 0x51 and script[1] == 0x20

print(f"final_pubkey: {script[2:].hex()}")

segwit_addr.encode("tb", 1, script[2:])

c600d12b3e8cd23fbb2318ceca0f5d7f59c28e7b4227c06a04673f4c3d0ec1c6
final_pubkey: f48bd11a8672e890c5652ba72a9cf42270024ee2d56935a7e6195a61a9079b66


'tb1p7j9azx5xwt5fp3t99wnj4885yfcqynhz645ntflxr9dxr2g8ndnq32xa2m'

We want to spend the following UTXO: https://mempool.space/testnet/address/tb1p7j9azx5xwt5fp3t99wnj4885yfcqynhz645ntflxr9dxr2g8ndnq32xa2m

In [15]:


psbt = PSBT()

psbt.tx_version = 2

psbt_input = PartiallySignedInput(version=2)
psbt_input.prev_txid = bytes(reversed(bytes.fromhex("f6c35db0ed5dc4e41ab42964a99d0ea115a4fda5c1f499f87f25f37bf2c1dc66")))
psbt_input.prev_out = 1
psbt_input.sequence = 0xfffffffd
psbt_input.witness_utxo = CTxOut(327327, script)
psbt_input.tap_bip32_paths[der_agg_xpub_rawpk] = (
    set(),
    KeyOriginInfo(
        fingerprint=b'\0\0\0\0',  # there's no meaningful fingerprint for the aggregate key
        path=[change, address_index]
    )
)
psbt.inputs.append(psbt_input)

psbt_output = PartiallySignedOutput(version=2)
opreturn_msg = "This inputs has two pubkeys but you only see one. #mpcgang revenge".encode()
assert 1 <= len(opreturn_msg) <= 75
psbt_output.amount = 0
psbt_output.script = b'\x6a' + len(opreturn_msg).to_bytes(1, 'big') + opreturn_msg
psbt.outputs.append(psbt_output)

psbt.convert_to_v0()

In [16]:
print(f"wallet policy: tr(musig(@0,@1)/**)")
print(f"keys info: [{FPR}/{path_1}]{xpub_1},{xpub_2}]")

print("psbt:", psbt.serialize())

wallet policy: tr(musig(@0,@1)/**)
keys info: [f5acc2fd/44'/1'/0']tpubDCwYjpDhUdPGP5rS3wgNg13mTrrjBuG8V9VpWbyptX6TRPbNoZVXsoVUSkCjmQ8jJycjuDKBb9eataSymXakTTaGifxR6kmVsfFehH1ZgJT,tpubDCwYjpDhUdPGQWG6wG6hkBJuWFZEtrn7j3xwG3i8XcQabcGC53xWZm1hSXrUPFS5UvZ3QhdPSjXWNfWmFGTioARHuG5J7XguEjgg7p8PxAm]
psbt: cHNidP8BAIACAAAAAWbcwfJ78yV/+Jn0waX9pBWhDp2pZCm0GuTEXe2wXcP2AQAAAAD9////AQAAAAAAAAAARGpCVGhpcyBpbnB1dHMgaGFzIHR3byBwdWJrZXlzIGJ1dCB5b3Ugb25seSBzZWUgb25lLiAjbXBjZ2FuZyByZXZlbmdlAAAAAAABASuf/gQAAAAAACJRIPSL0RqGcuiQxWUrpyqc9CJwAk7i1Wk1p+YZWmGpB5tmIRbGANErPozSP7sjGM7KD11/WcKOe0InwGoEZz9MPQ7Bxg0AAAAAAAAAAAADAAAAAAA=
