Skip to content

Commit

Permalink
Add ability for taker to connect to .onion makers
Browse files Browse the repository at this point in the history
Edit offerbook_sync.rs to set the tor port
  • Loading branch information
chris-belcher committed Feb 10, 2022
1 parent 73145d8 commit 0f75c1d
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rand = "0.7.3"
itertools = "0.9.0"
structopt = "0.3.21"
dirs = "3.0.1"
tokio-socks = "0.5"

#Empty default feature set, (helpful to generalise in github actions)
[features]
Expand Down
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub enum Error {
Disk(io::Error),
Protocol(&'static str),
Rpc(bitcoincore_rpc::Error),
Socks(tokio_socks::Error),
}

impl From<Box<dyn error::Error + Send>> for Error {
Expand All @@ -28,3 +29,9 @@ impl From<bitcoincore_rpc::Error> for Error {
Error::Rpc(e)
}
}

impl From<tokio_socks::Error> for Error {
fn from(e: tokio_socks::Error) -> Error {
Error::Socks(e)
}
}
3 changes: 3 additions & 0 deletions src/maker_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ async fn run(
Error::Rpc(e) => {
server_loop_comms_tx.send(Error::Rpc(e)).await.unwrap()
}
Error::Socks(e) => {
server_loop_comms_tx.send(Error::Socks(e)).await.unwrap()
}
};
break;
}
Expand Down
46 changes: 38 additions & 8 deletions src/offerbook_sync.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
use std::fmt;

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::sync::mpsc;

use crate::messages::{GiveOffer, MakerToTakerMessage, Offer, TakerHello, TakerToMakerMessage};

const TOR_ADDR: &str = "127.0.0.1:9150";

