Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 39 additions & 26 deletions minter/src/consolidate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,46 @@ pub async fn consolidate_deposits<R: CanisterRuntime>(runtime: R) {
Err(_) => return,
};

let consolidation_rounds: Vec<Vec<_>> =
read_state(|s| group_deposits_by_account(s.deposits_to_consolidate()))
.into_iter()
.chunks(MAX_TRANSFERS_PER_CONSOLIDATION)
.into_iter()
.map(Iterator::collect)
.collect();

for round in &consolidation_rounds
let all_deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate()));
let more_to_process =
all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION;
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, consolidate_deposits);
});
Comment thread
lpahlavi marked this conversation as resolved.

let batches: Vec<Vec<_>> = all_deposits
.into_iter()
.chunks(MAX_TRANSFERS_PER_CONSOLIDATION)
.into_iter()
.chunks(MAX_CONCURRENT_RPC_CALLS)
{
let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await {
Ok((slot, blockhash)) => (slot, blockhash),
Err(e) => {
log!(Priority::Info, "Failed to fetch recent blockhash: {e}");
return;
}
};

futures::future::join_all(round.map(async |funds| {
match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await {
Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"),
Err(e) => log!(Priority::Info, "Deposit consolidation failed: {e}"),
}
}))
.await;
.take(MAX_CONCURRENT_RPC_CALLS)
.map(Iterator::collect)
.collect();

if batches.is_empty() {
// Nothing to process
scopeguard::ScopeGuard::into_inner(reschedule);
return;
}

let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await {
Ok(result) => result,
Err(e) => {
log!(Priority::Info, "Failed to fetch recent blockhash: {e}");
return;
}
};

futures::future::join_all(batches.into_iter().map(async |funds| {
match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await {
Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"),
Err(e) => log!(Priority::Info, "Deposit consolidation failed: {e}"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was not changed by this PR, but maybe we should log errors with error priority?

}
}))
.await;

if !more_to_process {
// All work fits in this round
scopeguard::ScopeGuard::into_inner(reschedule);
}
}

Expand Down
64 changes: 60 additions & 4 deletions minter/src/consolidate/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::{MAX_TRANSFERS_PER_CONSOLIDATION, consolidate_deposits};
use crate::{
constants::MAX_CONCURRENT_RPC_CALLS,
numeric::LedgerMintIndex,
state::{
TaskType,
event::{DepositId, EventType, TransactionPurpose},
mutate_state,
mutate_state, read_state,
},
test_fixtures::{
EventsAssert, account, confirmed_block, deposit_id,
Expand Down Expand Up @@ -144,14 +145,14 @@ async fn should_submit_multiple_consolidation_batches() {
setup();

let funds: Vec<_> = (0..NUM_DEPOSITS)
.map(|i| (deposit_id(i as u8), (i as u64 + 1) * 1_000_000_000))
.map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000))
.collect();
add_funds_to_consolidate(&funds);

const BATCH_1_SIZE: usize = MAX_TRANSFERS_PER_CONSOLIDATION;

let fee_payer_signature_1 = signature(0);
let fee_payer_signature_2 = signature(BATCH_1_SIZE as u8);
let fee_payer_signature_2 = signature(BATCH_1_SIZE);
let slot = 100;

let mut runtime = TestCanisterRuntime::new()
Expand All @@ -167,7 +168,7 @@ async fn should_submit_multiple_consolidation_batches() {
)));

for i in 0..(2 + NUM_DEPOSITS) {
runtime = runtime.add_signature([i as u8; 64]);
runtime = runtime.add_signature(signature(i).into());
}

consolidate_deposits(runtime).await;
Expand Down Expand Up @@ -254,6 +255,61 @@ async fn should_consolidate_multiple_deposits_to_same_account_in_single_transfer
.assert_no_more_events();
}

#[tokio::test]
async fn should_reschedule_until_all_deposits_consolidated() {
setup();

let num_deposits = MAX_TRANSFERS_PER_CONSOLIDATION * MAX_CONCURRENT_RPC_CALLS + 1;
add_funds_to_consolidate(
&(0..num_deposits)
.map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000))
.collect::<Vec<_>>(),
);

