Skip to content

Commit

Permalink
chore: speed up recovery by batching and streaming
Browse files Browse the repository at this point in the history
This is effectively undoing some parts of
21da635
hopefully to improve the real life recovery performance,
which people are reporting as very slow.

The WIP recovery refactoring should render this irrelevant, but
might be a good bandaid for 0.2.
  • Loading branch information
dpc committed Jan 15, 2024
1 parent a8422b8 commit ee60015
Showing 1 changed file with 48 additions and 23 deletions.
71 changes: 48 additions & 23 deletions modules/fedimint-mint-client/src/backup/recovery.rs
@@ -1,6 +1,6 @@
use std::cmp::max;
use std::cmp::{self, max};
use std::collections::BTreeMap;
use std::fmt;
use std::{fmt, ops};

use fedimint_client::sm::{State, StateTransition};
use fedimint_client::DynGlobalClientContext;
Expand All @@ -16,6 +16,7 @@ use fedimint_core::{Amount, NumPeers, OutPoint, PeerId, Tiered, TieredMulti};
use fedimint_derive_secret::DerivableSecret;
use fedimint_logging::LOG_CLIENT_RECOVERY_MINT;
use fedimint_mint_common::{MintInput, MintOutput, Nonce};
use futures::StreamExt as _;
use serde::{Deserialize, Serialize};
use tbs::{AggregatePublicKey, BlindedMessage, PublicKeyShare};
use threshold_crypto::G1Affine;
Expand Down Expand Up @@ -247,20 +248,32 @@ impl MintRestoreInProgressState {
decoders: ModuleDecoderRegistry,
secret: DerivableSecret,
) -> Self {
const PROGRESS_SNAPSHOT_BLOCKS: u64 = 50;

assert_eq!(secret.level(), 2);

info!(target: LOG_CLIENT_RECOVERY_MINT, "Processing block {}", self.next_epoch);
let block_range = self.next_epoch
..cmp::min(
self.next_epoch.wrapping_add(PROGRESS_SNAPSHOT_BLOCKS),
self.end_epoch,
);
debug!(
target: LOG_CLIENT_RECOVERY_MINT,
?block_range,
"Processing blocks"
);

for accepted_item in Self::await_block(api, decoders, self.next_epoch)
.await
.items
{
if let ConsensusItem::Transaction(transaction) = accepted_item.item {
self.handle_transaction(&transaction, &secret);
let mut block_stream = Self::fetch_block_stream(api, decoders, block_range);
while let Some((block_idx, block)) = block_stream.next().await {
assert_eq!(self.next_epoch, block_idx);
for accepted_item in block.items {
if let ConsensusItem::Transaction(transaction) = accepted_item.item {
self.handle_transaction(&transaction, &secret);
}
}
}

self.next_epoch += 1;
self.next_epoch += 1;
}

self
}
Expand All @@ -271,20 +284,32 @@ impl MintRestoreInProgressState {
/// errors via `sender` itself.
///
/// TODO: could be internal to recovery_loop?
async fn await_block<'a>(
fn fetch_block_stream<'a>(
api: DynGlobalApi,
decoders: ModuleDecoderRegistry,
index: u64,
) -> SessionOutcome {
loop {
info!(target: LOG_CLIENT_RECOVERY_MINT, index, "Awaiting block {index}");
match api.await_block(index, &decoders).await {
Ok(block) => return block,
Err(e) => {
info!(e = %e, index, "Error trying to fetch signed block");
}
}
}
epoch_range: ops::Range<u64>,
) -> impl futures::Stream<Item = (u64, SessionOutcome)> + 'a {
futures::stream::iter(epoch_range)
.map(move |block_idx| {
let api = api.clone();
let decoders = decoders.clone();
Box::pin(async move {
info!(block_idx, "Fetching epoch");

let block = loop {
info!(target: LOG_CLIENT_RECOVERY_MINT, block_idx, "Awaiting signed block");
match api.await_block(block_idx, &decoders).await {
Ok(block) => break block,
Err(e) => {
info!(e = %e, block_idx, "Error trying to fetch signed block");
}
}
};

(block_idx, block)
})
})
.buffered(8)
}
}

Expand Down

0 comments on commit ee60015

Please sign in to comment.