diff --git a/integration_tests/src/fixtures.rs b/integration_tests/src/fixtures.rs index 62840e2b..647fe752 100644 --- a/integration_tests/src/fixtures.rs +++ b/integration_tests/src/fixtures.rs @@ -147,6 +147,12 @@ impl MockBuilder { self.get_transaction(get_deposit_transaction_response()) } + /// Mocks for `getSlot` → `getBlock`. + pub fn get_slot_and_block(self, slot: u64, blockhash: &str) -> Self { + self.expect(get_slot_request(), get_slot_response(slot)) + .expect(get_block_request(slot), get_block_response(blockhash)) + } + /// Mocks for `getSlot` → `getBlock` → `sendTransaction`. pub fn submit_transaction(self, slot: u64, blockhash: &str, tx_signature: &str) -> Self { self.expect(get_slot_request(), get_slot_response(slot)) @@ -279,6 +285,9 @@ fn get_slot_response(slot: u64) -> JsonRpcResponse { } fn get_block_request(slot: u64) -> JsonRpcRequestMatcher { + // The SOL RPC canister rounds the slot down to the nearest multiple of 20 + // before making getBlock requests, so we match that behavior here. + let slot = slot / 20 * 20; JsonRpcRequestMatcher::with_method("getBlock").with_params(json!([ slot, { diff --git a/integration_tests/tests/tests.rs b/integration_tests/tests/tests.rs index 59866fa7..beae039a 100644 --- a/integration_tests/tests/tests.rs +++ b/integration_tests/tests/tests.rs @@ -26,6 +26,11 @@ use sol_rpc_types::{CommitmentLevel, ConsensusStrategy, GetTransactionEncoding, use std::time::Duration; use tokio::join; +const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1); +const FINALIZE_TRANSACTIONS_DELAY: Duration = Duration::from_mins(2); +const RESUBMIT_TRANSACTIONS_DELAY: Duration = Duration::from_mins(3); +const DEPOSIT_CONSOLIDATION_DELAY: Duration = Duration::from_mins(10); + /// Deposits funds into the minter via `update_balance`, consolidates them, /// and finalizes the consolidation so the minter's internal balance is credited. /// @@ -39,7 +44,7 @@ async fn deposit_and_consolidate_funds(setup: &Setup) { assert_matches!(result, Ok(DepositStatus::Minted { .. })); // Consolidate - setup.advance_time(Duration::from_mins(10)).await; + setup.advance_time(DEPOSIT_CONSOLIDATION_DELAY).await; setup .execute_http_mocks( MockBuilder::with_start_id(4) @@ -53,7 +58,7 @@ async fn deposit_and_consolidate_funds(setup: &Setup) { .await; // Finalize - setup.advance_time(Duration::from_secs(60)).await; + setup.advance_time(FINALIZE_TRANSACTIONS_DELAY).await; setup .execute_http_mocks( MockBuilder::with_start_id(16) @@ -255,8 +260,10 @@ mod withdrawal_tests { use super::*; - const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1); const MAX_BLOCKHASH_AGE: Slot = 150; + /// The SOL RPC canister rounds the slot returned by getSlot down to the nearest multiple + /// of this value before querying getBlock and returning the slot to callers. + const SOL_RPC_SLOT_ROUNDING: u64 = 20; #[tokio::test] async fn should_validate_solana_address() { @@ -663,15 +670,22 @@ mod withdrawal_tests { other => panic!("Expected TxSent, got: {other:?}"), }; - // Advance time to trigger resubmission. The mocked slot exceeds - // INITIAL_SLOT + MAX_PROCESSING_AGE, so the original transaction - // is now considered expired. - const MONITOR_DELAY: Duration = Duration::from_secs(60); - setup.advance_time(MONITOR_DELAY).await; + // Advance time to trigger finalize_transactions, which fetches the current slot, + // checks statuses (not found), and marks the expired transaction for resubmission. + // The SOL RPC canister rounds the slot down to SOL_RPC_SLOT_ROUNDING before returning + // it, so we add SOL_RPC_SLOT_ROUNDING + 1 to ensure the rounded slot is strictly + // greater than INITIAL_SLOT + MAX_BLOCKHASH_AGE (the expiry threshold). + let resubmission_slot = INITIAL_SLOT + MAX_BLOCKHASH_AGE + SOL_RPC_SLOT_ROUNDING + 1; + setup.advance_time(FINALIZE_TRANSACTIONS_DELAY).await; setup - .execute_http_mocks(resubmit_withdrawal_http_mocks( - INITIAL_SLOT + MAX_BLOCKHASH_AGE + 50, - )) + .execute_http_mocks(mark_expired_withdrawal_http_mocks(resubmission_slot)) + .await; + + // Advance time to trigger resubmit_transactions. finalize_transactions also + // fires but has no pending transactions, so it makes no HTTP outcalls. + setup.advance_time(RESUBMIT_TRANSACTIONS_DELAY).await; + setup + .execute_http_mocks(resubmit_withdrawal_http_mocks(resubmission_slot)) .await; // Withdrawal status should now have a different signature @@ -687,11 +701,11 @@ mod withdrawal_tests { other => panic!("Expected TxSent after resubmission, got: {other:?}"), }; - // Advance time to trigger finalization. The monitor checks signature statuses - // and this time the transaction is reported as finalized. - setup.advance_time(MONITOR_DELAY).await; + // Advance time to trigger finalize_transactions again. This time the + // transaction is reported as finalized. + setup.advance_time(FINALIZE_TRANSACTIONS_DELAY).await; setup - .execute_http_mocks(finalize_withdrawal_http_mocks()) + .execute_http_mocks(finalize_withdrawal_http_mocks(resubmission_slot)) .await; // Withdrawal status should now be TxFinalized with Success @@ -721,10 +735,19 @@ mod withdrawal_tests { .build() } - /// HTTP mocks for resubmitting an expired withdrawal transaction. - fn resubmit_withdrawal_http_mocks(current_slot: u64) -> MockHttpOutcalls { + /// HTTP mocks for finalize_transactions detecting an expired transaction: + /// fetches slot, checks status (not found), marks for resubmission. + fn mark_expired_withdrawal_http_mocks(current_slot: u64) -> MockHttpOutcalls { MockBuilder::with_start_id(40) - .resubmit_transaction( + .get_current_slot(current_slot, "9ZNTfG4NyQgxy2SWjSiQoUyBPEvXT2xo7fKc5hPYYJ7b") + .check_signature_statuses_not_found(1) + .build() + } + + /// HTTP mocks for resubmit_transactions sending the replacement transaction. + fn resubmit_withdrawal_http_mocks(current_slot: u64) -> MockHttpOutcalls { + MockBuilder::with_start_id(52) + .submit_transaction( current_slot, "9ZNTfG4NyQgxy2SWjSiQoUyBPEvXT2xo7fKc5hPYYJ7b", "5VERv8NMvzbJMEkV8xnrLkEaWRtSz9CosKDYjCJjBRnbJLgp8uirBgmQpjKhoR4tjF3ZpRzrFmBV6UjKdiSZkQUW", @@ -732,10 +755,10 @@ mod withdrawal_tests { .build() } - /// HTTP mocks for finalizing a withdrawal transaction. - fn finalize_withdrawal_http_mocks() -> MockHttpOutcalls { + /// HTTP mocks for finalize_transactions confirming the resubmitted transaction. + fn finalize_withdrawal_http_mocks(current_slot: u64) -> MockHttpOutcalls { MockBuilder::with_start_id(64) - .get_current_slot(350_000_200, "9ZNTfG4NyQgxy2SWjSiQoUyBPEvXT2xo7fKc5hPYYJ7b") + .get_current_slot(current_slot, "9ZNTfG4NyQgxy2SWjSiQoUyBPEvXT2xo7fKc5hPYYJ7b") .check_signature_statuses_finalized(1) .build() } @@ -1040,8 +1063,6 @@ mod anonymous_caller_tests { mod consolidation_tests { use super::*; - const DEPOSIT_CONSOLIDATION_DELAY: Duration = Duration::from_mins(10); - #[tokio::test] async fn should_consolidate_deposits_after_timer() { let setup = SetupBuilder::new().with_proxy_canister().build().await; diff --git a/minter/src/constants.rs b/minter/src/constants.rs index d3a87e78..39d12d5d 100644 --- a/minter/src/constants.rs +++ b/minter/src/constants.rs @@ -1,6 +1,10 @@ /// Maximum number of concurrent calls to the SOL RPC canister. pub const MAX_CONCURRENT_RPC_CALLS: usize = 10; +/// Maximum number of rounds per timer invocation. +/// Each round issues up to [`MAX_CONCURRENT_RPC_CALLS`] parallel RPC calls. +pub const MAX_TIMER_ROUNDS: usize = 5; + /// Matches the ICP HTTPS outcall response limit for variable-length RPC calls /// such as `getTransaction` and `getSignatureStatuses`: /// https://docs.internetcomputer.org/references/ic-interface-spec#ic-http_request diff --git a/minter/src/main.rs b/minter/src/main.rs index c91bffcd..3da81bbb 100644 --- a/minter/src/main.rs +++ b/minter/src/main.rs @@ -1,11 +1,16 @@ use candid::Principal; use canlog::{Log, Sort}; -use cksol_minter::consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}; -use cksol_minter::monitor::{MONITOR_SUBMITTED_TRANSACTIONS_DELAY, monitor_submitted_transactions}; use cksol_minter::withdraw::{WITHDRAWAL_PROCESSING_DELAY, process_pending_withdrawals}; use cksol_minter::{ address::lazy_get_schnorr_master_key, runtime::IcCanisterRuntime, state::read_state, }; +use cksol_minter::{ + consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}, + monitor::{ + FINALIZE_TRANSACTIONS_DELAY, RESUBMIT_TRANSACTIONS_DELAY, finalize_transactions, + resubmit_transactions, + }, +}; use cksol_types::{ Address, DepositStatus, GetDepositAddressArgs, MinterInfo, UpdateBalanceArgs, UpdateBalanceError, WithdrawalArgs, WithdrawalError, WithdrawalOk, WithdrawalStatus, @@ -335,8 +340,11 @@ fn setup_timers() { ic_cdk_timers::set_timer_interval(WITHDRAWAL_PROCESSING_DELAY, async || { process_pending_withdrawals(&IcCanisterRuntime::new()).await; }); - ic_cdk_timers::set_timer_interval(MONITOR_SUBMITTED_TRANSACTIONS_DELAY, async || { - monitor_submitted_transactions(IcCanisterRuntime::new()).await; + ic_cdk_timers::set_timer_interval(FINALIZE_TRANSACTIONS_DELAY, async || { + finalize_transactions(IcCanisterRuntime::new()).await; + }); + ic_cdk_timers::set_timer_interval(RESUBMIT_TRANSACTIONS_DELAY, async || { + resubmit_transactions(IcCanisterRuntime::new()).await; }); } diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index b8214e61..01b30cf9 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -1,6 +1,6 @@ use crate::{ address::derivation_path, - constants::MAX_CONCURRENT_RPC_CALLS, + constants::{MAX_CONCURRENT_RPC_CALLS, MAX_TIMER_ROUNDS}, guard::TimerGuard, runtime::CanisterRuntime, signer::sign_bytes, @@ -31,19 +31,22 @@ use thiserror::Error; #[cfg(test)] mod tests; -pub const MONITOR_SUBMITTED_TRANSACTIONS_DELAY: Duration = Duration::from_secs(60); +pub const FINALIZE_TRANSACTIONS_DELAY: Duration = Duration::from_mins(2); +pub const RESUBMIT_TRANSACTIONS_DELAY: Duration = Duration::from_mins(3); const MAX_BLOCKHASH_AGE: Slot = 150; /// Maximum number of signatures per `getSignatureStatuses` RPC call. /// See https://solana.com/docs/rpc/http/getsignaturestatuses const MAX_SIGNATURES_PER_STATUS_CHECK: usize = 256; -pub async fn monitor_submitted_transactions(runtime: R) { - let _guard = match TimerGuard::new(TaskType::MonitorSubmittedTransactions) { +/// Check the status of all submitted transactions, finalize succeeded/failed +/// ones, and mark expired transactions for resubmission. +pub async fn finalize_transactions(runtime: R) { + let _guard = match TimerGuard::new(TaskType::FinalizeTransactions) { Ok(guard) => guard, Err(_) => return, }; - let all_transactions: Vec<(Signature, Slot)> = read_state(|state| { + let all_transactions: BTreeMap = read_state(|state| { state .submitted_transactions() .iter() @@ -51,16 +54,6 @@ pub async fn monitor_submitted_transactions(runtime: R) { .collect() }); if all_transactions.is_empty() { - let to_resubmit: Vec<_> = read_state(|state| { - state - .transactions_to_resubmit() - .iter() - .map(|(sig, tx)| (*sig, tx.message.clone(), tx.signers.clone())) - .collect() - }); - if !to_resubmit.is_empty() { - resubmit_expired_transactions(&runtime, to_resubmit).await; - } return; } @@ -75,7 +68,8 @@ pub async fn monitor_submitted_transactions(runtime: R) { } }; - let statuses = check_transaction_statuses(&runtime, &all_transactions).await; + let signatures: Vec = all_transactions.keys().copied().collect(); + let statuses = check_transaction_statuses(&runtime, signatures).await; for (signature, error) in &statuses.errored { log!( @@ -106,30 +100,28 @@ pub async fn monitor_submitted_transactions(runtime: R) { }); } - if statuses.not_found.is_empty() { - return; - } - - let expired_signatures: BTreeSet = all_transactions - .into_iter() - .filter(|(sig, slot)| { - statuses.not_found.contains(sig) && slot + MAX_BLOCKHASH_AGE < current_slot - }) - .map(|(sig, _)| sig) - .collect(); - - if expired_signatures.is_empty() { - return; + for signature in &statuses.not_found { + if all_transactions[signature] + MAX_BLOCKHASH_AGE < current_slot { + mutate_state(|state| { + process_event( + state, + EventType::ExpiredTransaction { + signature: *signature, + }, + &runtime, + ) + }); + } } +} - for signature in expired_signatures { - mutate_state(|state| { - // Skip if the transaction was finalized concurrently. - if state.submitted_transactions().contains_key(&signature) { - process_event(state, EventType::ExpiredTransaction { signature }, &runtime); - } - }); - } +/// Resubmit transactions that have been marked for resubmission by +/// [`finalize_transactions`]. +pub async fn resubmit_transactions(runtime: R) { + let _guard = match TimerGuard::new(TaskType::ResubmitTransactions) { + Ok(guard) => guard, + Err(_) => return, + }; let to_resubmit: Vec<_> = read_state(|state| { state @@ -138,7 +130,9 @@ pub async fn monitor_submitted_transactions(runtime: R) { .map(|(sig, tx)| (*sig, tx.message.clone(), tx.signers.clone())) .collect() }); - + if to_resubmit.is_empty() { + return; + } resubmit_expired_transactions(&runtime, to_resubmit).await; } @@ -156,10 +150,9 @@ struct TransactionStatuses { async fn check_transaction_statuses( runtime: &R, - transactions: &[(Signature, Slot)], + signatures: Vec, ) -> TransactionStatuses { - let signatures: Vec = transactions.iter().map(|(sig, _)| *sig).collect(); - let batches: Vec> = signatures + let batches: Vec> = signatures .into_iter() .chunks(MAX_SIGNATURES_PER_STATUS_CHECK) .into_iter() @@ -211,9 +204,17 @@ async fn check_transaction_statuses( async fn resubmit_expired_transactions( runtime: &R, - expired: Vec<(Signature, VersionedMessage, Vec)>, + to_resubmit: Vec<(Signature, VersionedMessage, Vec)>, ) { - for round in &expired.into_iter().chunks(MAX_CONCURRENT_RPC_CALLS) { + let rounds: Vec> = to_resubmit + .into_iter() + .chunks(MAX_CONCURRENT_RPC_CALLS) + .into_iter() + .take(MAX_TIMER_ROUNDS) + .map(Iterator::collect) + .collect(); + + for round in rounds { let (new_slot, new_blockhash) = match get_recent_slot_and_blockhash(runtime).await { Ok(result) => result, Err(e) => { @@ -222,27 +223,29 @@ async fn resubmit_expired_transactions( } }; - futures::future::join_all(round.map(async |(old_signature, message, signers)| { - match try_resubmit_transaction( - runtime, - old_signature, - message, - signers, - new_slot, - new_blockhash, - ) - .await - { - Ok(new_sig) => log!( - Priority::Info, - "Resubmitted transaction {old_signature} as {new_sig}" - ), - Err(e) => log!( - Priority::Info, - "Failed to resubmit transaction {old_signature}: {e}" - ), - } - })) + futures::future::join_all(round.into_iter().map( + async |(old_signature, message, signers)| { + match try_resubmit_transaction( + runtime, + old_signature, + message, + signers, + new_slot, + new_blockhash, + ) + .await + { + Ok(new_sig) => log!( + Priority::Info, + "Resubmitted transaction {old_signature} as {new_sig}" + ), + Err(e) => log!( + Priority::Info, + "Failed to resubmit transaction {old_signature}: {e}" + ), + } + }, + )) .await; } } diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs index cf170a12..2a8842f7 100644 --- a/minter/src/monitor/tests.rs +++ b/minter/src/monitor/tests.rs @@ -1,4 +1,4 @@ -use super::{MAX_BLOCKHASH_AGE, monitor_submitted_transactions}; +use super::{MAX_BLOCKHASH_AGE, finalize_transactions, resubmit_transactions}; use crate::{ state::{TaskType, event::EventType, mutate_state, read_state, reset_state}, storage::reset_events, @@ -22,57 +22,53 @@ const RECENT_SLOT: Slot = CURRENT_SLOT - 10; const EXPIRED_SLOT: Slot = CURRENT_SLOT - MAX_BLOCKHASH_AGE - 1; const RESUBMISSION_SLOT: Slot = CURRENT_SLOT + 5; -#[tokio::test] -async fn should_return_early_if_no_submitted_transactions() { - setup(); - - monitor_submitted_transactions(TestCanisterRuntime::new().with_increasing_time()).await; +mod finalization { + use super::*; - EventsAssert::assert_no_events_recorded(); -} + #[tokio::test] + async fn should_return_early_if_no_submitted_transactions() { + setup(); -#[tokio::test] -async fn should_return_early_if_task_already_active() { - setup(); - submit_consolidation_transaction(CURRENT_SLOT); + finalize_transactions(TestCanisterRuntime::new().with_increasing_time()).await; - mutate_state(|s| { - s.active_tasks_mut() - .insert(TaskType::MonitorSubmittedTransactions); - }); + EventsAssert::assert_no_events_recorded(); + } - let events_before = EventsAssert::from_recorded(); + #[tokio::test] + async fn should_return_early_if_task_already_active() { + setup(); + submit_consolidation_transaction(CURRENT_SLOT); - // We return early, therefore no RPC calls are made - let runtime = TestCanisterRuntime::new(); + mutate_state(|s| { + s.active_tasks_mut().insert(TaskType::FinalizeTransactions); + }); - monitor_submitted_transactions(runtime).await; + let events_before = EventsAssert::from_recorded(); - let events_after = EventsAssert::from_recorded(); - assert_eq!(events_before, events_after); -} + finalize_transactions(TestCanisterRuntime::new()).await; -#[tokio::test] -async fn should_return_early_if_fetching_current_slot_fails() { - setup(); - submit_consolidation_transaction(EXPIRED_SLOT); + let events_after = EventsAssert::from_recorded(); + assert_eq!(events_before, events_after); + } - let events_before = EventsAssert::from_recorded(); + #[tokio::test] + async fn should_return_early_if_fetching_current_slot_fails() { + setup(); + submit_consolidation_transaction(EXPIRED_SLOT); - let error = SlotResult::Consistent(Err(RpcError::ValidationError("Error".to_string()))); - let runtime = TestCanisterRuntime::new() - .add_stub_response(error.clone()) - .add_stub_response(error.clone()) - .add_stub_response(error); + let events_before = EventsAssert::from_recorded(); - monitor_submitted_transactions(runtime).await; + let error = SlotResult::Consistent(Err(RpcError::ValidationError("Error".to_string()))); + let runtime = TestCanisterRuntime::new() + .add_stub_response(error.clone()) + .add_stub_response(error.clone()) + .add_stub_response(error); - let events_after = EventsAssert::from_recorded(); - assert_eq!(events_before, events_after); -} + finalize_transactions(runtime).await; -mod finalization { - use super::*; + let events_after = EventsAssert::from_recorded(); + assert_eq!(events_before, events_after); + } #[tokio::test] async fn should_finalize_transaction_with_finalized_status() { @@ -88,7 +84,7 @@ mod finalization { finalized_status(), )]))); - monitor_submitted_transactions(runtime).await; + finalize_transactions(runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::SucceededTransaction { signature }); @@ -132,7 +128,7 @@ mod finalization { let events_before = EventsAssert::from_recorded(); - monitor_submitted_transactions(runtime).await; + finalize_transactions(runtime).await; let events_after = EventsAssert::from_recorded(); assert_eq!(events_before, events_after); @@ -159,7 +155,7 @@ mod finalization { }, )]))); - monitor_submitted_transactions(runtime).await; + finalize_transactions(runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::FailedTransaction { signature }); @@ -193,7 +189,7 @@ mod finalization { ]))); // sig_b is not_found but RECENT_SLOT is not expired, so no resubmission. - monitor_submitted_transactions(runtime).await; + finalize_transactions(runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::SucceededTransaction { @@ -206,6 +202,8 @@ mod finalization { read_state(|s| { assert_eq!(s.submitted_transactions().len(), 1); assert!(s.submitted_transactions().contains_key(&signature(sig_b))); + // sig_b is not yet expired (RECENT_SLOT), so not marked for resubmission + assert!(s.transactions_to_resubmit().is_empty()); }); } @@ -240,25 +238,66 @@ mod finalization { mod resubmission { use super::*; + #[tokio::test] + async fn should_return_early_if_no_transactions_to_resubmit() { + setup(); + + resubmit_transactions(TestCanisterRuntime::new().with_increasing_time()).await; + + EventsAssert::assert_no_events_recorded(); + } + + #[tokio::test] + async fn should_return_early_if_task_already_active() { + setup(); + let sig = submit_consolidation_transaction(EXPIRED_SLOT); + events::expire_transaction(sig); + + mutate_state(|s| { + s.active_tasks_mut().insert(TaskType::ResubmitTransactions); + }); + + let events_before = EventsAssert::from_recorded(); + + resubmit_transactions(TestCanisterRuntime::new()).await; + + let events_after = EventsAssert::from_recorded(); + assert_eq!(events_before, events_after); + } + #[tokio::test] async fn should_resubmit_expired_transaction_with_no_status() { setup(); let old_signature = submit_consolidation_transaction(EXPIRED_SLOT); - let new_signature = signature(0xAA); - let runtime = TestCanisterRuntime::new() + // Step 1: finalize_transactions fetches slot, checks status, marks expired + let finalize_runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) - .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))); + + finalize_transactions(finalize_runtime).await; + + EventsAssert::from_recorded().expect_contains_event_eq(EventType::ExpiredTransaction { + signature: old_signature, + }); + + read_state(|s| { + assert!(s.transactions_to_resubmit().contains_key(&old_signature)); + }); + + // Step 2: resubmit_transactions fetches a fresh blockhash and resubmits + let resubmit_runtime = TestCanisterRuntime::new() + .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) .add_stub_response(SendTransactionResult::Consistent(Ok(new_signature.into()))) .add_signature(new_signature.into()); - monitor_submitted_transactions(runtime).await; + resubmit_transactions(resubmit_runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::ExpiredTransaction { @@ -284,7 +323,7 @@ mod resubmission { let events_before = EventsAssert::from_recorded(); - let runtime = TestCanisterRuntime::new() + let finalize_runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) @@ -292,12 +331,15 @@ mod resubmission { RpcError::ValidationError("Error".to_string()), ))); - monitor_submitted_transactions(runtime).await; + finalize_transactions(finalize_runtime).await; let events_after = EventsAssert::from_recorded(); assert_eq!(events_before, events_after); - read_state(|s| assert_eq!(s.submitted_transactions().len(), 1)); + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), 1); + assert!(s.transactions_to_resubmit().is_empty()); + }); } #[tokio::test] @@ -305,20 +347,24 @@ mod resubmission { setup(); let old_signature = submit_consolidation_transaction(EXPIRED_SLOT); - let new_signature = signature(0xAA); - let runtime = TestCanisterRuntime::new() + let finalize_runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) - .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))) + .add_stub_response(SignatureStatusesResult::Consistent(Ok(vec![None]))); + + finalize_transactions(finalize_runtime).await; + + let resubmit_runtime = TestCanisterRuntime::new() + .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) .add_stub_response(SendTransactionResult::Inconsistent(vec![])) .add_signature(new_signature.into()); - monitor_submitted_transactions(runtime).await; + resubmit_transactions(resubmit_runtime).await; EventsAssert::from_recorded() .expect_contains_event_eq(EventType::ExpiredTransaction { @@ -342,20 +388,30 @@ mod resubmission { submit_consolidation_transaction_with_signature(i as u8, EXPIRED_SLOT); } - let mut runtime = TestCanisterRuntime::new() + // finalize_transactions: marks all as expired + let finalize_runtime = TestCanisterRuntime::new() .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(CURRENT_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))) // getSignatureStatuses: all not found .add_stub_response(SignatureStatusesResult::Consistent(Ok( vec![None; num_transactions], - ))) - // Round 1: get_recent_slot_and_blockhash for resubmission (getSlot + getBlock) + ))); + + finalize_transactions(finalize_runtime).await; + + read_state(|s| { + assert_eq!(s.transactions_to_resubmit().len(), num_transactions); + }); + + // resubmit_transactions: processes in rounds of MAX_CONCURRENT_RPC_CALLS + let mut resubmit_runtime = TestCanisterRuntime::new() + .with_increasing_time() .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); for i in 0..MAX_CONCURRENT_RPC_CALLS { - runtime = runtime + resubmit_runtime = resubmit_runtime .add_stub_response(SendTransactionResult::Consistent(Ok(signature( 0xA0 + i as u8, ) @@ -363,12 +419,12 @@ mod resubmission { .add_signature([0xA0 + i as u8; 64]); } - runtime = runtime + resubmit_runtime = resubmit_runtime .add_stub_response(SlotResult::Consistent(Ok(RESUBMISSION_SLOT))) .add_stub_response(BlockResult::Consistent(Ok(confirmed_block()))); for i in 0..2_usize { - runtime = runtime + resubmit_runtime = resubmit_runtime .add_stub_response(SendTransactionResult::Consistent(Ok(signature( 0xB0 + i as u8, ) @@ -376,7 +432,7 @@ mod resubmission { .add_signature([0xB0 + i as u8; 64]); } - monitor_submitted_transactions(runtime).await; + resubmit_transactions(resubmit_runtime).await; read_state(|s| assert_eq!(s.submitted_transactions().len(), num_transactions)); } diff --git a/minter/src/state/mod.rs b/minter/src/state/mod.rs index fccce1d3..dbf48ccc 100644 --- a/minter/src/state/mod.rs +++ b/minter/src/state/mod.rs @@ -799,7 +799,8 @@ pub struct MintedDeposit { pub enum TaskType { DepositConsolidation, Mint, - MonitorSubmittedTransactions, + FinalizeTransactions, + ResubmitTransactions, WithdrawalProcessing, } diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 8434ff83..933567ea 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -12,7 +12,7 @@ use itertools::Itertools; use sol_rpc_types::Slot; use solana_hash::Hash; -use crate::constants::MAX_CONCURRENT_RPC_CALLS; +use crate::constants::{MAX_CONCURRENT_RPC_CALLS, MAX_TIMER_ROUNDS}; use crate::ledger::BurnError; use crate::{ consolidate::consolidate_deposits, @@ -30,7 +30,6 @@ use crate::{ }; pub const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1); -pub(crate) const MAX_WITHDRAWAL_ROUNDS: usize = 5; #[cfg(test)] mod tests; @@ -104,8 +103,7 @@ pub async fn process_pending_withdrawals(runtime: &R) { } }; - let max_per_invocation = - MAX_WITHDRAWAL_ROUNDS * MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX; + let max_per_invocation = MAX_TIMER_ROUNDS * MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX; let (affordable_requests, num_pending_withdrawals) = read_state(|state| { let mut available_balance = state.balance(); @@ -152,7 +150,7 @@ pub async fn process_pending_withdrawals(runtime: &R) { .into_iter() .chunks(MAX_CONCURRENT_RPC_CALLS) .into_iter() - .take(MAX_WITHDRAWAL_ROUNDS) + .take(MAX_TIMER_ROUNDS) .map(Iterator::collect) .collect(); diff --git a/minter/src/withdraw/tests.rs b/minter/src/withdraw/tests.rs index b0129f7a..0cfaf0c1 100644 --- a/minter/src/withdraw/tests.rs +++ b/minter/src/withdraw/tests.rs @@ -1,5 +1,5 @@ use crate::{ - constants::MAX_CONCURRENT_RPC_CALLS, + constants::{MAX_CONCURRENT_RPC_CALLS, MAX_TIMER_ROUNDS}, guard::{TimerGuard, withdrawal_guard}, sol_transfer::MAX_WITHDRAWALS_PER_TX, state::{TaskType, read_state}, @@ -8,7 +8,7 @@ use crate::{ confirmed_block, events, init_balance, init_balance_to, init_schnorr_master_key, init_state, runtime::TestCanisterRuntime, signature, }, - withdraw::{MAX_WITHDRAWAL_ROUNDS, process_pending_withdrawals, withdraw, withdrawal_status}, + withdraw::{process_pending_withdrawals, withdraw, withdrawal_status}, }; use assert_matches::assert_matches; use candid::{Nat, Principal}; @@ -565,18 +565,18 @@ mod process_pending_withdrawals_tests { let slot = 1; let max_per_round = MAX_WITHDRAWALS_PER_TX * MAX_CONCURRENT_RPC_CALLS; - // Create enough requests to fill MAX_WITHDRAWAL_ROUNDS + 1 rounds. - let request_count = max_per_round * MAX_WITHDRAWAL_ROUNDS + 1; + // Create enough requests to fill MAX_TIMER_ROUNDS + 1 rounds. + let request_count = max_per_round * MAX_TIMER_ROUNDS + 1; // Insert pending withdrawal requests directly into state. for i in 0..request_count { events::accept_withdrawal(account(i as u8), i as u64, MINIMUM_WITHDRAWAL_AMOUNT); } - // Set up RPC responses for MAX_WITHDRAWAL_ROUNDS rounds. + // Set up RPC responses for MAX_TIMER_ROUNDS rounds. let mut runtime = TestCanisterRuntime::new().with_increasing_time(); let mut sig_counter: u8 = 0; - for _round in 0..MAX_WITHDRAWAL_ROUNDS { + for _round in 0..MAX_TIMER_ROUNDS { runtime = runtime .add_stub_response(GetSlotResult::Consistent(Ok(slot))) .add_stub_response(GetBlockResult::Consistent(Ok(confirmed_block()))); @@ -593,8 +593,8 @@ mod process_pending_withdrawals_tests { process_pending_withdrawals(&runtime).await; - let processed = max_per_round * MAX_WITHDRAWAL_ROUNDS; - // All requests within MAX_WITHDRAWAL_ROUNDS rounds should be processed + let processed = max_per_round * MAX_TIMER_ROUNDS; + // All requests within MAX_TIMER_ROUNDS rounds should be processed for i in 0..processed { assert_matches!( withdrawal_status(i as u64), @@ -602,7 +602,7 @@ mod process_pending_withdrawals_tests { "withdrawal {i} should be TxSent" ); } - // The extra request beyond MAX_WITHDRAWAL_ROUNDS rounds should remain pending + // The extra request beyond MAX_TIMER_ROUNDS rounds should remain pending assert_matches!( withdrawal_status(processed as u64), WithdrawalStatus::Pending,