Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not retry if transactions exist in blockchain #675

Merged
merged 3 commits into from
Oct 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 16 additions & 14 deletions integrationtests/tests/fixtures/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use rand::rngs::OsRng;

use fedimint_api::Amount;
use fedimint_server::modules::ln::contracts::Preimage;
use fedimint_wallet::bitcoind::IBitcoindRpc;
use fedimint_wallet::bitcoind::{IBitcoindRpc, Result as BitcoinRpcResult};
use fedimint_wallet::txoproof::TxOutProof;
use fedimint_wallet::Feerate;
use ln_gateway::ln::{LightningError, LnRpc};
Expand Down Expand Up @@ -197,38 +197,40 @@ impl BitcoinTest for FakeBitcoinTest {

#[async_trait]
impl IBitcoindRpc for FakeBitcoinTest {
async fn get_network(&self) -> Network {
Network::Regtest
async fn get_network(&self) -> BitcoinRpcResult<Network> {
Ok(Network::Regtest)
}

async fn get_block_height(&self) -> u64 {
self.blocks.lock().unwrap().len() as u64
async fn get_block_height(&self) -> BitcoinRpcResult<u64> {
Ok(self.blocks.lock().unwrap().len() as u64)
}

async fn get_block_hash(&self, height: u64) -> BlockHash {
self.blocks.lock().unwrap()[(height - 1) as usize]
async fn get_block_hash(&self, height: u64) -> BitcoinRpcResult<BlockHash> {
Ok(self.blocks.lock().unwrap()[(height - 1) as usize]
.header
.block_hash()
.block_hash())
}

async fn get_block(&self, hash: &BlockHash) -> Block {
self.blocks
async fn get_block(&self, hash: &BlockHash) -> BitcoinRpcResult<Block> {
Ok(self
.blocks
.lock()
.unwrap()
.iter()
.find(|block| *hash == block.header.block_hash())
.unwrap()
.clone()
.clone())
}

async fn get_fee_rate(&self, _confirmation_target: u16) -> Option<Feerate> {
None
async fn get_fee_rate(&self, _confirmation_target: u16) -> BitcoinRpcResult<Option<Feerate>> {
Ok(None)
}

async fn submit_transaction(&self, transaction: Transaction) {
async fn submit_transaction(&self, transaction: Transaction) -> BitcoinRpcResult<()> {
let mut pending = self.pending.lock().unwrap();
if !pending.contains(&transaction) {
pending.push(transaction);
}
Ok(())
}
}
1 change: 1 addition & 0 deletions modules/fedimint-mint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ name = "fedimint_mint"
path = "src/lib.rs"

[dependencies]
anyhow = "1.0.65"
async-trait = "0.1"
bincode = "1.3.1"
counter = "0.5.2"
Expand Down
168 changes: 130 additions & 38 deletions modules/fedimint-wallet/src/bitcoincore_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,117 +1,209 @@
use crate::bitcoind::IBitcoindRpc;
use crate::bitcoind::Result;
use crate::{bitcoind::BitcoindRpc, Feerate};
use async_trait::async_trait;
use bitcoin::{Block, BlockHash, Network, Transaction};
use bitcoincore_rpc::bitcoincore_rpc_json::EstimateMode;
use bitcoincore_rpc::Auth;
use bitcoincore_rpc::jsonrpc::error::RpcError;
use bitcoincore_rpc::{jsonrpc, Auth, Error};
use fedimint_api::config::BitcoindRpcCfg;
use fedimint_api::module::__reexports::serde_json::Value;
use jsonrpc::error::Error as JsonError;
use serde::Deserialize;
use std::sync::atomic::Ordering;
use std::future::Future;
use std::time::Duration;
use tracing::info;
use tracing::warn;

pub fn make_bitcoind_rpc(cfg: &BitcoindRpcCfg) -> Result<BitcoindRpc, bitcoincore_rpc::Error> {
/// https://github.com/bitcoin/bitcoin/blob/ec0a4ad67769109910e3685da9c56c1b9f42414e/src/rpc/protocol.h#L48
const RPC_VERIFY_ALREADY_IN_CHAIN: i32 = -27;

pub fn make_bitcoind_rpc(
cfg: &BitcoindRpcCfg,
) -> std::result::Result<BitcoindRpc, bitcoincore_rpc::Error> {
let bitcoind_client = bitcoincore_rpc::Client::new(
&cfg.btc_rpc_address,
Auth::UserPass(cfg.btc_rpc_user.clone(), cfg.btc_rpc_pass.clone()),
)?;
let retry_client = RetryClient {
client: bitcoind_client,
retries: Default::default(),
inner: ErrorReporting::new(bitcoind_client),
max_retries: 10,
base_sleep: Duration::from_millis(10),
};

Ok(retry_client.into())
}

/// Wrapper around [`bitcoincore_rpc::Client`] logging failures
///
/// In the future we might tweak which errors are worth reporting exactly.
#[derive(Debug)]
struct RetryClient {
client: bitcoincore_rpc::Client,
retries: std::sync::atomic::AtomicU16,
max_retries: u16,
base_sleep: Duration,
struct ErrorReporting<C> {
inner: C,
}

impl<C> ErrorReporting<C> {
fn new(inner: C) -> Self
where
C: bitcoincore_rpc::RpcApi,
{
Self { inner }
}
}

impl bitcoincore_rpc::RpcApi for RetryClient {
impl<C> bitcoincore_rpc::RpcApi for ErrorReporting<C>
where
C: bitcoincore_rpc::RpcApi,
{
fn call<T: for<'a> Deserialize<'a>>(
&self,
cmd: &str,
args: &[Value],
) -> bitcoincore_rpc::Result<T> {
self.inner.call(cmd, args).map_err(|e| {
warn!("bitcoind returned error on cmd '{}': {}", cmd, e);
e
})
}
}

/// Wrapper around [`IBitcoindRpc`] that will retry failed calls
#[derive(Debug)]
struct RetryClient<C> {
inner: C,
max_retries: u16,
base_sleep: Duration,
}

impl<C> RetryClient<C> {
async fn retry_call<T, F, R>(&self, call_fn: F) -> Result<T>
where
F: Fn() -> R,
R: Future<Output = Result<T>>,
{
let mut retries = 0;
let mut fail_sleep = self.base_sleep;
let ret = loop {
match self.client.call(cmd, args) {
match call_fn().await {
Ok(ret) => {
break ret;
}
Err(e) => {
warn!("bitcoind returned error on cmd '{}': {}", cmd, e);
let retries = self.retries.fetch_add(1, Ordering::Relaxed);
retries += 1;

if retries > self.max_retries {
return Err(e);
}

info!("Will retry rpc after {}ms", fail_sleep.as_millis());
std::thread::sleep(fail_sleep);
fail_sleep *= 2;
}
}
};
self.retries.store(0, Ordering::Relaxed);
Ok(ret)
}
}

#[async_trait]
impl<C> IBitcoindRpc for RetryClient<C>
where
C: IBitcoindRpc,
{
async fn get_network(&self) -> Result<Network> {
self.retry_call(|| async { self.inner.get_network().await })
.await
}

async fn get_block_height(&self) -> Result<u64> {
self.retry_call(|| async { self.inner.get_block_height().await })
.await
}

async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
self.retry_call(|| async { self.inner.get_block_hash(height).await })
.await
}

async fn get_block(&self, hash: &BlockHash) -> Result<Block> {
self.retry_call(|| async { self.inner.get_block(hash).await })
.await
}

async fn get_fee_rate(&self, confirmation_target: u16) -> Result<Option<Feerate>> {
self.retry_call(|| async { self.inner.get_fee_rate(confirmation_target).await })
.await
}

async fn submit_transaction(&self, transaction: Transaction) -> Result<()> {
self.retry_call(|| async { self.inner.submit_transaction(transaction.clone()).await })
.await
}
}

#[async_trait]
impl<T> IBitcoindRpc for T
where
T: bitcoincore_rpc::RpcApi + Send + Sync,
{
async fn get_network(&self) -> Network {
let network = fedimint_api::task::block_in_place(|| self.get_blockchain_info())
.expect("Bitcoind returned an error");
match network.chain.as_str() {
async fn get_network(&self) -> Result<Network> {
let network = fedimint_api::task::block_in_place(|| {
self.get_blockchain_info().map_err(anyhow::Error::from)
})?;
Ok(match network.chain.as_str() {
"main" => Network::Bitcoin,
"test" => Network::Testnet,
"regtest" => Network::Regtest,
"signet" => Network::Signet,
n => panic!("Unknown Network \"{}\"", n),
}
})
}

async fn get_block_height(&self) -> u64 {
fedimint_api::task::block_in_place(|| self.get_block_count())
.expect("Bitcoind returned an error")
async fn get_block_height(&self) -> Result<u64> {
fedimint_api::task::block_in_place(|| {
self.get_block_count()
.map_err(anyhow::Error::from)
.map_err(Into::into)
})
}

async fn get_block_hash(&self, height: u64) -> BlockHash {
fedimint_api::task::block_in_place(|| bitcoincore_rpc::RpcApi::get_block_hash(self, height))
.expect("Bitcoind returned an error")
async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
fedimint_api::task::block_in_place(|| {
bitcoincore_rpc::RpcApi::get_block_hash(self, height)
.map_err(anyhow::Error::from)
.map_err(Into::into)
})
}

async fn get_block(&self, hash: &BlockHash) -> Block {
fedimint_api::task::block_in_place(|| bitcoincore_rpc::RpcApi::get_block(self, hash))
.expect("Bitcoind returned an error")
async fn get_block(&self, hash: &BlockHash) -> Result<Block> {
fedimint_api::task::block_in_place(|| {
bitcoincore_rpc::RpcApi::get_block(self, hash)
.map_err(anyhow::Error::from)
.map_err(Into::into)
})
}

async fn get_fee_rate(&self, confirmation_target: u16) -> Option<Feerate> {
fedimint_api::task::block_in_place(|| {
async fn get_fee_rate(&self, confirmation_target: u16) -> Result<Option<Feerate>> {
Ok(fedimint_api::task::block_in_place(|| {
self.estimate_smart_fee(confirmation_target, Some(EstimateMode::Conservative))
.map_err(anyhow::Error::from)
})
.expect("Bitcoind returned an error") // TODO: implement retry logic in case bitcoind is temporarily unreachable
.fee_rate
.map(|per_kb| Feerate {
sats_per_kvb: per_kb.as_sat(),
})
}))
}

async fn submit_transaction(&self, transaction: Transaction) {
if let Err(error) =
fedimint_api::task::block_in_place(|| self.send_raw_transaction(&transaction))
{
warn!(?error, "Submitting transaction failed");
}
async fn submit_transaction(&self, transaction: Transaction) -> Result<()> {
fedimint_api::task::block_in_place(|| match self.send_raw_transaction(&transaction) {
// for our purposes, this is not an error
Err(Error::JsonRpc(JsonError::Rpc(RpcError {
code: RPC_VERIFY_ALREADY_IN_CHAIN,
..
}))) => Ok(()),
Err(e) => Err(anyhow::Error::from(e).into()),
Ok(_) => Ok(()),
})
}
}