From 3973ed2e07901fe2f0b5f170e27b9da02e15ac58 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 16 Oct 2024 22:03:40 +0200 Subject: [PATCH 1/8] perf: trigger rav request concurrently Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_account.rs | 169 +++++++++++++++-------- tap-agent/src/agent/sender_allocation.rs | 137 ++++++++++++------ 2 files changed, 208 insertions(+), 98 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index bbf295e53..e1795de8c 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -6,6 +6,7 @@ use alloy::primitives::U256; use bigdecimal::num_bigint::ToBigInt; use bigdecimal::ToPrimitive; use graphql_client::GraphQLQuery; +use jsonrpsee::http_client::HttpClientBuilder; use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; @@ -17,7 +18,7 @@ use alloy::primitives::Address; use anyhow::Result; use eventuals::{Eventual, EventualExt, PipeHandle}; use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; -use ractor::{call, Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent}; +use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent}; use sqlx::PgPool; use tap_core::rav::SignedRAV; use tracing::{error, Level}; @@ -76,10 +77,11 @@ lazy_static! { type RavMap = HashMap; type Balance = U256; -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub enum ReceiptFees { NewReceipt(u128), UpdateValue(UnaggregatedReceipts), + RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option)>), Retry, } @@ -149,7 +151,7 @@ pub struct State { domain_separator: Eip712Domain, config: &'static config::Config, pgpool: PgPool, - sender_aggregator_endpoint: String, + http_client: jsonrpsee::http_client::HttpClient, } impl State { @@ -172,8 +174,8 @@ impl State { escrow_subgraph: self.escrow_subgraph, escrow_adapter: self.escrow_adapter.clone(), domain_separator: self.domain_separator.clone(), - sender_aggregator_endpoint: self.sender_aggregator_endpoint.clone(), sender_account_ref: sender_account_ref.clone(), + http_client: self.http_client.clone(), }; SenderAllocation::spawn_linked( @@ -215,38 +217,8 @@ impl State { "Error while getting allocation actor {allocation_id} with most unaggregated fees" ); }; - // we call and wait for the response so we don't process anymore update - let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else { - anyhow::bail!("Error while sending and waiting message for actor {allocation_id}"); - }; - let (fees, rav) = match rav_result { - Ok(ok_value) => { - self.rav_tracker.ok_rav_request(allocation_id); - ok_value - } - Err(err) => { - self.rav_tracker.failed_rav_backoff(allocation_id); - anyhow::bail!( - "Error while requesting RAV for sender {} and allocation {}: {}", - self.sender, - allocation_id, - err - ); - } - }; + let _ = allocation.cast(SenderAllocationMessage::TriggerRAVRequest); - let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate); - // update rav tracker - self.rav_tracker.update(allocation_id, rav_value); - PENDING_RAV - .with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()]) - .set(rav_value as f64); - - // update sender fee tracker - self.sender_fee_tracker.update(allocation_id, fees.value); - UNAGGREGATED_FEES - .with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()]) - .set(fees.value as f64); Ok(()) } @@ -474,6 +446,10 @@ impl Actor for SenderAccount { .with_label_values(&[&sender_id.to_string()]) .set(config.tap.rav_request_trigger_value as f64); + let http_client = HttpClientBuilder::default() + .request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs)) + .build(&sender_aggregator_endpoint)?; + let state = State { sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis( config.tap.rav_request_timestamp_buffer_ms, @@ -488,7 +464,7 @@ impl Actor for SenderAccount { escrow_subgraph, escrow_adapter, domain_separator, - sender_aggregator_endpoint, + http_client, config, pgpool, sender: sender_id, @@ -588,6 +564,41 @@ impl Actor for SenderAccount { ]) .add(value as f64); } + ReceiptFees::RavRequestResponse(rav_result) => { + match rav_result { + Ok((fees, rav)) => { + state.rav_tracker.ok_rav_request(allocation_id); + + let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate); + // update rav tracker + state.rav_tracker.update(allocation_id, rav_value); + PENDING_RAV + .with_label_values(&[ + &state.sender.to_string(), + &allocation_id.to_string(), + ]) + .set(rav_value as f64); + + // update sender fee tracker + state.sender_fee_tracker.update(allocation_id, fees.value); + UNAGGREGATED_FEES + .with_label_values(&[ + &state.sender.to_string(), + &allocation_id.to_string(), + ]) + .set(fees.value as f64); + } + Err(err) => { + state.rav_tracker.failed_rav_backoff(allocation_id); + error!( + "Error while requesting RAV for sender {} and allocation {}: {}", + state.sender, + allocation_id, + err + ); + } + }; + } ReceiptFees::UpdateValue(unaggregated_fees) => { state .sender_fee_tracker @@ -891,7 +902,21 @@ pub mod tests { match (self, other) { (Self::UpdateAllocationIds(l0), Self::UpdateAllocationIds(r0)) => l0 == r0, (Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => { - l0 == r0 && l1 == r1 + l0 == r0 + && match (l1, r1) { + (ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l, + (ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l, + ( + ReceiptFees::RavRequestResponse(l), + ReceiptFees::RavRequestResponse(r), + ) => match (l, r) { + (Ok(l), Ok(r)) => l == r, + (Err(l), Err(r)) => l.to_string() == r.to_string(), + _ => false, + }, + (ReceiptFees::Retry, ReceiptFees::Retry) => true, + _ => false, + } } ( Self::UpdateInvalidReceiptFees(l0, l1), @@ -1072,13 +1097,18 @@ pub mod tests { next_rav_value: Arc>, next_unaggregated_fees_value: Arc>, receipts: Arc>>, + + sender_actor: Option>, } impl MockSenderAllocation { - pub fn new_with_triggered_rav_request() -> (Self, Arc, Arc>) { + pub fn new_with_triggered_rav_request( + sender_actor: ActorRef, + ) -> (Self, Arc, Arc>) { let triggered_rav_request = Arc::new(AtomicU32::new(0)); let unaggregated_fees = Arc::new(Mutex::new(0)); ( Self { + sender_actor: Some(sender_actor), triggered_rav_request: triggered_rav_request.clone(), receipts: Arc::new(Mutex::new(Vec::new())), next_rav_value: Arc::new(Mutex::new(0)), @@ -1089,10 +1119,13 @@ pub mod tests { ) } - pub fn new_with_next_unaggregated_fees_value() -> (Self, Arc>) { + pub fn new_with_next_unaggregated_fees_value( + sender_actor: ActorRef, + ) -> (Self, Arc>) { let unaggregated_fees = Arc::new(Mutex::new(0)); ( Self { + sender_actor: Some(sender_actor), triggered_rav_request: Arc::new(AtomicU32::new(0)), receipts: Arc::new(Mutex::new(Vec::new())), next_rav_value: Arc::new(Mutex::new(0)), @@ -1102,10 +1135,13 @@ pub mod tests { ) } - pub fn new_with_next_rav_value() -> (Self, Arc>) { + pub fn new_with_next_rav_value( + sender_actor: ActorRef, + ) -> (Self, Arc>) { let next_rav_value = Arc::new(Mutex::new(0)); ( Self { + sender_actor: Some(sender_actor), triggered_rav_request: Arc::new(AtomicU32::new(0)), receipts: Arc::new(Mutex::new(Vec::new())), next_rav_value: next_rav_value.clone(), @@ -1119,6 +1155,7 @@ pub mod tests { let receipts = Arc::new(Mutex::new(Vec::new())); ( Self { + sender_actor: None, triggered_rav_request: Arc::new(AtomicU32::new(0)), receipts: receipts.clone(), next_rav_value: Arc::new(Mutex::new(0)), @@ -1150,7 +1187,7 @@ pub mod tests { _state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { match message { - SenderAllocationMessage::TriggerRAVRequest(reply) => { + SenderAllocationMessage::TriggerRAVRequest => { self.triggered_rav_request .fetch_add(1, std::sync::atomic::Ordering::SeqCst); let signed_rav = create_rav( @@ -1159,13 +1196,18 @@ pub mod tests { 4, *self.next_rav_value.lock().unwrap(), ); - reply.send(Ok(( - UnaggregatedReceipts { - value: *self.next_unaggregated_fees_value.lock().unwrap(), - last_id: 0, - }, - Some(signed_rav), - )))?; + if let Some(sender_account) = self.sender_actor.as_ref() { + sender_account.cast(SenderAccountMessage::UpdateReceiptFees( + *ALLOCATION_ID_0, + ReceiptFees::RavRequestResponse(Ok(( + UnaggregatedReceipts { + value: *self.next_unaggregated_fees_value.lock().unwrap(), + last_id: 0, + }, + Some(signed_rav), + ))), + ))?; + } } SenderAllocationMessage::NewReceipt(receipt) => { self.receipts.lock().unwrap().push(receipt); @@ -1180,6 +1222,7 @@ pub mod tests { prefix: String, sender: Address, allocation: Address, + sender_actor: ActorRef, ) -> ( Arc, Arc>, @@ -1187,7 +1230,7 @@ pub mod tests { JoinHandle<()>, ) { let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) = - MockSenderAllocation::new_with_triggered_rav_request(); + MockSenderAllocation::new_with_triggered_rav_request(sender_actor); let name = format!("{}:{}:{}", prefix, sender, allocation); let (sender_account, join_handle) = @@ -1214,7 +1257,13 @@ pub mod tests { .await; let (triggered_rav_request, _, allocation, allocation_handle) = - create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await; + create_mock_sender_allocation( + prefix, + SENDER.1, + *ALLOCATION_ID_0, + sender_account.clone(), + ) + .await; // create a fake sender allocation sender_account @@ -1250,7 +1299,13 @@ pub mod tests { .await; let (triggered_rav_request, _, allocation, allocation_handle) = - create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await; + create_mock_sender_allocation( + prefix, + SENDER.1, + *ALLOCATION_ID_0, + sender_account.clone(), + ) + .await; // create a fake sender allocation sender_account @@ -1372,7 +1427,13 @@ pub mod tests { .await; let (triggered_rav_request, next_value, allocation, allocation_handle) = - create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await; + create_mock_sender_allocation( + prefix, + SENDER.1, + *ALLOCATION_ID_0, + sender_account.clone(), + ) + .await; assert_eq!( triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst), @@ -1549,7 +1610,7 @@ pub mod tests { .await; let (mock_sender_allocation, next_rav_value) = - MockSenderAllocation::new_with_next_rav_value(); + MockSenderAllocation::new_with_next_rav_value(sender_account.clone()); let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0); let (allocation, allocation_handle) = @@ -1750,7 +1811,7 @@ pub mod tests { .await; let (mock_sender_allocation, next_unaggregated_fees) = - MockSenderAllocation::new_with_next_unaggregated_fees_value(); + MockSenderAllocation::new_with_next_unaggregated_fees_value(sender_account.clone()); let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0); let (allocation, allocation_handle) = MockSenderAllocation::spawn_linked( diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 1504e5f1a..9e8dfb3af 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -12,9 +12,9 @@ use anyhow::{anyhow, ensure, Result}; use bigdecimal::num_bigint::BigInt; use eventuals::Eventual; use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; -use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; +use jsonrpsee::{core::client::ClientT, rpc_params}; use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; -use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; +use ractor::{Actor, ActorProcessingErr, ActorRef}; use sqlx::{types::BigDecimal, PgPool}; use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; use tap_core::{ @@ -117,16 +117,16 @@ pub struct SenderAllocationArgs { pub escrow_subgraph: &'static SubgraphClient, pub escrow_adapter: EscrowAdapter, pub domain_separator: Eip712Domain, - pub sender_aggregator_endpoint: String, pub sender_account_ref: ActorRef, + pub http_client: jsonrpsee::http_client::HttpClient, } #[derive(Debug)] pub enum SenderAllocationMessage { NewReceipt(NewReceiptNotification), - TriggerRAVRequest(RpcReplyPort)>>), + TriggerRAVRequest, #[cfg(test)] - GetUnaggregatedReceipts(RpcReplyPort), + GetUnaggregatedReceipts(ractor::RpcReplyPort), } #[async_trait::async_trait] @@ -250,8 +250,7 @@ impl Actor for SenderAllocation { ))?; } } - // we use a blocking call here to ensure that only one RAV request is running at a time. - SenderAllocationMessage::TriggerRAVRequest(reply) => { + SenderAllocationMessage::TriggerRAVRequest => { let rav_result = if state.unaggregated_fees.value > 0 { state .request_rav() @@ -261,9 +260,12 @@ impl Actor for SenderAllocation { Err(anyhow!("Unaggregated fee equals zero")) }; - if !reply.is_closed() { - let _ = reply.send(rav_result); - } + state + .sender_account_ref + .cast(SenderAccountMessage::UpdateReceiptFees( + state.allocation_id, + ReceiptFees::RavRequestResponse(rav_result), + ))?; } #[cfg(test)] SenderAllocationMessage::GetUnaggregatedReceipts(reply) => { @@ -288,8 +290,8 @@ impl SenderAllocationState { escrow_subgraph, escrow_adapter, domain_separator, - sender_aggregator_endpoint, sender_account_ref, + http_client, }: SenderAllocationArgs, ) -> anyhow::Result { let required_checks: Vec> = vec![ @@ -318,10 +320,6 @@ impl SenderAllocationState { CheckList::new(required_checks), ); - let http_client = HttpClientBuilder::default() - .request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs)) - .build(&sender_aggregator_endpoint)?; - Ok(Self { pgpool, tap_manager, @@ -805,6 +803,7 @@ pub mod tests { escrow_accounts::EscrowAccounts, subgraph_client::{DeploymentDetails, SubgraphClient}, }; + use jsonrpsee::http_client::HttpClientBuilder; use ractor::{ call, cast, concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef, ActorStatus, }; @@ -916,6 +915,9 @@ pub mod tests { None => create_mock_sender_account().await.1, }; + let http_client = HttpClientBuilder::default() + .build(&sender_aggregator_endpoint) + .unwrap(); SenderAllocationArgs { config, pgpool: pgpool.clone(), @@ -925,8 +927,8 @@ pub mod tests { escrow_subgraph, escrow_adapter, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), - sender_aggregator_endpoint, sender_account_ref, + http_client, } } @@ -1143,27 +1145,37 @@ pub mod tests { .await; // Trigger a RAV request manually and wait for updated fees. - let (total_unaggregated_fees, _rav) = call!( + sender_allocation + .cast(SenderAllocationMessage::TriggerRAVRequest) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + let total_unaggregated_fees = call!( sender_allocation, - SenderAllocationMessage::TriggerRAVRequest + SenderAllocationMessage::GetUnaggregatedReceipts ) - .unwrap() .unwrap(); // Check that the unaggregated fees are correct. assert_eq!(total_unaggregated_fees.value, 0u128); - // Check if the sender received invalid receipt fees - let expected_message = SenderAccountMessage::UpdateInvalidReceiptFees( - *ALLOCATION_ID_0, - UnaggregatedReceipts { - last_id: 0, - value: 45u128, - }, - ); { - let last_message_emitted = last_message_emitted.lock().unwrap(); - assert_eq!(last_message_emitted.last(), Some(&expected_message)); + let mut last_message_emitted = last_message_emitted.lock().unwrap(); + assert!(matches!( + last_message_emitted.pop().unwrap(), + SenderAccountMessage::UpdateReceiptFees(..) + )); + + // Check if the sender received invalid receipt fees + let expected_message = SenderAccountMessage::UpdateInvalidReceiptFees( + *ALLOCATION_ID_0, + UnaggregatedReceipts { + last_id: 0, + value: 45u128, + }, + ); + assert_eq!(last_message_emitted.pop(), Some(expected_message)); } // Stop the TAP aggregator server. @@ -1457,17 +1469,39 @@ pub mod tests { .unwrap(); } - // Create a sender_allocation. - let sender_allocation = - create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None).await; + let (last_message_emitted, sender_account, _join_handle) = + create_mock_sender_account().await; - let rav_response = call!( - sender_allocation, - SenderAllocationMessage::TriggerRAVRequest + // Create a sender_allocation. + let sender_allocation = create_sender_allocation( + pgpool.clone(), + DUMMY_URL.to_string(), + DUMMY_URL, + Some(sender_account), ) - .unwrap(); + .await; + + // Trigger a RAV request manually and wait for updated fees. + // this should fail because there's no receipt with valid timestamp + sender_allocation + .cast(SenderAllocationMessage::TriggerRAVRequest) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + // If it is an error then rav request failed - assert!(rav_response.is_err()); + { + let mut last_message = last_message_emitted.lock().unwrap(); + match last_message.pop() { + Some(SenderAccountMessage::UpdateReceiptFees( + _, + ReceiptFees::RavRequestResponse(rav_response), + )) => { + assert!(rav_response.is_err()); + } + v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"), + } + } // expect the actor to keep running assert_eq!(sender_allocation.get_status(), ActorStatus::Running); @@ -1526,22 +1560,37 @@ pub mod tests { .unwrap(); } + let (last_message_emitted, sender_account, _join_handle) = + create_mock_sender_account().await; + let sender_allocation = create_sender_allocation( pgpool.clone(), "http://".to_owned() + &aggregator_endpoint.to_string(), &mock_server.uri(), - None, + Some(sender_account), ) .await; // Trigger a RAV request manually and wait for updated fees. // this should fail because there's no receipt with valid timestamp - let rav_response = call!( - sender_allocation, - SenderAllocationMessage::TriggerRAVRequest - ) - .unwrap(); + sender_allocation + .cast(SenderAllocationMessage::TriggerRAVRequest) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + // If it is an error then rav request failed - assert!(rav_response.is_err()); + { + let mut last_message = last_message_emitted.lock().unwrap(); + match last_message.pop() { + Some(SenderAccountMessage::UpdateReceiptFees( + _, + ReceiptFees::RavRequestResponse(rav_response), + )) => { + assert!(rav_response.is_err()); + } + v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"), + } + } let invalid_receipts = sqlx::query!( r#" From 97b56790f577679d580ea50e86eeed2bccccbe34 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 17 Oct 2024 18:55:04 +0200 Subject: [PATCH 2/8] refactor: rename to sender_aggregator Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_account.rs | 8 ++++---- tap-agent/src/agent/sender_allocation.rs | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index e1795de8c..132314e71 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -151,7 +151,7 @@ pub struct State { domain_separator: Eip712Domain, config: &'static config::Config, pgpool: PgPool, - http_client: jsonrpsee::http_client::HttpClient, + sender_aggregator: jsonrpsee::http_client::HttpClient, } impl State { @@ -175,7 +175,7 @@ impl State { escrow_adapter: self.escrow_adapter.clone(), domain_separator: self.domain_separator.clone(), sender_account_ref: sender_account_ref.clone(), - http_client: self.http_client.clone(), + sender_aggregator: self.sender_aggregator.clone(), }; SenderAllocation::spawn_linked( @@ -446,7 +446,7 @@ impl Actor for SenderAccount { .with_label_values(&[&sender_id.to_string()]) .set(config.tap.rav_request_trigger_value as f64); - let http_client = HttpClientBuilder::default() + let sender_aggregator = HttpClientBuilder::default() .request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs)) .build(&sender_aggregator_endpoint)?; @@ -464,7 +464,7 @@ impl Actor for SenderAccount { escrow_subgraph, escrow_adapter, domain_separator, - http_client, + sender_aggregator, config, pgpool, sender: sender_id, diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 9e8dfb3af..e408db558 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -105,7 +105,7 @@ pub struct SenderAllocationState { domain_separator: Eip712Domain, sender_account_ref: ActorRef, - http_client: jsonrpsee::http_client::HttpClient, + sender_aggregator: jsonrpsee::http_client::HttpClient, } pub struct SenderAllocationArgs { @@ -118,7 +118,7 @@ pub struct SenderAllocationArgs { pub escrow_adapter: EscrowAdapter, pub domain_separator: Eip712Domain, pub sender_account_ref: ActorRef, - pub http_client: jsonrpsee::http_client::HttpClient, + pub sender_aggregator: jsonrpsee::http_client::HttpClient, } #[derive(Debug)] @@ -291,7 +291,7 @@ impl SenderAllocationState { escrow_adapter, domain_separator, sender_account_ref, - http_client, + sender_aggregator, }: SenderAllocationArgs, ) -> anyhow::Result { let required_checks: Vec> = vec![ @@ -332,7 +332,7 @@ impl SenderAllocationState { unaggregated_fees: UnaggregatedReceipts::default(), invalid_receipts_fees: UnaggregatedReceipts::default(), latest_rav, - http_client, + sender_aggregator, }) } @@ -510,7 +510,7 @@ impl SenderAllocationState { .collect(); let rav_response_time_start = Instant::now(); let response: JsonRpcResponse> = self - .http_client + .sender_aggregator .request( "aggregate_receipts", rpc_params!( @@ -915,7 +915,7 @@ pub mod tests { None => create_mock_sender_account().await.1, }; - let http_client = HttpClientBuilder::default() + let sender_aggregator = HttpClientBuilder::default() .build(&sender_aggregator_endpoint) .unwrap(); SenderAllocationArgs { @@ -928,7 +928,7 @@ pub mod tests { escrow_adapter, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), sender_account_ref, - http_client, + sender_aggregator, } } From ab6fbda8b6eb3bb6b98d92a0dc876790e3f9230a Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 17 Oct 2024 18:57:30 +0200 Subject: [PATCH 3/8] refactor: return as an error Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_account.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 132314e71..c8fc9b53c 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -217,9 +217,13 @@ impl State { "Error while getting allocation actor {allocation_id} with most unaggregated fees" ); }; - let _ = allocation.cast(SenderAllocationMessage::TriggerRAVRequest); - - Ok(()) + allocation + .cast(SenderAllocationMessage::TriggerRAVRequest) + .map_err(|e| { + anyhow::anyhow!( + "Error while sending and waiting message for actor {allocation_id}. Error: {e}" + ) + }) } fn deny_condition_reached(&self) -> bool { From 3625028e9b6bbaa97c7241be591fb68971a0193d Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 17 Oct 2024 20:07:21 +0200 Subject: [PATCH 4/8] fix: track ongoing requests Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_account.rs | 7 ++++++- tap-agent/src/agent/sender_fee_tracker.rs | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index c8fc9b53c..7248c6680 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -217,13 +217,17 @@ impl State { "Error while getting allocation actor {allocation_id} with most unaggregated fees" ); }; + allocation .cast(SenderAllocationMessage::TriggerRAVRequest) .map_err(|e| { anyhow::anyhow!( "Error while sending and waiting message for actor {allocation_id}. Error: {e}" ) - }) + })?; + self.sender_fee_tracker.start_rav_request(allocation_id); + + Ok(()) } fn deny_condition_reached(&self) -> bool { @@ -569,6 +573,7 @@ impl Actor for SenderAccount { .add(value as f64); } ReceiptFees::RavRequestResponse(rav_result) => { + state.sender_fee_tracker.finish_rav_request(allocation_id); match rav_result { Ok((fees, rav)) => { state.rav_tracker.ok_rav_request(allocation_id); diff --git a/tap-agent/src/agent/sender_fee_tracker.rs b/tap-agent/src/agent/sender_fee_tracker.rs index 1a57b38a9..d9c8b2ebe 100644 --- a/tap-agent/src/agent/sender_fee_tracker.rs +++ b/tap-agent/src/agent/sender_fee_tracker.rs @@ -34,6 +34,9 @@ pub struct SenderFeeTracker { id_to_fee: HashMap, total_fee: u128, + fees_requesting: u128, + ids_requesting: HashSet
, + buffer_window_fee: HashMap, buffer_window_duration: Duration, // there are some allocations that we don't want it to be @@ -120,6 +123,7 @@ impl SenderFeeTracker { self.id_to_fee .iter() .filter(|(addr, _)| !self.blocked_addresses.contains(*addr)) + .filter(|(addr, _)| !self.ids_requesting.contains(*addr)) .filter(|(addr, _)| { self.failed_ravs .get(*addr) @@ -171,6 +175,20 @@ impl SenderFeeTracker { acc + expiring.get_sum(&self.buffer_window_duration) }) } + + pub fn start_rav_request(&mut self, allocation_id: Address) { + let current_fee = self.id_to_fee.entry(allocation_id).or_default(); + self.ids_requesting.insert(allocation_id); + self.fees_requesting += *current_fee; + } + + /// Should be called before `update` + pub fn finish_rav_request(&mut self, allocation_id: Address) { + let current_fee = self.id_to_fee.entry(allocation_id).or_default(); + self.fees_requesting -= *current_fee; + self.ids_requesting.remove(&allocation_id); + } + pub fn failed_rav_backoff(&mut self, allocation_id: Address) { // backoff = max(100ms * 2 ^ retries, 60s) let failed_rav = self.failed_ravs.entry(allocation_id).or_default(); From bb4b9fb400fb1c98656e4c66f8c0eba01f25a0d9 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 17 Oct 2024 20:31:08 +0200 Subject: [PATCH 5/8] test: delay sleep duration Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_allocation.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index e408db558..7ea1d094d 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -1570,17 +1570,20 @@ pub mod tests { Some(sender_account), ) .await; + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Trigger a RAV request manually and wait for updated fees. // this should fail because there's no receipt with valid timestamp sender_allocation .cast(SenderAllocationMessage::TriggerRAVRequest) .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; // If it is an error then rav request failed { let mut last_message = last_message_emitted.lock().unwrap(); + println!("{:?}", *last_message); match last_message.pop() { Some(SenderAccountMessage::UpdateReceiptFees( _, From dcb80a4994e47836be824d48e393125d947b22bf Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 17 Oct 2024 20:46:03 +0200 Subject: [PATCH 6/8] test: add tracker test for ongoing ravs Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_fee_tracker.rs | 35 +++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/tap-agent/src/agent/sender_fee_tracker.rs b/tap-agent/src/agent/sender_fee_tracker.rs index d9c8b2ebe..5f83652cf 100644 --- a/tap-agent/src/agent/sender_fee_tracker.rs +++ b/tap-agent/src/agent/sender_fee_tracker.rs @@ -161,11 +161,11 @@ impl SenderFeeTracker { } pub fn get_total_fee(&self) -> u128 { - self.total_fee + self.total_fee - self.fees_requesting } pub fn get_total_fee_outside_buffer(&mut self) -> u128 { - self.total_fee - self.get_buffer_fee().min(self.total_fee) + self.get_total_fee() - self.get_buffer_fee().min(self.total_fee) } pub fn get_buffer_fee(&mut self) -> u128 { @@ -371,4 +371,35 @@ mod tests { assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); assert_eq!(tracker.get_total_fee(), 30); } + + #[test] + fn test_ongoing_rav_requests() { + let allocation_id_0 = address!("abababababababababababababababababababab"); + let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); + let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd"); + + let mut tracker = SenderFeeTracker::default(); + assert_eq!(tracker.get_heaviest_allocation_id(), None); + assert_eq!(tracker.get_total_fee_outside_buffer(), 0); + assert_eq!(tracker.get_total_fee(), 0); + + tracker.add(allocation_id_0, 10); + tracker.add(allocation_id_1, 20); + tracker.add(allocation_id_2, 30); + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 60); + assert_eq!(tracker.get_total_fee_outside_buffer(), 60); + + tracker.start_rav_request(allocation_id_2); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1)); + assert_eq!(tracker.get_total_fee(), 30); + assert_eq!(tracker.get_total_fee_outside_buffer(), 30); + + tracker.finish_rav_request(allocation_id_2); + + assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2)); + assert_eq!(tracker.get_total_fee(), 60); + assert_eq!(tracker.get_total_fee_outside_buffer(), 60); + } } From 9336743f5d52737237d191eedcf5113535c28a46 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 17 Oct 2024 20:55:43 +0200 Subject: [PATCH 7/8] test: add some println stmt Signed-off-by: Gustavo Inacio --- tap-agent/src/agent/sender_allocation.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 7ea1d094d..b2b3afc40 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -251,6 +251,7 @@ impl Actor for SenderAllocation { } } SenderAllocationMessage::TriggerRAVRequest => { + println!("Triggering RAV request!"); let rav_result = if state.unaggregated_fees.value > 0 { state .request_rav() @@ -260,12 +261,15 @@ impl Actor for SenderAllocation { Err(anyhow!("Unaggregated fee equals zero")) }; + println!("Returning the result!"); state .sender_account_ref .cast(SenderAccountMessage::UpdateReceiptFees( state.allocation_id, ReceiptFees::RavRequestResponse(rav_result), ))?; + println!("Finished"); + } #[cfg(test)] SenderAllocationMessage::GetUnaggregatedReceipts(reply) => { From 49d0e47a45733d783e6906c94744017cb418c617 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 17 Oct 2024 21:39:52 +0200 Subject: [PATCH 8/8] test: refactor and use channels Signed-off-by: Gustavo Inacio --- .../src/agent/sender_accounts_manager.rs | 10 +- tap-agent/src/agent/sender_allocation.rs | 199 +++++++++++------- 2 files changed, 131 insertions(+), 78 deletions(-) diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index 135e4dc63..3543d74f6 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -598,8 +598,8 @@ mod tests { use sqlx::postgres::PgListener; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; - use std::sync::{Arc, Mutex}; use std::time::Duration; + use tokio::sync::mpsc; const DUMMY_URL: &str = "http://localhost:1234"; @@ -931,12 +931,12 @@ mod tests { PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) ); - let last_message_emitted = Arc::new(Mutex::new(vec![])); + let (last_message_emitted, mut rx) = mpsc::channel(64); let (sender_account, join_handle) = MockSenderAccount::spawn( Some(format!("{}:{}", prefix.clone(), SENDER.1,)), MockSenderAccount { - last_message_emitted: last_message_emitted.clone(), + last_message_emitted, }, (), ) @@ -958,8 +958,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(10)).await; assert_eq!( - last_message_emitted.lock().unwrap().last().unwrap(), - &SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0) + rx.recv().await.unwrap(), + SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0) ); sender_account.stop_and_wait(None, None).await.unwrap(); join_handle.await.unwrap(); diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index b2b3afc40..34601f200 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -251,7 +251,6 @@ impl Actor for SenderAllocation { } } SenderAllocationMessage::TriggerRAVRequest => { - println!("Triggering RAV request!"); let rav_result = if state.unaggregated_fees.value > 0 { state .request_rav() @@ -261,15 +260,12 @@ impl Actor for SenderAllocation { Err(anyhow!("Unaggregated fee equals zero")) }; - println!("Returning the result!"); state .sender_account_ref .cast(SenderAccountMessage::UpdateReceiptFees( state.allocation_id, ReceiptFees::RavRequestResponse(rav_result), ))?; - println!("Finished"); - } #[cfg(test)] SenderAllocationMessage::GetUnaggregatedReceipts(reply) => { @@ -816,7 +812,7 @@ pub mod tests { use sqlx::PgPool; use std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; use tap_aggregator::{jsonrpsee_helpers::JsonRpcResponse, server::run_server}; @@ -825,6 +821,7 @@ pub mod tests { state::Checking, ReceiptWithState, }; + use tokio::sync::mpsc; use wiremock::{ matchers::{body_string_contains, method}, Mock, MockServer, Respond, ResponseTemplate, @@ -833,7 +830,7 @@ pub mod tests { const DUMMY_URL: &str = "http://localhost:1234"; pub struct MockSenderAccount { - pub last_message_emitted: Arc>>, + pub last_message_emitted: tokio::sync::mpsc::Sender, } #[async_trait::async_trait] @@ -856,17 +853,17 @@ pub mod tests { message: Self::Msg, _state: &mut Self::State, ) -> std::result::Result<(), ActorProcessingErr> { - self.last_message_emitted.lock().unwrap().push(message); + self.last_message_emitted.send(message).await.unwrap(); Ok(()) } } async fn create_mock_sender_account() -> ( - Arc>>, + mpsc::Receiver, ActorRef, JoinHandle<()>, ) { - let last_message_emitted = Arc::new(Mutex::new(vec![])); + let (last_message_emitted, rx) = mpsc::channel(64); let (sender_account, join_handle) = MockSenderAccount::spawn( None, @@ -877,7 +874,7 @@ pub mod tests { ) .await .unwrap(); - (last_message_emitted, sender_account, join_handle) + (rx, sender_account, join_handle) } async fn create_sender_allocation_args( @@ -959,7 +956,7 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn should_update_unaggregated_fees_on_start(pgpool: PgPool) { - let (last_message_emitted, sender_account, _join_handle) = + let (mut last_message_emitted, sender_account, _join_handle) = create_mock_sender_account().await; // Add receipts to the database. for i in 1..=10 { @@ -992,9 +989,8 @@ pub mod tests { value: 55u128, }), ); - let last_message_emitted = last_message_emitted.lock().unwrap(); - assert_eq!(last_message_emitted.len(), 1); - assert_eq!(last_message_emitted.last(), Some(&expected_message)); + let last_message_emitted = last_message_emitted.recv().await.unwrap(); + assert_eq!(last_message_emitted, expected_message); // Check that the unaggregated fees are correct. assert_eq!(total_unaggregated_fees.value, 55u128); @@ -1002,7 +998,7 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn should_return_invalid_receipts_on_startup(pgpool: PgPool) { - let (last_message_emitted, sender_account, _join_handle) = + let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; // Add receipts to the database. for i in 1..=10 { @@ -1035,9 +1031,16 @@ pub mod tests { value: 55u128, }, ); - let last_message_emitted = last_message_emitted.lock().unwrap(); - assert_eq!(last_message_emitted.len(), 2); - assert_eq!(last_message_emitted.first(), Some(&expected_message)); + let update_invalid_msg = message_receiver.recv().await.unwrap(); + assert_eq!(update_invalid_msg, expected_message); + let last_message_emitted = message_receiver.recv().await.unwrap(); + assert_eq!( + last_message_emitted, + SenderAccountMessage::UpdateReceiptFees( + *ALLOCATION_ID_0, + ReceiptFees::UpdateValue(UnaggregatedReceipts::default()) + ) + ); // Check that the unaggregated fees are correct. assert_eq!(total_unaggregated_fees.value, 0u128); @@ -1045,7 +1048,7 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_receive_new_receipt(pgpool: PgPool) { - let (last_message_emitted, sender_account, _join_handle) = + let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; let sender_allocation = create_sender_allocation( @@ -1088,9 +1091,19 @@ pub mod tests { *ALLOCATION_ID_0, ReceiptFees::NewReceipt(20u128), ); - let last_message_emitted = last_message_emitted.lock().unwrap(); - assert_eq!(last_message_emitted.len(), 2); - assert_eq!(last_message_emitted.last(), Some(&expected_message)); + let startup_load_msg = message_receiver.recv().await.unwrap(); + assert_eq!( + startup_load_msg, + SenderAccountMessage::UpdateReceiptFees( + *ALLOCATION_ID_0, + ReceiptFees::UpdateValue(UnaggregatedReceipts { + value: 0, + last_id: 0 + }) + ) + ); + let last_message_emitted = message_receiver.recv().await.unwrap(); + assert_eq!(last_message_emitted, expected_message); } #[sqlx::test(migrations = "../migrations")] @@ -1136,7 +1149,7 @@ pub mod tests { .unwrap(); } - let (last_message_emitted, sender_account, _join_handle) = + let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; // Create a sender_allocation. @@ -1164,23 +1177,32 @@ pub mod tests { // Check that the unaggregated fees are correct. assert_eq!(total_unaggregated_fees.value, 0u128); - { - let mut last_message_emitted = last_message_emitted.lock().unwrap(); - assert!(matches!( - last_message_emitted.pop().unwrap(), - SenderAccountMessage::UpdateReceiptFees(..) - )); - - // Check if the sender received invalid receipt fees - let expected_message = SenderAccountMessage::UpdateInvalidReceiptFees( + let startup_msg = message_receiver.recv().await.unwrap(); + assert_eq!( + startup_msg, + SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, - UnaggregatedReceipts { - last_id: 0, - value: 45u128, - }, - ); - assert_eq!(last_message_emitted.pop(), Some(expected_message)); - } + ReceiptFees::UpdateValue(UnaggregatedReceipts { + value: 90, + last_id: 20 + }) + ) + ); + + // Check if the sender received invalid receipt fees + let expected_message = SenderAccountMessage::UpdateInvalidReceiptFees( + *ALLOCATION_ID_0, + UnaggregatedReceipts { + last_id: 0, + value: 45u128, + }, + ); + assert_eq!(message_receiver.recv().await.unwrap(), expected_message); + + assert!(matches!( + message_receiver.recv().await.unwrap(), + SenderAccountMessage::UpdateReceiptFees(_, ReceiptFees::RavRequestResponse(_)) + )); // Stop the TAP aggregator server. handle.stop().unwrap(); @@ -1189,7 +1211,7 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_close_allocation_no_pending_fees(pgpool: PgPool) { - let (last_message_emitted, sender_account, _join_handle) = + let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; // create allocation @@ -1209,11 +1231,11 @@ pub mod tests { // check if message is sent to sender account assert_eq!( - last_message_emitted.lock().unwrap().last(), - Some(&SenderAccountMessage::UpdateReceiptFees( + message_receiver.recv().await.unwrap(), + SenderAccountMessage::UpdateReceiptFees( *ALLOCATION_ID_0, ReceiptFees::UpdateValue(UnaggregatedReceipts::default()) - )) + ) ); } @@ -1473,7 +1495,7 @@ pub mod tests { .unwrap(); } - let (last_message_emitted, sender_account, _join_handle) = + let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; // Create a sender_allocation. @@ -1494,17 +1516,27 @@ pub mod tests { tokio::time::sleep(std::time::Duration::from_millis(20)).await; // If it is an error then rav request failed - { - let mut last_message = last_message_emitted.lock().unwrap(); - match last_message.pop() { - Some(SenderAccountMessage::UpdateReceiptFees( - _, - ReceiptFees::RavRequestResponse(rav_response), - )) => { - assert!(rav_response.is_err()); - } - v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"), + + let startup_msg = message_receiver.recv().await.unwrap(); + assert_eq!( + startup_msg, + SenderAccountMessage::UpdateReceiptFees( + *ALLOCATION_ID_0, + ReceiptFees::UpdateValue(UnaggregatedReceipts { + value: 45, + last_id: 10 + }) + ) + ); + let rav_response_message = message_receiver.recv().await.unwrap(); + match rav_response_message { + SenderAccountMessage::UpdateReceiptFees( + _, + ReceiptFees::RavRequestResponse(rav_response), + ) => { + assert!(rav_response.is_err()); } + v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"), } // expect the actor to keep running @@ -1550,21 +1582,19 @@ pub mod tests { .expect("Time went backwards") .as_nanos() as u64 - 10000; + const RECEIPT_VALUE: u128 = 1622018441284756158; + const TOTAL_RECEIPTS: u64 = 10; + const TOTAL_SUM: u128 = RECEIPT_VALUE * TOTAL_RECEIPTS as u128; - for i in 0..10 { - let receipt = create_received_receipt( - &ALLOCATION_ID_0, - &SIGNER.0, - i, - timestamp, - 1622018441284756158, - ); + for i in 0..TOTAL_RECEIPTS { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, timestamp, RECEIPT_VALUE); store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); } - let (last_message_emitted, sender_account, _join_handle) = + let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; let sender_allocation = create_sender_allocation( @@ -1585,18 +1615,41 @@ pub mod tests { tokio::time::sleep(std::time::Duration::from_millis(100)).await; // If it is an error then rav request failed - { - let mut last_message = last_message_emitted.lock().unwrap(); - println!("{:?}", *last_message); - match last_message.pop() { - Some(SenderAccountMessage::UpdateReceiptFees( - _, - ReceiptFees::RavRequestResponse(rav_response), - )) => { - assert!(rav_response.is_err()); + + let startup_msg = message_receiver.recv().await.unwrap(); + assert_eq!( + startup_msg, + SenderAccountMessage::UpdateReceiptFees( + *ALLOCATION_ID_0, + ReceiptFees::UpdateValue(UnaggregatedReceipts { + value: 16220184412847561580, + last_id: 10 + }) + ) + ); + + let invalid_receipts = message_receiver.recv().await.unwrap(); + + assert_eq!( + invalid_receipts, + SenderAccountMessage::UpdateInvalidReceiptFees( + *ALLOCATION_ID_0, + UnaggregatedReceipts { + value: TOTAL_SUM, + last_id: 0 } - v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"), + ) + ); + + let rav_response_message = message_receiver.recv().await.unwrap(); + match rav_response_message { + SenderAccountMessage::UpdateReceiptFees( + _, + ReceiptFees::RavRequestResponse(rav_response), + ) => { + assert!(rav_response.is_err()); } + v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"), } let invalid_receipts = sqlx::query!(