## 3.2 Liquid Tapscript Case Study

Similar to multisig case study in 3.1:

* n-of-n MuSig signing policy on outer taproot pubkey
* Taptree, with n-choose-k k-of-k MuSig pubkey tapleaves, weighted greedily to reduce expected witness depth.
* One additional f-of-e checksigadd script tapleaf, behind CSV policy of 1 day

First, let's import everything we need and set parameters to your liking:

In [None]:
# Parameters to play with 

k = 3 # threshold for regular keys
n = 5 # regular keys
e = 3 # emergency keys
f = 2 # threshold for emergency keys

import hashlib
from io import BytesIO

import util
from test_framework.address import program_to_witness
from test_framework.key import ECKey, ECPubKey, generate_key_pair, generate_schnorr_nonce
from test_framework.messages import CTransaction, COutPoint, CTxIn, CTxOut, CScriptWitness, CTxInWitness, ser_string, sha256
from test_framework.script import TapTree, TapLeaf, Node, CScript, OP_1, TaprootSignatureHash, OP_CHECKSIG, SIGHASH_ALL_TAPROOT
from test_framework.util import assert_equal
from test_framework.musig import generate_musig_key, aggregate_schnorr_nonces, sign_musig, aggregate_musig_signatures

import random
from itertools import combinations

Next, we will generate all the "base" untweaked keys that each of the n signing participants will use, combine them to generate the "inner" MuSig key, along with the associated tweaked keys that are put into the MuSig.

In [None]:
# Generate all n inner keypairs
priv_keys_base = []
pub_keys_base = []
for i in range(n):
    privkey, pubkey = generate_key_pair()
    priv_keys_base.append(privkey)
    pub_keys_base.append(pubkey)

print("Create MuSig inner key")
c_map, inner_pubkey = generate_musig_key(pub_keys_base)

print("Apply tweaks to both priv and public parts")
priv_keys_tweaked = []
pub_keys_tweaked = []
for pubkey, privkey in zip(pub_keys_base, priv_keys_base):
    priv_keys_tweaked.append(privkey.mul(c_map[pubkey]))
    pub_keys_tweaked.append(pubkey.mul(c_map[pubkey]))

Before making the Taproot output pubkey, we need to generate our taptree, starting with the k-of-k MuSig tapscript-based leaves:

In [None]:
tapscript_leaves = []
tapscript_leaf_signers = []
leaf_tweaks = []
tapscript_leaf_pubkey = []

print("Making k-of-k taptree leaves")
counts = [0]*((2**n))

# This counts the number of times a specific subset
# would be chosen canonically to sign from the given
# subsets of signers that are up, over every subset
# combination. We later use these as weights to prioritize
# placement in the tree.
# TODO: sort the pubkeys lexicographically
for i in range(len(counts)):
    avail = i
    ones = bin(avail).count("1")
    while ones > k:
        avail &= (avail-1)
        ones -= 1
    counts[avail] += 1

for i, weight in enumerate(counts):
    # We only captured values exactly at k
    # signing participants, and only want those
    if bin(i).count("1") != k:
        continue
    subset = []
    subset_index = []
    # pad out binary string for this value
    i_str = format(i, '#0'+str(n+2)+'b')[2:]
    for j, binary in enumerate(i_str):
        if binary == '1':
            subset.append(pub_keys_base[j])
            subset_index.append(j)

    c_map, tapscript_pubkey = generate_musig_key(subset)
    leaf_tweaks.append(c_map)
    pk_tapscript = TapLeaf()
    pk_tapscript.construct_pk(tapscript_pubkey)
    tapscript_leaves.append((weight, pk_tapscript)) # try squaring the weight value etc to change tree
    tapscript_leaf_signers.append(subset_index)
    tapscript_leaf_pubkey.append(tapscript_pubkey)

And finally the emergency 2-of-3 condition:

In [None]:
print("Generate emergency keys")
emergency_privkeys = []
emergency_pubkeys = []
for i in range(e):
    privkey, pubkey = generate_key_pair()
    emergency_privkeys.append(privkey)
    emergency_pubkeys.append(pubkey)

print("Generate taptree emergency spending condition")
csa_tapscript = TapLeaf()
csv_delay = 144 # One day, just to make this test fast
csa_tapscript.construct_csaolder(f, emergency_pubkeys, csv_delay)
tapscript_leaves.append((0 ,csa_tapscript)) # Low priority, "should never happen"

Compute the taptree root, then the Taproot output scriptPubKey itself:

In [None]:
print("Generate taptree root")
taptree = TapTree()
taptree.key = inner_pubkey
taptree.huffman_constructor(tapscript_leaves)
print("final taptree descriptor: {}\n".format(taptree.desc))