let slot = 100;

// Round 1: processes MAX_CONCURRENT_RPC_CALLS batches, 1 deposit remains → reschedule
let mut runtime = TestCanisterRuntime::new()
.with_increasing_time()
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())));
for i in 0..MAX_CONCURRENT_RPC_CALLS {
runtime =
runtime.add_stub_response(SendTransactionResult::Consistent(Ok(signature(i).into())));
}
for i in 0..(MAX_CONCURRENT_RPC_CALLS + num_deposits) {
runtime = runtime.add_signature(signature(i).into());
}

consolidate_deposits(runtime.clone()).await;

read_state(|s| {
assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS);
assert_eq!(s.deposits_to_consolidate().len(), 1);
});
assert_eq!(runtime.set_timer_call_count(), 1);

// Round 2: processes the remaining 1 deposit → no reschedule
let last_sig = signature(num_deposits);
let runtime = TestCanisterRuntime::new()
.with_increasing_time()
.add_stub_response(SlotResult::Consistent(Ok(slot)))
.add_stub_response(BlockResult::Consistent(Ok(confirmed_block())))
.add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into())))
.add_signature(last_sig.into());

consolidate_deposits(runtime.clone()).await;

read_state(|s| {
assert_eq!(
s.submitted_transactions().len(),
MAX_CONCURRENT_RPC_CALLS + 1
);
assert!(s.deposits_to_consolidate().is_empty());
});
assert_eq!(runtime.set_timer_call_count(), 0);
}

fn setup() {
init_state();
init_schnorr_master_key();
Expand Down
4 changes: 0 additions & 4 deletions minter/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
/// Maximum number of concurrent calls to the SOL RPC canister.
pub const MAX_CONCURRENT_RPC_CALLS: usize = 10;

/// Maximum number of rounds per timer invocation.
/// Each round issues up to [`MAX_CONCURRENT_RPC_CALLS`] parallel RPC calls.
pub const MAX_TIMER_ROUNDS: usize = 5;

/// Matches the ICP HTTPS outcall response limit for variable-length RPC calls
/// such as `getTransaction` and `getSignatureStatuses`:
/// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request
Expand Down
4 changes: 2 additions & 2 deletions minter/src/dashboard/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ fn should_paginate_minted_deposits_across_multiple_pages() {
let remainder = total_deposits - DEFAULT_PAGE_SIZE * 2;

for i in 0..total_deposits {
accept_deposit(deposit_id(i as u8), 500_000_000);
mint_deposit(deposit_id(i as u8), i as u64);
accept_deposit(deposit_id(i), 500_000_000);
mint_deposit(deposit_id(i), i as u64);
}

let page1 = dashboard();
Expand Down
10 changes: 5 additions & 5 deletions minter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use candid::Principal;
use canlog::{Log, Sort};
use cksol_minter::withdraw::{WITHDRAWAL_PROCESSING_DELAY, process_pending_withdrawals};
use cksol_minter::{
address::lazy_get_schnorr_master_key, runtime::IcCanisterRuntime, state::read_state,
};
use cksol_minter::{
address::lazy_get_schnorr_master_key,
consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits},
monitor::{
FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions,
resubmit_transactions,
},
runtime::IcCanisterRuntime,
state::read_state,
withdraw::{WITHDRAWAL_PROCESSING_DELAY, process_pending_withdrawals},
};
use cksol_types::{
Address, DepositStatus, GetDepositAddressArgs, MinterInfo, UpdateBalanceArgs,
Expand Down Expand Up @@ -338,7 +338,7 @@ fn setup_timers() {
consolidate_deposits(IcCanisterRuntime::new()).await;
});
ic_cdk_timers::set_timer_interval(WITHDRAWAL_PROCESSING_DELAY, async || {
process_pending_withdrawals(&IcCanisterRuntime::new()).await;
process_pending_withdrawals(IcCanisterRuntime::new()).await;
Comment thread
lpahlavi marked this conversation as resolved.
});
ic_cdk_timers::set_timer_interval(FINALIZE_TRANSACTIONS_DELAY, async || {
finalize_transactions(IcCanisterRuntime::new()).await;
Expand Down
Loading
Loading