#[derive(Debug, Clone)]
pub struct OfferAddress {
pub enum MakerAddress {
Clearnet { address: String },
Tor { address: String },
}

#[derive(Debug, Clone)]
pub struct OfferAndAddress {
pub offer: Offer,
pub address: String, //string for now when its "localhost:port"
pub address: MakerAddress,
}

const MAKER_HOSTS: [&str; 5] = [
Expand All @@ -18,6 +28,24 @@ const MAKER_HOSTS: [&str; 5] = [
"localhost:46102",
];

impl MakerAddress {
pub fn get_tcpstream_address(&self) -> String {
match &self {
MakerAddress::Clearnet { address } => address.to_string(),
MakerAddress::Tor { address: _ } => String::from(TOR_ADDR),
}
}
}

impl fmt::Display for MakerAddress {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &self {
MakerAddress::Clearnet { address } => write!(f, "{}", address),
MakerAddress::Tor { address } => write!(f, "{}", address),
}
}
}

fn parse_message(line: &str) -> Option<MakerToTakerMessage> {
let message: MakerToTakerMessage = match serde_json::from_str(line) {
Ok(r) => r,
Expand All @@ -26,7 +54,7 @@ fn parse_message(line: &str) -> Option<MakerToTakerMessage> {
Some(message)
}

async fn download_maker_offer(host: &str) -> Option<OfferAddress> {
async fn download_maker_offer(host: &str) -> Option<OfferAndAddress> {
//TODO add timeouts to deal with indefinite hangs
let mut socket = match TcpStream::connect(host).await {
Ok(s) => s,
Expand Down Expand Up @@ -82,14 +110,16 @@ async fn download_maker_offer(host: &str) -> Option<OfferAddress> {
return None;
};

Some(OfferAddress {
Some(OfferAndAddress {
offer,
address: String::from(host),
address: MakerAddress::Clearnet {
address: String::from(host),
},
})
}

pub async fn sync_offerbook() -> Vec<OfferAddress> {
let (offers_writer_m, mut offers_reader) = mpsc::channel::<Option<OfferAddress>>(100);
pub async fn sync_offerbook() -> Vec<OfferAndAddress> {
let (offers_writer_m, mut offers_reader) = mpsc::channel::<Option<OfferAndAddress>>(100);
//unbounded_channel makes more sense here, but results in a compile
//error i cant figure out

Expand All @@ -102,7 +132,7 @@ pub async fn sync_offerbook() -> Vec<OfferAddress> {
});
}

let mut result = Vec::<OfferAddress>::new();
let mut result = Vec::<OfferAndAddress>::new();
for _ in 0..MAKER_HOSTS.len() {
if let Some(offer_addr) = offers_reader.recv().await.unwrap() {
result.push(offer_addr);
Expand Down
81 changes: 47 additions & 34 deletions src/taker_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::time::sleep;

use tokio_socks::tcp::Socks5Stream;

use bitcoin::consensus::encode::deserialize;
use bitcoin::hashes::hash160::Hash as Hash160;
use bitcoin::hashes::{hex::ToHex, Hash};
Expand Down Expand Up @@ -36,7 +38,7 @@ use crate::messages::{
SignSendersContractTx, SwapCoinPrivateKey, TakerHello, TakerToMakerMessage, PREIMAGE_LEN,
};

use crate::offerbook_sync::{sync_offerbook, OfferAddress};
use crate::offerbook_sync::{sync_offerbook, MakerAddress, OfferAndAddress};
use crate::wallet_sync::{
generate_keypair, import_watchonly_redeemscript, IncomingSwapCoin, OutgoingSwapCoin, Wallet,
};
Expand Down Expand Up @@ -97,7 +99,7 @@ async fn send_coinswap(
rpc: &Client,
wallet: &mut Wallet,
config: TakerConfig,
all_maker_offers_addresses: &Vec<OfferAddress>,
all_maker_offers_addresses: &Vec<OfferAndAddress>,
) -> Result<(), Error> {
let mut preimage = [0u8; PREIMAGE_LEN];
OsRng.fill_bytes(&mut preimage);
Expand All @@ -107,7 +109,7 @@ async fn send_coinswap(

let mut maker_offers_addresses = all_maker_offers_addresses
.iter()
.collect::<Vec<&OfferAddress>>();
.collect::<Vec<&OfferAndAddress>>();

let (
first_maker,
Expand Down Expand Up @@ -199,9 +201,9 @@ async fn send_coinswap(
//unwrap the option without checking for Option::None because we passed no contract txes
//to watch and therefore they cant be broadcast

let mut active_maker_addresses = Vec::<String>::new();
let mut active_maker_addresses = Vec::<&MakerAddress>::new();
let mut next_maker = first_maker;
let mut previous_maker: Option<&OfferAddress> = None;
let mut previous_maker: Option<&OfferAndAddress> = None;

let mut watchonly_swapcoins = Vec::<Vec<WatchOnlySwapCoin>>::new();
let mut incoming_swapcoins = Vec::<IncomingSwapCoin>::new();
Expand Down Expand Up @@ -254,7 +256,7 @@ async fn send_coinswap(
)
.await?;
next_maker = found_next_maker;
active_maker_addresses.push(this_maker.address.clone());
active_maker_addresses.push(&this_maker.address);

let wait_for_confirm_result = wait_for_funding_tx_confirmation(
rpc,
Expand Down Expand Up @@ -386,9 +388,9 @@ async fn send_coinswap(
}

fn choose_next_maker<'a>(
maker_offers_addresses: &mut Vec<&'a OfferAddress>,
maker_offers_addresses: &mut Vec<&'a OfferAndAddress>,
amount: u64,
) -> Option<&'a OfferAddress> {
) -> Option<&'a OfferAndAddress> {
loop {
let m = maker_offers_addresses.pop()?;
if amount < m.offer.min_size || amount > m.offer.max_size {
Expand Down Expand Up @@ -428,9 +430,16 @@ async fn read_message(reader: &mut BufReader<ReadHalf<'_>>) -> Result<MakerToTak
Ok(message)
}

async fn handshake_maker(
socket: &mut TcpStream,
) -> Result<(BufReader<ReadHalf<'_>>, WriteHalf<'_>), Error> {
async fn handshake_maker<'a>(
socket: &'a mut TcpStream,
maker_address: &MakerAddress,
) -> Result<(BufReader<ReadHalf<'a>>, WriteHalf<'a>), Error> {
let socket = match maker_address {
MakerAddress::Clearnet { address: _ } => socket,
MakerAddress::Tor { address } => Socks5Stream::connect_with_socket(socket, address.clone())
.await?
.into_inner(),
};
let (reader, mut socket_writer) = socket.split();
let mut socket_reader = BufReader::new(reader);
send_message(
Expand Down Expand Up @@ -495,7 +504,7 @@ fn generate_my_multisig_and_hashlock_keys(
}

async fn request_senders_contract_tx_signatures<S: SwapCoin>(
maker_address: &str,
maker_address: &MakerAddress,
outgoing_swapcoins: &[S],
maker_multisig_nonces: &[SecretKey],
maker_hashlock_nonces: &[SecretKey],
Expand Down Expand Up @@ -533,14 +542,15 @@ async fn request_senders_contract_tx_signatures<S: SwapCoin>(
}

async fn request_senders_contract_tx_signatures_attempt_once<S: SwapCoin>(
maker_address: &str,
maker_address: &MakerAddress,
outgoing_swapcoins: &[S],
maker_multisig_nonces: &[SecretKey],
maker_hashlock_nonces: &[SecretKey],
locktime: u16,
) -> Result<Vec<Signature>, Error> {
let mut socket = TcpStream::connect(maker_address).await?;
let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?;
let mut socket = TcpStream::connect(maker_address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, maker_address).await?;
log::info!("===> Sending SignSendersContractTx to {}", maker_address);
send_message(
&mut socket_writer,
Expand Down Expand Up @@ -591,7 +601,7 @@ async fn request_senders_contract_tx_signatures_attempt_once<S: SwapCoin>(
}

async fn request_receivers_contract_tx_signatures<S: SwapCoin>(
maker_address: &str,
maker_address: &MakerAddress,
incoming_swapcoins: &[S],
receivers_contract_txes: &[Transaction],
) -> Result<Vec<Signature>, Error> {
Expand Down Expand Up @@ -632,12 +642,13 @@ async fn request_receivers_contract_tx_signatures<S: SwapCoin>(
}

async fn request_receivers_contract_tx_signatures_attempt_once<S: SwapCoin>(
maker_address: &str,
maker_address: &MakerAddress,
incoming_swapcoins: &[S],
receivers_contract_txes: &[Transaction],
) -> Result<Vec<Signature>, Error> {
let mut socket = TcpStream::connect(maker_address).await?;
let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?;
let mut socket = TcpStream::connect(maker_address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, maker_address).await?;
send_message(
&mut socket_writer,
TakerToMakerMessage::SignReceiversContractTx(SignReceiversContractTx {
Expand Down Expand Up @@ -797,9 +808,9 @@ fn get_swapcoin_multisig_contract_redeemscripts_txes<S: SwapCoin>(
async fn exchange_signatures_and_find_next_maker<'a>(
rpc: &Client,
config: &TakerConfig,
maker_offers_addresses: &mut Vec<&'a OfferAddress>,
this_maker: &'a OfferAddress,
previous_maker: Option<&'a OfferAddress>,
maker_offers_addresses: &mut Vec<&'a OfferAndAddress>,
this_maker: &'a OfferAndAddress,
previous_maker: Option<&'a OfferAndAddress>,
is_taker_previous_peer: bool,
is_taker_next_peer: bool,
funding_txes: &[Transaction],
Expand All @@ -820,7 +831,7 @@ async fn exchange_signatures_and_find_next_maker<'a>(
Vec<SecretKey>,
SignSendersAndReceiversContractTxes,
Vec<Script>,
&'a OfferAddress,
&'a OfferAndAddress,
),
Error,
> {
Expand Down Expand Up @@ -878,9 +889,9 @@ async fn exchange_signatures_and_find_next_maker<'a>(
async fn exchange_signatures_and_find_next_maker_attempt_once<'a>(
rpc: &Client,
config: &TakerConfig,
maker_offers_addresses: &mut Vec<&'a OfferAddress>,
this_maker: &'a OfferAddress,
previous_maker: Option<&'a OfferAddress>,
maker_offers_addresses: &mut Vec<&'a OfferAndAddress>,
this_maker: &'a OfferAndAddress,
previous_maker: Option<&'a OfferAndAddress>,
is_taker_previous_peer: bool,
is_taker_next_peer: bool,
funding_txes: &[Transaction],
Expand All @@ -901,15 +912,16 @@ async fn exchange_signatures_and_find_next_maker_attempt_once<'a>(
Vec<SecretKey>,
SignSendersAndReceiversContractTxes,
Vec<Script>,
&'a OfferAddress,
&'a OfferAndAddress,
),
Error,
> {
//return next_peer_multisig_pubkeys, next_peer_multisig_keys_or_nonces,
// next_peer_hashlock_keys_or_nonces, (), next_swap_contract_redeemscripts, found_next_maker

let mut socket = TcpStream::connect(this_maker.address.clone()).await?;
let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?;
let mut socket = TcpStream::connect(this_maker.address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, &this_maker.address).await?;
let mut next_maker = this_maker;
let (
next_peer_multisig_pubkeys,
Expand Down Expand Up @@ -1059,7 +1071,7 @@ async fn exchange_signatures_and_find_next_maker_attempt_once<'a>(
async fn send_proof_of_funding_and_check_reply(
socket_reader: &mut BufReader<ReadHalf<'_>>,
socket_writer: &mut WriteHalf<'_>,
this_maker: &OfferAddress,
this_maker: &OfferAndAddress,
funding_txes: &[Transaction],
funding_tx_merkleproofs: &[String],
this_maker_multisig_redeemscripts: &[Script],
Expand Down Expand Up @@ -1409,7 +1421,7 @@ fn check_and_apply_maker_private_keys<S: SwapCoin>(
async fn settle_all_coinswaps_send_hash_preimage_and_privkeys(
config: &TakerConfig,
preimage: Preimage,
active_maker_addresses: &Vec<String>,
active_maker_addresses: &Vec<&MakerAddress>,
outgoing_swapcoins: &Vec<OutgoingSwapCoin>,
watchonly_swapcoins: &mut Vec<Vec<WatchOnlySwapCoin>>,
incoming_swapcoins: &mut Vec<IncomingSwapCoin>,
Expand Down Expand Up @@ -1474,7 +1486,7 @@ async fn settle_all_coinswaps_send_hash_preimage_and_privkeys(
}

async fn settle_one_coinswap(
maker_address: &String,
maker_address: &MakerAddress,
index: usize,
is_taker_previous_peer: bool,
is_taker_next_peer: bool,
Expand All @@ -1486,8 +1498,9 @@ async fn settle_one_coinswap(
receivers_multisig_redeemscripts: &Vec<Script>,
preimage: Preimage,
) -> Result<(), Error> {
let mut socket = TcpStream::connect(maker_address).await?;
let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?;
let mut socket = TcpStream::connect(maker_address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, maker_address).await?;

log::info!("===> Sending HashPreimage to {}", maker_address);
let maker_private_key_handover = send_hash_preimage_and_get_private_keys(
Expand Down
4 changes: 4 additions & 0 deletions src/watchtower_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ async fn run(rpc: &Client, kill_flag: Arc<RwLock<bool>>) -> Result<(), Error> {
Error::Rpc(e) => {
server_loop_err_comms_tx.send(Error::Rpc(e)).await.unwrap()
}
Error::Socks(e) => server_loop_err_comms_tx
.send(Error::Socks(e))
.await
.unwrap(),
};
break;
}
Expand Down

0 comments on commit 0f75c1d

Please sign in to comment.