diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs
index bbf295e53..7248c6680 100644
--- a/tap-agent/src/agent/sender_account.rs
+++ b/tap-agent/src/agent/sender_account.rs
@@ -6,6 +6,7 @@ use alloy::primitives::U256;
use bigdecimal::num_bigint::ToBigInt;
use bigdecimal::ToPrimitive;
use graphql_client::GraphQLQuery;
+use jsonrpsee::http_client::HttpClientBuilder;
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
@@ -17,7 +18,7 @@ use alloy::primitives::Address;
use anyhow::Result;
use eventuals::{Eventual, EventualExt, PipeHandle};
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
-use ractor::{call, Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
+use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
use sqlx::PgPool;
use tap_core::rav::SignedRAV;
use tracing::{error, Level};
@@ -76,10 +77,11 @@ lazy_static! {
type RavMap = HashMap
;
type Balance = U256;
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Debug)]
pub enum ReceiptFees {
NewReceipt(u128),
UpdateValue(UnaggregatedReceipts),
+ RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option)>),
Retry,
}
@@ -149,7 +151,7 @@ pub struct State {
domain_separator: Eip712Domain,
config: &'static config::Config,
pgpool: PgPool,
- sender_aggregator_endpoint: String,
+ sender_aggregator: jsonrpsee::http_client::HttpClient,
}
impl State {
@@ -172,8 +174,8 @@ impl State {
escrow_subgraph: self.escrow_subgraph,
escrow_adapter: self.escrow_adapter.clone(),
domain_separator: self.domain_separator.clone(),
- sender_aggregator_endpoint: self.sender_aggregator_endpoint.clone(),
sender_account_ref: sender_account_ref.clone(),
+ sender_aggregator: self.sender_aggregator.clone(),
};
SenderAllocation::spawn_linked(
@@ -215,38 +217,16 @@ impl State {
"Error while getting allocation actor {allocation_id} with most unaggregated fees"
);
};
- // we call and wait for the response so we don't process anymore update
- let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
- anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
- };
- let (fees, rav) = match rav_result {
- Ok(ok_value) => {
- self.rav_tracker.ok_rav_request(allocation_id);
- ok_value
- }
- Err(err) => {
- self.rav_tracker.failed_rav_backoff(allocation_id);
- anyhow::bail!(
- "Error while requesting RAV for sender {} and allocation {}: {}",
- self.sender,
- allocation_id,
- err
- );
- }
- };
- let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
- // update rav tracker
- self.rav_tracker.update(allocation_id, rav_value);
- PENDING_RAV
- .with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
- .set(rav_value as f64);
-
- // update sender fee tracker
- self.sender_fee_tracker.update(allocation_id, fees.value);
- UNAGGREGATED_FEES
- .with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
- .set(fees.value as f64);
+ allocation
+ .cast(SenderAllocationMessage::TriggerRAVRequest)
+ .map_err(|e| {
+ anyhow::anyhow!(
+ "Error while sending and waiting message for actor {allocation_id}. Error: {e}"
+ )
+ })?;
+ self.sender_fee_tracker.start_rav_request(allocation_id);
+
Ok(())
}
@@ -474,6 +454,10 @@ impl Actor for SenderAccount {
.with_label_values(&[&sender_id.to_string()])
.set(config.tap.rav_request_trigger_value as f64);
+ let sender_aggregator = HttpClientBuilder::default()
+ .request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs))
+ .build(&sender_aggregator_endpoint)?;
+
let state = State {
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
config.tap.rav_request_timestamp_buffer_ms,
@@ -488,7 +472,7 @@ impl Actor for SenderAccount {
escrow_subgraph,
escrow_adapter,
domain_separator,
- sender_aggregator_endpoint,
+ sender_aggregator,
config,
pgpool,
sender: sender_id,
@@ -588,6 +572,42 @@ impl Actor for SenderAccount {
])
.add(value as f64);
}
+ ReceiptFees::RavRequestResponse(rav_result) => {
+ state.sender_fee_tracker.finish_rav_request(allocation_id);
+ match rav_result {
+ Ok((fees, rav)) => {
+ state.rav_tracker.ok_rav_request(allocation_id);
+
+ let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
+ // update rav tracker
+ state.rav_tracker.update(allocation_id, rav_value);
+ PENDING_RAV
+ .with_label_values(&[
+ &state.sender.to_string(),
+ &allocation_id.to_string(),
+ ])
+ .set(rav_value as f64);
+
+ // update sender fee tracker
+ state.sender_fee_tracker.update(allocation_id, fees.value);
+ UNAGGREGATED_FEES
+ .with_label_values(&[
+ &state.sender.to_string(),
+ &allocation_id.to_string(),
+ ])
+ .set(fees.value as f64);
+ }
+ Err(err) => {
+ state.rav_tracker.failed_rav_backoff(allocation_id);
+ error!(
+ "Error while requesting RAV for sender {} and allocation {}: {}",
+ state.sender,
+ allocation_id,
+ err
+ );
+ }
+ };
+ }
ReceiptFees::UpdateValue(unaggregated_fees) => {
state
.sender_fee_tracker
@@ -891,7 +911,21 @@ pub mod tests {
match (self, other) {
(Self::UpdateAllocationIds(l0), Self::UpdateAllocationIds(r0)) => l0 == r0,
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
- l0 == r0 && l1 == r1
+ l0 == r0
+ && match (l1, r1) {
+ (ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l,
+ (ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l,
+ (
+ ReceiptFees::RavRequestResponse(l),
+ ReceiptFees::RavRequestResponse(r),
+ ) => match (l, r) {
+ (Ok(l), Ok(r)) => l == r,
+ (Err(l), Err(r)) => l.to_string() == r.to_string(),
+ _ => false,
+ },
+ (ReceiptFees::Retry, ReceiptFees::Retry) => true,
+ _ => false,
+ }
}
(
Self::UpdateInvalidReceiptFees(l0, l1),
@@ -1072,13 +1106,18 @@ pub mod tests {
next_rav_value: Arc>,
next_unaggregated_fees_value: Arc>,
receipts: Arc>>,
+
+ sender_actor: Option>,
}
impl MockSenderAllocation {
- pub fn new_with_triggered_rav_request() -> (Self, Arc, Arc>) {
+ pub fn new_with_triggered_rav_request(
+ sender_actor: ActorRef,
+ ) -> (Self, Arc, Arc>) {
let triggered_rav_request = Arc::new(AtomicU32::new(0));
let unaggregated_fees = Arc::new(Mutex::new(0));
(
Self {
+ sender_actor: Some(sender_actor),
triggered_rav_request: triggered_rav_request.clone(),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1089,10 +1128,13 @@ pub mod tests {
)
}
- pub fn new_with_next_unaggregated_fees_value() -> (Self, Arc>) {
+ pub fn new_with_next_unaggregated_fees_value(
+ sender_actor: ActorRef,
+ ) -> (Self, Arc>) {
let unaggregated_fees = Arc::new(Mutex::new(0));
(
Self {
+ sender_actor: Some(sender_actor),
triggered_rav_request: Arc::new(AtomicU32::new(0)),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1102,10 +1144,13 @@ pub mod tests {
)
}
- pub fn new_with_next_rav_value() -> (Self, Arc>) {
+ pub fn new_with_next_rav_value(
+ sender_actor: ActorRef,
+ ) -> (Self, Arc>) {
let next_rav_value = Arc::new(Mutex::new(0));
(
Self {
+ sender_actor: Some(sender_actor),
triggered_rav_request: Arc::new(AtomicU32::new(0)),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: next_rav_value.clone(),
@@ -1119,6 +1164,7 @@ pub mod tests {
let receipts = Arc::new(Mutex::new(Vec::new()));
(
Self {
+ sender_actor: None,
triggered_rav_request: Arc::new(AtomicU32::new(0)),
receipts: receipts.clone(),
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1150,7 +1196,7 @@ pub mod tests {
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
- SenderAllocationMessage::TriggerRAVRequest(reply) => {
+ SenderAllocationMessage::TriggerRAVRequest => {
self.triggered_rav_request
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let signed_rav = create_rav(
@@ -1159,13 +1205,18 @@ pub mod tests {
4,
*self.next_rav_value.lock().unwrap(),
);
- reply.send(Ok((
- UnaggregatedReceipts {
- value: *self.next_unaggregated_fees_value.lock().unwrap(),
- last_id: 0,
- },
- Some(signed_rav),
- )))?;
+ if let Some(sender_account) = self.sender_actor.as_ref() {
+ sender_account.cast(SenderAccountMessage::UpdateReceiptFees(
+ *ALLOCATION_ID_0,
+ ReceiptFees::RavRequestResponse(Ok((
+ UnaggregatedReceipts {
+ value: *self.next_unaggregated_fees_value.lock().unwrap(),
+ last_id: 0,
+ },
+ Some(signed_rav),
+ ))),
+ ))?;
+ }
}
SenderAllocationMessage::NewReceipt(receipt) => {
self.receipts.lock().unwrap().push(receipt);
@@ -1180,6 +1231,7 @@ pub mod tests {
prefix: String,
sender: Address,
allocation: Address,
+ sender_actor: ActorRef,
) -> (
Arc,
Arc>,
@@ -1187,7 +1239,7 @@ pub mod tests {
JoinHandle<()>,
) {
let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) =
- MockSenderAllocation::new_with_triggered_rav_request();
+ MockSenderAllocation::new_with_triggered_rav_request(sender_actor);
let name = format!("{}:{}:{}", prefix, sender, allocation);
let (sender_account, join_handle) =
@@ -1214,7 +1266,13 @@ pub mod tests {
.await;
let (triggered_rav_request, _, allocation, allocation_handle) =
- create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
+ create_mock_sender_allocation(
+ prefix,
+ SENDER.1,
+ *ALLOCATION_ID_0,
+ sender_account.clone(),
+ )
+ .await;
// create a fake sender allocation
sender_account
@@ -1250,7 +1308,13 @@ pub mod tests {
.await;
let (triggered_rav_request, _, allocation, allocation_handle) =
- create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
+ create_mock_sender_allocation(
+ prefix,
+ SENDER.1,
+ *ALLOCATION_ID_0,
+ sender_account.clone(),
+ )
+ .await;
// create a fake sender allocation
sender_account
@@ -1372,7 +1436,13 @@ pub mod tests {
.await;
let (triggered_rav_request, next_value, allocation, allocation_handle) =
- create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
+ create_mock_sender_allocation(
+ prefix,
+ SENDER.1,
+ *ALLOCATION_ID_0,
+ sender_account.clone(),
+ )
+ .await;
assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
@@ -1549,7 +1619,7 @@ pub mod tests {
.await;
let (mock_sender_allocation, next_rav_value) =
- MockSenderAllocation::new_with_next_rav_value();
+ MockSenderAllocation::new_with_next_rav_value(sender_account.clone());
let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
let (allocation, allocation_handle) =
@@ -1750,7 +1820,7 @@ pub mod tests {
.await;
let (mock_sender_allocation, next_unaggregated_fees) =
- MockSenderAllocation::new_with_next_unaggregated_fees_value();
+ MockSenderAllocation::new_with_next_unaggregated_fees_value(sender_account.clone());
let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
let (allocation, allocation_handle) = MockSenderAllocation::spawn_linked(
diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs
index 135e4dc63..3543d74f6 100644
--- a/tap-agent/src/agent/sender_accounts_manager.rs
+++ b/tap-agent/src/agent/sender_accounts_manager.rs
@@ -598,8 +598,8 @@ mod tests {
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
- use std::sync::{Arc, Mutex};
use std::time::Duration;
+ use tokio::sync::mpsc;
const DUMMY_URL: &str = "http://localhost:1234";
@@ -931,12 +931,12 @@ mod tests {
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
);
- let last_message_emitted = Arc::new(Mutex::new(vec![]));
+ let (last_message_emitted, mut rx) = mpsc::channel(64);
let (sender_account, join_handle) = MockSenderAccount::spawn(
Some(format!("{}:{}", prefix.clone(), SENDER.1,)),
MockSenderAccount {
- last_message_emitted: last_message_emitted.clone(),
+ last_message_emitted,
},
(),
)
@@ -958,8 +958,8 @@ mod tests {
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(
- last_message_emitted.lock().unwrap().last().unwrap(),
- &SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0)
+ rx.recv().await.unwrap(),
+ SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0)
);
sender_account.stop_and_wait(None, None).await.unwrap();
join_handle.await.unwrap();
diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs
index 1504e5f1a..34601f200 100644
--- a/tap-agent/src/agent/sender_allocation.rs
+++ b/tap-agent/src/agent/sender_allocation.rs
@@ -12,9 +12,9 @@ use anyhow::{anyhow, ensure, Result};
use bigdecimal::num_bigint::BigInt;
use eventuals::Eventual;
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
-use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
+use jsonrpsee::{core::client::ClientT, rpc_params};
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};
-use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
+use ractor::{Actor, ActorProcessingErr, ActorRef};
use sqlx::{types::BigDecimal, PgPool};
use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse;
use tap_core::{
@@ -105,7 +105,7 @@ pub struct SenderAllocationState {
domain_separator: Eip712Domain,
sender_account_ref: ActorRef,
- http_client: jsonrpsee::http_client::HttpClient,
+ sender_aggregator: jsonrpsee::http_client::HttpClient,
}
pub struct SenderAllocationArgs {
@@ -117,16 +117,16 @@ pub struct SenderAllocationArgs {
pub escrow_subgraph: &'static SubgraphClient,
pub escrow_adapter: EscrowAdapter,
pub domain_separator: Eip712Domain,
- pub sender_aggregator_endpoint: String,
pub sender_account_ref: ActorRef,
+ pub sender_aggregator: jsonrpsee::http_client::HttpClient,
}
#[derive(Debug)]
pub enum SenderAllocationMessage {
NewReceipt(NewReceiptNotification),
- TriggerRAVRequest(RpcReplyPort)>>),
+ TriggerRAVRequest,
#[cfg(test)]
- GetUnaggregatedReceipts(RpcReplyPort),
+ GetUnaggregatedReceipts(ractor::RpcReplyPort),
}
#[async_trait::async_trait]
@@ -250,8 +250,7 @@ impl Actor for SenderAllocation {
))?;
}
}
- // we use a blocking call here to ensure that only one RAV request is running at a time.
- SenderAllocationMessage::TriggerRAVRequest(reply) => {
+ SenderAllocationMessage::TriggerRAVRequest => {
let rav_result = if state.unaggregated_fees.value > 0 {
state
.request_rav()
@@ -261,9 +260,12 @@ impl Actor for SenderAllocation {
Err(anyhow!("Unaggregated fee equals zero"))
};
- if !reply.is_closed() {
- let _ = reply.send(rav_result);
- }
+ state
+ .sender_account_ref
+ .cast(SenderAccountMessage::UpdateReceiptFees(
+ state.allocation_id,
+ ReceiptFees::RavRequestResponse(rav_result),
+ ))?;
}
#[cfg(test)]
SenderAllocationMessage::GetUnaggregatedReceipts(reply) => {
@@ -288,8 +290,8 @@ impl SenderAllocationState {
escrow_subgraph,
escrow_adapter,
domain_separator,
- sender_aggregator_endpoint,
sender_account_ref,
+ sender_aggregator,
}: SenderAllocationArgs,
) -> anyhow::Result {
let required_checks: Vec> = vec![
@@ -318,10 +320,6 @@ impl SenderAllocationState {
CheckList::new(required_checks),
);
- let http_client = HttpClientBuilder::default()
- .request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs))
- .build(&sender_aggregator_endpoint)?;
-
Ok(Self {
pgpool,
tap_manager,
@@ -334,7 +332,7 @@ impl SenderAllocationState {
unaggregated_fees: UnaggregatedReceipts::default(),
invalid_receipts_fees: UnaggregatedReceipts::default(),
latest_rav,
- http_client,
+ sender_aggregator,
})
}
@@ -512,7 +510,7 @@ impl SenderAllocationState {
.collect();
let rav_response_time_start = Instant::now();
let response: JsonRpcResponse> = self
- .http_client
+ .sender_aggregator
.request(
"aggregate_receipts",
rpc_params!(
@@ -805,6 +803,7 @@ pub mod tests {
escrow_accounts::EscrowAccounts,
subgraph_client::{DeploymentDetails, SubgraphClient},
};
+ use jsonrpsee::http_client::HttpClientBuilder;
use ractor::{
call, cast, concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef, ActorStatus,
};
@@ -813,7 +812,7 @@ pub mod tests {
use sqlx::PgPool;
use std::{
collections::HashMap,
- sync::{Arc, Mutex},
+ sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tap_aggregator::{jsonrpsee_helpers::JsonRpcResponse, server::run_server};
@@ -822,6 +821,7 @@ pub mod tests {
state::Checking,
ReceiptWithState,
};
+ use tokio::sync::mpsc;
use wiremock::{
matchers::{body_string_contains, method},
Mock, MockServer, Respond, ResponseTemplate,
@@ -830,7 +830,7 @@ pub mod tests {
const DUMMY_URL: &str = "http://localhost:1234";
pub struct MockSenderAccount {
- pub last_message_emitted: Arc>>,
+ pub last_message_emitted: tokio::sync::mpsc::Sender,
}
#[async_trait::async_trait]
@@ -853,17 +853,17 @@ pub mod tests {
message: Self::Msg,
_state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
- self.last_message_emitted.lock().unwrap().push(message);
+ self.last_message_emitted.send(message).await.unwrap();
Ok(())
}
}
async fn create_mock_sender_account() -> (
- Arc>>,
+ mpsc::Receiver,
ActorRef,
JoinHandle<()>,
) {
- let last_message_emitted = Arc::new(Mutex::new(vec![]));
+ let (last_message_emitted, rx) = mpsc::channel(64);
let (sender_account, join_handle) = MockSenderAccount::spawn(
None,
@@ -874,7 +874,7 @@ pub mod tests {
)
.await
.unwrap();
- (last_message_emitted, sender_account, join_handle)
+ (rx, sender_account, join_handle)
}
async fn create_sender_allocation_args(
@@ -916,6 +916,9 @@ pub mod tests {
None => create_mock_sender_account().await.1,
};
+ let sender_aggregator = HttpClientBuilder::default()
+ .build(&sender_aggregator_endpoint)
+ .unwrap();
SenderAllocationArgs {
config,
pgpool: pgpool.clone(),
@@ -925,8 +928,8 @@ pub mod tests {
escrow_subgraph,
escrow_adapter,
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
- sender_aggregator_endpoint,
sender_account_ref,
+ sender_aggregator,
}
}
@@ -953,7 +956,7 @@ pub mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn should_update_unaggregated_fees_on_start(pgpool: PgPool) {
- let (last_message_emitted, sender_account, _join_handle) =
+ let (mut last_message_emitted, sender_account, _join_handle) =
create_mock_sender_account().await;
// Add receipts to the database.
for i in 1..=10 {
@@ -986,9 +989,8 @@ pub mod tests {
value: 55u128,
}),
);
- let last_message_emitted = last_message_emitted.lock().unwrap();
- assert_eq!(last_message_emitted.len(), 1);
- assert_eq!(last_message_emitted.last(), Some(&expected_message));
+ let last_message_emitted = last_message_emitted.recv().await.unwrap();
+ assert_eq!(last_message_emitted, expected_message);
// Check that the unaggregated fees are correct.
assert_eq!(total_unaggregated_fees.value, 55u128);
@@ -996,7 +998,7 @@ pub mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn should_return_invalid_receipts_on_startup(pgpool: PgPool) {
- let (last_message_emitted, sender_account, _join_handle) =
+ let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;
// Add receipts to the database.
for i in 1..=10 {
@@ -1029,9 +1031,16 @@ pub mod tests {
value: 55u128,
},
);
- let last_message_emitted = last_message_emitted.lock().unwrap();
- assert_eq!(last_message_emitted.len(), 2);
- assert_eq!(last_message_emitted.first(), Some(&expected_message));
+ let update_invalid_msg = message_receiver.recv().await.unwrap();
+ assert_eq!(update_invalid_msg, expected_message);
+ let last_message_emitted = message_receiver.recv().await.unwrap();
+ assert_eq!(
+ last_message_emitted,
+ SenderAccountMessage::UpdateReceiptFees(
+ *ALLOCATION_ID_0,
+ ReceiptFees::UpdateValue(UnaggregatedReceipts::default())
+ )
+ );
// Check that the unaggregated fees are correct.
assert_eq!(total_unaggregated_fees.value, 0u128);
@@ -1039,7 +1048,7 @@ pub mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn test_receive_new_receipt(pgpool: PgPool) {
- let (last_message_emitted, sender_account, _join_handle) =
+ let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;
let sender_allocation = create_sender_allocation(
@@ -1082,9 +1091,19 @@ pub mod tests {
*ALLOCATION_ID_0,
ReceiptFees::NewReceipt(20u128),
);
- let last_message_emitted = last_message_emitted.lock().unwrap();
- assert_eq!(last_message_emitted.len(), 2);
- assert_eq!(last_message_emitted.last(), Some(&expected_message));
+ let startup_load_msg = message_receiver.recv().await.unwrap();
+ assert_eq!(
+ startup_load_msg,
+ SenderAccountMessage::UpdateReceiptFees(
+ *ALLOCATION_ID_0,
+ ReceiptFees::UpdateValue(UnaggregatedReceipts {
+ value: 0,
+ last_id: 0
+ })
+ )
+ );
+ let last_message_emitted = message_receiver.recv().await.unwrap();
+ assert_eq!(last_message_emitted, expected_message);
}
#[sqlx::test(migrations = "../migrations")]
@@ -1130,7 +1149,7 @@ pub mod tests {
.unwrap();
}
- let (last_message_emitted, sender_account, _join_handle) =
+ let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;
// Create a sender_allocation.
@@ -1143,16 +1162,33 @@ pub mod tests {
.await;
// Trigger a RAV request manually and wait for updated fees.
- let (total_unaggregated_fees, _rav) = call!(
+ sender_allocation
+ .cast(SenderAllocationMessage::TriggerRAVRequest)
+ .unwrap();
+
+ tokio::time::sleep(std::time::Duration::from_millis(20)).await;
+
+ let total_unaggregated_fees = call!(
sender_allocation,
- SenderAllocationMessage::TriggerRAVRequest
+ SenderAllocationMessage::GetUnaggregatedReceipts
)
- .unwrap()
.unwrap();
// Check that the unaggregated fees are correct.
assert_eq!(total_unaggregated_fees.value, 0u128);
+ let startup_msg = message_receiver.recv().await.unwrap();
+ assert_eq!(
+ startup_msg,
+ SenderAccountMessage::UpdateReceiptFees(
+ *ALLOCATION_ID_0,
+ ReceiptFees::UpdateValue(UnaggregatedReceipts {
+ value: 90,
+ last_id: 20
+ })
+ )
+ );
+
// Check if the sender received invalid receipt fees
let expected_message = SenderAccountMessage::UpdateInvalidReceiptFees(
*ALLOCATION_ID_0,
@@ -1161,10 +1197,12 @@ pub mod tests {
value: 45u128,
},
);
- {
- let last_message_emitted = last_message_emitted.lock().unwrap();
- assert_eq!(last_message_emitted.last(), Some(&expected_message));
- }
+ assert_eq!(message_receiver.recv().await.unwrap(), expected_message);
+
+ assert!(matches!(
+ message_receiver.recv().await.unwrap(),
+ SenderAccountMessage::UpdateReceiptFees(_, ReceiptFees::RavRequestResponse(_))
+ ));
// Stop the TAP aggregator server.
handle.stop().unwrap();
@@ -1173,7 +1211,7 @@ pub mod tests {
#[sqlx::test(migrations = "../migrations")]
async fn test_close_allocation_no_pending_fees(pgpool: PgPool) {
- let (last_message_emitted, sender_account, _join_handle) =
+ let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;
// create allocation
@@ -1193,11 +1231,11 @@ pub mod tests {
// check if message is sent to sender account
assert_eq!(
- last_message_emitted.lock().unwrap().last(),
- Some(&SenderAccountMessage::UpdateReceiptFees(
+ message_receiver.recv().await.unwrap(),
+ SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::UpdateValue(UnaggregatedReceipts::default())
- ))
+ )
);
}
@@ -1457,17 +1495,49 @@ pub mod tests {
.unwrap();
}
- // Create a sender_allocation.
- let sender_allocation =
- create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None).await;
+ let (mut message_receiver, sender_account, _join_handle) =
+ create_mock_sender_account().await;
- let rav_response = call!(
- sender_allocation,
- SenderAllocationMessage::TriggerRAVRequest
+ // Create a sender_allocation.
+ let sender_allocation = create_sender_allocation(
+ pgpool.clone(),
+ DUMMY_URL.to_string(),
+ DUMMY_URL,
+ Some(sender_account),
)
- .unwrap();
+ .await;
+
+ // Trigger a RAV request manually and wait for updated fees.
+ // this should fail because there's no receipt with valid timestamp
+ sender_allocation
+ .cast(SenderAllocationMessage::TriggerRAVRequest)
+ .unwrap();
+
+ tokio::time::sleep(std::time::Duration::from_millis(20)).await;
+
// If it is an error then rav request failed
- assert!(rav_response.is_err());
+
+ let startup_msg = message_receiver.recv().await.unwrap();
+ assert_eq!(
+ startup_msg,
+ SenderAccountMessage::UpdateReceiptFees(
+ *ALLOCATION_ID_0,
+ ReceiptFees::UpdateValue(UnaggregatedReceipts {
+ value: 45,
+ last_id: 10
+ })
+ )
+ );
+ let rav_response_message = message_receiver.recv().await.unwrap();
+ match rav_response_message {
+ SenderAccountMessage::UpdateReceiptFees(
+ _,
+ ReceiptFees::RavRequestResponse(rav_response),
+ ) => {
+ assert!(rav_response.is_err());
+ }
+ v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"),
+ }
// expect the actor to keep running
assert_eq!(sender_allocation.get_status(), ActorStatus::Running);
@@ -1512,36 +1582,75 @@ pub mod tests {
.expect("Time went backwards")
.as_nanos() as u64
- 10000;
+ const RECEIPT_VALUE: u128 = 1622018441284756158;
+ const TOTAL_RECEIPTS: u64 = 10;
+ const TOTAL_SUM: u128 = RECEIPT_VALUE * TOTAL_RECEIPTS as u128;
- for i in 0..10 {
- let receipt = create_received_receipt(
- &ALLOCATION_ID_0,
- &SIGNER.0,
- i,
- timestamp,
- 1622018441284756158,
- );
+ for i in 0..TOTAL_RECEIPTS {
+ let receipt =
+ create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, timestamp, RECEIPT_VALUE);
store_receipt(&pgpool, receipt.signed_receipt())
.await
.unwrap();
}
+ let (mut message_receiver, sender_account, _join_handle) =
+ create_mock_sender_account().await;
+
let sender_allocation = create_sender_allocation(
pgpool.clone(),
"http://".to_owned() + &aggregator_endpoint.to_string(),
&mock_server.uri(),
- None,
+ Some(sender_account),
)
.await;
+
+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Trigger a RAV request manually and wait for updated fees.
// this should fail because there's no receipt with valid timestamp
- let rav_response = call!(
- sender_allocation,
- SenderAllocationMessage::TriggerRAVRequest
- )
- .unwrap();
+ sender_allocation
+ .cast(SenderAllocationMessage::TriggerRAVRequest)
+ .unwrap();
+
+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+
// If it is an error then rav request failed
- assert!(rav_response.is_err());
+
+ let startup_msg = message_receiver.recv().await.unwrap();
+ assert_eq!(
+ startup_msg,
+ SenderAccountMessage::UpdateReceiptFees(
+ *ALLOCATION_ID_0,
+ ReceiptFees::UpdateValue(UnaggregatedReceipts {
+ value: 16220184412847561580,
+ last_id: 10
+ })
+ )
+ );
+
+ let invalid_receipts = message_receiver.recv().await.unwrap();
+
+ assert_eq!(
+ invalid_receipts,
+ SenderAccountMessage::UpdateInvalidReceiptFees(
+ *ALLOCATION_ID_0,
+ UnaggregatedReceipts {
+ value: TOTAL_SUM,
+ last_id: 0
+ }
+ )
+ );
+
+ let rav_response_message = message_receiver.recv().await.unwrap();
+ match rav_response_message {
+ SenderAccountMessage::UpdateReceiptFees(
+ _,
+ ReceiptFees::RavRequestResponse(rav_response),
+ ) => {
+ assert!(rav_response.is_err());
+ }
+ v => panic!("Expecting RavRequestResponse as last message, found: {v:?}"),
+ }
let invalid_receipts = sqlx::query!(
r#"
diff --git a/tap-agent/src/agent/sender_fee_tracker.rs b/tap-agent/src/agent/sender_fee_tracker.rs
index 1a57b38a9..5f83652cf 100644
--- a/tap-agent/src/agent/sender_fee_tracker.rs
+++ b/tap-agent/src/agent/sender_fee_tracker.rs
@@ -34,6 +34,9 @@ pub struct SenderFeeTracker {
id_to_fee: HashMap,
total_fee: u128,
+ fees_requesting: u128,
+ ids_requesting: HashSet,
+
buffer_window_fee: HashMap,
buffer_window_duration: Duration,
// there are some allocations that we don't want it to be
@@ -120,6 +123,7 @@ impl SenderFeeTracker {
self.id_to_fee
.iter()
.filter(|(addr, _)| !self.blocked_addresses.contains(*addr))
+ .filter(|(addr, _)| !self.ids_requesting.contains(*addr))
.filter(|(addr, _)| {
self.failed_ravs
.get(*addr)
@@ -157,11 +161,11 @@ impl SenderFeeTracker {
}
pub fn get_total_fee(&self) -> u128 {
- self.total_fee
+ self.total_fee - self.fees_requesting
}
pub fn get_total_fee_outside_buffer(&mut self) -> u128 {
- self.total_fee - self.get_buffer_fee().min(self.total_fee)
+ self.get_total_fee() - self.get_buffer_fee().min(self.total_fee)
}
pub fn get_buffer_fee(&mut self) -> u128 {
@@ -171,6 +175,20 @@ impl SenderFeeTracker {
acc + expiring.get_sum(&self.buffer_window_duration)
})
}
+
+ pub fn start_rav_request(&mut self, allocation_id: Address) {
+ let current_fee = self.id_to_fee.entry(allocation_id).or_default();
+ self.ids_requesting.insert(allocation_id);
+ self.fees_requesting += *current_fee;
+ }
+
+ /// Should be called before `update`
+ pub fn finish_rav_request(&mut self, allocation_id: Address) {
+ let current_fee = self.id_to_fee.entry(allocation_id).or_default();
+ self.fees_requesting -= *current_fee;
+ self.ids_requesting.remove(&allocation_id);
+ }
+
pub fn failed_rav_backoff(&mut self, allocation_id: Address) {
// backoff = max(100ms * 2 ^ retries, 60s)
let failed_rav = self.failed_ravs.entry(allocation_id).or_default();
@@ -353,4 +371,35 @@ mod tests {
assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
assert_eq!(tracker.get_total_fee(), 30);
}
+
+ #[test]
+ fn test_ongoing_rav_requests() {
+ let allocation_id_0 = address!("abababababababababababababababababababab");
+ let allocation_id_1 = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc");
+ let allocation_id_2 = address!("cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd");
+
+ let mut tracker = SenderFeeTracker::default();
+ assert_eq!(tracker.get_heaviest_allocation_id(), None);
+ assert_eq!(tracker.get_total_fee_outside_buffer(), 0);
+ assert_eq!(tracker.get_total_fee(), 0);
+
+ tracker.add(allocation_id_0, 10);
+ tracker.add(allocation_id_1, 20);
+ tracker.add(allocation_id_2, 30);
+ assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));
+ assert_eq!(tracker.get_total_fee(), 60);
+ assert_eq!(tracker.get_total_fee_outside_buffer(), 60);
+
+ tracker.start_rav_request(allocation_id_2);
+
+ assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_1));
+ assert_eq!(tracker.get_total_fee(), 30);
+ assert_eq!(tracker.get_total_fee_outside_buffer(), 30);
+
+ tracker.finish_rav_request(allocation_id_2);
+
+ assert_eq!(tracker.get_heaviest_allocation_id(), Some(allocation_id_2));
+ assert_eq!(tracker.get_total_fee(), 60);
+ assert_eq!(tracker.get_total_fee_outside_buffer(), 60);
+ }
}