# Tweak inner_pubkey by taptree root to create outer taproot key
taproot_script, tapscript_tweak, control_map = taptree.construct()
print(taproot_script.hex())

program = bytes(taproot_script[2:])
address = program_to_witness(1, program)
print("Address: {}".format(address))

Now that we have a Taproot output script to spend "from", fire up the Bitcoin Core test suite and stage a transaction to spend it, which we will satisfy in 3 different ways with different signatures:

In [None]:
test = util.TestWrapper()
test.setup()

test.nodes[0].generate(101)
balance = test.nodes[0].getbalance()
print('Balance: {}'.format(balance))

assert balance > 1

# Send wallet transaction to segwit address
amount_btc = 0.05
txid = test.nodes[0].sendtoaddress(address, amount_btc)

# Decode wallet transaction
tx_hex = test.nodes[0].getrawtransaction(txid) 
decoded_tx = test.nodes[0].decoderawtransaction(tx_hex)

print("Transaction:\n{}\n".format(decoded_tx))

# Reconstruct wallet transaction locally
tx = CTransaction()
tx.deserialize(BytesIO(bytes.fromhex(tx_hex)))
tx.rehash()

# We can check if the transaction was correctly deserialized
assert txid == decoded_tx["txid"]

# The wallet randomizes the change output index for privacy
# Loop through the outputs and return the first where the scriptPubKey matches the segwit v1 output
output_index, output = next(out for out in enumerate(tx.vout) if out[1].scriptPubKey == CScript([OP_1, program]))

print("Segwit v1 output is {}".format(output))
print("Segwit v1 output value is {}".format(output.nValue))
print("Segwit v1 output index is {}".format(output_index))

# Construct transaction
spending_tx = CTransaction()

# Populate the transaction version
spending_tx.nVersion = 1

# Populate the locktime
spending_tx.nLockTime = 0

# Populate the transaction inputs
outpoint = COutPoint(tx.sha256, output_index)
spending_tx_in = CTxIn(outpoint = outpoint)
spending_tx.vin = [spending_tx_in]

print("Spending transaction:\n{}".format(spending_tx))

# Generate new Bitcoin Core wallet address
dest_addr = test.nodes[0].getnewaddress(address_type="bech32")
scriptpubkey = bytes.fromhex(test.nodes[0].getaddressinfo(dest_addr)['scriptPubKey'])

# Determine minimum fee required for mempool acceptance
min_fee = int(test.nodes[0].getmempoolinfo()['mempoolminfee'] * 100000000)

# Complete output which returns funds to Bitcoin Core wallet
amount_sat = int(amount_btc * 100000000)
dest_output = CTxOut(nValue=amount_sat - min_fee, scriptPubKey=scriptpubkey)
spending_tx.vout = [dest_output]

print("Spending transaction:\n{}".format(spending_tx))

1) Spend the transaction using n-of-n Taproot output pubkey spending path:

In [None]:
# Generate the taproot signature hash for signing output key
sighash_musig = TaprootSignatureHash(spending_tx,
                               [output],
                               SIGHASH_ALL_TAPROOT)

# Generate nonces for all signers (re-using multiple times... bad!!!)
nonces = [generate_schnorr_nonce() for i in range(len(pub_keys_base))]
R_agg, negated = aggregate_schnorr_nonces([nonce.get_pubkey() for nonce in nonces])
if negated:
    for nonce in nonces:
        nonce.negate()

output_pubkey = inner_pubkey.tweak_add(tapscript_tweak)

signatures = []
for i, (key, nonce) in enumerate(zip(priv_keys_tweaked, nonces)):
    # One key must be tweaked
    if i == 0:
        key_tweaked = key.add(tapscript_tweak)
        signatures.append(sign_musig(key_tweaked, nonce, R_agg, output_pubkey, sighash_musig))
    else:
        signatures.append(sign_musig(key, nonce, R_agg, output_pubkey, sighash_musig))

aggregated_sig = aggregate_musig_signatures(signatures, R_agg)

assert output_pubkey.verify_schnorr(aggregated_sig, sighash_musig)

# Construct transaction witness
witness = CScriptWitness()
witness.stack.append(aggregated_sig)
witness_in = CTxInWitness()
witness_in.scriptWitness = witness
spending_tx.wit.vtxinwit.append(witness_in)
 
print("spending_tx: {}\n".format(spending_tx))

# Test mempool acceptance
spending_tx_str = spending_tx.serialize().hex()
assert test.nodes[0].testmempoolaccept([spending_tx_str])[0]['allowed']

print("Key path spending transaction weight: {}".format(test.nodes[0].decoderawtransaction(spending_tx_str)['weight']))

print("Success!")

2) Spend the output using a randomly-chosen k-of-k MuSig spending path. Note that this is not optimal for production; the least-deep viable spending path should be published to reduce transaction weight:

