Skip to content

Commit

Permalink
Have taker watch for broadcasted contract txes
Browse files Browse the repository at this point in the history
If contract transactions get broadcasted it means someone is deviating from
the protocol and therefore taker should stop with the coinswap.

Right now taker will just print a message and quit. In future the taker could
blacklist the makers fidelity bond who deviated from the protocol.

Also fixed up some clippy messages that were nearby in the code.
  • Loading branch information
chris-belcher committed Dec 18, 2021
1 parent 8c5a4be commit 7c3fc03
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 39 deletions.
2 changes: 0 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::error;
use std::io;

use bitcoincore_rpc;

// error enum for the whole project
// try to make functions return this
#[derive(Debug)]
Expand Down
11 changes: 5 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ fn display_wallet_balance(wallet_file_name: &PathBuf, long_form: Option<bool>) {
println!("total balance = {}", balance);

let incomplete_coinswaps = wallet.find_incomplete_coinswaps(&rpc).unwrap();
if incomplete_coinswaps.len() > 0 {
if !incomplete_coinswaps.is_empty() {
println!("= incomplete coinswaps =");
for (hashvalue, (utxo_incoming_swapcoins, utxo_outgoing_swapcoins)) in incomplete_coinswaps
{
Expand Down Expand Up @@ -235,7 +235,7 @@ fn display_wallet_balance(wallet_file_name: &PathBuf, long_form: Option<bool>) {

let (_incoming_contract_utxos, mut outgoing_contract_utxos) =
wallet.find_live_contract_unspents(&rpc).unwrap();
if outgoing_contract_utxos.len() > 0 {
if !outgoing_contract_utxos.is_empty() {
outgoing_contract_utxos.sort_by(|a, b| b.1.confirmations.cmp(&a.1.confirmations));
println!("= live timelocked contracts =");
println!(
Expand Down Expand Up @@ -400,13 +400,12 @@ fn run_watchtower() {
let rpc = match get_bitcoin_rpc() {
Ok(rpc) => rpc,
Err(error) => {
log::trace!(target: "main", "error connecting to bitcoin node: {:?}", error);
log::info!(target: "main", "error connecting to bitcoin node: {:?}", error);
return;
}
};

let rpc_ptr = Arc::new(rpc);
watchtower_protocol::start_watchtower(rpc_ptr);
watchtower_protocol::start_watchtower(&rpc);
}

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -523,7 +522,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Subcommand::TestWatchtowerClient {
mut contract_transactions_hex,
} => {
if contract_transactions_hex.len() == 0 {
if contract_transactions_hex.is_empty() {
// https://bitcoin.stackexchange.com/questions/68811/what-is-the-absolute-smallest-size-of-the-data-bytes-that-a-blockchain-transac
contract_transactions_hex = vec![String::from("0200000000010100000000000000000000000000000000000000000000000000000000000000000000000000fdffffff010100000000000000160014ffffffffffffffffffffffffffffffffffffffff022102000000000000000000000000000000000000000000000000000000000000000147304402207777777777777777777777777777777777777777777777777777777777777777022055555555555555555555555555555555555555555555555555555555555555550100000000")];
}
Expand Down
101 changes: 82 additions & 19 deletions src/taker_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::io::ErrorKind;
use std::iter::once;
use std::time::Duration;

use tokio::io::BufReader;
Expand Down Expand Up @@ -44,6 +45,9 @@ use crate::wallet_sync::{
generate_keypair, CoreAddressLabelType, IncomingSwapCoin, OutgoingSwapCoin, Wallet,
};

use crate::watchtower_client::ContractInfo;
use crate::watchtower_protocol::check_for_broadcasted_contract_txes;

#[tokio::main]
pub async fn start_taker(rpc: &Client, wallet: &mut Wallet) {
match run(rpc, wallet).await {
Expand Down Expand Up @@ -141,14 +145,21 @@ async fn send_coinswap(
.iter()
.map(|tx| tx.txid())
.collect::<Vec<Txid>>(),
&[],
&mut None,
)
.await?;
.await?
.unwrap();
//unwrap the option without checking for Option::None because we passed no contract txes
//to watch and therefore they cant be broadcast

let mut active_maker_addresses = Vec::<String>::new();
let mut previous_maker: Option<OfferAddress> = None;
let mut watchonly_swapcoins = Vec::<Vec<WatchOnlySwapCoin>>::new();
let mut incoming_swapcoins = Vec::<IncomingSwapCoin>::new();

let mut last_checked_block_height: Option<u64> = None;

for maker_index in 0..maker_count {
let maker = maker_offers_addresses.pop().unwrap();
let current_maker_port = maker.address.split(":").collect::<Vec<&str>>()[1];
Expand Down Expand Up @@ -286,7 +297,7 @@ async fn send_coinswap(
active_maker_addresses.push(maker.address.clone());

log::info!("Waiting for funding transaction confirmations",);
let (next_funding_txes, next_funding_tx_merkleproofs) = wait_for_funding_tx_confirmation(
let wait_for_confirm_result = wait_for_funding_tx_confirmation(
rpc,
&maker_sign_sender_and_receiver_contracts
.senders_contract_txes_info
Expand All @@ -297,8 +308,33 @@ async fn send_coinswap(
.txid
})
.collect::<Vec<Txid>>(),
&watchonly_swapcoins
.iter()
.map(|watchonly_swapcoin_list| {
watchonly_swapcoin_list
.iter()
.map(|watchonly_swapcoin| watchonly_swapcoin.contract_tx.clone())
.collect::<Vec<Transaction>>()
})
.chain(once(
outgoing_swapcoins
.iter()
.map(|osc| osc.contract_tx.clone())
.collect::<Vec<Transaction>>(),
))
.collect::<Vec<Vec<Transaction>>>(),
&mut last_checked_block_height,
)
.await?;
if wait_for_confirm_result.is_none() {
log::info!(concat!(
"Somebody deviated from the protocol by broadcasting one or more contract",
" transactions! Use main method `recover-from-incomplete-coinswap` to recover",
" coins"
));
panic!("ending");
}
let (next_funding_txes, next_funding_tx_merkleproofs) = wait_for_confirm_result.unwrap();
funding_txes = next_funding_txes;
funding_tx_merkleproofs = next_funding_tx_merkleproofs;

Expand Down Expand Up @@ -658,10 +694,27 @@ async fn request_receivers_contract_tx_signatures<S: SwapCoin>(
Ok(maker_receiver_contract_sig.sigs)
}

//return a list of the transactions and merkleproofs if the funding txes confirmed
//return None if any of the contract transactions were seen on the network
// if it turns out i want to return data in the contract tx broadcast case, then maybe use an enum
async fn wait_for_funding_tx_confirmation(
rpc: &Client,
funding_txids: &[Txid],
) -> Result<(Vec<Transaction>, Vec<String>), Error> {
contract_txes_to_watch: &[Vec<Transaction>],
last_checked_block_height: &mut Option<u64>,
) -> Result<Option<(Vec<Transaction>, Vec<String>)>, Error> {
let contract_infos_to_watch = contract_txes_to_watch
.iter()
.map(|contract_txes| {
contract_txes
.iter()
.map(|contract_tx| ContractInfo {
contract_tx: contract_tx.clone(),
})
.collect::<Vec<ContractInfo>>()
})
.collect::<Vec<Vec<ContractInfo>>>();

let mut txid_tx_map = HashMap::<Txid, Transaction>::new();
let mut txid_blockhash_map = HashMap::<Txid, BlockHash>::new();
loop {
Expand All @@ -682,27 +735,37 @@ async fn wait_for_funding_tx_confirmation(
}
}
if txid_tx_map.len() == funding_txids.len() {
break;
log::info!("Funding Transaction confirmed");

let txes = funding_txids
.iter()
.map(|txid| txid_tx_map.get(txid).unwrap().clone())
.collect::<Vec<Transaction>>();
let merkleproofs = funding_txids
.iter()
.map(|&txid| {
rpc.get_tx_out_proof(&[txid], Some(&txid_blockhash_map.get(&txid).unwrap()))
.map(|gettxoutproof_result| gettxoutproof_result.to_hex())
})
.collect::<Result<Vec<String>, bitcoincore_rpc::Error>>()?;
return Ok(Some((txes, merkleproofs)));
}
if !contract_infos_to_watch.is_empty() {
let contracts_broadcasted = check_for_broadcasted_contract_txes(
rpc,
&contract_infos_to_watch,
last_checked_block_height,
)?;
if contracts_broadcasted {
log::info!("Contract transactions were broadcasted! Aborting");
return Ok(None);
}
}

sleep(Duration::from_millis(1000)).await;
#[cfg(test)]
crate::test::generate_1_block(&get_bitcoin_rpc().unwrap());
}

log::info!("Funding Transaction confirmed");

let txes = funding_txids
.iter()
.map(|txid| txid_tx_map.get(txid).unwrap().clone())
.collect::<Vec<Transaction>>();
let merkleproofs = funding_txids
.iter()
.map(|&txid| {
rpc.get_tx_out_proof(&[txid], Some(&txid_blockhash_map.get(&txid).unwrap()))
.map(|gettxoutproof_result| gettxoutproof_result.to_hex())
})
.collect::<Result<Vec<String>, bitcoincore_rpc::Error>>()?;
Ok((txes, merkleproofs))
}

fn check_and_apply_maker_private_keys<S: SwapCoin>(
Expand Down
25 changes: 13 additions & 12 deletions src/watchtower_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashSet;
use std::iter::FromIterator;
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;

use tokio::io::BufReader;
Expand All @@ -15,7 +14,6 @@ use tokio::time::sleep;
use serde::{Deserialize, Serialize};

use bitcoin::Txid;
use bitcoincore_rpc;
use bitcoincore_rpc::{json::GetBlockResult, Client, RpcApi};

use crate::error::Error;
Expand Down Expand Up @@ -60,15 +58,15 @@ pub enum WatchtowerToMakerMessage {
//pub async fn

#[tokio::main]
pub async fn start_watchtower(rpc: Arc<Client>) {
pub async fn start_watchtower(rpc: &Client) {
match run(rpc).await {
Ok(_o) => log::info!("watchtower ended without error"),
Err(e) => log::info!("watchtower ended with err {:?}", e),
};
}

//TODO i think rpc doesnt need to be wrapped in Arc, because its not used in the spawned task
async fn run(rpc: Arc<Client>) -> Result<(), Error> {
async fn run(rpc: &Client) -> Result<(), Error> {
//TODO port number in config file
let port = 6103;
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port)).await?;
Expand Down Expand Up @@ -112,7 +110,7 @@ async fn run(rpc: Arc<Client>) -> Result<(), Error> {
//TODO make a const for this magic number of how often to poll, see similar
// comment in maker_protocol.rs
_ = sleep(Duration::from_secs(10)) => {
let r = check_for_watched_contract_txes(&rpc, &watched_contracts,
let r = check_for_broadcasted_contract_txes(&rpc, &watched_contracts,
&mut last_checked_block_height);
accepting_clients = r.is_ok();
log::info!("Timeout Branch, Accepting Clients @ {}", port);
Expand All @@ -126,7 +124,6 @@ async fn run(rpc: Arc<Client>) -> Result<(), Error> {
}

log::info!("<=== [{}] | Accepted Connection From", addr.port());
let _client_rpc = Arc::clone(&rpc);
let server_loop_err_comms_tx = server_loop_err_comms_tx.clone();
let watched_txes_comms_tx = watched_txes_comms_tx.clone();

Expand Down Expand Up @@ -239,11 +236,11 @@ enum TxidListType {
FromBlock(GetBlockResult),
}

fn check_for_watched_contract_txes(
rpc: &Arc<Client>,
all_contracts_to_watch: &Vec<Vec<ContractInfo>>,
pub fn check_for_broadcasted_contract_txes(
rpc: &Client,
all_contracts_to_watch: &[Vec<ContractInfo>],
last_checked_block_height: &mut Option<u64>,
) -> Result<(), bitcoincore_rpc::Error> {
) -> Result<bool, bitcoincore_rpc::Error> {
let mut network_txids = Vec::<TxidListType>::new();

let mempool_txids = rpc.get_raw_mempool()?;
Expand Down Expand Up @@ -277,6 +274,7 @@ fn check_for_watched_contract_txes(
}
*last_checked_block_height = Some(blockchain_tip_height);

let mut contract_broadcasted = false;
for txid_list_type in network_txids {
let txid_list = match txid_list_type {
TxidListType::FromMempool(txids) => {
Expand Down Expand Up @@ -305,7 +303,10 @@ fn check_for_watched_contract_txes(
"contract_txids_on_network = {:?}",
contract_txids_on_network
);
if contract_txids_on_network.len() == 0
if !contract_txids_on_network.is_empty() {
contract_broadcasted = true;
}
if contract_txids_on_network.is_empty()
|| contract_txids_on_network.len() == contracts_to_watch.len()
{
continue;
Expand Down Expand Up @@ -334,5 +335,5 @@ fn check_for_watched_contract_txes(
}
}

Ok(())
Ok(contract_broadcasted)
}

0 comments on commit 7c3fc03

Please sign in to comment.