From ee7fb02c41857e6825c6d041fb1b67ea9783d3a5 Mon Sep 17 00:00:00 2001 From: Carlos V Date: Wed, 23 Oct 2024 18:17:13 +0000 Subject: [PATCH 1/5] wip --- tap-agent/src/backoff.rs | 35 +++++++++++++++++++++ tap-agent/src/lib.rs | 1 + tap-agent/src/tracker/sender_fee_stats.rs | 38 ++--------------------- 3 files changed, 38 insertions(+), 36 deletions(-) create mode 100644 tap-agent/src/backoff.rs diff --git a/tap-agent/src/backoff.rs b/tap-agent/src/backoff.rs new file mode 100644 index 000000000..ce439d1be --- /dev/null +++ b/tap-agent/src/backoff.rs @@ -0,0 +1,35 @@ +use std::time::{Duration, Instant}; + +#[derive(Debug, Clone)] +pub struct BackoffInfo { + failed_count: u32, + failed_backoff_time: Instant, +} + +impl BackoffInfo { + pub fn ok(&mut self) { + self.failed_count = 0; + } + + pub fn fail(&mut self) { + // backoff = max(100ms * 2 ^ retries, 60s) + self.failed_backoff_time = Instant::now() + + (Duration::from_millis(100) * 2u32.pow(self.failed_count)) + .min(Duration::from_secs(60)); + self.failed_count += 1; + } + + pub fn in_backoff(&self) -> bool { + let now = Instant::now(); + now < self.failed_backoff_time + } +} + +impl Default for BackoffInfo { + fn default() -> Self { + Self { + failed_count: 0, + failed_backoff_time: Instant::now(), + } + } +} diff --git a/tap-agent/src/lib.rs b/tap-agent/src/lib.rs index e90671f97..48b675caf 100644 --- a/tap-agent/src/lib.rs +++ b/tap-agent/src/lib.rs @@ -17,6 +17,7 @@ lazy_static! { pub mod adaptative_concurrency; pub mod agent; +pub mod backoff; pub mod config; pub mod database; pub mod metrics; diff --git a/tap-agent/src/tracker/sender_fee_stats.rs b/tap-agent/src/tracker/sender_fee_stats.rs index 9893e6782..9559e9222 100644 --- a/tap-agent/src/tracker/sender_fee_stats.rs +++ b/tap-agent/src/tracker/sender_fee_stats.rs @@ -3,10 +3,10 @@ use std::{ collections::VecDeque, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; -use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::{agent::unaggregated_receipts::UnaggregatedReceipts, backoff::BackoffInfo}; use super::{AllocationStats, DefaultFromExtra, DurationInfo}; @@ -84,31 +84,6 @@ impl BufferInfo { } } -#[derive(Debug, Clone)] -pub struct BackoffInfo { - failed_ravs_count: u32, - failed_rav_backoff_time: Instant, -} - -impl BackoffInfo { - pub fn ok(&mut self) { - self.failed_ravs_count = 0; - } - - pub fn fail(&mut self) { - // backoff = max(100ms * 2 ^ retries, 60s) - self.failed_rav_backoff_time = Instant::now() - + (Duration::from_millis(100) * 2u32.pow(self.failed_ravs_count)) - .min(Duration::from_secs(60)); - self.failed_ravs_count += 1; - } - - pub fn in_backoff(&self) -> bool { - let now = Instant::now(); - now < self.failed_rav_backoff_time - } -} - impl DefaultFromExtra for SenderFeeStats { fn default_from_extra(extra: &DurationInfo) -> Self { SenderFeeStats { @@ -121,15 +96,6 @@ impl DefaultFromExtra for SenderFeeStats { } } -impl Default for BackoffInfo { - fn default() -> Self { - Self { - failed_ravs_count: 0, - failed_rav_backoff_time: Instant::now(), - } - } -} - impl AllocationStats for SenderFeeStats { fn update(&mut self, v: UnaggregatedReceipts) { self.total_fee = v.value; From 24ed49307669900c0e5ef53e1d3e333ae2e3bae7 Mon Sep 17 00:00:00 2001 From: Carlos V Date: Wed, 23 Oct 2024 19:05:40 +0000 Subject: [PATCH 2/5] feat: Add backoff to sender --- tap-agent/src/agent/sender_account.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 73f1bff96..635dc953c 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -30,6 +30,7 @@ use super::sender_allocation::{SenderAllocation, SenderAllocationArgs}; use crate::adaptative_concurrency::AdaptiveLimiter; use crate::agent::sender_allocation::SenderAllocationMessage; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::backoff::BackoffInfo; use crate::tracker::{SenderFeeTracker, SimpleFeeTracker}; use crate::{ config::{self}, @@ -161,6 +162,9 @@ pub struct State { config: &'static config::Config, pgpool: PgPool, sender_aggregator: jsonrpsee::http_client::HttpClient, + + // Backoff info + backoff_info: BackoffInfo, } impl State { @@ -546,6 +550,7 @@ impl Actor for SenderAccount { retry_interval, scheduled_rav_request: None, adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50), + backoff_info: BackoffInfo::default(), }; for allocation_id in &allocation_ids { @@ -664,17 +669,16 @@ impl Actor for SenderAccount { counter_greater_receipt_limit, total_fee_greater_trigger_value, ) { - (true, _) => { + (true, _) if !state.backoff_info.in_backoff() => { tracing::debug!( total_counter_for_allocation, rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, %allocation_id, "Total counter greater than the receipt limit per rav. Triggering RAV request" ); - state.rav_request_for_allocation(allocation_id).await } - (_, true) => { + (_, true) if !state.backoff_info.in_backoff() => { tracing::debug!( total_fee_outside_buffer, trigger_value = state.config.tap.rav_request_trigger_value, From 0ed53055163612173203d6da743a1f739bf3771d Mon Sep 17 00:00:00 2001 From: Carlos V Date: Wed, 23 Oct 2024 19:10:22 +0000 Subject: [PATCH 3/5] style: add license header --- tap-agent/src/backoff.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tap-agent/src/backoff.rs b/tap-agent/src/backoff.rs index ce439d1be..56d3faf01 100644 --- a/tap-agent/src/backoff.rs +++ b/tap-agent/src/backoff.rs @@ -1,3 +1,6 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + use std::time::{Duration, Instant}; #[derive(Debug, Clone)] From a40bf2e9056eaf02b9f2b85eeea7451b9a2e8a2f Mon Sep 17 00:00:00 2001 From: Carlos V Date: Wed, 23 Oct 2024 20:23:00 +0000 Subject: [PATCH 4/5] fix: refactor match to if else statement --- tap-agent/src/agent/sender_account.rs | 65 ++++++++++++++------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 635dc953c..73f946799 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -215,6 +215,7 @@ impl State { .sender_fee_tracker .get_heaviest_allocation_id() .ok_or_else(|| { + self.backoff_info.fail(); anyhow::anyhow!( "Error while getting the heaviest allocation, \ this is due one of the following reasons: \n @@ -225,6 +226,7 @@ impl State { If this doesn't work, open an issue on our Github." ) })?; + self.backoff_info.ok(); self.rav_request_for_allocation(allocation_id).await } @@ -655,38 +657,41 @@ impl Actor for SenderAccount { let has_available_slots_for_requests = state.adaptive_limiter.has_limit(); if has_available_slots_for_requests { - let total_counter_for_allocation = state - .sender_fee_tracker - .get_count_outside_buffer_for_allocation(&allocation_id); - let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id); - let counter_greater_receipt_limit = total_counter_for_allocation - >= state.config.tap.rav_request_receipt_limit - && can_trigger_rav; - let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee(); - let total_fee_greater_trigger_value = - total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value; - let rav_result = match ( - counter_greater_receipt_limit, - total_fee_greater_trigger_value, - ) { - (true, _) if !state.backoff_info.in_backoff() => { - tracing::debug!( - total_counter_for_allocation, - rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, - %allocation_id, - "Total counter greater than the receipt limit per rav. Triggering RAV request" - ); - state.rav_request_for_allocation(allocation_id).await - } - (_, true) if !state.backoff_info.in_backoff() => { - tracing::debug!( - total_fee_outside_buffer, - trigger_value = state.config.tap.rav_request_trigger_value, - "Total fee greater than the trigger value. Triggering RAV request" - ); + let rav_result = if !state.backoff_info.in_backoff() { + let total_fee_outside_buffer = + state.sender_fee_tracker.get_ravable_total_fee(); + let total_fee_greater_trigger_value = + total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value; + tracing::debug!( + total_fee_outside_buffer, + trigger_value = state.config.tap.rav_request_trigger_value, + "Total fee greater than the trigger value. Triggering RAV request" + ); + if total_fee_greater_trigger_value { state.rav_request_for_heaviest_allocation().await + } else { + Ok(()) + } + } else { + let total_counter_for_allocation = state + .sender_fee_tracker + .get_count_outside_buffer_for_allocation(&allocation_id); + let can_trigger_rav = + state.sender_fee_tracker.can_trigger_rav(allocation_id); + let counter_greater_receipt_limit = total_counter_for_allocation + >= state.config.tap.rav_request_receipt_limit + && can_trigger_rav; + tracing::debug!( + total_counter_for_allocation, + rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, + %allocation_id, + "Total counter greater than the receipt limit per rav. Triggering RAV request" + ); + if counter_greater_receipt_limit { + state.rav_request_for_allocation(allocation_id).await + } else { + Ok(()) } - _ => Ok(()), }; // In case we fail, we want our actor to keep running if let Err(err) = rav_result { From ee64ff896f65753c5a784ed31be9f070b3c9c993 Mon Sep 17 00:00:00 2001 From: Carlos V Date: Wed, 23 Oct 2024 21:15:05 +0000 Subject: [PATCH 5/5] fix: fix the if statement --- tap-agent/src/agent/sender_account.rs | 40 +++++++++++---------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 73f946799..596389ad4 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -657,41 +657,33 @@ impl Actor for SenderAccount { let has_available_slots_for_requests = state.adaptive_limiter.has_limit(); if has_available_slots_for_requests { - let rav_result = if !state.backoff_info.in_backoff() { - let total_fee_outside_buffer = - state.sender_fee_tracker.get_ravable_total_fee(); - let total_fee_greater_trigger_value = - total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value; + let total_fee_outside_buffer = state.sender_fee_tracker.get_ravable_total_fee(); + let total_counter_for_allocation = state + .sender_fee_tracker + .get_count_outside_buffer_for_allocation(&allocation_id); + let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id); + let counter_greater_receipt_limit = total_counter_for_allocation + >= state.config.tap.rav_request_receipt_limit + && can_trigger_rav; + let rav_result = if !state.backoff_info.in_backoff() + && total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value + { tracing::debug!( total_fee_outside_buffer, trigger_value = state.config.tap.rav_request_trigger_value, "Total fee greater than the trigger value. Triggering RAV request" ); - if total_fee_greater_trigger_value { - state.rav_request_for_heaviest_allocation().await - } else { - Ok(()) - } - } else { - let total_counter_for_allocation = state - .sender_fee_tracker - .get_count_outside_buffer_for_allocation(&allocation_id); - let can_trigger_rav = - state.sender_fee_tracker.can_trigger_rav(allocation_id); - let counter_greater_receipt_limit = total_counter_for_allocation - >= state.config.tap.rav_request_receipt_limit - && can_trigger_rav; + state.rav_request_for_heaviest_allocation().await + } else if counter_greater_receipt_limit { tracing::debug!( total_counter_for_allocation, rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, %allocation_id, "Total counter greater than the receipt limit per rav. Triggering RAV request" ); - if counter_greater_receipt_limit { - state.rav_request_for_allocation(allocation_id).await - } else { - Ok(()) - } + state.rav_request_for_allocation(allocation_id).await + } else { + Ok(()) }; // In case we fail, we want our actor to keep running if let Err(err) = rav_result {