In [None]:
# Generate the taproot signature hash for a random tapscript MuSig path

# Pick random path, excluding the emergency leaf
chosen_leaf_index = random.randint(0, len(tapscript_leaves)-1)
chosen_signer_indices = tapscript_leaf_signers[chosen_leaf_index]
chosen_leaf = tapscript_leaves[chosen_leaf_index][1]
chosen_key_map = leaf_tweaks[chosen_leaf_index]
chosen_pubkey = tapscript_leaf_pubkey[chosen_leaf_index]

sighash_musig_leaf = TaprootSignatureHash(spending_tx,
                               [output],
                               SIGHASH_ALL_TAPROOT,
                               0,
                               scriptpath=True,
                               tapscript=chosen_leaf.script)

# Compute tweaked keys for the signing subset
priv_keys_subset_base = [priv_keys_base[i] for i in chosen_signer_indices]
pub_keys_subset_base =  [pub_keys_base[i] for i in chosen_signer_indices]
priv_keys_subset_tweaked = []
pub_keys_subset_tweaked = []
for pubkey, privkey in zip(pub_keys_subset_base, priv_keys_subset_base):
    priv_keys_subset_tweaked.append(privkey.mul(chosen_key_map[pubkey]))
    pub_keys_subset_tweaked.append(pubkey.mul(chosen_key_map[pubkey]))

# Generate nonces for all signers (re-using multiple times... bad!!!)
subset_nonces = [generate_schnorr_nonce() for i in range(len(pub_keys_subset_base))]
R_agg, negated = aggregate_schnorr_nonces([nonce.get_pubkey() for nonce in subset_nonces])
if negated:
    for nonce in subset_nonces:
        nonce.negate()

subset_signatures = []
for key, nonce in zip(priv_keys_subset_tweaked, subset_nonces):
    subset_signatures.append(sign_musig(key, nonce, R_agg, chosen_pubkey, sighash_musig_leaf))

aggregated_subset_sig = aggregate_musig_signatures(subset_signatures, R_agg)

assert chosen_pubkey.verify_schnorr(aggregated_subset_sig, sighash_musig_leaf)

# Construct transaction witness
witness = CScriptWitness()
witness.stack = [aggregated_subset_sig] + [chosen_leaf.script, control_map[chosen_leaf.script]]
witness_in = CTxInWitness()
witness_in.scriptWitness = witness
spending_tx.wit.vtxinwit[0] = witness_in # replace old witness
 
print("spending_tx: {}\n".format(spending_tx))

# Test mempool acceptance
spending_tx_str = spending_tx.serialize().hex()
assert test.nodes[0].testmempoolaccept([spending_tx_str])[0]['allowed']

Last, we spend the transaction using the emergency keys after the transaction output becomes the correct age:

In [None]:
# Next we attempt to spend the timelock tapscript path
# Construct transaction
spending_tx = CTransaction()

spending_tx.nVersion = 2
spending_tx.nLockTime = 0
outpoint = COutPoint(tx.sha256, output_index)
spending_tx_in = CTxIn(outpoint=outpoint, nSequence=csv_delay)
spending_tx.vin = [spending_tx_in]
spending_tx.vout = [dest_output]

csv_sighash = TaprootSignatureHash(spending_tx, [output], SIGHASH_ALL_TAPROOT, 0, scriptpath=True, tapscript=csa_tapscript.script)

witness_elements = []

# Add signatures to the witness
# Remember to reverse the order of signatures
witness_elements = [key.sign_schnorr(csv_sighash) for key in emergency_privkeys[::-1]]
# We only need f of the e sigs, blank the rest
for i in range(e-f):
    witness_elements[i] = b''

# Construct transaction witness
witness = CScriptWitness()
witness.stack = witness_elements + [csa_tapscript.script, control_map[csa_tapscript.script]]
witness_in = CTxInWitness()
witness_in.scriptWitness = witness
spending_tx.wit.vtxinwit.append(witness_in)
spending_tx_str = spending_tx.serialize().hex()
print("Testing timelock transaction without moving chain forward.")
# Test timelock
assert_equal(
    [{'txid': spending_tx.rehash(), 'allowed': False, 'reject-reason': '64: non-BIP68-final'}],
    test.nodes[0].testmempoolaccept([spending_tx_str])
)
print("Generating blocks to move chain forward.")
test.nodes[0].generate(csv_delay)
assert test.nodes[0].testmempoolaccept([spending_tx_str])[0]["allowed"]

print("Long delay script path spending transaction weight: {}".format(test.nodes[0].decoderawtransaction(spending_tx_str)['weight']))

print("Success!")

All done! Shut down the test environment.

In [None]:
test.shutdown()