In [19]:
from io import BytesIO
import random

import util
from test_framework.key import generate_key_pair, generate_bip340_key_pair, generate_schnorr_nonce, ECKey, ECPubKey, SECP256K1_FIELD_SIZE, SECP256K1, SECP256K1_ORDER
from test_framework.musig import aggregate_musig_signatures, aggregate_schnorr_nonces, generate_musig_key, musig_digest, sign_musig
from test_framework.script import TapLeaf, TapTree, TaprootSignatureHash, SIGHASH_ALL_TAPROOT
from test_framework.address import program_to_witness
from test_framework.messages import CTransaction, COutPoint, CTxIn, CTxOut, CTxInWitness
from test_framework.util import assert_equal

## 3.1 Degrading Multisig Output

In this case study, we consider a degrading multisig output, which provides recovery spending paths if the main wallet keys are lost or cannot sign. This output is expected to spent soon after being created. 
The recovery spending paths include delays in case the back-up keys are compromised.

![test](images/degrading_multisig0.jpg)

#### Locking conditions

* **multisig( 3/3 main wallet key )** - spendable immediately; or
* **multisig( 2/3 main wallet keys + 1/2 backup keys )** - spendable after 3 days; or
* **multisig( 1/3 main wallet keys + 2/2 backup keys )** - spendable after 10 days.


#### Signers

* **Main wallet keys** - Keys A, B, C
* **Backup keys** - Keys D, E

#### Privacy Requirements

No unused public keys should be revealed during spending.

#### Other considerations

Since the backup keys are stored on simple HSMs, they are not able to interactively co-sign MuSig aggregate signatures.

#### _Exercise 3.1.1:_ Determine different signing scenarios and their likelihoods

**(This is not a coding exercise)**

Before we construct the Taptree, sketch out different signing scenarios and their likelihoods.

##### Spending paths
假设 
1. using 3 main key 1 path 55% 
2. using 2/3 main keys, using only 1 backup keys, 6 path 30% 
3. using 1/3 main keys, using both backup keys 3 path 15% 

##### Sketch out Taproot Descriptors
- 3-3 agg schonnor multi sig (pk abc)
- 6 * csa_delay (3, <3 keys >, 3 days )
- 3 * cas_deay( 3, <3 keys >, 10 days)


#### Example 3.1.2: Set up keys for the taproot output

Here we prepare key pairs for all participants, and create an aggregate MuSig pubkey.

In [20]:
# Generate main wallet key pairs
main_privkeyA, main_pubkeyA = generate_bip340_key_pair()
main_privkeyB, main_pubkeyB = generate_bip340_key_pair()
main_privkeyC, main_pubkeyC = generate_bip340_key_pair()
main_pubkeys = [main_pubkeyA.get_bytes().hex(),
                main_pubkeyB.get_bytes().hex(), 
                main_pubkeyC.get_bytes().hex()]

print("Main pubkeys: {}\n".format(main_pubkeys))

# Generate back-up wallet key pairs
backup_privkeyD, backup_pubkeyD = generate_bip340_key_pair()
backup_privkeyE, backup_pubkeyE = generate_bip340_key_pair()
backup_pubkeys = [backup_pubkeyD.get_bytes().hex(),
                  backup_pubkeyE.get_bytes().hex()]

print("Backup pubkeys: {}\n".format(backup_pubkeys))

# 3-of-3 main key (MuSig public key)
c_map, musig_ABC = generate_musig_key([main_pubkeyA, main_pubkeyB, main_pubkeyC])
main_privkeyA_c = main_privkeyA.mul(c_map[main_pubkeyA])
main_privkeyB_c = main_privkeyB.mul(c_map[main_pubkeyB])
main_privkeyC_c = main_privkeyC.mul(c_map[main_pubkeyC])
main_pubkeyA_c = main_pubkeyA.mul(c_map[main_pubkeyA])
main_pubkeyB_c = main_pubkeyA.mul(c_map[main_pubkeyB])
main_pubkeyC_c = main_pubkeyA.mul(c_map[main_pubkeyC])

