diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 73f1bff96..596389ad4 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -30,6 +30,7 @@ use super::sender_allocation::{SenderAllocation, SenderAllocationArgs}; use crate::adaptative_concurrency::AdaptiveLimiter; use crate::agent::sender_allocation::SenderAllocationMessage; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::backoff::BackoffInfo; use crate::tracker::{SenderFeeTracker, SimpleFeeTracker}; use crate::{ config::{self}, @@ -161,6 +162,9 @@ pub struct State { config: &'static config::Config, pgpool: PgPool, sender_aggregator: jsonrpsee::http_client::HttpClient, + + // Backoff info + backoff_info: BackoffInfo, } impl State { @@ -211,6 +215,7 @@ impl State { .sender_fee_tracker .get_heaviest_allocation_id() .ok_or_else(|| { + self.backoff_info.fail(); anyhow::anyhow!( "Error while getting the heaviest allocation, \ this is due one of the following reasons: \n @@ -221,6 +226,7 @@ impl State { If this doesn't work, open an issue on our Github." ) })?; + self.backoff_info.ok(); self.rav_request_for_allocation(allocation_id).await } @@ -546,6 +552,7 @@ impl Actor for SenderAccount { retry_interval, scheduled_rav_request: None, adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50), + backoff_info: BackoffInfo::default(), }; for allocation_id in &allocation_ids { @@ -650,6 +657,7 @@ impl Actor for SenderAccount { let has_available_slots_for_requests = state.adaptive_limiter.has_limit(); if has_available_slots_for_requests { + let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee(); let total_counter_for_allocation = state .sender_fee_tracker .get_count_outside_buffer_for_allocation(&allocation_id); @@ -657,32 +665,25 @@ impl Actor for SenderAccount { let counter_greater_receipt_limit = total_counter_for_allocation >= state.config.tap.rav_request_receipt_limit && can_trigger_rav; - let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee(); - let total_fee_greater_trigger_value = - total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value; - let rav_result = match ( - counter_greater_receipt_limit, - total_fee_greater_trigger_value, - ) { - (true, _) => { - tracing::debug!( - total_counter_for_allocation, - rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, - %allocation_id, - "Total counter greater than the receipt limit per rav. Triggering RAV request" - ); - - state.rav_request_for_allocation(allocation_id).await - } - (_, true) => { - tracing::debug!( - total_fee_outside_buffer, - trigger_value = state.config.tap.rav_request_trigger_value, - "Total fee greater than the trigger value. Triggering RAV request" - ); - state.rav_request_for_heaviest_allocation().await - } - _ => Ok(()), + let rav_result = if !state.backoff_info.in_backoff() + && total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value + { + tracing::debug!( + total_fee_outside_buffer, + trigger_value = state.config.tap.rav_request_trigger_value, + "Total fee greater than the trigger value. Triggering RAV request" + ); + state.rav_request_for_heaviest_allocation().await + } else if counter_greater_receipt_limit { + tracing::debug!( + total_counter_for_allocation, + rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, + %allocation_id, + "Total counter greater than the receipt limit per rav. Triggering RAV request" + ); + state.rav_request_for_allocation(allocation_id).await + } else { + Ok(()) }; // In case we fail, we want our actor to keep running if let Err(err) = rav_result { diff --git a/tap-agent/src/backoff.rs b/tap-agent/src/backoff.rs new file mode 100644 index 000000000..56d3faf01 --- /dev/null +++ b/tap-agent/src/backoff.rs @@ -0,0 +1,38 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::{Duration, Instant}; + +#[derive(Debug, Clone)] +pub struct BackoffInfo { + failed_count: u32, + failed_backoff_time: Instant, +} + +impl BackoffInfo { + pub fn ok(&mut self) { + self.failed_count = 0; + } + + pub fn fail(&mut self) { + // backoff = max(100ms * 2 ^ retries, 60s) + self.failed_backoff_time = Instant::now() + + (Duration::from_millis(100) * 2u32.pow(self.failed_count)) + .min(Duration::from_secs(60)); + self.failed_count += 1; + } + + pub fn in_backoff(&self) -> bool { + let now = Instant::now(); + now < self.failed_backoff_time + } +} + +impl Default for BackoffInfo { + fn default() -> Self { + Self { + failed_count: 0, + failed_backoff_time: Instant::now(), + } + } +} diff --git a/tap-agent/src/lib.rs b/tap-agent/src/lib.rs index e90671f97..48b675caf 100644 --- a/tap-agent/src/lib.rs +++ b/tap-agent/src/lib.rs @@ -17,6 +17,7 @@ lazy_static! { pub mod adaptative_concurrency; pub mod agent; +pub mod backoff; pub mod config; pub mod database; pub mod metrics; diff --git a/tap-agent/src/tracker/sender_fee_stats.rs b/tap-agent/src/tracker/sender_fee_stats.rs index 9893e6782..9559e9222 100644 --- a/tap-agent/src/tracker/sender_fee_stats.rs +++ b/tap-agent/src/tracker/sender_fee_stats.rs @@ -3,10 +3,10 @@ use std::{ collections::VecDeque, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; -use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::{agent::unaggregated_receipts::UnaggregatedReceipts, backoff::BackoffInfo}; use super::{AllocationStats, DefaultFromExtra, DurationInfo}; @@ -84,31 +84,6 @@ impl BufferInfo { } } -#[derive(Debug, Clone)] -pub struct BackoffInfo { - failed_ravs_count: u32, - failed_rav_backoff_time: Instant, -} - -impl BackoffInfo { - pub fn ok(&mut self) { - self.failed_ravs_count = 0; - } - - pub fn fail(&mut self) { - // backoff = max(100ms * 2 ^ retries, 60s) - self.failed_rav_backoff_time = Instant::now() - + (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count)) - .min(Duration::from_secs(60)); - self.failed_ravs_count += 1; - } - - pub fn in_backoff(&self) -> bool { - let now = Instant::now(); - now < self.failed_rav_backoff_time - } -} - impl DefaultFromExtra for SenderFeeStats { fn default_from_extra(extra: &DurationInfo) -> Self { SenderFeeStats { @@ -121,15 +96,6 @@ impl DefaultFromExtra for SenderFeeStats { } } -impl Default for BackoffInfo { - fn default() -> Self { - Self { - failed_ravs_count: 0, - failed_rav_backoff_time: Instant::now(), - } - } -} - impl AllocationStats for SenderFeeStats { fn update(&mut self, v: UnaggregatedReceipts) { self.total_fee = v.value;