Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 124 additions & 54 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use alloy::primitives::U256;
use bigdecimal::num_bigint::ToBigInt;
use bigdecimal::ToPrimitive;
use graphql_client::GraphQLQuery;
use jsonrpsee::http_client::HttpClientBuilder;
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
Expand All @@ -17,7 +18,7 @@ use alloy::primitives::Address;
use anyhow::Result;
use eventuals::{Eventual, EventualExt, PipeHandle};
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
use ractor::{call, Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
use sqlx::PgPool;
use tap_core::rav::SignedRAV;
use tracing::{error, Level};
Expand Down Expand Up @@ -76,10 +77,11 @@ lazy_static! {
type RavMap = HashMap<Address, u128>;
type Balance = U256;

#[derive(Debug, Eq, PartialEq)]
#[derive(Debug)]
pub enum ReceiptFees {
NewReceipt(u128),
UpdateValue(UnaggregatedReceipts),
RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>),
Retry,
}

Expand Down Expand Up @@ -149,7 +151,7 @@ pub struct State {
domain_separator: Eip712Domain,
config: &'static config::Config,
pgpool: PgPool,
sender_aggregator_endpoint: String,
sender_aggregator: jsonrpsee::http_client::HttpClient,
}

impl State {
Expand All @@ -172,8 +174,8 @@ impl State {
escrow_subgraph: self.escrow_subgraph,
escrow_adapter: self.escrow_adapter.clone(),
domain_separator: self.domain_separator.clone(),
sender_aggregator_endpoint: self.sender_aggregator_endpoint.clone(),
sender_account_ref: sender_account_ref.clone(),
sender_aggregator: self.sender_aggregator.clone(),
};

SenderAllocation::spawn_linked(
Expand Down Expand Up @@ -215,38 +217,16 @@ impl State {
"Error while getting allocation actor {allocation_id} with most unaggregated fees"
);
};
// we call and wait for the response so we don't process anymore update
let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
};
let (fees, rav) = match rav_result {
Ok(ok_value) => {
self.rav_tracker.ok_rav_request(allocation_id);
ok_value
}
Err(err) => {
self.rav_tracker.failed_rav_backoff(allocation_id);
anyhow::bail!(
"Error while requesting RAV for sender {} and allocation {}: {}",
self.sender,
allocation_id,
err
);
}
};

let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
// update rav tracker
self.rav_tracker.update(allocation_id, rav_value);
PENDING_RAV
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
.set(rav_value as f64);

// update sender fee tracker
self.sender_fee_tracker.update(allocation_id, fees.value);
UNAGGREGATED_FEES
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
.set(fees.value as f64);
allocation
.cast(SenderAllocationMessage::TriggerRAVRequest)
.map_err(|e| {
anyhow::anyhow!(
"Error while sending and waiting message for actor {allocation_id}. Error: {e}"
)
})?;
self.sender_fee_tracker.start_rav_request(allocation_id);

Ok(())
}

Expand Down Expand Up @@ -474,6 +454,10 @@ impl Actor for SenderAccount {
.with_label_values(&[&sender_id.to_string()])
.set(config.tap.rav_request_trigger_value as f64);

let sender_aggregator = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs))
.build(&sender_aggregator_endpoint)?;