if musig_ABC.get_y()%2 != 0:
    musig_ABC.negate()
    main_privkeyA_c.negate()
    main_privkeyB_c.negate()
    main_privkeyC_c.negate()
    main_pubkeyA_c.negate()
    main_pubkeyB_c.negate()
    main_pubkeyC_c.negate()

print("MuSig pubkey: {}".format(musig_ABC.get_bytes().hex()))

Main pubkeys: ['cd831524749064b24a9ad06df09cda49fecf79c79d74e3365cf02a4587dfbe3a', 'ff7e3cae73a84637b19e0d516a67727607677a4266916f1406fc0830f9a93b75', '7a10f251ce4b39d2bb31a21ace2c76654f2aaf01a2047d64aebe6180159ea2e1']

Backup pubkeys: ['8ace72d6faf4025e51a9070ddd5be3d1227f46488168f800eb41c999d46255a2', '309c32519916a2c7d650e2dc4a226728d5d35008557f190d5a02efe9322a7ac8']

MuSig pubkey: d6c5baf5c387ed5067b3d9fa165fa9966568cd163201a2b5598c0ae9a89253d5


#### _Programming Exercise 3.1.3:_ Build a taproot output

In this exercise, we'll build a taptree according to the spending paths and their likelihoods, and then derive the segwit address for the taproot.

In [21]:
# Tapscripts - 2 main keys & 1 backup key
# Use construct_csa_delay() to construct the tapscript
# deplay 3 days = 3 * 24 * 6 = 432 blocks (1 block every 10 minutes)
delay =  432
# ABD 
tapscript_2a =  TapLeaf().construct_csa_delay(3, [main_pubkeyA, main_pubkeyB, backup_pubkeyD], delay)
# ACD 
tapscript_2b =  TapLeaf().construct_csa_delay(3, [main_pubkeyA, main_pubkeyC, backup_pubkeyD], delay)
# BCD
tapscript_2c =  TapLeaf().construct_csa_delay(3, [main_pubkeyB, main_pubkeyC, backup_pubkeyD], delay)
# ABE
tapscript_2d =  TapLeaf().construct_csa_delay(3, [main_pubkeyA, main_pubkeyB, backup_pubkeyE], delay)
# ACE
tapscript_2e =  TapLeaf().construct_csa_delay(3, [main_pubkeyA, main_pubkeyC, backup_pubkeyE], delay)
# BCE
tapscript_2f =  TapLeaf().construct_csa_delay(3, [main_pubkeyB, main_pubkeyC, backup_pubkeyE], delay)

# Tapscripts - 1 main keys & 2 backup keys
long_delay = 10*24*6
tapscript_3a = TapLeaf().construct_csa_delay(3, [main_pubkeyA, backup_pubkeyD, backup_pubkeyE], long_delay)
tapscript_3b = TapLeaf().construct_csa_delay(3, [main_pubkeyB, backup_pubkeyD, backup_pubkeyE], long_delay)
tapscript_3c = TapLeaf().construct_csa_delay(3, [main_pubkeyC, backup_pubkeyD, backup_pubkeyE], long_delay)


# Set list of backup tapscripts
# Suggestion: Include tapscripts with 3d timelocks first, then those with 10d timelocks
backup_tapscripts = [tapscript_2a, tapscript_2b, tapscript_2c,
                     tapscript_2d, tapscript_2e, tapscript_2f,
                     tapscript_3a, tapscript_3b, tapscript_3c]
assert len(backup_tapscripts) == 9

# Construct taptree with huffman constructor
tapscript_weights = [(2, tapscript_2a), (2, tapscript_2b), (2, tapscript_2c),
                     (2, tapscript_2d), (2, tapscript_2e), (2, tapscript_2f),
                     (1, tapscript_3a), (1, tapscript_3b), (1, tapscript_3c)]
                                
multisig_taproot = TapTree(key=musig_ABC)
multisig_taproot.huffman_constructor(tapscript_weights)

