Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug CI failures, introduce task groups, error handling improvements #834

Merged
merged 1 commit into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions client/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,11 @@ async fn handle_command(
)
}
Command::WaitBlockHeight { height } => {
client.await_consensus_block_height(height).await;
Ok(CliOutput::WaitBlockHeight { reached: (height) })
client.await_consensus_block_height(height).await.transform(
|_| CliOutput::WaitBlockHeight { reached: (height) },
CliErrorKind::Timeout,
"timeout reached",
)
}
Command::ConnectInfo => {
let info = WsFederationConnect::from(client.config().as_ref());
Expand Down
16 changes: 14 additions & 2 deletions client/client-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use bitcoin::{secp256k1, Address, Transaction as BitcoinTransaction};
use bitcoin_hashes::{sha256, Hash};
use fedimint_api::db::Database;
use fedimint_api::module::TransactionItemAmount;
use fedimint_api::task::sleep;
use fedimint_api::task::{self, sleep};
use fedimint_api::tiered::InvalidAmountTierError;
use fedimint_api::TieredMulti;
use fedimint_api::{Amount, FederationModule, OutPoint, PeerId, TransactionId};
Expand Down Expand Up @@ -453,7 +453,17 @@ impl<T: AsRef<ClientConfig> + Clone> Client<T> {
self.reissue(all_coins, rng).await
}

pub async fn await_consensus_block_height(&self, block_height: u64) -> u64 {
pub async fn await_consensus_block_height(
&self,
block_height: u64,
) -> std::result::Result<u64, task::Elapsed> {
task::timeout(Duration::from_secs(30), async {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At minimum - we should have timeouts for any network operations, at least on the client side. It's better to return an error and have user retry, than just hang forever with no feedback to the user.

self.await_consensus_block_height_inner(block_height).await
})
.await
}

async fn await_consensus_block_height_inner(&self, block_height: u64) -> u64 {
loop {
match self.context.api.fetch_consensus_block_height().await {
Ok(height) if height >= block_height => return height,
Expand Down Expand Up @@ -1159,6 +1169,8 @@ pub enum ClientError {
FailedPaymentNoRefund,
#[error("Failed to delete unknown outgoing contract")]
DeleteUnknownOutgoingContract,
#[error("Timeout")]
Timeout,
}

impl From<InvalidAmountTierError> for ClientError {
Expand Down
21 changes: 16 additions & 5 deletions client/client-lib/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ mod tests {
use bitcoin_hashes::Hash;
use fedimint_api::config::BitcoindRpcCfg;
use fedimint_api::db::mem_impl::MemDatabase;
use fedimint_api::task::TaskGroup;
use fedimint_api::{Feerate, OutPoint, TransactionId};
use fedimint_core::epoch::EpochHistory;
use fedimint_core::modules::ln::contracts::incoming::IncomingContractOffer;
Expand Down Expand Up @@ -273,7 +274,9 @@ mod tests {
}
}

async fn new_mint_and_client() -> (
async fn new_mint_and_client(
task_group: &mut TaskGroup,
) -> (
Arc<tokio::sync::Mutex<Fed>>,
WalletClientConfig,
ClientContext,
Expand All @@ -286,11 +289,17 @@ mod tests {
FakeFed::<Wallet, WalletClientConfig>::new(
4,
move |cfg, db| {
let mut task_group = task_group.clone();
let btc_rpc_clone = btc_rpc.clone();
async move {
Wallet::new_with_bitcoind(cfg, db, btc_rpc_clone.clone().into())
.await
.unwrap()
Wallet::new_with_bitcoind(
cfg,
db,
btc_rpc_clone.clone().into(),
&mut task_group,
)
.await
.unwrap()
}
},
&BitcoindRpcCfg {
Expand All @@ -316,7 +325,9 @@ mod tests {

#[test_log::test(tokio::test)]
async fn create_output() {
let (fed, client_config, client_context, btc_rpc) = new_mint_and_client().await;
let mut task_group = TaskGroup::new();
let (fed, client_config, client_context, btc_rpc) =
new_mint_and_client(&mut task_group).await;
let _client = WalletClient {
config: &client_config,
context: &client_context,
Expand Down
7 changes: 5 additions & 2 deletions client/clientd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use clientd::{
};
use clientd::{Json as JsonExtract, SpendPayload};
use fedimint_core::config::load_from_file;
use mint_client::{Client, UserClientConfig};
use mint_client::{Client, ClientError, UserClientConfig};
use rand::rngs::OsRng;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -127,7 +127,10 @@ async fn wait_block_height(
JsonExtract(payload): JsonExtract<WaitBlockHeightPayload>,
) -> Result<impl IntoResponse, ClientdError> {
let client = &state.client;
client.await_consensus_block_height(payload.height).await;
client
.await_consensus_block_height(payload.height)
.await
.map_err(|_e| ClientError::Timeout)?;
json_success!("done")
}

Expand Down
152 changes: 146 additions & 6 deletions fedimint-api/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,164 @@
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use futures::lock::Mutex;
pub use imp::*;
use thiserror::Error;
#[cfg(not(target_family = "wasm"))]
use tokio::task::JoinHandle;
use tracing::info;
#[cfg(target_family = "wasm")]
type JoinHandle<T> = futures::future::Ready<anyhow::Result<T>>;

#[derive(Debug, Error)]
#[error("deadline has elapsed")]
pub struct Elapsed;

#[derive(Debug, Default)]
struct TaskGroupInner {
/// Was the shutdown requested, either externally or due to any task failure?
is_shutting_down: AtomicBool,
join: Mutex<Vec<(String, JoinHandle<()>)>>,
}

/// A group of task working together
///
/// Using this struct it is possible to spawn one or more
/// main thread collabarating, which can cooperatively gracefully
/// shut down, either due to external request, or failure of
/// one of them.
///
/// Each thread should periodically check [`TaskHandle`] or rely
/// on condition like channel disconnection to detect when it is time
/// to finish.
#[derive(Clone, Default, Debug)]
pub struct TaskGroup {
Copy link
Contributor Author

@dpc dpc Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have to polish this a bit before landing, but that's the idea - instead of spawning thread willy-nilly, have a primitive that keeps track of them, and allows to signal shutdown and wait for all of them to wrap up before exiting.

Passing ThreadGroup around improves ability to track which parts of the code can spawn threads/task, working as a unenforced capability-like resource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool abstraction. I wonder if we still need a way to get output from tasks in some places? Maybe not for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be done, where TaskHandle could give out some T: Write to places. Though as we are doing logging via tracing it really wants to do all relevant output to a global stderr so it might be hard.

inner: Arc<TaskGroupInner>,
}

impl TaskGroup {
pub fn new() -> Self {
Self::default()
}

fn make_handle(&self) -> TaskHandle {
TaskHandle {
inner: self.inner.clone(),
}
}

#[cfg(not(target_family = "wasm"))]
pub async fn spawn<Fut>(
&mut self,
name: impl Into<String>,
f: impl FnOnce(TaskHandle) -> Fut + Send + 'static,
) where
Fut: Future<Output = ()> + Send + 'static,
{
let name = name.into();
let mut guard = TaskPanicGuard {
name: name.clone(),
inner: self.inner.clone(),
completed: false,
};
let handle = self.make_handle();

if let Some(handle) = self::imp::spawn(async move {
f(handle).await;
}) {
self.inner.join.lock().await.push((name, handle));
}
guard.completed = true;
}

// TODO: Send vs lack of Send bound; do something about it
#[cfg(target_family = "wasm")]
pub async fn spawn<Fut>(
&mut self,
name: impl Into<String>,
f: impl FnOnce(TaskHandle) -> Fut + 'static,
) where
Fut: Future<Output = ()> + 'static,
{
let name = name.into();
let mut guard = TaskPanicGuard {
name: name.clone(),
inner: self.inner.clone(),
completed: false,
};
let handle = self.make_handle();

if let Some(handle) = self::imp::spawn(async move {
f(handle).await;
}) {
self.inner.join.lock().await.push((name, handle));
}
guard.completed = true;
}

pub fn shutdown(&self) {
self.inner.is_shutting_down.store(true, SeqCst);
}

pub async fn join_all(self) -> anyhow::Result<()> {
for (name, join) in self.inner.join.lock().await.drain(..) {
join.await
.map_err(|e| anyhow!("Thread {name} panicked with: {e}"))?
}
Ok(())
}
}

pub struct TaskPanicGuard {
name: String,
inner: Arc<TaskGroupInner>,
/// Did the future completed successfully (no panic)
completed: bool,
}

impl TaskPanicGuard {
pub fn is_shutting_down(&self) -> bool {
self.inner.is_shutting_down.load(SeqCst)
}
}

impl Drop for TaskPanicGuard {
fn drop(&mut self) {
if !self.completed {
info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a warning?

"Task {} shut down uncleanly. Shutting down task group.",
self.name
);
self.inner.is_shutting_down.store(true, SeqCst);
}
}
}

pub struct TaskHandle {
inner: Arc<TaskGroupInner>,
}

impl TaskHandle {
pub fn is_shutting_down(&self) -> bool {
self.inner.is_shutting_down.load(SeqCst)
}
}

#[cfg(not(target_family = "wasm"))]
mod imp {
pub use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

use super::*;

pub fn spawn<F>(future: F)
pub(crate) fn spawn<F>(future: F) -> Option<JoinHandle<()>>
where
F: Future<Output = ()> + Send + 'static,
{
tokio::spawn(future);
Some(tokio::spawn(future))
}

pub fn block_in_place<F, R>(f: F) -> R
Expand Down Expand Up @@ -52,12 +193,13 @@ mod imp {

use super::*;

pub fn spawn<F>(future: F)
pub(crate) fn spawn<F>(future: F) -> Option<JoinHandle<()>>
where
// No Send needed on wasm
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future)
wasm_bindgen_futures::spawn_local(future);
Copy link
Contributor Author

@dpc dpc Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For wasm no JoinHandle-like thing is returned, so we will just not be able to wait for the tasks to shut down, I guess. 🤷

None
}

pub fn block_in_place<F, R>(f: F) -> R
Expand Down Expand Up @@ -87,5 +229,3 @@ mod imp {
}
}
}

pub use imp::*;
10 changes: 7 additions & 3 deletions fedimintd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::PathBuf;

use clap::Parser;
use fedimint_api::db::Database;
use fedimint_api::task::TaskGroup;
use fedimint_core::modules::ln::LightningModule;
use fedimint_mint_server::MintServerModule;
use fedimint_server::config::ServerConfig;
Expand Down Expand Up @@ -86,11 +87,14 @@ async fn main() -> anyhow::Result<()> {
.into();
let btc_rpc = fedimint_bitcoind::bitcoincore_rpc::make_bitcoind_rpc(&cfg.wallet.btc_rpc)?;

let mut task_group = TaskGroup::new();

let mint = fedimint_core::modules::mint::Mint::new(cfg.mint.clone(), db.clone());

let wallet = Wallet::new_with_bitcoind(cfg.wallet.clone(), db.clone(), btc_rpc)
.await
.expect("Couldn't create wallet");
let wallet =
Wallet::new_with_bitcoind(cfg.wallet.clone(), db.clone(), btc_rpc, &mut task_group)
.await
.expect("Couldn't create wallet");

let ln = LightningModule::new(cfg.ln.clone(), db.clone());

Expand Down
1 change: 1 addition & 0 deletions integrationtests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ name = "fedimint-tests"
path = "tests/tests.rs"

[dev-dependencies]
anyhow = "1.0.65"
assert_matches = "1.5.0"
async-trait = "0.1.42"
bitcoin = "0.29.1"
Expand Down