From 8613a52dbbc4722ea8f46720262ac3dada5c5680 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 12:42:56 +0200 Subject: [PATCH 01/11] refactor: single-round timers with immediate rescheduling Process one round of concurrent RPC calls per timer invocation, then reschedule immediately (via set_timer(0)) if work remains, instead of batching all work in a single async task. This avoids holding the timer guard across multiple async rounds and simplifies cancellation. Also simplify tests to use event helpers (events::expire_transaction, events::accept_withdrawal) instead of mocking finalize_transactions as setup, and remove the test-only MAX_SIGNATURES_PER_STATUS_CHECK override (using the real value of 256 with a usize-indexed signature helper instead). Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/mod.rs | 51 +++--- minter/src/consolidate/tests.rs | 102 +++++++++++- minter/src/constants.rs | 7 +- minter/src/dashboard/tests.rs | 4 +- minter/src/main.rs | 14 +- minter/src/monitor/mod.rs | 154 +++++++++-------- minter/src/monitor/tests.rs | 214 +++++++++++++++++------- minter/src/runtime/mod.rs | 12 +- minter/src/state/tests.rs | 4 +- minter/src/test_fixtures/mod.rs | 21 ++- minter/src/test_fixtures/runtime.rs | 7 + minter/src/withdraw/mod.rs | 62 ++++--- minter/src/withdraw/tests.rs | 246 +++++++++++++--------------- 13 files changed, 553 insertions(+), 345 deletions(-) diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index b6ec5efc..859421e8 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -1,5 +1,5 @@ use crate::{ - constants::MAX_CONCURRENT_RPC_CALLS, + constants::{MAX_CONCURRENT_RPC_CALLS, RESCHEDULE_DELAY}, guard::TimerGuard, numeric::LedgerMintIndex, runtime::CanisterRuntime, @@ -36,33 +36,42 @@ pub async fn consolidate_deposits(runtime: R) { Err(_) => return, }; - let consolidation_rounds: Vec> = + let reschedule = scopeguard::guard(runtime.clone(), |runtime| { + runtime.set_timer_with_clone(RESCHEDULE_DELAY, consolidate_deposits); + }); + + let batches: Vec> = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate())) .into_iter() .chunks(MAX_TRANSFERS_PER_CONSOLIDATION) .into_iter() + .take(MAX_CONCURRENT_RPC_CALLS) .map(Iterator::collect) .collect(); - for round in &consolidation_rounds - .into_iter() - .chunks(MAX_CONCURRENT_RPC_CALLS) - { - let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { - Ok((slot, blockhash)) => (slot, blockhash), - Err(e) => { - log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); - return; - } - }; - - futures::future::join_all(round.map(async |funds| { - match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await { - Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"), - Err(e) => log!(Priority::Info, "Deposit consolidation failed: {e}"), - } - })) - .await; + if batches.is_empty() { + scopeguard::ScopeGuard::into_inner(reschedule); // nothing to do, defuse reschedule + return; + } + + let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { + Ok(result) => result, + Err(e) => { + log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); + return; + } + }; + + futures::future::join_all(batches.into_iter().map(async |funds| { + match submit_consolidation_transaction(&runtime, funds, slot, recent_blockhash).await { + Ok(sig) => log!(Priority::Info, "Submitted consolidation transaction {sig}"), + Err(e) => log!(Priority::Info, "Deposit consolidation failed: {e}"), + } + })) + .await; + + if read_state(|s| s.deposits_to_consolidate().is_empty()) { + scopeguard::ScopeGuard::into_inner(reschedule); // nothing left, defuse reschedule } } diff --git a/minter/src/consolidate/tests.rs b/minter/src/consolidate/tests.rs index 193eb03c..491f11ca 100644 --- a/minter/src/consolidate/tests.rs +++ b/minter/src/consolidate/tests.rs @@ -1,10 +1,11 @@ use super::{MAX_TRANSFERS_PER_CONSOLIDATION, consolidate_deposits}; use crate::{ + constants::MAX_CONCURRENT_RPC_CALLS, numeric::LedgerMintIndex, state::{ TaskType, event::{DepositId, EventType, TransactionPurpose}, - mutate_state, + mutate_state, read_state, }, test_fixtures::{ EventsAssert, account, confirmed_block, deposit_id, @@ -144,14 +145,14 @@ async fn should_submit_multiple_consolidation_batches() { setup(); let funds: Vec<_> = (0..NUM_DEPOSITS) - .map(|i| (deposit_id(i as u8), (i as u64 + 1) * 1_000_000_000)) + .map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000)) .collect(); add_funds_to_consolidate(&funds); const BATCH_1_SIZE: usize = MAX_TRANSFERS_PER_CONSOLIDATION; let fee_payer_signature_1 = signature(0); - let fee_payer_signature_2 = signature(BATCH_1_SIZE as u8); + let fee_payer_signature_2 = signature(BATCH_1_SIZE); let slot = 100; let mut runtime = TestCanisterRuntime::new() @@ -254,6 +255,101 @@ async fn should_consolidate_multiple_deposits_to_same_account_in_single_transfer .assert_no_more_events(); } +#[tokio::test] +async fn should_consolidate_up_to_max_concurrent_batches_per_invocation() { + setup(); + + // Create enough deposits to require more than MAX_CONCURRENT_RPC_CALLS batches + let num_deposits = MAX_TRANSFERS_PER_CONSOLIDATION * MAX_CONCURRENT_RPC_CALLS + 1; + let funds: Vec<_> = (0..num_deposits) + .map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000)) + .collect(); + add_funds_to_consolidate(&funds); + + let slot = 100; + let mut runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(slot))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); + + // Provide signatures and send responses for MAX_CONCURRENT_RPC_CALLS batches + for i in 0..MAX_CONCURRENT_RPC_CALLS { + runtime = + runtime.add_stub_response(SendTransactionResult::Consistent(Ok(signature(i).into()))); + } + for i in 0..(MAX_CONCURRENT_RPC_CALLS + num_deposits) { + runtime = runtime.add_signature([i as u8; 64]); + } + + consolidate_deposits(runtime).await; + + // Only MAX_CONCURRENT_RPC_CALLS batches should have been submitted + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); + assert!( + !s.deposits_to_consolidate().is_empty(), + "Some deposits should remain unconsolidated" + ); + }); +} + +#[tokio::test] +async fn should_reschedule_immediately_when_deposits_remain() { + setup(); + + let num_deposits = MAX_TRANSFERS_PER_CONSOLIDATION * MAX_CONCURRENT_RPC_CALLS + 1; + let funds: Vec<_> = (0..num_deposits) + .map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000)) + .collect(); + add_funds_to_consolidate(&funds); + + let slot = 100; + let mut runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(slot))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); + for i in 0..MAX_CONCURRENT_RPC_CALLS { + runtime = + runtime.add_stub_response(SendTransactionResult::Consistent(Ok(signature(i).into()))); + } + for i in 0..(MAX_CONCURRENT_RPC_CALLS + num_deposits) { + runtime = runtime.add_signature([i as u8; 64]); + } + + consolidate_deposits(runtime.clone()).await; + + assert_eq!( + runtime.set_timer_call_count(), + 1, + "should reschedule when deposits remain" + ); +} + +#[tokio::test] +async fn should_not_reschedule_when_all_deposits_consolidated() { + setup(); + + add_funds_to_consolidate(&[(deposit_id(0), 1_000_000_000)]); + + let slot = 100; + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(slot))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SendTransactionResult::Consistent( + Ok(signature(0x11).into()), + )) + .add_signature(signature(0x11).into()); + + consolidate_deposits(runtime.clone()).await; + + assert_eq!( + runtime.set_timer_call_count(), + 0, + "should not reschedule when all deposits consolidated" + ); +} + fn setup() { init_state(); init_schnorr_master_key(); diff --git a/minter/src/constants.rs b/minter/src/constants.rs index 39d12d5d..3e8a2b70 100644 --- a/minter/src/constants.rs +++ b/minter/src/constants.rs @@ -1,9 +1,10 @@ +use std::time::Duration; + /// Maximum number of concurrent calls to the SOL RPC canister. pub const MAX_CONCURRENT_RPC_CALLS: usize = 10; -/// Maximum number of rounds per timer invocation. -/// Each round issues up to [`MAX_CONCURRENT_RPC_CALLS`] parallel RPC calls. -pub const MAX_TIMER_ROUNDS: usize = 5; +/// Short cooldown before rescheduling a timer that has more work to do. +pub const RESCHEDULE_DELAY: Duration = Duration::from_secs(1); /// Matches the ICP HTTPS outcall response limit for variable-length RPC calls /// such as `getTransaction` and `getSignatureStatuses`: diff --git a/minter/src/dashboard/tests.rs b/minter/src/dashboard/tests.rs index be86c2c6..11b2b7cf 100644 --- a/minter/src/dashboard/tests.rs +++ b/minter/src/dashboard/tests.rs @@ -207,8 +207,8 @@ fn should_paginate_minted_deposits_across_multiple_pages() { let remainder = total_deposits - DEFAULT_PAGE_SIZE * 2; for i in 0..total_deposits { - accept_deposit(deposit_id(i as u8), 500_000_000); - mint_deposit(deposit_id(i as u8), i as u64); + accept_deposit(deposit_id(i), 500_000_000); + mint_deposit(deposit_id(i), i as u64); } let page1 = dashboard(); diff --git a/minter/src/main.rs b/minter/src/main.rs index 3da81bbb..b0ec4c2a 100644 --- a/minter/src/main.rs +++ b/minter/src/main.rs @@ -1,16 +1,14 @@ use candid::Principal; use canlog::{Log, Sort}; +use cksol_minter::consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}; +use cksol_minter::monitor::{ + FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions, + resubmit_transactions, +}; use cksol_minter::withdraw::{WITHDRAWAL_PROCESSING_DELAY, process_pending_withdrawals}; use cksol_minter::{ address::lazy_get_schnorr_master_key, runtime::IcCanisterRuntime, state::read_state, }; -use cksol_minter::{ - consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}, - monitor::{ - FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions, - resubmit_transactions, - }, -}; use cksol_types::{ Address, DepositStatus, GetDepositAddressArgs, MinterInfo, UpdateBalanceArgs, UpdateBalanceError, WithdrawalArgs, WithdrawalError, WithdrawalOk, WithdrawalStatus, @@ -338,7 +336,7 @@ fn setup_timers() { consolidate_deposits(IcCanisterRuntime::new()).await; }); ic_cdk_timers::set_timer_interval(WITHDRAWAL_PROCESSING_DELAY, async || { - process_pending_withdrawals(&IcCanisterRuntime::new()).await; + process_pending_withdrawals(IcCanisterRuntime::new()).await; }); ic_cdk_timers::set_timer_interval(FINALIZE_TRANSACTIONS_DELAY, async || { finalize_transactions(IcCanisterRuntime::new()).await; diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index 01b30cf9..92142385 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,6 +1,6 @@ use crate::{ address::derivation_path, - constants::{MAX_CONCURRENT_RPC_CALLS, MAX_TIMER_ROUNDS}, + constants::{MAX_CONCURRENT_RPC_CALLS, RESCHEDULE_DELAY}, guard::TimerGuard, runtime::CanisterRuntime, signer::sign_bytes, @@ -57,6 +57,11 @@ pub async fn finalize_transactions(runtime: R) { return; } + // Reschedule unless all submitted transactions were checked in this invocation. + let reschedule = scopeguard::guard(runtime.clone(), |runtime| { + runtime.set_timer_with_clone(RESCHEDULE_DELAY, finalize_transactions); + }); + // Fetch the current slot before checking statuses: if a transaction finalizes // after we snapshot the slot, the status check will see it as finalized rather // than missing, so it will never be incorrectly marked as expired. @@ -113,6 +118,10 @@ pub async fn finalize_transactions(runtime: R) { }); } } + + if statuses.len() == all_transactions.len() { + scopeguard::ScopeGuard::into_inner(reschedule); + } } /// Resubmit transactions that have been marked for resubmission by @@ -133,7 +142,14 @@ pub async fn resubmit_transactions(runtime: R) { if to_resubmit.is_empty() { return; } + + let num_remaining = to_resubmit.len().saturating_sub(MAX_CONCURRENT_RPC_CALLS); + resubmit_expired_transactions(&runtime, to_resubmit).await; + + if num_remaining > 0 { + runtime.set_timer_with_clone(RESCHEDULE_DELAY, resubmit_transactions); + } } /// Result of checking transaction statuses. @@ -144,8 +160,18 @@ struct TransactionStatuses { errored: BTreeMap, /// Transactions with no on-chain status (safe to resubmit if expired). not_found: BTreeSet, + /// Number of signatures whose status was received (from successful batch requests). + /// Used to determine whether all submitted transactions were checked in this invocation. + checked_count: usize, // Transactions that are in-flight (Processed/Confirmed) or whose status - // check failed are implicitly excluded — they appear in neither set. + // check failed are implicitly excluded from the above sets. +} + +impl TransactionStatuses { + /// Total number of signatures whose status was received in this invocation. + fn len(&self) -> usize { + self.checked_count + } } async fn check_transaction_statuses( @@ -156,6 +182,7 @@ async fn check_transaction_statuses( .into_iter() .chunks(MAX_SIGNATURES_PER_STATUS_CHECK) .into_iter() + .take(MAX_CONCURRENT_RPC_CALLS) .map(Iterator::collect) .collect(); @@ -163,38 +190,37 @@ async fn check_transaction_statuses( succeeded: BTreeSet::new(), errored: BTreeMap::new(), not_found: BTreeSet::new(), + checked_count: 0, }; - for round in &batches.into_iter().chunks(MAX_CONCURRENT_RPC_CALLS) { - let batch_results: Vec<_> = futures::future::join_all(round.map(async |batch| { - match get_signature_statuses(runtime, &batch).await { - Ok(statuses) => Some((batch, statuses)), - Err(e) => { - log!(Priority::Info, "Failed to check transaction statuses: {e}"); - None - } + let batch_results: Vec<_> = futures::future::join_all(batches.into_iter().map(async |batch| { + match get_signature_statuses(runtime, &batch).await { + Ok(statuses) => Some((batch, statuses)), + Err(e) => { + log!(Priority::Info, "Failed to check transaction statuses: {e}"); + None } - })) - .await; - - for (sigs, statuses) in batch_results.into_iter().flatten() { - for (signature, status) in sigs.iter().zip(statuses) { - match status { - Some(s) - if s.confirmation_status - == Some(TransactionConfirmationStatus::Finalized) => - { - if let Some(err) = s.err { - result.errored.insert(*signature, format!("{err:?}")); - } else { - result.succeeded.insert(*signature); - } - } - Some(_) => {} // in-flight (Processed/Confirmed) - None => { - result.not_found.insert(*signature); + } + })) + .await; + + for (sigs, statuses) in batch_results.into_iter().flatten() { + result.checked_count += sigs.len(); + for (signature, status) in sigs.iter().zip(statuses) { + match status { + Some(s) + if s.confirmation_status == Some(TransactionConfirmationStatus::Finalized) => + { + if let Some(err) = s.err { + result.errored.insert(*signature, format!("{err:?}")); + } else { + result.succeeded.insert(*signature); } } + Some(_) => {} // in-flight (Processed/Confirmed) + None => { + result.not_found.insert(*signature); + } } } } @@ -206,48 +232,38 @@ async fn resubmit_expired_transactions( runtime: &R, to_resubmit: Vec<(Signature, VersionedMessage, Vec)>, ) { - let rounds: Vec> = to_resubmit - .into_iter() - .chunks(MAX_CONCURRENT_RPC_CALLS) - .into_iter() - .take(MAX_TIMER_ROUNDS) - .map(Iterator::collect) - .collect(); + let (new_slot, new_blockhash) = match get_recent_slot_and_blockhash(runtime).await { + Ok(result) => result, + Err(e) => { + log!(Priority::Info, "Failed to get recent blockhash: {e}"); + return; + } + }; - for round in rounds { - let (new_slot, new_blockhash) = match get_recent_slot_and_blockhash(runtime).await { - Ok(result) => result, - Err(e) => { - log!(Priority::Info, "Failed to get recent blockhash: {e}"); - return; + futures::future::join_all(to_resubmit.into_iter().take(MAX_CONCURRENT_RPC_CALLS).map( + async |(old_signature, message, signers)| { + match try_resubmit_transaction( + runtime, + old_signature, + message, + signers, + new_slot, + new_blockhash, + ) + .await + { + Ok(new_sig) => log!( + Priority::Info, + "Resubmitted transaction {old_signature} as {new_sig}" + ), + Err(e) => log!( + Priority::Info, + "Failed to resubmit transaction {old_signature}: {e}" + ), } - }; - - futures::future::join_all(round.into_iter().map( - async |(old_signature, message, signers)| { - match try_resubmit_transaction( - runtime, - old_signature, - message, - signers, - new_slot, - new_blockhash, - ) - .await - { - Ok(new_sig) => log!( - Priority::Info, - "Resubmitted transaction {old_signature} as {new_sig}" - ), - Err(e) => log!( - Priority::Info, - "Failed to resubmit transaction {old_signature}: {e}" - ), - } - }, - )) - .await; - } + }, + )) + .await; } async fn try_resubmit_transaction( diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index 2a8842f7..58db2685 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -1,9 +1,14 @@ -use super::{MAX_BLOCKHASH_AGE, finalize_transactions, resubmit_transactions}; +use super::{ + MAX_BLOCKHASH_AGE, MAX_SIGNATURES_PER_STATUS_CHECK, finalize_transactions, + resubmit_transactions, +}; use crate::{ + constants::MAX_CONCURRENT_RPC_CALLS, + state::event::DepositId, state::{TaskType, event::EventType, mutate_state, read_state, reset_state}, storage::reset_events, test_fixtures::{ - EventsAssert, MINTER_ACCOUNT, confirmed_block, deposit_id, events, init_schnorr_master_key, + EventsAssert, MINTER_ACCOUNT, account, confirmed_block, events, init_schnorr_master_key, init_state, runtime::TestCanisterRuntime, signature, }, }; @@ -70,6 +75,77 @@ mod finalization { assert_eq!(events_before, events_after); } + #[tokio::test] + async fn should_process_up_to_max_concurrent_batches_per_invocation() { + setup(); + + let num = MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK + 1; + for i in 0..num { + submit_consolidation_transaction_with_signature(i, RECENT_SLOT); + } + + let mut runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); + for _ in 0..MAX_CONCURRENT_RPC_CALLS { + runtime = runtime.add_stub_response(SignatureStatusesResult::Consistent(Ok( + vec![None; MAX_SIGNATURES_PER_STATUS_CHECK], + ))); + } + + finalize_transactions(runtime).await; + + // All transactions remain: processed ones are not expired (RECENT_SLOT), + // and the last one was not checked at all. + read_state(|s| assert_eq!(s.submitted_transactions().len(), num)); + } + + #[tokio::test] + async fn should_reschedule_immediately_when_transactions_remain() { + setup(); + + let num = MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK + 1; + for i in 0..num { + submit_consolidation_transaction_with_signature(i, RECENT_SLOT); + } + + let mut runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); + for _ in 0..MAX_CONCURRENT_RPC_CALLS { + runtime = runtime.add_stub_response(SignatureStatusesResult::Consistent(Ok( + vec![None; MAX_SIGNATURES_PER_STATUS_CHECK], + ))); + } + + let runtime_ref = runtime.clone(); + finalize_transactions(runtime).await; + + assert_eq!(runtime_ref.set_timer_call_count(), 1); + } + + #[tokio::test] + async fn should_not_reschedule_when_all_transactions_finalized() { + setup(); + + submit_consolidation_transaction(RECENT_SLOT); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![Some( + finalized_status(), + )]))); + + let runtime_ref = runtime.clone(); + finalize_transactions(runtime).await; + + assert_eq!(runtime_ref.set_timer_call_count(), 0); + } + #[tokio::test] async fn should_finalize_transaction_with_finalized_status() { setup(); @@ -271,25 +347,12 @@ mod resubmission { let old_signature = submit_consolidation_transaction(EXPIRED_SLOT); let new_signature = signature(0xAA); - - // Step 1: finalize_transactions fetches slot, checks status, marks expired - let finalize_runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) - .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) - .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))); - - finalize_transactions(finalize_runtime).await; - - EventsAssert::from_recorded().expect_contains_event_eq(EventType::ExpiredTransaction { - signature: old_signature, - }); + events::expire_transaction(old_signature); read_state(|s| { assert!(s.transactions_to_resubmit().contains_key(&old_signature)); }); - // Step 2: resubmit_transactions fetches a fresh blockhash and resubmits let resubmit_runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) @@ -348,14 +411,7 @@ mod resubmission { let old_signature = submit_consolidation_transaction(EXPIRED_SLOT); let new_signature = signature(0xAA); - - let finalize_runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) - .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) - .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))); - - finalize_transactions(finalize_runtime).await; + events::expire_transaction(old_signature); let resubmit_runtime = TestCanisterRuntime::new() .with_increasing_time() @@ -378,63 +434,93 @@ mod resubmission { } #[tokio::test] - async fn should_resubmit_multiple_expired_transactions_in_batches() { - use crate::constants::MAX_CONCURRENT_RPC_CALLS; - + async fn should_process_up_to_max_concurrent_rpc_calls_per_invocation() { setup(); - let num_transactions = MAX_CONCURRENT_RPC_CALLS + 2; // 10+2 = 12 transactions require 2 rounds + let num_transactions = MAX_CONCURRENT_RPC_CALLS + 2; for i in 0..num_transactions { - submit_consolidation_transaction_with_signature(i as u8, EXPIRED_SLOT); + let sig = submit_consolidation_transaction_with_signature(i, EXPIRED_SLOT); + events::expire_transaction(sig); } - // finalize_transactions: marks all as expired - let finalize_runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) - .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) - // getSignatureStatuses: all not found - .add_stub_response(SignatureStatusesResult::Consistent(Ok( - vec![None; num_transactions], - ))); - - finalize_transactions(finalize_runtime).await; - read_state(|s| { assert_eq!(s.transactions_to_resubmit().len(), num_transactions); }); - // resubmit_transactions: processes in rounds of MAX_CONCURRENT_RPC_CALLS let mut resubmit_runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); - for i in 0..MAX_CONCURRENT_RPC_CALLS { resubmit_runtime = resubmit_runtime - .add_stub_response(SendTransactionResult::Consistent(Ok(signature( - 0xA0 + i as u8, - ) - .into()))) + .add_stub_response(SendTransactionResult::Consistent(Ok( + signature(0xA0 + i).into() + ))) .add_signature([0xA0 + i as u8; 64]); } - resubmit_runtime = resubmit_runtime + resubmit_transactions(resubmit_runtime).await; + + let remaining = num_transactions - MAX_CONCURRENT_RPC_CALLS; + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); + assert_eq!(s.transactions_to_resubmit().len(), remaining); + }); + } + + #[tokio::test] + async fn should_reschedule_immediately_when_transactions_remain() { + setup(); + + let num_transactions = MAX_CONCURRENT_RPC_CALLS + 2; + for i in 0..num_transactions { + let sig = submit_consolidation_transaction_with_signature(i, EXPIRED_SLOT); + events::expire_transaction(sig); + } + + let mut resubmit_runtime = TestCanisterRuntime::new() + .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); - - for i in 0..2_usize { + for i in 0..MAX_CONCURRENT_RPC_CALLS { resubmit_runtime = resubmit_runtime - .add_stub_response(SendTransactionResult::Consistent(Ok(signature( - 0xB0 + i as u8, - ) - .into()))) - .add_signature([0xB0 + i as u8; 64]); + .add_stub_response(SendTransactionResult::Consistent(Ok( + signature(0xA0 + i).into() + ))) + .add_signature([0xA0 + i as u8; 64]); } + let runtime_ref = resubmit_runtime.clone(); resubmit_transactions(resubmit_runtime).await; - read_state(|s| assert_eq!(s.submitted_transactions().len(), num_transactions)); + assert_eq!(runtime_ref.set_timer_call_count(), 1); + } + + #[tokio::test] + async fn should_not_reschedule_when_all_transactions_resubmitted() { + setup(); + + let old_signature = submit_consolidation_transaction(EXPIRED_SLOT); + let new_signature = signature(0xAA); + events::expire_transaction(old_signature); + + let resubmit_runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) + .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) + .add_stub_response(SendTransactionResult::Consistent(Ok(new_signature.into()))) + .add_signature(new_signature.into()); + + let runtime_ref = resubmit_runtime.clone(); + resubmit_transactions(resubmit_runtime).await; + + EventsAssert::from_recorded().expect_contains_event_eq(EventType::ResubmittedTransaction { + old_signature, + new_signature, + new_slot: RESUBMISSION_SLOT, + }); + + assert_eq!(runtime_ref.set_timer_call_count(), 0); } } @@ -448,12 +534,16 @@ fn submit_consolidation_transaction(slot: Slot) -> solana_signature::Signature { } fn submit_consolidation_transaction_with_signature( - i: u8, + i: usize, slot: Slot, ) -> solana_signature::Signature { - let signature = signature(i); - events::accept_deposit(deposit_id(i), 1_000_000); - events::mint_deposit(deposit_id(i), i as u64); - events::submit_consolidation(signature, MINTER_ACCOUNT, slot, vec![i as u64]); - signature + let sig = signature(i); + let dep_id = DepositId { + signature: sig, + account: account(0), + }; + events::accept_deposit(dep_id, 1_000_000); + events::mint_deposit(dep_id, i as u64); + events::submit_consolidation(sig, MINTER_ACCOUNT, slot, vec![i as u64]); + sig } diff --git a/minter/src/runtime/mod.rs b/minter/src/runtime/mod.rs index fc9660fa..49d9e44b 100644 --- a/minter/src/runtime/mod.rs +++ b/minter/src/runtime/mod.rs @@ -1,7 +1,7 @@ use crate::signer::{IcSchnorrSigner, SchnorrSigner}; use candid::Principal; use ic_canister_runtime::{IcRuntime, Runtime}; -use std::{fmt::Debug, time::Duration}; +use std::{fmt::Debug, future::Future, time::Duration}; pub trait CanisterRuntime: Clone + 'static { fn inter_canister_call_runtime(&self) -> impl Runtime; @@ -17,6 +17,16 @@ pub trait CanisterRuntime: Clone + 'static { delay: Duration, future: impl Future + 'static, ) -> ic_cdk_timers::TimerId; + + fn set_timer_with_clone(&self, delay: Duration, f: F) -> ic_cdk_timers::TimerId + where + Self: Sized, + F: FnOnce(Self) -> Fut + 'static, + Fut: Future + 'static, + { + let clone = self.clone(); + self.set_timer(delay, async move { f(clone).await }) + } } #[derive(Clone, Default, Debug)] diff --git a/minter/src/state/tests.rs b/minter/src/state/tests.rs index c4f6cb57..0a0692c3 100644 --- a/minter/src/state/tests.rs +++ b/minter/src/state/tests.rs @@ -427,7 +427,9 @@ fn should_track_balance_through_deposits_withdrawals_and_failures() { } fn submit_transaction(sig: Signature, num_signers: u8, purpose: TransactionPurpose) { - let signers: Vec<_> = (0..num_signers).map(|i| account(100 + i)).collect(); + let signers: Vec<_> = (0..num_signers) + .map(|i| account(100 + i as usize)) + .collect(); mutate_state(|state| { process_event( state, diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index 47072fa2..ac46fe6a 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -94,9 +94,16 @@ pub fn init_schnorr_master_key() { }); } -/// Returns a [`Signature`] filled with byte `i`, e.g. `signature(1)` → `[0x01; 64]`. -pub fn signature(i: u8) -> solana_signature::Signature { - solana_signature::Signature::from([i; 64]) +/// Returns a [`Signature`] unique for any `usize` index. +/// For `i < 256`, produces `[i as u8; 64]`. +pub fn signature(i: usize) -> solana_signature::Signature { + if i < 256 { + solana_signature::Signature::from([i as u8; 64]) + } else { + let mut bytes = [0u8; 64]; + bytes[..8].copy_from_slice(&(i as u64).to_le_bytes()); + solana_signature::Signature::from(bytes) + } } /// Returns a [`ConfirmedBlock`] with a deterministic blockhash for use in RPC mock stubs. @@ -115,17 +122,17 @@ pub fn confirmed_block() -> sol_rpc_types::ConfirmedBlock { } /// Returns a [`DepositId`] with deterministic signature and account derived from `i`. -pub fn deposit_id(i: u8) -> DepositId { +pub fn deposit_id(i: usize) -> DepositId { DepositId { - signature: solana_signature::Signature::from([i; 64]), + signature: signature(i), account: account(i), } } /// Returns an [`Account`] with a deterministic principal derived from `i`. -pub fn account(i: u8) -> Account { +pub fn account(i: usize) -> Account { Account { - owner: Principal::from_slice(&[i; 29]), + owner: Principal::from_slice(&[i as u8; 29]), subaccount: None, } } diff --git a/minter/src/test_fixtures/runtime.rs b/minter/src/test_fixtures/runtime.rs index 7d6f1955..ff217495 100644 --- a/minter/src/test_fixtures/runtime.rs +++ b/minter/src/test_fixtures/runtime.rs @@ -3,6 +3,7 @@ use crate::{runtime::CanisterRuntime, signer::SchnorrSigner}; use candid::{CandidType, Principal}; use ic_canister_runtime::{IcError, Runtime, StubRuntime}; use ic_cdk_management_canister::SignCallError; +use std::sync::{Arc, Mutex}; use std::time::Duration; pub const TEST_CANISTER_ID: Principal = Principal::from_slice(&[0xCA; 10]); @@ -16,6 +17,7 @@ pub struct TestCanisterRuntime { msg_cycles_accept: Stubs, msg_cycles_available: Stubs, msg_cycles_refunded: Stubs, + set_timer_call_count: Arc>, } impl TestCanisterRuntime { @@ -68,6 +70,10 @@ impl TestCanisterRuntime { self.signer = self.signer.add_response(Err(error)); self } + + pub(crate) fn set_timer_call_count(&self) -> usize { + *self.set_timer_call_count.lock().unwrap() + } } impl CanisterRuntime for TestCanisterRuntime { @@ -110,6 +116,7 @@ impl CanisterRuntime for TestCanisterRuntime { _delay: Duration, _future: impl Future + 'static, ) -> ic_cdk_timers::TimerId { + *self.set_timer_call_count.lock().unwrap() += 1; Default::default() } } diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 933567ea..50af4cd0 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -12,7 +12,7 @@ use itertools::Itertools; use sol_rpc_types::Slot; use solana_hash::Hash; -use crate::constants::{MAX_CONCURRENT_RPC_CALLS, MAX_TIMER_ROUNDS}; +use crate::constants::{MAX_CONCURRENT_RPC_CALLS, RESCHEDULE_DELAY}; use crate::ledger::BurnError; use crate::{ consolidate::consolidate_deposits, @@ -91,7 +91,7 @@ pub async fn withdraw( Ok(WithdrawalOk { block_index }) } -pub async fn process_pending_withdrawals(runtime: &R) { +pub async fn process_pending_withdrawals(runtime: R) { let _guard = match TimerGuard::new(TaskType::WithdrawalProcessing) { Ok(guard) => guard, Err(_) => { @@ -103,15 +103,18 @@ pub async fn process_pending_withdrawals(runtime: &R) { } }; - let max_per_invocation = MAX_TIMER_ROUNDS * MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX; + let reschedule = scopeguard::guard(runtime.clone(), |runtime| { + runtime.set_timer_with_clone(RESCHEDULE_DELAY, process_pending_withdrawals); + }); let (affordable_requests, num_pending_withdrawals) = read_state(|state| { let mut available_balance = state.balance(); let pending = state.pending_withdrawal_requests(); + // Scan all pending requests to find what's affordable, so that we can accurately + // detect when balance (not capacity) is the limiting factor. let affordable: Vec<_> = pending .values() - .take(max_per_invocation) .take_while(|r| { if available_balance >= r.request.withdrawal_amount { available_balance -= r.request.withdrawal_amount; @@ -131,42 +134,37 @@ pub async fn process_pending_withdrawals(runtime: &R) { Priority::Info, "Insufficient minter balance for some withdrawal requests, scheduling consolidation" ); - let runtime_clone = runtime.clone(); - runtime.set_timer(Duration::ZERO, async move { - consolidate_deposits(runtime_clone).await; - }); - } - - if affordable_requests.is_empty() { - return; + runtime.set_timer_with_clone(Duration::ZERO, consolidate_deposits); } - let rounds: Vec>> = affordable_requests + let batches: Vec> = affordable_requests .into_iter() .chunks(MAX_WITHDRAWALS_PER_TX) .into_iter() - .map(Iterator::collect) - .collect::>>() - .into_iter() - .chunks(MAX_CONCURRENT_RPC_CALLS) - .into_iter() - .take(MAX_TIMER_ROUNDS) + .take(MAX_CONCURRENT_RPC_CALLS) .map(Iterator::collect) .collect(); - for round in rounds { - let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(runtime).await { - Ok((slot, blockhash)) => (slot, blockhash), - Err(e) => { - log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); - return; - } - }; - - futures::future::join_all(round.into_iter().map(async |batch| { - submit_withdrawal_transaction(runtime, batch, slot, recent_blockhash).await - })) - .await; + if batches.is_empty() { + scopeguard::ScopeGuard::into_inner(reschedule); // nothing to do, defuse reschedule + return; + } + + let (slot, recent_blockhash) = match get_recent_slot_and_blockhash(&runtime).await { + Ok(result) => result, + Err(e) => { + log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); + return; + } + }; + + futures::future::join_all(batches.into_iter().map(async |batch| { + submit_withdrawal_transaction(&runtime, batch, slot, recent_blockhash).await + })) + .await; + + if read_state(|s| s.pending_withdrawal_requests().is_empty()) { + scopeguard::ScopeGuard::into_inner(reschedule); // nothing left, defuse reschedule } } diff --git a/minter/src/withdraw/tests.rs b/minter/src/withdraw/tests.rs index 0cfaf0c1..5c639602 100644 --- a/minter/src/withdraw/tests.rs +++ b/minter/src/withdraw/tests.rs @@ -1,5 +1,5 @@ use crate::{ - constants::{MAX_CONCURRENT_RPC_CALLS, MAX_TIMER_ROUNDS}, + constants::MAX_CONCURRENT_RPC_CALLS, guard::{TimerGuard, withdrawal_guard}, sol_transfer::MAX_WITHDRAWALS_PER_TX, state::{TaskType, read_state}, @@ -246,7 +246,7 @@ mod process_pending_withdrawals_tests { // We return early, therefore no RPC calls are made let runtime = TestCanisterRuntime::new(); - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; EventsAssert::assert_no_events_recorded(); } @@ -259,7 +259,7 @@ mod process_pending_withdrawals_tests { // We return early, therefore no RPC calls are made let runtime = TestCanisterRuntime::new(); - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; EventsAssert::assert_no_events_recorded(); } @@ -269,7 +269,7 @@ mod process_pending_withdrawals_tests { init_state(); let runtime = TestCanisterRuntime::new(); - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; // Guard should be released, so we can acquire it again let _guard = TimerGuard::new(TaskType::WithdrawalProcessing).unwrap(); @@ -286,7 +286,7 @@ mod process_pending_withdrawals_tests { let events_before = EventsAssert::from_recorded(); let runtime = TestCanisterRuntime::new().with_increasing_time(); - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; // No new events should be recorded let events_after = EventsAssert::from_recorded(); @@ -319,7 +319,7 @@ mod process_pending_withdrawals_tests { .add_stub_response(SendTransactionResult::Consistent(Ok(tx_signature.into()))) .with_increasing_time(); - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; // First two withdrawals should be submitted, third should remain pending assert_matches!(withdrawal_status(0), WithdrawalStatus::TxSent(_)); @@ -331,19 +331,6 @@ mod process_pending_withdrawals_tests { assert_eq!(events_after.len(), events_before.len() + 1); } - async fn submit_withdrawals(runtime: &TestCanisterRuntime, count: u8) { - for i in 1..count + 1 { - let _ = withdraw( - runtime, - Principal::from_slice(&[1, i]).into(), - MINIMUM_WITHDRAWAL_AMOUNT, - VALID_ADDRESS.to_string(), - ) - .await - .unwrap(); - } - } - #[tokio::test] async fn should_process_when_pending_withdrawals_exist() { init_state(); @@ -352,22 +339,16 @@ mod process_pending_withdrawals_tests { let tx_signature = signature(0x42); let slot = 1; + events::accept_withdrawal(account(1), 1, MINIMUM_WITHDRAWAL_AMOUNT); let runtime = TestCanisterRuntime::new() - // ledger burn response for withdraw - .add_stub_response(Ok::(Nat::from(1u64))) - // get_recent_slot_and_blockhash calls + .with_increasing_time() .add_stub_response(GetSlotResult::Consistent(Ok(slot))) .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))) - // schnorr signing response .add_signature(tx_signature.into()) - // sendTransaction response - .add_stub_response(SendTransactionResult::Consistent(Ok(tx_signature.into()))) - .with_increasing_time(); + .add_stub_response(SendTransactionResult::Consistent(Ok(tx_signature.into()))); - submit_withdrawals(&runtime, 1).await; - - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; assert_matches!(withdrawal_status(1), WithdrawalStatus::TxSent(_)); } @@ -377,10 +358,12 @@ mod process_pending_withdrawals_tests { init_state(); init_balance(); + events::accept_withdrawal(account(1), 1, MINIMUM_WITHDRAWAL_AMOUNT); + + let events_before = EventsAssert::from_recorded(); + let runtime = TestCanisterRuntime::new() - // ledger burn response for withdraw - .add_stub_response(Ok::(Nat::from(1u64))) - // get_recent_block retries getSlot 3 times before giving up + .with_increasing_time() .add_stub_response(GetSlotResult::Consistent(Err(RpcError::ValidationError( "slot unavailable".to_string(), )))) @@ -389,14 +372,9 @@ mod process_pending_withdrawals_tests { )))) .add_stub_response(GetSlotResult::Consistent(Err(RpcError::ValidationError( "slot unavailable".to_string(), - )))) - .with_increasing_time(); - - submit_withdrawals(&runtime, 1).await; - - let events_before = EventsAssert::from_recorded(); + )))); - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; // No withdrawal transaction event should be recorded let events_after = EventsAssert::from_recorded(); @@ -422,25 +400,20 @@ mod process_pending_withdrawals_tests { init_schnorr_master_key(); let slot = 1; + events::accept_withdrawal(account(1), 1, MINIMUM_WITHDRAWAL_AMOUNT); + events::accept_withdrawal(account(2), 2, MINIMUM_WITHDRAWAL_AMOUNT); + + let events_before = EventsAssert::from_recorded(); let runtime = TestCanisterRuntime::new() - // responses for burn blocks - .add_stub_response(Ok::(Nat::from(1u64))) - .add_stub_response(Ok::(Nat::from(2u64))) - // get_recent_slot_and_blockhash calls + .with_increasing_time() .add_stub_response(GetSlotResult::Consistent(Ok(slot))) .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))) - // signing fails for the batch .add_schnorr_signing_error(SignCallError::CallFailed( CallRejected::with_rejection(4, "signing service unavailable".to_string()).into(), - )) - .with_increasing_time(); + )); - submit_withdrawals(&runtime, 2).await; - - let events_before = EventsAssert::from_recorded(); - - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; // No transaction event should be recorded (signing failed) let events_after = EventsAssert::from_recorded(); @@ -471,25 +444,20 @@ mod process_pending_withdrawals_tests { let request_count = MAX_WITHDRAWALS_PER_TX as u64 + 1; let slot = 1; - let mut runtime = TestCanisterRuntime::new().with_increasing_time(); - // withdraw ledger burn responses for i in 0..request_count { - runtime = runtime.add_stub_response(Ok::(Nat::from(i))); + events::accept_withdrawal(account(i as usize), i, MINIMUM_WITHDRAWAL_AMOUNT); } - // get_recent_slot_and_blockhash (one round: getSlot + getBlock) - runtime = runtime + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() .add_stub_response(GetSlotResult::Consistent(Ok(slot))) - .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))); - // one signature per batch + sendTransaction: batch 1 (MAX_WITHDRAWALS_PER_TX) + batch 2 (1) - runtime = runtime + .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))) .add_signature(signature(1).into()) .add_stub_response(SendTransactionResult::Consistent(Ok(signature(1).into()))) .add_signature(signature(2).into()) .add_stub_response(SendTransactionResult::Consistent(Ok(signature(2).into()))); - submit_withdrawals(&runtime, request_count as u8).await; - - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; // All withdrawals should be processed in a single invocation // (2 batches in 1 round, both within MAX_CONCURRENT_RPC_CALLS) @@ -502,111 +470,117 @@ mod process_pending_withdrawals_tests { } #[tokio::test] - async fn should_process_multiple_rounds_per_invocation() { + async fn should_process_up_to_max_concurrent_batches_per_invocation() { init_state(); init_balance(); init_schnorr_master_key(); - // Create more withdrawals than fit in one round - // (MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS) but fewer than - // the per-invocation limit (rounds * concurrent * per_tx). - // This requires 2 rounds within a single invocation. - let max_per_round = (MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS) as u64; - let request_count = max_per_round + 1; - let slot = 1; + // Create enough requests to require more than MAX_CONCURRENT_RPC_CALLS batches + let num_requests = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS + 1; - let mut runtime = TestCanisterRuntime::new().with_increasing_time(); - for i in 0..request_count { - runtime = runtime.add_stub_response(Ok::(Nat::from(i))); + // Insert pending withdrawal requests directly into state + for i in 0..num_requests { + events::accept_withdrawal(account(i), i as u64, MINIMUM_WITHDRAWAL_AMOUNT); } - // Round 1: get_recent_slot_and_blockhash, then signatures + sendTransaction - runtime = runtime + let slot = 1; + let mut runtime = TestCanisterRuntime::new() + .with_increasing_time() .add_stub_response(GetSlotResult::Consistent(Ok(slot))) .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))); + for i in 0..MAX_CONCURRENT_RPC_CALLS { runtime = runtime - .add_signature(signature(i as u8 + 1).into()) - .add_stub_response(SendTransactionResult::Consistent(Ok(signature( - i as u8 + 1, - ) - .into()))); + .add_signature(signature(i + 1).into()) + .add_stub_response(SendTransactionResult::Consistent(Ok( + signature(i + 1).into() + ))); } - // Round 2: fresh get_recent_slot_and_blockhash, then 1 signature + sendTransaction - runtime = runtime - .add_stub_response(GetSlotResult::Consistent(Ok(slot))) - .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))) - .add_signature(signature(MAX_CONCURRENT_RPC_CALLS as u8 + 1).into()) - .add_stub_response(SendTransactionResult::Consistent(Ok(signature( - MAX_CONCURRENT_RPC_CALLS as u8 + 1, - ) - .into()))); - - submit_withdrawals(&runtime, request_count as u8).await; - - // All withdrawals should be processed in a single invocation (2 rounds) - process_pending_withdrawals(&runtime).await; + process_pending_withdrawals(runtime).await; - for i in 0..request_count { + // Only MAX_CONCURRENT_RPC_CALLS batches should have been submitted + let processed = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS; + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); + assert!( + !s.pending_withdrawal_requests().is_empty(), + "Some withdrawal requests should remain pending" + ); + }); + for i in 0..processed { assert_matches!( - withdrawal_status(i), + withdrawal_status(i as u64), WithdrawalStatus::TxSent(_), "withdrawal {i} should be TxSent" ); } + // The extra request should remain pending + assert_matches!( + withdrawal_status(processed as u64), + WithdrawalStatus::Pending, + "withdrawal beyond limit should still be Pending" + ); } #[tokio::test] - async fn should_respect_max_withdrawal_rounds() { + async fn should_reschedule_immediately_when_withdrawals_remain() { init_state(); init_balance(); init_schnorr_master_key(); - let slot = 1; - let max_per_round = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS; - // Create enough requests to fill MAX_TIMER_ROUNDS + 1 rounds. - let request_count = max_per_round * MAX_TIMER_ROUNDS + 1; - - // Insert pending withdrawal requests directly into state. - for i in 0..request_count { - events::accept_withdrawal(account(i as u8), i as u64, MINIMUM_WITHDRAWAL_AMOUNT); + let num_requests = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS + 1; + for i in 0..num_requests { + events::accept_withdrawal(account(i), i as u64, MINIMUM_WITHDRAWAL_AMOUNT); } - // Set up RPC responses for MAX_TIMER_ROUNDS rounds. - let mut runtime = TestCanisterRuntime::new().with_increasing_time(); - let mut sig_counter: u8 = 0; - for _round in 0..MAX_TIMER_ROUNDS { + let slot = 1; + let mut runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(GetSlotResult::Consistent(Ok(slot))) + .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))); + for i in 0..MAX_CONCURRENT_RPC_CALLS { runtime = runtime - .add_stub_response(GetSlotResult::Consistent(Ok(slot))) - .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))); - for _ in 0..MAX_CONCURRENT_RPC_CALLS { - sig_counter = sig_counter.wrapping_add(1); - runtime = runtime - .add_signature(signature(sig_counter).into()) - .add_stub_response(SendTransactionResult::Consistent(Ok(signature( - sig_counter, - ) - .into()))); - } + .add_signature(signature(i + 1).into()) + .add_stub_response(SendTransactionResult::Consistent(Ok( + signature(i + 1).into() + ))); } - process_pending_withdrawals(&runtime).await; + let runtime_ref = runtime.clone(); + process_pending_withdrawals(runtime).await; - let processed = max_per_round * MAX_TIMER_ROUNDS; - // All requests within MAX_TIMER_ROUNDS rounds should be processed - for i in 0..processed { - assert_matches!( - withdrawal_status(i as u64), - WithdrawalStatus::TxSent(_), - "withdrawal {i} should be TxSent" - ); - } - // The extra request beyond MAX_TIMER_ROUNDS rounds should remain pending - assert_matches!( - withdrawal_status(processed as u64), - WithdrawalStatus::Pending, - "withdrawal beyond max rounds should still be Pending" + assert_eq!( + runtime_ref.set_timer_call_count(), + 1, + "should reschedule when withdrawals remain" + ); + } + + #[tokio::test] + async fn should_not_reschedule_when_all_withdrawals_processed() { + init_state(); + init_balance(); + init_schnorr_master_key(); + + events::accept_withdrawal(account(1), 1, MINIMUM_WITHDRAWAL_AMOUNT); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(GetSlotResult::Consistent(Ok(1))) + .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))) + .add_signature(signature(0x42).into()) + .add_stub_response(SendTransactionResult::Consistent( + Ok(signature(0x42).into()), + )); + + let runtime_ref = runtime.clone(); + process_pending_withdrawals(runtime).await; + + assert_eq!( + runtime_ref.set_timer_call_count(), + 0, + "should not reschedule when all withdrawals processed" ); } } @@ -615,7 +589,7 @@ mod withdrawal_finalization_tests { use super::*; fn setup_sent_withdrawal(burn_block_index: u64) -> Signature { - let tx_signature = signature(burn_block_index as u8 + 1); + let tx_signature = signature(burn_block_index as usize + 1); events::accept_withdrawal(MINTER_ACCOUNT, burn_block_index, MINIMUM_WITHDRAWAL_AMOUNT); events::submit_withdrawal(tx_signature, MINTER_ACCOUNT, 1, vec![burn_block_index]); tx_signature From 6153be45b98421a50f5548767337e1ec594fd782 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 14:50:28 +0200 Subject: [PATCH 02/11] refactor: simplify timers, fixtures, and rescheduling - Replace set_timer(delay, future) with set_timer(delay, fn) in CanisterRuntime trait (automatically clones self) - Remove RESCHEDULE_DELAY constant; reschedule timers immediately (Duration::ZERO) - Change signature/deposit_id/account helpers to accept usize with consistent le_bytes encoding - Merge per-timer capacity + reschedule tests into single successive-rounds tests - Drop runtime_ref clone pattern; pass runtime.clone() directly to timer functions Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/mod.rs | 4 +- minter/src/consolidate/tests.rs | 87 ++++-------------- minter/src/constants.rs | 5 -- minter/src/monitor/mod.rs | 6 +- minter/src/monitor/tests.rs | 133 +++++++--------------------- minter/src/runtime/mod.rs | 28 +++--- minter/src/test_fixtures/mod.rs | 14 ++- minter/src/test_fixtures/runtime.rs | 18 ++-- minter/src/withdraw/mod.rs | 8 +- minter/src/withdraw/tests.rs | 97 +++----------------- 10 files changed, 99 insertions(+), 301 deletions(-) diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index 859421e8..a6802879 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -1,5 +1,5 @@ use crate::{ - constants::{MAX_CONCURRENT_RPC_CALLS, RESCHEDULE_DELAY}, + constants::MAX_CONCURRENT_RPC_CALLS, guard::TimerGuard, numeric::LedgerMintIndex, runtime::CanisterRuntime, @@ -37,7 +37,7 @@ pub async fn consolidate_deposits(runtime: R) { }; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer_with_clone(RESCHEDULE_DELAY, consolidate_deposits); + runtime.set_timer(Duration::ZERO, consolidate_deposits); }); let batches: Vec> = diff --git a/minter/src/consolidate/tests.rs b/minter/src/consolidate/tests.rs index 491f11ca..bf575a67 100644 --- a/minter/src/consolidate/tests.rs +++ b/minter/src/consolidate/tests.rs @@ -168,7 +168,7 @@ async fn should_submit_multiple_consolidation_batches() { ))); for i in 0..(2 + NUM_DEPOSITS) { - runtime = runtime.add_signature([i as u8; 64]); + runtime = runtime.add_signature(signature(i).into()); } consolidate_deposits(runtime).await; @@ -256,54 +256,19 @@ async fn should_consolidate_multiple_deposits_to_same_account_in_single_transfer } #[tokio::test] -async fn should_consolidate_up_to_max_concurrent_batches_per_invocation() { +async fn should_reschedule_until_all_deposits_consolidated() { setup(); - // Create enough deposits to require more than MAX_CONCURRENT_RPC_CALLS batches let num_deposits = MAX_TRANSFERS_PER_CONSOLIDATION * MAX_CONCURRENT_RPC_CALLS + 1; - let funds: Vec<_> = (0..num_deposits) - .map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000)) - .collect(); - add_funds_to_consolidate(&funds); + add_funds_to_consolidate( + &(0..num_deposits) + .map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000)) + .collect::>(), + ); let slot = 100; - let mut runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(SlotResult::Consistent(Ok(slot))) - .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); - - // Provide signatures and send responses for MAX_CONCURRENT_RPC_CALLS batches - for i in 0..MAX_CONCURRENT_RPC_CALLS { - runtime = - runtime.add_stub_response(SendTransactionResult::Consistent(Ok(signature(i).into()))); - } - for i in 0..(MAX_CONCURRENT_RPC_CALLS + num_deposits) { - runtime = runtime.add_signature([i as u8; 64]); - } - - consolidate_deposits(runtime).await; - - // Only MAX_CONCURRENT_RPC_CALLS batches should have been submitted - read_state(|s| { - assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); - assert!( - !s.deposits_to_consolidate().is_empty(), - "Some deposits should remain unconsolidated" - ); - }); -} -#[tokio::test] -async fn should_reschedule_immediately_when_deposits_remain() { - setup(); - - let num_deposits = MAX_TRANSFERS_PER_CONSOLIDATION * MAX_CONCURRENT_RPC_CALLS + 1; - let funds: Vec<_> = (0..num_deposits) - .map(|i| (deposit_id(i), (i as u64 + 1) * 1_000_000_000)) - .collect(); - add_funds_to_consolidate(&funds); - - let slot = 100; + // Round 1: processes MAX_CONCURRENT_RPC_CALLS batches, 1 deposit remains → reschedule let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(slot))) @@ -313,41 +278,27 @@ async fn should_reschedule_immediately_when_deposits_remain() { runtime.add_stub_response(SendTransactionResult::Consistent(Ok(signature(i).into()))); } for i in 0..(MAX_CONCURRENT_RPC_CALLS + num_deposits) { - runtime = runtime.add_signature([i as u8; 64]); + runtime = runtime.add_signature(signature(i).into()); } consolidate_deposits(runtime.clone()).await; - assert_eq!( - runtime.set_timer_call_count(), - 1, - "should reschedule when deposits remain" - ); -} - -#[tokio::test] -async fn should_not_reschedule_when_all_deposits_consolidated() { - setup(); - - add_funds_to_consolidate(&[(deposit_id(0), 1_000_000_000)]); + assert!(!read_state(|s| s.deposits_to_consolidate().is_empty())); + assert_eq!(runtime.set_timer_call_count(), 1); - let slot = 100; - let runtime = TestCanisterRuntime::new() + // Round 2: processes the remaining 1 deposit → no reschedule + let last_sig = signature(num_deposits); + let runtime2 = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(slot))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) - .add_stub_response(SendTransactionResult::Consistent( - Ok(signature(0x11).into()), - )) - .add_signature(signature(0x11).into()); + .add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into()))) + .add_signature(last_sig.into()); - consolidate_deposits(runtime.clone()).await; + consolidate_deposits(runtime2.clone()).await; - assert_eq!( - runtime.set_timer_call_count(), - 0, - "should not reschedule when all deposits consolidated" - ); + assert!(read_state(|s| s.deposits_to_consolidate().is_empty())); + assert_eq!(runtime2.set_timer_call_count(), 0); } fn setup() { diff --git a/minter/src/constants.rs b/minter/src/constants.rs index 3e8a2b70..d3a87e78 100644 --- a/minter/src/constants.rs +++ b/minter/src/constants.rs @@ -1,11 +1,6 @@ -use std::time::Duration; - /// Maximum number of concurrent calls to the SOL RPC canister. pub const MAX_CONCURRENT_RPC_CALLS: usize = 10; -/// Short cooldown before rescheduling a timer that has more work to do. -pub const RESCHEDULE_DELAY: Duration = Duration::from_secs(1); - /// Matches the ICP HTTPS outcall response limit for variable-length RPC calls /// such as `getTransaction` and `getSignatureStatuses`: /// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index 92142385..461e2a11 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,6 +1,6 @@ use crate::{ address::derivation_path, - constants::{MAX_CONCURRENT_RPC_CALLS, RESCHEDULE_DELAY}, + constants::MAX_CONCURRENT_RPC_CALLS, guard::TimerGuard, runtime::CanisterRuntime, signer::sign_bytes, @@ -59,7 +59,7 @@ pub async fn finalize_transactions(runtime: R) { // Reschedule unless all submitted transactions were checked in this invocation. let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer_with_clone(RESCHEDULE_DELAY, finalize_transactions); + runtime.set_timer(Duration::ZERO, finalize_transactions); }); // Fetch the current slot before checking statuses: if a transaction finalizes @@ -148,7 +148,7 @@ pub async fn resubmit_transactions(runtime: R) { resubmit_expired_transactions(&runtime, to_resubmit).await; if num_remaining > 0 { - runtime.set_timer_with_clone(RESCHEDULE_DELAY, resubmit_transactions); + runtime.set_timer(Duration::ZERO, resubmit_transactions); } } diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index 58db2685..71692743 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -76,7 +76,7 @@ mod finalization { } #[tokio::test] - async fn should_process_up_to_max_concurrent_batches_per_invocation() { + async fn should_reschedule_until_all_transactions_finalized() { setup(); let num = MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK + 1; @@ -84,55 +84,24 @@ mod finalization { submit_consolidation_transaction_with_signature(i, RECENT_SLOT); } + // Round 1: finalizes MAX_CONCURRENT_RPC_CALLS batches, 1 transaction unchecked → reschedule let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); for _ in 0..MAX_CONCURRENT_RPC_CALLS { runtime = runtime.add_stub_response(SignatureStatusesResult::Consistent(Ok( - vec![None; MAX_SIGNATURES_PER_STATUS_CHECK], + vec![Some(finalized_status()); MAX_SIGNATURES_PER_STATUS_CHECK], ))); } - finalize_transactions(runtime).await; - - // All transactions remain: processed ones are not expired (RECENT_SLOT), - // and the last one was not checked at all. - read_state(|s| assert_eq!(s.submitted_transactions().len(), num)); - } + finalize_transactions(runtime.clone()).await; - #[tokio::test] - async fn should_reschedule_immediately_when_transactions_remain() { - setup(); - - let num = MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK + 1; - for i in 0..num { - submit_consolidation_transaction_with_signature(i, RECENT_SLOT); - } + assert_eq!(read_state(|s| s.submitted_transactions().len()), 1); + assert_eq!(runtime.set_timer_call_count(), 1); - let mut runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) - .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); - for _ in 0..MAX_CONCURRENT_RPC_CALLS { - runtime = runtime.add_stub_response(SignatureStatusesResult::Consistent(Ok( - vec![None; MAX_SIGNATURES_PER_STATUS_CHECK], - ))); - } - - let runtime_ref = runtime.clone(); - finalize_transactions(runtime).await; - - assert_eq!(runtime_ref.set_timer_call_count(), 1); - } - - #[tokio::test] - async fn should_not_reschedule_when_all_transactions_finalized() { - setup(); - - submit_consolidation_transaction(RECENT_SLOT); - - let runtime = TestCanisterRuntime::new() + // Round 2: finalizes the remaining 1 transaction → no reschedule + let runtime2 = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) @@ -140,10 +109,10 @@ mod finalization { finalized_status(), )]))); - let runtime_ref = runtime.clone(); - finalize_transactions(runtime).await; + finalize_transactions(runtime2.clone()).await; - assert_eq!(runtime_ref.set_timer_call_count(), 0); + assert!(read_state(|s| s.submitted_transactions().is_empty())); + assert_eq!(runtime2.set_timer_call_count(), 0); } #[tokio::test] @@ -434,7 +403,7 @@ mod resubmission { } #[tokio::test] - async fn should_process_up_to_max_concurrent_rpc_calls_per_invocation() { + async fn should_reschedule_until_all_transactions_resubmitted() { setup(); let num_transactions = MAX_CONCURRENT_RPC_CALLS + 2; @@ -443,84 +412,44 @@ mod resubmission { events::expire_transaction(sig); } - read_state(|s| { - assert_eq!(s.transactions_to_resubmit().len(), num_transactions); - }); - - let mut resubmit_runtime = TestCanisterRuntime::new() + // Round 1: resubmits MAX_CONCURRENT_RPC_CALLS transactions, 2 remain → reschedule + let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); for i in 0..MAX_CONCURRENT_RPC_CALLS { - resubmit_runtime = resubmit_runtime + runtime = runtime .add_stub_response(SendTransactionResult::Consistent(Ok( signature(0xA0 + i).into() ))) - .add_signature([0xA0 + i as u8; 64]); + .add_signature(signature(0xA0 + i).into()); } - resubmit_transactions(resubmit_runtime).await; + resubmit_transactions(runtime.clone()).await; - let remaining = num_transactions - MAX_CONCURRENT_RPC_CALLS; - read_state(|s| { - assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); - assert_eq!(s.transactions_to_resubmit().len(), remaining); - }); - } - - #[tokio::test] - async fn should_reschedule_immediately_when_transactions_remain() { - setup(); + assert_eq!( + read_state(|s| s.transactions_to_resubmit().len()), + num_transactions - MAX_CONCURRENT_RPC_CALLS + ); + assert_eq!(runtime.set_timer_call_count(), 1); - let num_transactions = MAX_CONCURRENT_RPC_CALLS + 2; - for i in 0..num_transactions { - let sig = submit_consolidation_transaction_with_signature(i, EXPIRED_SLOT); - events::expire_transaction(sig); - } - - let mut resubmit_runtime = TestCanisterRuntime::new() + // Round 2: resubmits remaining 2 transactions → no reschedule + let mut runtime2 = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); - for i in 0..MAX_CONCURRENT_RPC_CALLS { - resubmit_runtime = resubmit_runtime + for i in 0..(num_transactions - MAX_CONCURRENT_RPC_CALLS) { + runtime2 = runtime2 .add_stub_response(SendTransactionResult::Consistent(Ok( - signature(0xA0 + i).into() + signature(0xB0 + i).into() ))) - .add_signature([0xA0 + i as u8; 64]); + .add_signature(signature(0xB0 + i).into()); } - let runtime_ref = resubmit_runtime.clone(); - resubmit_transactions(resubmit_runtime).await; - - assert_eq!(runtime_ref.set_timer_call_count(), 1); - } - - #[tokio::test] - async fn should_not_reschedule_when_all_transactions_resubmitted() { - setup(); - - let old_signature = submit_consolidation_transaction(EXPIRED_SLOT); - let new_signature = signature(0xAA); - events::expire_transaction(old_signature); - - let resubmit_runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) - .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) - .add_stub_response(SendTransactionResult::Consistent(Ok(new_signature.into()))) - .add_signature(new_signature.into()); - - let runtime_ref = resubmit_runtime.clone(); - resubmit_transactions(resubmit_runtime).await; - - EventsAssert::from_recorded().expect_contains_event_eq(EventType::ResubmittedTransaction { - old_signature, - new_signature, - new_slot: RESUBMISSION_SLOT, - }); + resubmit_transactions(runtime2.clone()).await; - assert_eq!(runtime_ref.set_timer_call_count(), 0); + assert!(read_state(|s| s.transactions_to_resubmit().is_empty())); + assert_eq!(runtime2.set_timer_call_count(), 0); } } diff --git a/minter/src/runtime/mod.rs b/minter/src/runtime/mod.rs index 49d9e44b..6690ba94 100644 --- a/minter/src/runtime/mod.rs +++ b/minter/src/runtime/mod.rs @@ -12,21 +12,11 @@ pub trait CanisterRuntime: Clone + 'static { fn msg_cycles_accept(&self, amount: u128) -> u128; fn msg_cycles_available(&self) -> u128; fn msg_cycles_refunded(&self) -> u128; - fn set_timer( - &self, - delay: Duration, - future: impl Future + 'static, - ) -> ic_cdk_timers::TimerId; - - fn set_timer_with_clone(&self, delay: Duration, f: F) -> ic_cdk_timers::TimerId + fn set_timer(&self, delay: Duration, f: F) -> ic_cdk_timers::TimerId where Self: Sized, F: FnOnce(Self) -> Fut + 'static, - Fut: Future + 'static, - { - let clone = self.clone(); - self.set_timer(delay, async move { f(clone).await }) - } + Fut: Future + 'static; } #[derive(Clone, Default, Debug)] @@ -71,11 +61,13 @@ impl CanisterRuntime for IcCanisterRuntime { ic_cdk::api::msg_cycles_refunded() } - fn set_timer( - &self, - delay: Duration, - future: impl Future + 'static, - ) -> ic_cdk_timers::TimerId { - ic_cdk_timers::set_timer(delay, future) + fn set_timer(&self, delay: Duration, f: F) -> ic_cdk_timers::TimerId + where + Self: Sized, + F: FnOnce(Self) -> Fut + 'static, + Fut: Future + 'static, + { + let clone = self.clone(); + ic_cdk_timers::set_timer(delay, async move { f(clone).await }) } } diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index ac46fe6a..1fe72185 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -97,13 +97,9 @@ pub fn init_schnorr_master_key() { /// Returns a [`Signature`] unique for any `usize` index. /// For `i < 256`, produces `[i as u8; 64]`. pub fn signature(i: usize) -> solana_signature::Signature { - if i < 256 { - solana_signature::Signature::from([i as u8; 64]) - } else { - let mut bytes = [0u8; 64]; - bytes[..8].copy_from_slice(&(i as u64).to_le_bytes()); - solana_signature::Signature::from(bytes) - } + let mut bytes = [0u8; 64]; + bytes[..8].copy_from_slice(&(i as u64).to_le_bytes()); + solana_signature::Signature::from(bytes) } /// Returns a [`ConfirmedBlock`] with a deterministic blockhash for use in RPC mock stubs. @@ -131,8 +127,10 @@ pub fn deposit_id(i: usize) -> DepositId { /// Returns an [`Account`] with a deterministic principal derived from `i`. pub fn account(i: usize) -> Account { + let mut bytes = [0u8; 29]; + bytes[..8].copy_from_slice(&(i as u64).to_le_bytes()); Account { - owner: Principal::from_slice(&[i as u8; 29]), + owner: Principal::from_slice(&bytes), subaccount: None, } } diff --git a/minter/src/test_fixtures/runtime.rs b/minter/src/test_fixtures/runtime.rs index ff217495..c7f2a7f3 100644 --- a/minter/src/test_fixtures/runtime.rs +++ b/minter/src/test_fixtures/runtime.rs @@ -3,8 +3,11 @@ use crate::{runtime::CanisterRuntime, signer::SchnorrSigner}; use candid::{CandidType, Principal}; use ic_canister_runtime::{IcError, Runtime, StubRuntime}; use ic_cdk_management_canister::SignCallError; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::{ + future::Future, + sync::{Arc, Mutex}, + time::Duration, +}; pub const TEST_CANISTER_ID: Principal = Principal::from_slice(&[0xCA; 10]); @@ -111,11 +114,12 @@ impl CanisterRuntime for TestCanisterRuntime { self.msg_cycles_refunded.next() } - fn set_timer( - &self, - _delay: Duration, - _future: impl Future + 'static, - ) -> ic_cdk_timers::TimerId { + fn set_timer(&self, _delay: Duration, _f: F) -> ic_cdk_timers::TimerId + where + Self: Sized, + F: FnOnce(Self) -> Fut + 'static, + Fut: Future + 'static, + { *self.set_timer_call_count.lock().unwrap() += 1; Default::default() } diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 50af4cd0..4bf3abf1 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -12,7 +12,7 @@ use itertools::Itertools; use sol_rpc_types::Slot; use solana_hash::Hash; -use crate::constants::{MAX_CONCURRENT_RPC_CALLS, RESCHEDULE_DELAY}; +use crate::constants::MAX_CONCURRENT_RPC_CALLS; use crate::ledger::BurnError; use crate::{ consolidate::consolidate_deposits, @@ -104,15 +104,13 @@ pub async fn process_pending_withdrawals(runtime: R) { }; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer_with_clone(RESCHEDULE_DELAY, process_pending_withdrawals); + runtime.set_timer(Duration::ZERO, process_pending_withdrawals); }); let (affordable_requests, num_pending_withdrawals) = read_state(|state| { let mut available_balance = state.balance(); let pending = state.pending_withdrawal_requests(); - // Scan all pending requests to find what's affordable, so that we can accurately - // detect when balance (not capacity) is the limiting factor. let affordable: Vec<_> = pending .values() .take_while(|r| { @@ -134,7 +132,7 @@ pub async fn process_pending_withdrawals(runtime: R) { Priority::Info, "Insufficient minter balance for some withdrawal requests, scheduling consolidation" ); - runtime.set_timer_with_clone(Duration::ZERO, consolidate_deposits); + runtime.set_timer(Duration::ZERO, consolidate_deposits); } let batches: Vec> = affordable_requests diff --git a/minter/src/withdraw/tests.rs b/minter/src/withdraw/tests.rs index 5c639602..de0dd623 100644 --- a/minter/src/withdraw/tests.rs +++ b/minter/src/withdraw/tests.rs @@ -470,71 +470,19 @@ mod process_pending_withdrawals_tests { } #[tokio::test] - async fn should_process_up_to_max_concurrent_batches_per_invocation() { + async fn should_reschedule_until_all_withdrawals_processed() { init_state(); init_balance(); init_schnorr_master_key(); - // Create enough requests to require more than MAX_CONCURRENT_RPC_CALLS batches let num_requests = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS + 1; - - // Insert pending withdrawal requests directly into state for i in 0..num_requests { events::accept_withdrawal(account(i), i as u64, MINIMUM_WITHDRAWAL_AMOUNT); } let slot = 1; - let mut runtime = TestCanisterRuntime::new() - .with_increasing_time() - .add_stub_response(GetSlotResult::Consistent(Ok(slot))) - .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))); - - for i in 0..MAX_CONCURRENT_RPC_CALLS { - runtime = runtime - .add_signature(signature(i + 1).into()) - .add_stub_response(SendTransactionResult::Consistent(Ok( - signature(i + 1).into() - ))); - } - - process_pending_withdrawals(runtime).await; - - // Only MAX_CONCURRENT_RPC_CALLS batches should have been submitted - let processed = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS; - read_state(|s| { - assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); - assert!( - !s.pending_withdrawal_requests().is_empty(), - "Some withdrawal requests should remain pending" - ); - }); - for i in 0..processed { - assert_matches!( - withdrawal_status(i as u64), - WithdrawalStatus::TxSent(_), - "withdrawal {i} should be TxSent" - ); - } - // The extra request should remain pending - assert_matches!( - withdrawal_status(processed as u64), - WithdrawalStatus::Pending, - "withdrawal beyond limit should still be Pending" - ); - } - - #[tokio::test] - async fn should_reschedule_immediately_when_withdrawals_remain() { - init_state(); - init_balance(); - init_schnorr_master_key(); - - let num_requests = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS + 1; - for i in 0..num_requests { - events::accept_withdrawal(account(i), i as u64, MINIMUM_WITHDRAWAL_AMOUNT); - } - let slot = 1; + // Round 1: processes MAX_CONCURRENT_RPC_CALLS batches, 1 request remains → reschedule let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(GetSlotResult::Consistent(Ok(slot))) @@ -547,41 +495,24 @@ mod process_pending_withdrawals_tests { ))); } - let runtime_ref = runtime.clone(); - process_pending_withdrawals(runtime).await; + process_pending_withdrawals(runtime.clone()).await; - assert_eq!( - runtime_ref.set_timer_call_count(), - 1, - "should reschedule when withdrawals remain" - ); - } + assert!(!read_state(|s| s.pending_withdrawal_requests().is_empty())); + assert_eq!(runtime.set_timer_call_count(), 1); - #[tokio::test] - async fn should_not_reschedule_when_all_withdrawals_processed() { - init_state(); - init_balance(); - init_schnorr_master_key(); - - events::accept_withdrawal(account(1), 1, MINIMUM_WITHDRAWAL_AMOUNT); - - let runtime = TestCanisterRuntime::new() + // Round 2: processes the remaining 1 request → no reschedule + let last_sig = signature(num_requests); + let runtime2 = TestCanisterRuntime::new() .with_increasing_time() - .add_stub_response(GetSlotResult::Consistent(Ok(1))) + .add_stub_response(GetSlotResult::Consistent(Ok(slot))) .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))) - .add_signature(signature(0x42).into()) - .add_stub_response(SendTransactionResult::Consistent( - Ok(signature(0x42).into()), - )); + .add_signature(last_sig.into()) + .add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into()))); - let runtime_ref = runtime.clone(); - process_pending_withdrawals(runtime).await; + process_pending_withdrawals(runtime2.clone()).await; - assert_eq!( - runtime_ref.set_timer_call_count(), - 0, - "should not reschedule when all withdrawals processed" - ); + assert!(read_state(|s| s.pending_withdrawal_requests().is_empty())); + assert_eq!(runtime2.set_timer_call_count(), 0); } } From 3bbf04af137e9ae92ccfdf59608032b5a967609e Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 14:52:01 +0200 Subject: [PATCH 03/11] refactor: rename runtime2 to runtime in merged timer tests Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/tests.rs | 6 +++--- minter/src/monitor/tests.rs | 14 +++++++------- minter/src/withdraw/tests.rs | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/minter/src/consolidate/tests.rs b/minter/src/consolidate/tests.rs index bf575a67..3279cec9 100644 --- a/minter/src/consolidate/tests.rs +++ b/minter/src/consolidate/tests.rs @@ -288,17 +288,17 @@ async fn should_reschedule_until_all_deposits_consolidated() { // Round 2: processes the remaining 1 deposit → no reschedule let last_sig = signature(num_deposits); - let runtime2 = TestCanisterRuntime::new() + let runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(slot))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) .add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into()))) .add_signature(last_sig.into()); - consolidate_deposits(runtime2.clone()).await; + consolidate_deposits(runtime.clone()).await; assert!(read_state(|s| s.deposits_to_consolidate().is_empty())); - assert_eq!(runtime2.set_timer_call_count(), 0); + assert_eq!(runtime.set_timer_call_count(), 0); } fn setup() { diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index 71692743..9ed68313 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -101,7 +101,7 @@ mod finalization { assert_eq!(runtime.set_timer_call_count(), 1); // Round 2: finalizes the remaining 1 transaction → no reschedule - let runtime2 = TestCanisterRuntime::new() + let runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) @@ -109,10 +109,10 @@ mod finalization { finalized_status(), )]))); - finalize_transactions(runtime2.clone()).await; + finalize_transactions(runtime.clone()).await; assert!(read_state(|s| s.submitted_transactions().is_empty())); - assert_eq!(runtime2.set_timer_call_count(), 0); + assert_eq!(runtime.set_timer_call_count(), 0); } #[tokio::test] @@ -434,22 +434,22 @@ mod resubmission { assert_eq!(runtime.set_timer_call_count(), 1); // Round 2: resubmits remaining 2 transactions → no reschedule - let mut runtime2 = TestCanisterRuntime::new() + let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); for i in 0..(num_transactions - MAX_CONCURRENT_RPC_CALLS) { - runtime2 = runtime2 + runtime = runtime .add_stub_response(SendTransactionResult::Consistent(Ok( signature(0xB0 + i).into() ))) .add_signature(signature(0xB0 + i).into()); } - resubmit_transactions(runtime2.clone()).await; + resubmit_transactions(runtime.clone()).await; assert!(read_state(|s| s.transactions_to_resubmit().is_empty())); - assert_eq!(runtime2.set_timer_call_count(), 0); + assert_eq!(runtime.set_timer_call_count(), 0); } } diff --git a/minter/src/withdraw/tests.rs b/minter/src/withdraw/tests.rs index de0dd623..bd57e5c3 100644 --- a/minter/src/withdraw/tests.rs +++ b/minter/src/withdraw/tests.rs @@ -502,17 +502,17 @@ mod process_pending_withdrawals_tests { // Round 2: processes the remaining 1 request → no reschedule let last_sig = signature(num_requests); - let runtime2 = TestCanisterRuntime::new() + let runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(GetSlotResult::Consistent(Ok(slot))) .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))) .add_signature(last_sig.into()) .add_stub_response(SendTransactionResult::Consistent(Ok(last_sig.into()))); - process_pending_withdrawals(runtime2.clone()).await; + process_pending_withdrawals(runtime.clone()).await; assert!(read_state(|s| s.pending_withdrawal_requests().is_empty())); - assert_eq!(runtime2.set_timer_call_count(), 0); + assert_eq!(runtime.set_timer_call_count(), 0); } } From eb397cabfed76b17981a4d33a0d78581a1735ab1 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 14:56:22 +0200 Subject: [PATCH 04/11] test: add queue size assertions to merged timer reschedule tests Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/tests.rs | 13 +++++++++++-- minter/src/monitor/tests.rs | 11 +++++++---- minter/src/withdraw/tests.rs | 5 ++++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/minter/src/consolidate/tests.rs b/minter/src/consolidate/tests.rs index 3279cec9..cb52de76 100644 --- a/minter/src/consolidate/tests.rs +++ b/minter/src/consolidate/tests.rs @@ -283,7 +283,10 @@ async fn should_reschedule_until_all_deposits_consolidated() { consolidate_deposits(runtime.clone()).await; - assert!(!read_state(|s| s.deposits_to_consolidate().is_empty())); + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); + assert_eq!(s.deposits_to_consolidate().len(), 1); + }); assert_eq!(runtime.set_timer_call_count(), 1); // Round 2: processes the remaining 1 deposit → no reschedule @@ -297,7 +300,13 @@ async fn should_reschedule_until_all_deposits_consolidated() { consolidate_deposits(runtime.clone()).await; - assert!(read_state(|s| s.deposits_to_consolidate().is_empty())); + read_state(|s| { + assert_eq!( + s.submitted_transactions().len(), + MAX_CONCURRENT_RPC_CALLS + 1 + ); + assert!(s.deposits_to_consolidate().is_empty()); + }); assert_eq!(runtime.set_timer_call_count(), 0); } diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index 9ed68313..c714526e 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -427,10 +427,13 @@ mod resubmission { resubmit_transactions(runtime.clone()).await; - assert_eq!( - read_state(|s| s.transactions_to_resubmit().len()), - num_transactions - MAX_CONCURRENT_RPC_CALLS - ); + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); + assert_eq!( + s.transactions_to_resubmit().len(), + num_transactions - MAX_CONCURRENT_RPC_CALLS + ); + }); assert_eq!(runtime.set_timer_call_count(), 1); // Round 2: resubmits remaining 2 transactions → no reschedule diff --git a/minter/src/withdraw/tests.rs b/minter/src/withdraw/tests.rs index bd57e5c3..c89f4c5c 100644 --- a/minter/src/withdraw/tests.rs +++ b/minter/src/withdraw/tests.rs @@ -497,7 +497,10 @@ mod process_pending_withdrawals_tests { process_pending_withdrawals(runtime.clone()).await; - assert!(!read_state(|s| s.pending_withdrawal_requests().is_empty())); + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), MAX_CONCURRENT_RPC_CALLS); + assert_eq!(s.pending_withdrawal_requests().len(), 1); + }); assert_eq!(runtime.set_timer_call_count(), 1); // Round 2: processes the remaining 1 request → no reschedule From e7d8ccb535cdadc1d1d7e9a4ce1d0b575d75dbba Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 15:01:10 +0200 Subject: [PATCH 05/11] refactor: simplify finalize_transactions rescheduling Use the same num_remaining pattern as resubmit_transactions instead of scopeguard + checked_count tracking. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/monitor/mod.rs | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index 461e2a11..52ebf4ce 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -57,10 +57,9 @@ pub async fn finalize_transactions(runtime: R) { return; } - // Reschedule unless all submitted transactions were checked in this invocation. - let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer(Duration::ZERO, finalize_transactions); - }); + let num_remaining = all_transactions + .len() + .saturating_sub(MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK); // Fetch the current slot before checking statuses: if a transaction finalizes // after we snapshot the slot, the status check will see it as finalized rather @@ -119,8 +118,8 @@ pub async fn finalize_transactions(runtime: R) { } } - if statuses.len() == all_transactions.len() { - scopeguard::ScopeGuard::into_inner(reschedule); + if num_remaining > 0 { + runtime.set_timer(Duration::ZERO, finalize_transactions); } } @@ -153,6 +152,8 @@ pub async fn resubmit_transactions(runtime: R) { } /// Result of checking transaction statuses. +// Transactions that are in-flight (Processed/Confirmed) or whose status +// check failed are implicitly excluded from the below sets. struct TransactionStatuses { /// Transactions confirmed as finalized on-chain without errors. succeeded: BTreeSet, @@ -160,18 +161,6 @@ struct TransactionStatuses { errored: BTreeMap, /// Transactions with no on-chain status (safe to resubmit if expired). not_found: BTreeSet, - /// Number of signatures whose status was received (from successful batch requests). - /// Used to determine whether all submitted transactions were checked in this invocation. - checked_count: usize, - // Transactions that are in-flight (Processed/Confirmed) or whose status - // check failed are implicitly excluded from the above sets. -} - -impl TransactionStatuses { - /// Total number of signatures whose status was received in this invocation. - fn len(&self) -> usize { - self.checked_count - } } async fn check_transaction_statuses( @@ -190,7 +179,6 @@ async fn check_transaction_statuses( succeeded: BTreeSet::new(), errored: BTreeMap::new(), not_found: BTreeSet::new(), - checked_count: 0, }; let batch_results: Vec<_> = futures::future::join_all(batches.into_iter().map(async |batch| { @@ -205,7 +193,6 @@ async fn check_transaction_statuses( .await; for (sigs, statuses) in batch_results.into_iter().flatten() { - result.checked_count += sigs.len(); for (signature, status) in sigs.iter().zip(statuses) { match status { Some(s) From ab4a0b723375cc0c5b341b68b87f102d8a1af6ea Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 15:04:24 +0200 Subject: [PATCH 06/11] refactor: use scopeguard for both monitor timer rescheduling Also fix signature() doc comment and merge MAX_CONCURRENT_RPC_CALLS import into the crate:: block in withdraw/mod.rs. Co-Authored-By: Claude Sonnet 4.6 --- minter/src/monitor/mod.rs | 19 +++++++++++-------- minter/src/test_fixtures/mod.rs | 3 +-- minter/src/withdraw/mod.rs | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index 52ebf4ce..b4c5beb8 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -57,9 +57,9 @@ pub async fn finalize_transactions(runtime: R) { return; } - let num_remaining = all_transactions - .len() - .saturating_sub(MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK); + let reschedule = scopeguard::guard(runtime.clone(), |runtime| { + runtime.set_timer(Duration::ZERO, finalize_transactions); + }); // Fetch the current slot before checking statuses: if a transaction finalizes // after we snapshot the slot, the status check will see it as finalized rather @@ -118,8 +118,8 @@ pub async fn finalize_transactions(runtime: R) { } } - if num_remaining > 0 { - runtime.set_timer(Duration::ZERO, finalize_transactions); + if all_transactions.len() <= MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK { + scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -142,12 +142,15 @@ pub async fn resubmit_transactions(runtime: R) { return; } - let num_remaining = to_resubmit.len().saturating_sub(MAX_CONCURRENT_RPC_CALLS); + let reschedule = scopeguard::guard(runtime.clone(), |runtime| { + runtime.set_timer(Duration::ZERO, resubmit_transactions); + }); + let fits_in_one_round = to_resubmit.len() <= MAX_CONCURRENT_RPC_CALLS; resubmit_expired_transactions(&runtime, to_resubmit).await; - if num_remaining > 0 { - runtime.set_timer(Duration::ZERO, resubmit_transactions); + if fits_in_one_round { + scopeguard::ScopeGuard::into_inner(reschedule); } } diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index 1fe72185..812596c6 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -94,8 +94,7 @@ pub fn init_schnorr_master_key() { }); } -/// Returns a [`Signature`] unique for any `usize` index. -/// For `i < 256`, produces `[i as u8; 64]`. +/// Returns a [`Signature`] unique for any `usize` index, derived from `i as u64` via le_bytes. pub fn signature(i: usize) -> solana_signature::Signature { let mut bytes = [0u8; 64]; bytes[..8].copy_from_slice(&(i as u64).to_le_bytes()); diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 4bf3abf1..17069565 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -12,10 +12,10 @@ use itertools::Itertools; use sol_rpc_types::Slot; use solana_hash::Hash; -use crate::constants::MAX_CONCURRENT_RPC_CALLS; use crate::ledger::BurnError; use crate::{ consolidate::consolidate_deposits, + constants::MAX_CONCURRENT_RPC_CALLS, guard::{TimerGuard, withdrawal_guard}, ledger::burn, runtime::CanisterRuntime, From 3712cd6387be4e2da910c3f9f67212aeb0744731 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 15:14:02 +0200 Subject: [PATCH 07/11] refactor: add scopeguards to all timers and clean up imports - Add scopeguard + more_to_process pattern to process_pending_withdrawals - Add comments above ScopeGuard::into_inner defuse calls in all timers - Use deposit_id(i) helper in submit_consolidation_transaction_with_signature - Merge use crate:: statements in withdraw/mod.rs - Merge use cksol_minter:: statements in main.rs Co-Authored-By: Claude Sonnet 4.6 --- minter/src/consolidate/mod.rs | 26 +++++++++++++++----------- minter/src/main.rs | 16 +++++++++------- minter/src/monitor/mod.rs | 10 +++++++--- minter/src/monitor/tests.rs | 13 ++++--------- minter/src/withdraw/mod.rs | 21 ++++++++++++--------- 5 files changed, 47 insertions(+), 39 deletions(-) diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index a6802879..3f96cb94 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -36,21 +36,24 @@ pub async fn consolidate_deposits(runtime: R) { Err(_) => return, }; + let all_deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate())); + let more_to_process = + all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, consolidate_deposits); }); - let batches: Vec> = - read_state(|s| group_deposits_by_account(s.deposits_to_consolidate())) - .into_iter() - .chunks(MAX_TRANSFERS_PER_CONSOLIDATION) - .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(Iterator::collect) - .collect(); + let batches: Vec> = all_deposits + .into_iter() + .chunks(MAX_TRANSFERS_PER_CONSOLIDATION) + .into_iter() + .take(MAX_CONCURRENT_RPC_CALLS) + .map(Iterator::collect) + .collect(); if batches.is_empty() { - scopeguard::ScopeGuard::into_inner(reschedule); // nothing to do, defuse reschedule + // Nothing to process + scopeguard::ScopeGuard::into_inner(reschedule); return; } @@ -70,8 +73,9 @@ pub async fn consolidate_deposits(runtime: R) { })) .await; - if read_state(|s| s.deposits_to_consolidate().is_empty()) { - scopeguard::ScopeGuard::into_inner(reschedule); // nothing left, defuse reschedule + if !more_to_process { + // All work fits in this round + scopeguard::ScopeGuard::into_inner(reschedule); } } diff --git a/minter/src/main.rs b/minter/src/main.rs index b0ec4c2a..33315c7d 100644 --- a/minter/src/main.rs +++ b/minter/src/main.rs @@ -1,13 +1,15 @@ use candid::Principal; use canlog::{Log, Sort}; -use cksol_minter::consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}; -use cksol_minter::monitor::{ - FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions, - resubmit_transactions, -}; -use cksol_minter::withdraw::{WITHDRAWAL_PROCESSING_DELAY, process_pending_withdrawals}; use cksol_minter::{ - address::lazy_get_schnorr_master_key, runtime::IcCanisterRuntime, state::read_state, + address::lazy_get_schnorr_master_key, + consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}, + monitor::{ + FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions, + resubmit_transactions, + }, + runtime::IcCanisterRuntime, + state::read_state, + withdraw::{WITHDRAWAL_PROCESSING_DELAY, process_pending_withdrawals}, }; use cksol_types::{ Address, DepositStatus, GetDepositAddressArgs, MinterInfo, UpdateBalanceArgs, diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index b4c5beb8..b42e1a23 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -57,6 +57,8 @@ pub async fn finalize_transactions(runtime: R) { return; } + let more_to_process = + all_transactions.len() > MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, finalize_transactions); }); @@ -118,7 +120,8 @@ pub async fn finalize_transactions(runtime: R) { } } - if all_transactions.len() <= MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK { + if !more_to_process { + // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } @@ -142,14 +145,15 @@ pub async fn resubmit_transactions(runtime: R) { return; } + let more_to_process = to_resubmit.len() > MAX_CONCURRENT_RPC_CALLS; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, resubmit_transactions); }); - let fits_in_one_round = to_resubmit.len() <= MAX_CONCURRENT_RPC_CALLS; resubmit_expired_transactions(&runtime, to_resubmit).await; - if fits_in_one_round { + if !more_to_process { + // All work fits in this round scopeguard::ScopeGuard::into_inner(reschedule); } } diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index c714526e..8a89a8b2 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -4,11 +4,10 @@ use super::{ }; use crate::{ constants::MAX_CONCURRENT_RPC_CALLS, - state::event::DepositId, state::{TaskType, event::EventType, mutate_state, read_state, reset_state}, storage::reset_events, test_fixtures::{ - EventsAssert, MINTER_ACCOUNT, account, confirmed_block, events, init_schnorr_master_key, + EventsAssert, MINTER_ACCOUNT, confirmed_block, deposit_id, events, init_schnorr_master_key, init_state, runtime::TestCanisterRuntime, signature, }, }; @@ -469,13 +468,9 @@ fn submit_consolidation_transaction_with_signature( i: usize, slot: Slot, ) -> solana_signature::Signature { - let sig = signature(i); - let dep_id = DepositId { - signature: sig, - account: account(0), - }; + let dep_id = deposit_id(i); events::accept_deposit(dep_id, 1_000_000); events::mint_deposit(dep_id, i as u64); - events::submit_consolidation(sig, MINTER_ACCOUNT, slot, vec![i as u64]); - sig + events::submit_consolidation(dep_id.signature, MINTER_ACCOUNT, slot, vec![i as u64]); + dep_id.signature } diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 17069565..122d9c34 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -12,12 +12,11 @@ use itertools::Itertools; use sol_rpc_types::Slot; use solana_hash::Hash; -use crate::ledger::BurnError; use crate::{ consolidate::consolidate_deposits, constants::MAX_CONCURRENT_RPC_CALLS, guard::{TimerGuard, withdrawal_guard}, - ledger::burn, + ledger::{BurnError, burn}, runtime::CanisterRuntime, sol_transfer::{MAX_WITHDRAWALS_PER_TX, create_signed_batch_withdrawal_transaction}, state::{ @@ -103,10 +102,6 @@ pub async fn process_pending_withdrawals(runtime: R) { } }; - let reschedule = scopeguard::guard(runtime.clone(), |runtime| { - runtime.set_timer(Duration::ZERO, process_pending_withdrawals); - }); - let (affordable_requests, num_pending_withdrawals) = read_state(|state| { let mut available_balance = state.balance(); let pending = state.pending_withdrawal_requests(); @@ -135,6 +130,12 @@ pub async fn process_pending_withdrawals(runtime: R) { runtime.set_timer(Duration::ZERO, consolidate_deposits); } + let more_to_process = + affordable_requests.len() > MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX; + let reschedule = scopeguard::guard(runtime.clone(), |runtime| { + runtime.set_timer(Duration::ZERO, process_pending_withdrawals); + }); + let batches: Vec> = affordable_requests .into_iter() .chunks(MAX_WITHDRAWALS_PER_TX) @@ -144,7 +145,8 @@ pub async fn process_pending_withdrawals(runtime: R) { .collect(); if batches.is_empty() { - scopeguard::ScopeGuard::into_inner(reschedule); // nothing to do, defuse reschedule + // Nothing to process + scopeguard::ScopeGuard::into_inner(reschedule); return; } @@ -161,8 +163,9 @@ pub async fn process_pending_withdrawals(runtime: R) { })) .await; - if read_state(|s| s.pending_withdrawal_requests().is_empty()) { - scopeguard::ScopeGuard::into_inner(reschedule); // nothing left, defuse reschedule + if !more_to_process { + // All work fits in this round + scopeguard::ScopeGuard::into_inner(reschedule); } } From 27173fb4e7a88d9680d7b27fc1fc61a32a91c682 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 15:18:08 +0200 Subject: [PATCH 08/11] refactor: extract sig variable in submit_consolidation_transaction_with_signature Co-Authored-By: Claude Sonnet 4.6 --- minter/src/monitor/tests.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index 8a89a8b2..86b195f7 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -469,8 +469,9 @@ fn submit_consolidation_transaction_with_signature( slot: Slot, ) -> solana_signature::Signature { let dep_id = deposit_id(i); + let sig = dep_id.signature; events::accept_deposit(dep_id, 1_000_000); events::mint_deposit(dep_id, i as u64); - events::submit_consolidation(dep_id.signature, MINTER_ACCOUNT, slot, vec![i as u64]); - dep_id.signature + events::submit_consolidation(sig, MINTER_ACCOUNT, slot, vec![i as u64]); + sig } From e4c31c29ab6566e14fd2f3edfd2f241fe460b96f Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 15:19:44 +0200 Subject: [PATCH 09/11] refactor: use signature(i) directly in submit_consolidation_transaction_with_signature Co-Authored-By: Claude Sonnet 4.6 --- minter/src/monitor/tests.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index 86b195f7..f0d74ac8 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -468,10 +468,9 @@ fn submit_consolidation_transaction_with_signature( i: usize, slot: Slot, ) -> solana_signature::Signature { - let dep_id = deposit_id(i); - let sig = dep_id.signature; - events::accept_deposit(dep_id, 1_000_000); - events::mint_deposit(dep_id, i as u64); + let sig = signature(i); + events::accept_deposit(deposit_id(i), 1_000_000); + events::mint_deposit(deposit_id(i), i as u64); events::submit_consolidation(sig, MINTER_ACCOUNT, slot, vec![i as u64]); sig } From c40f18c0e3812fda6c0d26c056a80375c1d2dff8 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 15:20:21 +0200 Subject: [PATCH 10/11] refactor: rename sig to signature in submit_consolidation_transaction_with_signature Co-Authored-By: Claude Sonnet 4.6 --- minter/src/monitor/tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index f0d74ac8..3ff3132f 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -468,9 +468,9 @@ fn submit_consolidation_transaction_with_signature( i: usize, slot: Slot, ) -> solana_signature::Signature { - let sig = signature(i); + let signature = signature(i); events::accept_deposit(deposit_id(i), 1_000_000); events::mint_deposit(deposit_id(i), i as u64); - events::submit_consolidation(sig, MINTER_ACCOUNT, slot, vec![i as u64]); - sig + events::submit_consolidation(signature, MINTER_ACCOUNT, slot, vec![i as u64]); + signature } From 4860f8179e482377da0e591e09087fcbfe7594b6 Mon Sep 17 00:00:00 2001 From: Louis Pahlavi Date: Tue, 14 Apr 2026 17:09:46 +0200 Subject: [PATCH 11/11] Use `MAX_CONCURRENT_RPC_CALLS + 1` transactions in `should_reschedule_until_all_transactions_resubmitted` --- minter/src/monitor/tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index 3ff3132f..b737f605 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -405,13 +405,13 @@ mod resubmission { async fn should_reschedule_until_all_transactions_resubmitted() { setup(); - let num_transactions = MAX_CONCURRENT_RPC_CALLS + 2; + let num_transactions = MAX_CONCURRENT_RPC_CALLS + 1; for i in 0..num_transactions { let sig = submit_consolidation_transaction_with_signature(i, EXPIRED_SLOT); events::expire_transaction(sig); } - // Round 1: resubmits MAX_CONCURRENT_RPC_CALLS transactions, 2 remain → reschedule + // Round 1: resubmits MAX_CONCURRENT_RPC_CALLS transactions, 1 remain → reschedule let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) @@ -435,7 +435,7 @@ mod resubmission { }); assert_eq!(runtime.set_timer_call_count(), 1); - // Round 2: resubmits remaining 2 transactions → no reschedule + // Round 2: resubmits remaining transaction → no reschedule let mut runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT)))