Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Avoid loading all notes in memory #2088

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ impl FedimintCli {
),
Command::Info => {
let client = cli.build_client(&self.module_gens).await?;
// FIXME: replace by methods that don't load everything in memory
let notes = client.notes().await;
let details_vec = notes
.iter()
Expand Down
4 changes: 4 additions & 0 deletions client/client-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,10 @@ impl<T: AsRef<ClientConfig> + Clone + Send> Client<T> {
}
}

pub async fn total_amount(&self) -> Amount {
self.mint_client().total_amount().await
}

pub async fn notes(&self) -> TieredMulti<SpendableNote> {
self.mint_client().notes().await
}
Expand Down
276 changes: 263 additions & 13 deletions client/client-lib/src/mint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::module::{ModuleCommon, TransactionItemAmount};
use fedimint_core::tiered::InvalidAmountTierError;
use fedimint_core::{Amount, OutPoint, Tiered, TieredMulti, TransactionId};
use fedimint_core::{Amount, OutPoint, Tiered, TieredMulti, TieredSummary, TransactionId};
use fedimint_mint_client::MintModuleTypes;
use futures::{Future, StreamExt};
use secp256k1_zkp::{KeyPair, Secp256k1, Signing};
Expand Down Expand Up @@ -260,14 +260,14 @@ impl MintClient {
) -> (NoteIssuanceRequests, TieredMulti<BlindNonce>) {
let mut amount_requests: Vec<((Amount, NoteIssuanceRequest), (Amount, BlindNonce))> =
Vec::new();
let denominations = TieredMulti::represent_amount(
let denominations = TieredSummary::represent_amount(
amount,
&self.notes().await,
&self.current_denominations().await,
&self.config.tbs_pks,
notes_per_denomination,
);
for (amt, num) in denominations.iter() {
for _ in 0..*num {
for _ in 0..num {
let (request, blind_nonce) =
self.new_ecash_note(&self.context.secp, amt, dbtx).await;
amount_requests.push(((amt, request), (amt, blind_nonce)));
Expand Down Expand Up @@ -383,13 +383,123 @@ impl MintClient {
NoteIssuanceRequest::new(ctx, secret)
}

async fn current_denominations(&self) -> TieredSummary {
self.start_dbtx()
.await
.find_by_prefix(&NoteKeyPrefix)
.await
.fold(
TieredSummary::default(),
|mut acc, (key, _note)| async move {
acc.inc(key.amount, 1);
acc
},
)
.await
}

pub async fn total_amount(&self) -> Amount {
self.start_dbtx()
.await
.find_by_prefix(&NoteKeyPrefix)
.await
.fold(
Amount::ZERO,
|acc, (key, _note)| async move { acc + key.amount },
)
.await
}

/// Select notes with total amount of *at least* `amount`. If more than
/// requested amount of notes are returned it was because exact change
/// couldn't be made, and the next smallest amount will be returned.
///
/// The caller can request change from the federation.
pub async fn select_notes(&self, amount: Amount) -> Result<TieredMulti<SpendableNote>> {
let notes = self.notes().await;
let selected_notes = notes.select_notes(amount).ok_or_else(|| {
MintClientError::InsufficientBalance(amount, TieredMulti::total_amount(&notes))
})?;
let mut t = self.start_dbtx().await;
let note_stream = t
.find_by_prefix_sorted_reverse(&NoteKeyPrefix)
.await
.map(|(key, note)| (key.amount, note));
Self::select_notes_from_stream(note_stream, amount).await
}

Ok(selected_notes)
// FIXME: delete after validation
async fn _select_notes_from_stream_total_amount<C>(
stream: impl futures::Stream<Item = (Amount, C)>,
total_amount: Amount,
mut amount: Amount,
) -> Result<TieredMulti<C>> {
if amount > total_amount {
return Err(MintClientError::InsufficientBalance(amount, total_amount));
}
let mut stream = Box::pin(stream);
let mut selected = vec![];
let mut remaining = total_amount;
let mut previous_amount: Option<Amount> = None; // used to assert descending order
while let Some((note_amount, note)) = stream.next().await {
remaining -= note_amount;
assert!(
previous_amount.map_or(true, |previous| previous >= note_amount),
"notes are not sorted in descending order"
);
previous_amount = Some(note_amount);
if note_amount <= amount {
amount -= note_amount;
selected.push((note_amount, note))
} else if remaining < amount {
// we can't make exact change, so just use this note
selected.push((note_amount, note));
break;
} // else there are still notes with smaller amounts, so keep
// looking
}
Ok(selected.into_iter().collect::<TieredMulti<C>>())
}

async fn select_notes_from_stream<Note>(
stream: impl futures::Stream<Item = (Amount, Note)>,
requested_amount: Amount,
) -> Result<TieredMulti<Note>> {
if requested_amount == Amount::ZERO {
return Ok(TieredMulti::default());
}
let mut stream = Box::pin(stream);
let mut selected = vec![];
let mut total_amount = Amount::ZERO;
let mut last_big_note_checkpoint: Option<(Amount, Note, usize)> = None;
let mut pending_amount = requested_amount;
let mut previous_amount: Option<Amount> = None; // used to assert descending order
while let Some((note_amount, note)) = stream.next().await {
assert!(
previous_amount.map_or(true, |previous| previous >= note_amount),
"notes are not sorted in descending order"
);
previous_amount = Some(note_amount);
total_amount += note_amount;
if note_amount < pending_amount {
pending_amount -= note_amount;
selected.push((note_amount, note))
} else if note_amount == pending_amount {
selected.push((note_amount, note));
return Ok(selected.into_iter().collect::<TieredMulti<Note>>());
} else {
last_big_note_checkpoint = Some((note_amount, note, selected.len()));
}
}
assert!(pending_amount > Amount::ZERO);
if requested_amount > total_amount {
return Err(MintClientError::InsufficientBalance(
requested_amount,
total_amount,
));
}
let (note_amount, note, checkpoint) =
last_big_note_checkpoint.expect("to have at least one big note");
// we can't make exact change, so just use this big note
selected[checkpoint] = (note_amount, note);
selected.truncate(checkpoint + 1);
Ok(selected.into_iter().collect::<TieredMulti<Note>>())
}

pub async fn receive_notes<'a, F, Fut>(
Expand Down Expand Up @@ -662,12 +772,15 @@ mod tests {
use fedimint_core::db::Database;
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::outcome::{SerdeOutputOutcome, TransactionStatus};
use fedimint_core::{Amount, OutPoint, ServerModule, Tiered, TransactionId};
use fedimint_core::{Amount, OutPoint, ServerModule, Tiered, TieredMulti, TransactionId};
use fedimint_mint_server::{Mint, MintGen, MintGenParams};
use fedimint_testing::FakeFed;
use futures::executor::block_on;
use futures::StreamExt;
use itertools::Itertools;
use tokio::sync::Mutex;

use super::*;
use crate::api::fake::FederationApiFaker;
use crate::mint::db::NextECashNoteIndexKey;
use crate::mint::MintClient;
Expand Down Expand Up @@ -815,7 +928,11 @@ mod tests {
const ISSUE_AMOUNT: Amount = Amount::from_sats(12);
issue_notes(&fed, &client, &context.db, ISSUE_AMOUNT).await;

assert_eq!(client.notes().await.total_amount(), ISSUE_AMOUNT)
assert_eq!(client.total_amount().await, ISSUE_AMOUNT)
}

fn notes_distribution(notes: &TieredMulti<SpendableNote>) -> Vec<(Amount, usize)> {
notes.summary().iter().collect::<Vec<_>>()
}

#[test_log::test(tokio::test)]
Expand All @@ -841,6 +958,10 @@ mod tests {
let _tbs_pks = &client.config.tbs_pks;
let rng = rand::rngs::OsRng;
let notes = client.select_notes(SPEND_AMOUNT).await.unwrap();
assert_eq!(
notes_distribution(&notes),
vec![(Amount::from_sats(1), 1), (Amount::from_sats(20), 1)]
);
let (spend_keys, ecash_input) = MintClient::ecash_input(notes.clone()).unwrap();

builder.input(&mut spend_keys.clone(), ecash_input.clone());
Expand Down Expand Up @@ -873,7 +994,11 @@ mod tests {
.await;

// The right amount of money is left
assert_eq!(client.notes().await.total_amount(), SPEND_AMOUNT);
assert_eq!(client.total_amount().await, SPEND_AMOUNT);
assert_eq!(
notes_distribution(&client.notes().await),
vec![(Amount::from_sats(1), 1), (Amount::from_sats(10), 2)]
);

// Double spends aren't possible
assert!(fed.lock().await.verify_input(&input).await.is_err());
Expand All @@ -883,6 +1008,10 @@ mod tests {
let mut dbtx = client.context.db.begin_transaction().await;
let mut builder = TransactionBuilder::default();
let notes = client.select_notes(SPEND_AMOUNT).await.unwrap();
assert_eq!(
notes_distribution(&notes),
vec![(Amount::from_sats(1), 1), (Amount::from_sats(10), 2)]
);
let rng = rand::rngs::OsRng;
let (spend_keys, ecash_input) = MintClient::ecash_input(notes).unwrap();

Expand Down Expand Up @@ -910,7 +1039,7 @@ mod tests {
);

// No money is left
assert_eq!(client.notes().await.total_amount(), Amount::ZERO);
assert_eq!(client.total_amount().await, Amount::ZERO);
}
}

Expand Down Expand Up @@ -994,4 +1123,125 @@ mod tests {
// Ensure we didn't skip any keys
assert_eq!(last_idx, result_count as u64);
}

#[test_log::test(tokio::test)]
async fn select_notes_avg_test() {
let max_amount = Amount::from_sats(1000000);
let tiers = Tiered::gen_denominations(max_amount);
let tiered =
TieredSummary::represent_amount::<()>(max_amount, &Default::default(), &tiers, 3);

let mut total_notes = 0;
for multiplier in 1..100 {
let stream = sorted_note_stream(tiered.iter().collect());
let select =
MintClient::select_notes_from_stream(stream, Amount::from_sats(multiplier * 1000))
.await;
total_notes += select.unwrap().into_iter_items().count();
}
assert_eq!(total_notes / 100, 10);
}

#[test_log::test(tokio::test)]
async fn select_notes_returns_exact_amount_with_minimum_notes() {
let f = || {
sorted_note_stream(vec![
(Amount::from_sats(1), 10),
(Amount::from_sats(5), 10),
(Amount::from_sats(20), 10),
])
};
let total = total(f()).await;
assert_eq!(
MintClient::_select_notes_from_stream_total_amount(f(), total, Amount::from_sats(7))
.await
.unwrap(),
notes(vec![(Amount::from_sats(1), 2), (Amount::from_sats(5), 1)])
);
assert_eq!(
MintClient::_select_notes_from_stream_total_amount(f(), total, Amount::from_sats(20))
.await
.unwrap(),
notes(vec![(Amount::from_sats(20), 1)])
);
assert_eq!(
MintClient::select_notes_from_stream(f(), Amount::from_sats(7))
.await
.unwrap(),
notes(vec![(Amount::from_sats(1), 2), (Amount::from_sats(5), 1)])
);
assert_eq!(
MintClient::select_notes_from_stream(f(), Amount::from_sats(20))
.await
.unwrap(),
notes(vec![(Amount::from_sats(20), 1)])
);
}

#[test_log::test(tokio::test)]
async fn select_notes_returns_next_smallest_amount_if_exact_change_cannot_be_made() {
let f = || {
sorted_note_stream(vec![
(Amount::from_sats(1), 1),
(Amount::from_sats(5), 5),
(Amount::from_sats(20), 5),
])
};
let total = total(f()).await;
assert_eq!(
MintClient::_select_notes_from_stream_total_amount(f(), total, Amount::from_sats(7))
.await
.unwrap(),
notes(vec![(Amount::from_sats(5), 2)])
);
assert_eq!(
MintClient::select_notes_from_stream(f(), Amount::from_sats(7))
.await
.unwrap(),
notes(vec![(Amount::from_sats(5), 2)])
);
}

#[test_log::test(tokio::test)]
async fn select_notes_returns_none_if_amount_is_too_large() {
let f = || sorted_note_stream(vec![(Amount::from_sats(10), 1)]);
let total = total(f()).await;
assert!(matches!(
MintClient::_select_notes_from_stream_total_amount(f(), total, Amount::from_sats(100))
.await
.unwrap_err(),
MintClientError::InsufficientBalance(_, _)
));
assert!(matches!(
MintClient::select_notes_from_stream(f(), Amount::from_sats(100))
.await
.unwrap_err(),
MintClientError::InsufficientBalance(_, _)
));
}

fn sorted_note_stream(
notes: Vec<(Amount, usize)>,
) -> impl futures::Stream<Item = (Amount, usize)> {
futures::stream::iter(
notes
.into_iter()
.flat_map(|(amount, number)| vec![(amount, 0_usize); number])
.sorted()
.rev(),
)
}

fn notes(notes: Vec<(Amount, usize)>) -> TieredMulti<usize> {
notes
.into_iter()
.flat_map(|(amount, number)| vec![(amount, 0_usize); number])
.collect()
}

async fn total(stream: impl futures::Stream<Item = (Amount, usize)>) -> Amount {
stream
.fold(Amount::ZERO, |acc, (amount, _)| async move { acc + amount })
.await
}
}