let state = State {
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
config.tap.rav_request_timestamp_buffer_ms,
Expand All @@ -488,7 +472,7 @@ impl Actor for SenderAccount {
escrow_subgraph,
escrow_adapter,
domain_separator,
sender_aggregator_endpoint,
sender_aggregator,
config,
pgpool,
sender: sender_id,
Expand Down Expand Up @@ -588,6 +572,42 @@ impl Actor for SenderAccount {
])
.add(value as f64);
}
ReceiptFees::RavRequestResponse(rav_result) => {
state.sender_fee_tracker.finish_rav_request(allocation_id);
match rav_result {
Ok((fees, rav)) => {
state.rav_tracker.ok_rav_request(allocation_id);

let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
// update rav tracker
state.rav_tracker.update(allocation_id, rav_value);
PENDING_RAV
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.set(rav_value as f64);

// update sender fee tracker
state.sender_fee_tracker.update(allocation_id, fees.value);
UNAGGREGATED_FEES
.with_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
])
.set(fees.value as f64);
}
Err(err) => {
state.rav_tracker.failed_rav_backoff(allocation_id);
error!(
"Error while requesting RAV for sender {} and allocation {}: {}",
state.sender,
allocation_id,
err
);
}
};
}
ReceiptFees::UpdateValue(unaggregated_fees) => {
state
.sender_fee_tracker
Expand Down Expand Up @@ -891,7 +911,21 @@ pub mod tests {
match (self, other) {
(Self::UpdateAllocationIds(l0), Self::UpdateAllocationIds(r0)) => l0 == r0,
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
l0 == r0 && l1 == r1
l0 == r0
&& match (l1, r1) {
(ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l,
(ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l,
(
ReceiptFees::RavRequestResponse(l),
ReceiptFees::RavRequestResponse(r),
) => match (l, r) {
(Ok(l), Ok(r)) => l == r,
(Err(l), Err(r)) => l.to_string() == r.to_string(),
_ => false,
},
(ReceiptFees::Retry, ReceiptFees::Retry) => true,
_ => false,
}
}
(
Self::UpdateInvalidReceiptFees(l0, l1),
Expand Down Expand Up @@ -1072,13 +1106,18 @@ pub mod tests {
next_rav_value: Arc<Mutex<u128>>,
next_unaggregated_fees_value: Arc<Mutex<u128>>,
receipts: Arc<Mutex<Vec<NewReceiptNotification>>>,

sender_actor: Option<ActorRef<SenderAccountMessage>>,
}
impl MockSenderAllocation {
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
pub fn new_with_triggered_rav_request(
sender_actor: ActorRef<SenderAccountMessage>,
) -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
let triggered_rav_request = Arc::new(AtomicU32::new(0));
let unaggregated_fees = Arc::new(Mutex::new(0));
(
Self {
sender_actor: Some(sender_actor),
triggered_rav_request: triggered_rav_request.clone(),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: Arc::new(Mutex::new(0)),
Expand All @@ -1089,10 +1128,13 @@ pub mod tests {
)
}

pub fn new_with_next_unaggregated_fees_value() -> (Self, Arc<Mutex<u128>>) {
pub fn new_with_next_unaggregated_fees_value(
sender_actor: ActorRef<SenderAccountMessage>,
) -> (Self, Arc<Mutex<u128>>) {
let unaggregated_fees = Arc::new(Mutex::new(0));
(
Self {
sender_actor: Some(sender_actor),
triggered_rav_request: Arc::new(AtomicU32::new(0)),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: Arc::new(Mutex::new(0)),
Expand All @@ -1102,10 +1144,13 @@ pub mod tests {
)
}

pub fn new_with_next_rav_value() -> (Self, Arc<Mutex<u128>>) {
pub fn new_with_next_rav_value(
sender_actor: ActorRef<SenderAccountMessage>,
) -> (Self, Arc<Mutex<u128>>) {
let next_rav_value = Arc::new(Mutex::new(0));
(
Self {
sender_actor: Some(sender_actor),
triggered_rav_request: Arc::new(AtomicU32::new(0)),
receipts: Arc::new(Mutex::new(Vec::new())),
next_rav_value: next_rav_value.clone(),
Expand All @@ -1119,6 +1164,7 @@ pub mod tests {
let receipts = Arc::new(Mutex::new(Vec::new()));
(
Self {
sender_actor: None,
triggered_rav_request: Arc::new(AtomicU32::new(0)),
receipts: receipts.clone(),
next_rav_value: Arc::new(Mutex::new(0)),
Expand Down Expand Up @@ -1150,7 +1196,7 @@ pub mod tests {
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SenderAllocationMessage::TriggerRAVRequest(reply) => {
SenderAllocationMessage::TriggerRAVRequest => {
self.triggered_rav_request
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let signed_rav = create_rav(
Expand All @@ -1159,13 +1205,18 @@ pub mod tests {
4,
*self.next_rav_value.lock().unwrap(),
);
reply.send(Ok((
UnaggregatedReceipts {
value: *self.next_unaggregated_fees_value.lock().unwrap(),
last_id: 0,
},
Some(signed_rav),
)))?;
if let Some(sender_account) = self.sender_actor.as_ref() {
sender_account.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
ReceiptFees::RavRequestResponse(Ok((
UnaggregatedReceipts {
value: *self.next_unaggregated_fees_value.lock().unwrap(),
last_id: 0,
},
Some(signed_rav),
))),
))?;
}
}
SenderAllocationMessage::NewReceipt(receipt) => {
self.receipts.lock().unwrap().push(receipt);
Expand All @@ -1180,14 +1231,15 @@ pub mod tests {
prefix: String,
sender: Address,
allocation: Address,
sender_actor: ActorRef<SenderAccountMessage>,
) -> (
Arc<AtomicU32>,
Arc<Mutex<u128>>,
ActorRef<SenderAllocationMessage>,
JoinHandle<()>,
) {
let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) =
MockSenderAllocation::new_with_triggered_rav_request();
MockSenderAllocation::new_with_triggered_rav_request(sender_actor);

let name = format!("{}:{}:{}", prefix, sender, allocation);
let (sender_account, join_handle) =
Expand All @@ -1214,7 +1266,13 @@ pub mod tests {
.await;

let (triggered_rav_request, _, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
create_mock_sender_allocation(
prefix,
SENDER.1,
*ALLOCATION_ID_0,
sender_account.clone(),
)
.await;

// create a fake sender allocation
sender_account
Expand Down Expand Up @@ -1250,7 +1308,13 @@ pub mod tests {
.await;

let (triggered_rav_request, _, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
create_mock_sender_allocation(
prefix,
SENDER.1,
*ALLOCATION_ID_0,
sender_account.clone(),
)
.await;

// create a fake sender allocation
sender_account
Expand Down Expand Up @@ -1372,7 +1436,13 @@ pub mod tests {
.await;

let (triggered_rav_request, next_value, allocation, allocation_handle) =
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
create_mock_sender_allocation(
prefix,
SENDER.1,
*ALLOCATION_ID_0,
sender_account.clone(),
)
.await;

assert_eq!(
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
Expand Down Expand Up @@ -1549,7 +1619,7 @@ pub mod tests {
.await;

let (mock_sender_allocation, next_rav_value) =
MockSenderAllocation::new_with_next_rav_value();
MockSenderAllocation::new_with_next_rav_value(sender_account.clone());

let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
let (allocation, allocation_handle) =
Expand Down Expand Up @@ -1750,7 +1820,7 @@ pub mod tests {
.await;

let (mock_sender_allocation, next_unaggregated_fees) =
MockSenderAllocation::new_with_next_unaggregated_fees_value();
MockSenderAllocation::new_with_next_unaggregated_fees_value(sender_account.clone());

let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
let (allocation, allocation_handle) = MockSenderAllocation::spawn_linked(
Expand Down
10 changes: 5 additions & 5 deletions tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ mod tests {
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::mpsc;

const DUMMY_URL: &str = "http://localhost:1234";

Expand Down Expand Up @@ -931,12 +931,12 @@ mod tests {
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
);

let last_message_emitted = Arc::new(Mutex::new(vec![]));
let (last_message_emitted, mut rx) = mpsc::channel(64);

let (sender_account, join_handle) = MockSenderAccount::spawn(
Some(format!("{}:{}", prefix.clone(), SENDER.1,)),
MockSenderAccount {
last_message_emitted: last_message_emitted.clone(),
last_message_emitted,
},
(),
)
Expand All @@ -958,8 +958,8 @@ mod tests {
tokio::time::sleep(Duration::from_millis(10)).await;

assert_eq!(
last_message_emitted.lock().unwrap().last().unwrap(),
&SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0)
rx.recv().await.unwrap(),
SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0)
);
sender_account.stop_and_wait(None, None).await.unwrap();
join_handle.await.unwrap();
Expand Down
Loading
Loading