print("Taproot descriptor {}\n".format(multisig_taproot.desc))

# Derive segwit v1 address
tapscript, taptweak, control_map = multisig_taproot.construct()
taptweak = int.from_bytes(taptweak, 'big')
output_pubkey = musig_ABC.tweak_add(taptweak)
output_pubkey_b = output_pubkey.get_bytes()
segwit_address  = program_to_witness(1, output_pubkey_b)
print("Segwit Address:", segwit_address)

Taproot descriptor tp(d6c5baf5c387ed5067b3d9fa165fa9966568cd163201a2b5598c0ae9a89253d5,[[[ts(csa_delay(3,cd831524749064b24a9ad06df09cda49fecf79c79d74e3365cf02a4587dfbe3a,8ace72d6faf4025e51a9070ddd5be3d1227f46488168f800eb41c999d46255a2,309c32519916a2c7d650e2dc4a226728d5d35008557f190d5a02efe9322a7ac8,1440)),ts(csa_delay(3,cd831524749064b24a9ad06df09cda49fecf79c79d74e3365cf02a4587dfbe3a,ff7e3cae73a84637b19e0d516a67727607677a4266916f1406fc0830f9a93b75,8ace72d6faf4025e51a9070ddd5be3d1227f46488168f800eb41c999d46255a2,432))],[ts(csa_delay(3,cd831524749064b24a9ad06df09cda49fecf79c79d74e3365cf02a4587dfbe3a,7a10f251ce4b39d2bb31a21ace2c76654f2aaf01a2047d64aebe6180159ea2e1,309c32519916a2c7d650e2dc4a226728d5d35008557f190d5a02efe9322a7ac8,432)),ts(csa_delay(3,cd831524749064b24a9ad06df09cda49fecf79c79d74e3365cf02a4587dfbe3a,ff7e3cae73a84637b19e0d516a67727607677a4266916f1406fc0830f9a93b75,309c32519916a2c7d650e2dc4a226728d5d35008557f190d5a02efe9322a7ac8,432))]],[[ts(csa_delay(3,cd831524749064b24a9ad06d

#### Start TestNodes

In [22]:
test = util.TestWrapper()
test.setup()

TestWrapper is already running!


#### Generate Wallet Balance

In [23]:
test.nodes[0].generate(101)
balance = test.nodes[0].getbalance()
print("Balance: {}".format(balance))

Balance: 5099.50000000


#### Send funds from the Bitcoin Core wallet to the taproot output

In [24]:
# Send funds to taproot output.
txid = test.nodes[0].sendtoaddress(address=segwit_address, amount=0.5, fee_rate=25)
print("Funding tx:", txid)

# Deserialize wallet transaction.
tx = CTransaction()
tx_hex = test.nodes[0].getrawtransaction(txid)
tx.deserialize(BytesIO(bytes.fromhex(tx_hex)))
tx.rehash()

print(tapscript.hex())

print(tx.vout)

# The wallet randomizes the change output index for privacy
# Loop through the outputs and return the first where the scriptPubKey matches the segwit v1 output
output_index, output = next(out for out in enumerate(tx.vout) if out[1].scriptPubKey == tapscript)
output_value = output.nValue

print("Segwit v1 output is {}".format(output))
print("Segwit v1 output value is {}".format(output_value))
print("Segwit v1 output index is {}".format(output_index))

Funding tx: 1425b81428230e6a4f5763b97401722dcfa916c2c700b01ba49352ab1ea38f3a
512067f76fe72bf59de321c7bbd97dd7962f28110e33c3da57a89a3a5e18ada483b9
[CTxOut(nValue=0.50000000 scriptPubKey=512067f76fe72bf59de321c7bbd97dd7962f28110e33c3da57a89a3a5e18ada483b9), CTxOut(nValue=48.99990400 scriptPubKey=0014c5729da6434a5525da1f909d9124e135ab78b658)]
Segwit v1 output is CTxOut(nValue=0.50000000 scriptPubKey=512067f76fe72bf59de321c7bbd97dd7962f28110e33c3da57a89a3a5e18ada483b9)
Segwit v1 output value is 50000000
Segwit v1 output index is 0


## Test spending paths of the taproot

In the next section exercise, we'll construct three taproot spends:

- one using the 3-of-3 musig key spending path (exercise)
- one using one of the 3-of-5 short delay backup script spending path (example)
- one using one of the 3-of-5 long delay backup script spending path (exercise)

In each case we'll test the tx validity with the `testmempoolaccept()`, and verify that the timelock requirements work as intended. We'll also compute the weight of each spending transaction and compare.

#### Construct a spending transaction

In [25]:
# Construct transaction
spending_tx = CTransaction()

# Populate the transaction version
spending_tx.nVersion = 1

# Populate the locktime
spending_tx.nLockTime = 0

# Populate the transaction inputs
outpoint = COutPoint(tx.sha256, output_index)
spending_tx_in = CTxIn(outpoint = outpoint)
spending_tx.vin = [spending_tx_in]

print("Spending transaction:\n{}".format(spending_tx))

Spending transaction:
CTransaction(nVersion=1 vin=[CTxIn(prevout=COutPoint(hash=1425b81428230e6a4f5763b97401722dcfa916c2c700b01ba49352ab1ea38f3a n=0) scriptSig= nSequence=0)] vout=[] wit=CTxWitness() nLockTime=0)


#### Populate outputs

We'll generate an output address in the Bitcoin Core wallet to send the funds to, determine the fee, and then populate the spending_tx with an output to that address.

In [26]:
# Generate new Bitcoin Core wallet address
dest_addr = test.nodes[0].getnewaddress(address_type="bech32")
scriptpubkey = bytes.fromhex(test.nodes[0].getaddressinfo(dest_addr)['scriptPubKey'])

# Determine minimum fee required for mempool acceptance
min_fee = int(test.nodes[0].getmempoolinfo()['mempoolminfee'] * 100000000)

# Complete output which returns funds to Bitcoin Core wallet
dest_output = CTxOut(nValue=output_value - min_fee, scriptPubKey=scriptpubkey)
spending_tx.vout = [dest_output]

print("Spending transaction:\n{}".format(spending_tx))

Spending transaction:
CTransaction(nVersion=1 vin=[CTxIn(prevout=COutPoint(hash=1425b81428230e6a4f5763b97401722dcfa916c2c700b01ba49352ab1ea38f3a n=0) scriptSig= nSequence=0)] vout=[CTxOut(nValue=0.49999000 scriptPubKey=0014859a2c1abd905da3cc866f16803123e664df1626)] wit=CTxWitness() nLockTime=0)


#### 3.1.4 _Programming Exercise:_ Create a valid key path output

In this exercise, we'll spend the taproot output using the key path. Since the key path is used, there is no control block to indicate whether or not the public key (Q) has an even or odd y-coordinate and so it is assumed that the y-coordinate is odd. Therefore, if Q needs to be negated, then so do all the private keys as well as the tweak.

In [27]:
# Negate keys if necessary
output_keyPath = output_pubkey
privKeyA_keyPath = main_privkeyA_c
privKeyB_keyPath = main_privkeyB_c
privKeyC_keyPath = main_privkeyC_c
tweak_keyPath = taptweak

if output_keyPath.get_y()%2 != 0:
    output_keyPath.negate()
    privKeyA_keyPath.negate()
    privKeyB_keyPath.negate()
    privKeyC_keyPath.negate()
    tweak_keyPath = SECP256K1_ORDER - taptweak

# Create sighash for ALL
sighash_musig =  TaprootSignatureHash(spending_tx, [output], SIGHASH_ALL_TAPROOT)
 
# Generate individual nonces for participants and an aggregate nonce point
nonceA = generate_schnorr_nonce()
nonceB = generate_schnorr_nonce()
nonceC = generate_schnorr_nonce()
# Remember to negate the individual nonces if necessary
R_agg, negated = aggregate_schnorr_nonces([nonceA.get_pubkey(), nonceB.get_pubkey(), nonceC.get_pubkey()])
if negated:
    nonceA.negate()
    nonceB.negate()
    nonceC.negate()

# Create an aggregate signature.
# Remember to add a factor for the tweak
sA = sign_musig(privKeyA_keyPath, nonceA, R_agg, output_pubkey, sighash_musig)
sB = sign_musig(privKeyB_keyPath, nonceB, R_agg, output_pubkey, sighash_musig)
sC = sign_musig(privKeyC_keyPath, nonceC, R_agg, output_pubkey, sighash_musig)
e = musig_digest(R_agg, output_keyPath, sighash_musig)
sig_agg = aggregate_musig_signatures([sA, sB, sC, e * tweak_keyPath], R_agg)

print("Aggregate signature is {}\n".format(sig_agg.hex()))

assert output_keyPath.verify_schnorr(sig_agg, sighash_musig)

# Construct transaction witness
spending_tx.wit.vtxinwit.append(CTxInWitness([sig_agg]))
 
print("spending_tx: {}\n".format(spending_tx))

# Test mempool acceptance
spending_tx_str = spending_tx.serialize().hex() 
assert test.nodes[0].testmempoolaccept([spending_tx_str])[0]['allowed']

print("Key path spending transaction weight: {}".format(test.nodes[0].decoderawtransaction(spending_tx_str)['weight']))

print("Success!")

Aggregate signature is 89ef3ac2792d63c686ac9e570dbf2fc196c091ae4429761e30ac80f418def26a7b4d0117b3cec265816b0c09397c1d28a0def7325c9b5102495720805d28bc69

spending_tx: CTransaction(nVersion=1 vin=[CTxIn(prevout=COutPoint(hash=1425b81428230e6a4f5763b97401722dcfa916c2c700b01ba49352ab1ea38f3a n=0) scriptSig= nSequence=0)] vout=[CTxOut(nValue=0.49999000 scriptPubKey=0014859a2c1abd905da3cc866f16803123e664df1626)] wit=CTxWitness(CScriptWitness(89ef3ac2792d63c686ac9e570dbf2fc196c091ae4429761e30ac80f418def26a7b4d0117b3cec265816b0c09397c1d28a0def7325c9b5102495720805d28bc69)) nLockTime=0)

Key path spending transaction weight: 396
Success!


#### 3.1.5 Example: Create a valid script path output for a short delay script

In this example, we'll spend the output using a script path for the short delay script. This will not be accepted to the mempool initially, because the locktime has not been reached.

In [30]:
# Construct transaction
spending_tx = CTransaction()

spending_tx.nVersion = 2
spending_tx.nLockTime = 0
outpoint = COutPoint(tx.sha256, output_index)
spending_tx_in = CTxIn(outpoint=outpoint, nSequence=delay)
spending_tx.vin = [spending_tx_in]
spending_tx.vout = [dest_output]

sighash = TaprootSignatureHash(spending_tx, [output], SIGHASH_ALL_TAPROOT, 0, scriptpath=True, script=tapscript_2a.script)

witness_elements = []

# Add signatures to the witness
# Remember to reverse the order of signatures
sigA = main_privkeyA.sign_schnorr(sighash)
sigB = main_privkeyB.sign_schnorr(sighash)
sigD = backup_privkeyD.sign_schnorr(sighash)

# Add witness to transaction
witness_elements = [sigD, sigB, sigA, tapscript_2a.script, control_map[tapscript_2a.script]]
spending_tx.wit.vtxinwit.append(CTxInWitness(witness_elements))
spending_tx_str = spending_tx.serialize().hex()

print(test.nodes[0].testmempoolaccept([spending_tx_str]))
# Test timelock

print("Short delay script path spending transaction weight: {}".format(test.nodes[0].decoderawtransaction(spending_tx_str)['weight']))

print("Success!")

[{'txid': '7cfb41ff678d9fcd316453f72dd7502d7b6bf4e30bd2df3b41ff92130c558c7c', 'wtxid': 'cbda675285a813f4fb51d42f1e5b8e9515af8dade026c786021bb6174fa719eb', 'allowed': False, 'reject-reason': 'non-BIP68-final'}]
Short delay script path spending transaction weight: 766
Success!


#### Generate enough blocks to satisfy timelock and retest mempool acceptance

Do not do this until you have completed the exercise above!

In [31]:
test.nodes[0].generate(delay - 1)

# Timelock not satisfied - transaction not accepted
assert not test.nodes[0].testmempoolaccept([spending_tx.serialize().hex()])[0]['allowed']

test.nodes[0].generate(1)

# Transaction should be accepted now that the timelock is satisfied
assert test.nodes[0].testmempoolaccept([spending_tx.serialize().hex()])[0]['allowed']

print("Success!")

Success!


#### 3.1.6 _Programming Exercise:_ Create a valid script path output for a long delay script

In this exercise, we'll spend the output using a script path for the long delay script. This will not be accepted to the mempool initially, because the locktime has not been reached.

In [33]:
# Construct transaction
spending_tx = CTransaction()

spending_tx.nVersion = 2
spending_tx.nLockTime = 0
outpoint = COutPoint(tx.sha256, output_index)
spending_tx_in = CTxIn(outpoint=outpoint, nSequence=long_delay)
spending_tx.vin = [spending_tx_in]
spending_tx.vout = [dest_output]

# Derive the sighash. Use tapscript_3a.
sighash =   TaprootSignatureHash(spending_tx, [output], SIGHASH_ALL_TAPROOT, 0, scriptpath=True, script=tapscript_3a.script)

witness_elements = []

# Add signatures to the witness
# Remember to reverse the order of signatures
sigA = main_privkeyA.sign_schnorr(sighash)
sigD = backup_privkeyD.sign_schnorr(sighash)
sigE = backup_privkeyE.sign_schnorr(sighash)

witness_elements = [sigE, sigD, sigA, tapscript_3a.script, control_map[tapscript_3a.script]]

# Construct transaction witness
spending_tx.wit.vtxinwit.append(CTxInWitness(witness_elements))
spending_tx_str = spending_tx.serialize().hex()

# Test timelock
print(test.nodes[0].testmempoolaccept([spending_tx_str]))
assert not test.nodes[0].testmempoolaccept([spending_tx_str])[0]['allowed']

print("Long delay script path spending transaction weight: {}".format(test.nodes[0].decoderawtransaction(spending_tx_str)['weight']))

print("Success!")

[{'txid': '69a0fc24a2fd6819b33af3cc923672e1ee03299fc1d3901ceebfb3f50ee30ba1', 'wtxid': 'b08bcd2d5abbc001252941d21c9211219d3dc3266d4f608c4ae3199745f64c09', 'allowed': False, 'reject-reason': 'non-BIP68-final'}]
Long delay script path spending transaction weight: 766
Success!


#### Generate enough blocks to satisfy timelock and retest mempool acceptance

Do not do this until you have completed the exercise above!

In [34]:
test.nodes[0].generate(long_delay - delay - 1)

# Timelock not satisfied - transaction not accepted
assert not test.nodes[0].testmempoolaccept([spending_tx.serialize().hex()])[0]['allowed'] 

test.nodes[0].generate(1)

# Transaction should be accepted now that the timelock is satisfied
assert test.nodes[0].testmempoolaccept([spending_tx.serialize().hex()])[0]['allowed']

print("Success!")

Success!


#### Shutdown

In [35]:
test.shutdown()

2023-05-02T06:28:22.988000Z TestFramework./tmp/bitcoin_func_test_1p5ap6lo (INFO): Stopping nodes
2023-05-02T06:28:23.090000Z TestFramework./tmp/bitcoin_func_test_1p5ap6lo (INFO): Cleaning up /tmp/bitcoin_func_test_1p5ap6lo on exit
2023-05-02T06:28:23.091000Z TestFramework./tmp/bitcoin_func_test_1p5ap6lo (INFO): Tests successful
