From 5e2ae5d97b33b740ca5f6f30677ad67e64eb4820 Mon Sep 17 00:00:00 2001 From: Joseph Livesey Date: Mon, 1 Dec 2025 15:18:25 -0500 Subject: [PATCH 1/3] feat(tap-agent): add periodic allocation reconciliation Add a background task that periodically triggers allocation reconciliation to ensure recovery after subgraph connectivity issues. Previously, the allocation watcher was purely event-driven and only fired when the allocation list changed. If allocations were closed during a connectivity outage and the list became static afterward, no messages would fire and stale SenderAllocation actors would keep running, never triggering mark_rav_last(). This change adds: - New config option (default: 5 min) - ReconcileAllocations message type for SenderAccount - Periodic task that sends ReconcileAllocations every interval - Handler that forces re-check of all allocations against the watcher This ensures closed allocations are detected and RAVs are properly marked as 'last' for redemption, even after connectivity issues. --- crates/config/src/config.rs | 11 +++ crates/tap-agent/src/agent/sender_account.rs | 71 ++++++++++++++++++++ crates/tap-agent/src/test.rs | 2 + crates/tap-agent/tests/tap_agent_test.rs | 1 + 4 files changed, 85 insertions(+) diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index e635a2484..a1d6470fb 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -441,6 +441,17 @@ pub struct TapConfig { /// over the escrow balance #[serde(default)] pub trusted_senders: HashSet
, + + /// Interval in seconds for periodic allocation reconciliation. + /// This ensures stale allocations are detected after subgraph connectivity issues. + /// Default: 300 (5 minutes) + #[serde(default = "default_allocation_reconciliation_interval_secs")] + #[serde_as(as = "DurationSecondsWithFrac")] + pub allocation_reconciliation_interval_secs: Duration, +} + +fn default_allocation_reconciliation_interval_secs() -> Duration { + Duration::from_secs(300) } #[derive(Debug, Deserialize)] diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 673614ad6..06d990f22 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -232,6 +232,9 @@ pub enum SenderAccountMessage { UpdateInvalidReceiptFees(AllocationId, UnaggregatedReceipts), /// Update rav tracker UpdateRav(RavInformation), + /// Periodic reconciliation to detect stale allocations. + /// This ensures recovery after subgraph connectivity issues. + ReconcileAllocations, #[cfg(test)] /// Returns the sender fee tracker, used for tests GetSenderFeeTracker( @@ -392,6 +395,12 @@ pub struct State { // Config forwarded to [SenderAllocation] config: &'static SenderAccountConfig, + + /// Watcher for allocation IDs, used for periodic reconciliation + indexer_allocations: Receiver>, + + /// Handle for the periodic reconciliation task + reconciliation_handle: Option>, } /// Configuration derived from config.toml @@ -425,6 +434,10 @@ pub struct SenderAccountConfig { /// Defines whether the indexer operates in legacy mode (V1 TAP receipts only) /// or horizon mode (hybrid V1/V2 TAP receipts support). pub tap_mode: indexer_config::TapMode, + + /// Interval for periodic allocation reconciliation. + /// This ensures stale allocations are detected after subgraph connectivity issues. + pub allocation_reconciliation_interval: Duration, } impl SenderAccountConfig { @@ -443,6 +456,8 @@ impl SenderAccountConfig { // Derive TapMode from horizon configuration tap_mode: config.tap_mode(), + + allocation_reconciliation_interval: config.tap.allocation_reconciliation_interval_secs, } } } @@ -828,6 +843,9 @@ impl Actor for SenderAccount { sender_type, }: Self::Arguments, ) -> Result { + // Clone the receiver for later use in State + let indexer_allocations_for_state = indexer_allocations.clone(); + // Pass-through normalized allocation IDs for this sender type let myself_clone = myself.clone(); watch_pipe(indexer_allocations, move |allocation_ids| { @@ -1151,6 +1169,31 @@ impl Actor for SenderAccount { // wiremock_grpc used for tests doesn't support Zstd compression #[cfg(not(test))] let aggregator_v2 = aggregator_v2.send_compressed(tonic::codec::CompressionEncoding::Zstd); + // Spawn periodic reconciliation task + let reconciliation_interval = config.allocation_reconciliation_interval; + let myself_reconcile = myself.clone(); + let sender_for_log = sender_id; + let reconciliation_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(reconciliation_interval); + // Skip the first tick (which fires immediately) + interval.tick().await; + loop { + interval.tick().await; + tracing::debug!( + sender = %sender_for_log, + "Running periodic allocation reconciliation" + ); + if let Err(e) = myself_reconcile.cast(SenderAccountMessage::ReconcileAllocations) { + tracing::error!( + error = ?e, + sender = %sender_for_log, + "Error sending ReconcileAllocations message" + ); + break; + } + } + }); + let state = State { prefix, sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer), @@ -1175,6 +1218,8 @@ impl Actor for SenderAccount { trusted_sender: config.trusted_senders.contains(&sender_id), config, sender_type, + indexer_allocations: indexer_allocations_for_state, + reconciliation_handle: Some(reconciliation_handle), }; stream::iter(allocation_ids) @@ -1552,6 +1597,20 @@ impl Actor for SenderAccount { (_, _) => {} } } + SenderAccountMessage::ReconcileAllocations => { + // Get current allocations from the watcher and trigger UpdateAllocationIds + // This forces a re-check of all allocations even if the watcher data hasn't changed, + // ensuring we recover from missed closure events during connectivity issues. + let current_allocations = state.indexer_allocations.borrow().clone(); + tracing::debug!( + sender = %state.sender, + allocation_count = current_allocations.len(), + "Triggering allocation reconciliation" + ); + myself.cast(SenderAccountMessage::UpdateAllocationIds( + current_allocations, + ))?; + } #[cfg(test)] SenderAccountMessage::GetSenderFeeTracker(reply) => { if !reply.is_closed() { @@ -1687,6 +1746,18 @@ impl Actor for SenderAccount { } Ok(()) } + + async fn post_stop( + &self, + _myself: ActorRef, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + // Abort the reconciliation task on stop + if let Some(handle) = state.reconciliation_handle.take() { + handle.abort(); + } + Ok(()) + } } impl SenderAccount { diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 57243cb6b..f6ccca4a9 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -97,6 +97,7 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig { tap_sender_timeout: Duration::from_secs(63), trusted_senders: HashSet::new(), tap_mode: indexer_config::TapMode::Legacy, + allocation_reconciliation_interval: Duration::from_secs(300), })) } @@ -134,6 +135,7 @@ pub async fn create_sender_account( tap_sender_timeout: TAP_SENDER_TIMEOUT, trusted_senders, tap_mode: indexer_config::TapMode::Legacy, + allocation_reconciliation_interval: Duration::from_secs(300), })); let network_subgraph = Box::leak(Box::new( diff --git a/crates/tap-agent/tests/tap_agent_test.rs b/crates/tap-agent/tests/tap_agent_test.rs index e81963ae2..e285e3a9b 100644 --- a/crates/tap-agent/tests/tap_agent_test.rs +++ b/crates/tap-agent/tests/tap_agent_test.rs @@ -95,6 +95,7 @@ pub async fn start_agent( tap_sender_timeout: Duration::from_secs(30), trusted_senders: HashSet::new(), tap_mode: indexer_config::TapMode::Legacy, + allocation_reconciliation_interval: Duration::from_secs(300), })); let args = SenderAccountsManagerArgs { From a5aa1cdc1e1e5597f500307de1be482b7f13cea6 Mon Sep 17 00:00:00 2001 From: Joseph Livesey Date: Mon, 1 Dec 2025 17:31:03 -0500 Subject: [PATCH 2/3] test(tap-agent): add tests and validation for allocation reconciliation - Add unit test for ReconcileAllocations message handling that verifies it triggers UpdateAllocationIds with current allocations from watcher - Add config validation for allocation_reconciliation_interval_secs: error if 0, warn if < 60s - Add test for periodic task lifecycle verifying task spawn/abort - Update create_sender_account() test helper to expose indexer_allocations_tx and configurable reconciliation interval - Add tokio test-util feature for time control in tests --- crates/config/src/config.rs | 14 + crates/tap-agent/Cargo.toml | 1 + crates/tap-agent/src/agent/sender_account.rs | 283 +++++++++++++++++- crates/tap-agent/src/test.rs | 16 +- crates/tap-agent/tests/sender_account_test.rs | 2 +- 5 files changed, 296 insertions(+), 20 deletions(-) diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index a1d6470fb..eaced1496 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -226,6 +226,20 @@ impl Config { ); } + if self.tap.allocation_reconciliation_interval_secs == Duration::ZERO { + return Err( + "tap.allocation_reconciliation_interval_secs must be greater than 0".to_string(), + ); + } + + if self.tap.allocation_reconciliation_interval_secs < Duration::from_secs(60) { + tracing::warn!( + "Your `tap.allocation_reconciliation_interval_secs` value is too low. \ + This may cause unnecessary load on the system. \ + A recommended value is at least 60 seconds." + ); + } + // Horizon configuration validation // Explicit toggle via `horizon.enabled`. When enabled, require both // `blockchain.subgraph_service_address` and diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index dd7113751..61aa023b7 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -67,3 +67,4 @@ test-log = { workspace = true, features = ["trace"] } rstest.workspace = true stdext.workspace = true insta.workspace = true +tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 06d990f22..87a36db32 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -1821,7 +1821,7 @@ pub mod tests { Mock, MockServer, ResponseTemplate, }; - use super::{RavInformation, SenderAccountMessage}; + use super::{RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS}; use crate::{ agent::{ sender_account::ReceiptFees, sender_accounts_manager::AllocationId, @@ -1887,7 +1887,7 @@ pub mod tests { ) .await; - let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() + let (sender_account, mut msg_receiver, prefix, _, _) = create_sender_account() .pgpool(pgpool) .escrow_subgraph_endpoint(&mock_escrow_subgraph.uri()) .network_subgraph_endpoint(&mock_server.uri()) @@ -1978,7 +1978,7 @@ pub mod tests { ) .await; - let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() + let (sender_account, mut msg_receiver, prefix, _, _) = create_sender_account() .pgpool(pgpool) .escrow_subgraph_endpoint(&mock_escrow_subgraph.uri()) .network_subgraph_endpoint(&mock_server.uri()) @@ -2071,7 +2071,7 @@ pub mod tests { async fn test_update_receipt_fees_no_rav() { let test_db = test_assets::setup_shared_test_db().await; let pgpool = test_db.pool; - let (sender_account, msg_receiver, prefix, _) = + let (sender_account, msg_receiver, prefix, _, _) = create_sender_account().pgpool(pgpool).call().await; let basic_sender_account = TestSenderAccount { sender_account, @@ -2105,7 +2105,7 @@ pub mod tests { async fn test_update_receipt_fees_trigger_rav() { let test_db = test_assets::setup_shared_test_db().await; let pgpool = test_db.pool; - let (sender_account, msg_receiver, prefix, _) = + let (sender_account, msg_receiver, prefix, _, _) = create_sender_account().pgpool(pgpool).call().await; let mut basic_sender_account = TestSenderAccount { sender_account, @@ -2151,7 +2151,7 @@ pub mod tests { async fn test_counter_greater_limit_trigger_rav() { let test_db = test_assets::setup_shared_test_db().await; let pgpool = test_db.pool; - let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() + let (sender_account, mut msg_receiver, prefix, _, _) = create_sender_account() .pgpool(pgpool.clone()) .rav_request_receipt_limit(2) .call() @@ -2204,7 +2204,7 @@ pub mod tests { let test_db = test_assets::setup_shared_test_db().await; let pgpool = test_db.pool; let mock_escrow_subgraph = setup_mock_escrow_subgraph().await; - let (sender_account, _, prefix, _) = create_sender_account() + let (sender_account, _, prefix, _, _) = create_sender_account() .pgpool(pgpool) .initial_allocation( vec![AllocationId::Legacy(AllocationIdCore::from( @@ -2264,7 +2264,7 @@ pub mod tests { .await .unwrap(); - let (sender_account, _notify, _, _) = + let (sender_account, _notify, _, _, _) = create_sender_account().pgpool(pgpool.clone()).call().await; let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap(); @@ -2293,7 +2293,7 @@ pub mod tests { // we set to zero to block the sender, no matter the fee let max_unaggregated_fees_per_sender: u128 = 0; - let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() + let (sender_account, mut msg_receiver, prefix, _, _) = create_sender_account() .pgpool(pgpool) .max_amount_willing_to_lose_grt(max_unaggregated_fees_per_sender) .call() @@ -2342,7 +2342,7 @@ pub mod tests { let max_unaggregated_fees_per_sender: u128 = 1000; // Making sure no RAV is going to be triggered during the test - let (sender_account, mut msg_receiver, _, _) = create_sender_account() + let (sender_account, mut msg_receiver, _, _, _) = create_sender_account() .pgpool(pgpool.clone()) .rav_request_trigger_value(u128::MAX) .max_amount_willing_to_lose_grt(max_unaggregated_fees_per_sender) @@ -2444,7 +2444,7 @@ pub mod tests { .await .unwrap(); - let (sender_account, _notify, _, _) = create_sender_account() + let (sender_account, _notify, _, _, _) = create_sender_account() .pgpool(pgpool.clone()) .max_amount_willing_to_lose_grt(u128::MAX) .call() @@ -2485,7 +2485,7 @@ pub mod tests { let trigger_rav_request = ESCROW_VALUE * 2; // initialize with no trigger value and no max receipt deny - let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() + let (sender_account, mut msg_receiver, prefix, _, _) = create_sender_account() .pgpool(pgpool.clone()) .rav_request_trigger_value(trigger_rav_request) .max_amount_willing_to_lose_grt(u128::MAX) @@ -2565,7 +2565,7 @@ pub mod tests { let pgpool = test_db.pool; let max_amount_willing_to_lose_grt = ESCROW_VALUE / 10; // initialize with no trigger value and no max receipt deny - let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() + let (sender_account, mut msg_receiver, prefix, _, _) = create_sender_account() .pgpool(pgpool) .trusted_sender(true) .rav_request_trigger_value(u128::MAX) @@ -2674,7 +2674,7 @@ pub mod tests { .await .unwrap(); - let (sender_account, mut msg_receiver, _, escrow_accounts_tx) = create_sender_account() + let (sender_account, mut msg_receiver, _, escrow_accounts_tx, _) = create_sender_account() .pgpool(pgpool.clone()) .max_amount_willing_to_lose_grt(u128::MAX) .escrow_subgraph_endpoint(&mock_server.uri()) @@ -2734,7 +2734,7 @@ pub mod tests { .await .unwrap(); - let (sender_account, mut msg_receiver, _, escrow_accounts_tx) = create_sender_account() + let (sender_account, mut msg_receiver, _, escrow_accounts_tx, _) = create_sender_account() .pgpool(pgpool.clone()) .max_amount_willing_to_lose_grt(u128::MAX) .call() @@ -2779,7 +2779,7 @@ pub mod tests { // we set to 1 to block the sender on a really low value let max_unaggregated_fees_per_sender: u128 = 1; - let (sender_account, mut msg_receiver, prefix, _) = create_sender_account() + let (sender_account, mut msg_receiver, prefix, _, _) = create_sender_account() .pgpool(pgpool) .max_amount_willing_to_lose_grt(max_unaggregated_fees_per_sender) .call() @@ -2835,4 +2835,255 @@ pub mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); } + + #[tokio::test] + async fn test_reconcile_allocations_triggers_update() { + // Test that ReconcileAllocations message triggers UpdateAllocationIds + // with the current allocations from the watcher + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let allocation_set = HashSet::from_iter([ + AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_0)), + AllocationId::Legacy(AllocationIdCore::from(ALLOCATION_ID_1)), + ]); + + let (sender_account, mut msg_receiver, _, _, indexer_allocations_tx) = + create_sender_account() + .pgpool(pgpool) + .initial_allocation(allocation_set.clone()) + .call() + .await; + + // Verify initial state - indexer_allocations should have the allocations + // Send ReconcileAllocations message + sender_account + .cast(SenderAccountMessage::ReconcileAllocations) + .unwrap(); + + // Should receive the ReconcileAllocations message first (from TestableActor) + let message = msg_receiver.recv().await.expect("Channel failed"); + assert!( + matches!(message, SenderAccountMessage::ReconcileAllocations), + "Expected ReconcileAllocations message, got {message:?}" + ); + + // Then should receive UpdateAllocationIds with current allocations + let message = msg_receiver.recv().await.expect("Channel failed"); + match message { + SenderAccountMessage::UpdateAllocationIds(allocations) => { + assert_eq!( + allocations, allocation_set, + "UpdateAllocationIds should contain current allocations from watcher" + ); + } + _ => panic!("Expected UpdateAllocationIds message, got {message:?}"), + } + + // Test that updating the watcher changes what ReconcileAllocations sends + let new_allocation_set = HashSet::from_iter([AllocationId::Legacy( + AllocationIdCore::from(ALLOCATION_ID_0), + )]); + indexer_allocations_tx + .send(new_allocation_set.clone()) + .unwrap(); + + // Send another ReconcileAllocations + sender_account + .cast(SenderAccountMessage::ReconcileAllocations) + .unwrap(); + + // Skip ReconcileAllocations echo + let _ = msg_receiver.recv().await.expect("Channel failed"); + + // Should receive UpdateAllocationIds with updated allocations + let message = msg_receiver.recv().await.expect("Channel failed"); + match message { + SenderAccountMessage::UpdateAllocationIds(allocations) => { + assert_eq!( + allocations, new_allocation_set, + "UpdateAllocationIds should reflect updated watcher state" + ); + } + _ => panic!("Expected UpdateAllocationIds message, got {message:?}"), + } + + sender_account.stop_and_wait(None, None).await.unwrap(); + } + + #[tokio::test] + async fn test_periodic_reconciliation_task_lifecycle() { + // Test that the reconciliation task is spawned on actor start + // and is aborted when post_stop is called + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + // Use a short reconciliation interval for testing + let reconciliation_interval = Duration::from_millis(100); + + let (sender_account, mut msg_receiver, _, _, _) = create_sender_account() + .pgpool(pgpool) + .allocation_reconciliation_interval(reconciliation_interval) + .call() + .await; + + // Pause time after actor creation (can't pause before because DB setup needs real time) + tokio::time::pause(); + + // The first tick is skipped, so we need to advance past the first interval + tokio::time::advance(reconciliation_interval).await; + tokio::task::yield_now().await; + + // Advance time to trigger the periodic reconciliation + tokio::time::advance(reconciliation_interval).await; + tokio::task::yield_now().await; + + // Should receive ReconcileAllocations message from the periodic task + let message = tokio::time::timeout(Duration::from_millis(50), msg_receiver.recv()) + .await + .expect("Should receive message within timeout") + .expect("Channel should not be closed"); + + assert!( + matches!(message, SenderAccountMessage::ReconcileAllocations), + "Expected ReconcileAllocations from periodic task, got {message:?}" + ); + + // Drain any UpdateAllocationIds message that follows ReconcileAllocations + flush_messages(&mut msg_receiver).await; + + // Resume time for shutdown operations + tokio::time::resume(); + + // Stop the actor (this should abort the reconciliation task) + sender_account.stop_and_wait(None, None).await.unwrap(); + + // Pause again to test that no more messages come after stop + tokio::time::pause(); + + // Advance time again - no more messages should be received since task is aborted + tokio::time::advance(reconciliation_interval * 2).await; + tokio::task::yield_now().await; + + // Count any remaining ReconcileAllocations messages (should be none) + let mut reconcile_count = 0; + while let Ok(Some(msg)) = + tokio::time::timeout(Duration::from_millis(10), msg_receiver.recv()).await + { + if matches!(msg, SenderAccountMessage::ReconcileAllocations) { + reconcile_count += 1; + } + } + + assert_eq!( + reconcile_count, 0, + "Should not receive ReconcileAllocations messages after actor stop" + ); + } + + /// Verifies reconciliation correctly handles the empty allocation set case. + /// This is the most critical scenario: all allocations close during a connectivity + /// outage, and reconciliation must detect this to trigger RAV finalization. + #[tokio::test] + async fn test_reconcile_allocations_handles_empty_set() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + // Start with one allocation + let initial_allocation_set = HashSet::from_iter([AllocationId::Legacy( + AllocationIdCore::from(ALLOCATION_ID_0), + )]); + + let (sender_account, mut msg_receiver, _, _, indexer_allocations_tx) = + create_sender_account() + .pgpool(pgpool) + .initial_allocation(initial_allocation_set) + .call() + .await; + + // Simulate all allocations closing during connectivity outage: + // update watcher to empty set + let empty_allocation_set: HashSet = HashSet::new(); + indexer_allocations_tx + .send(empty_allocation_set.clone()) + .unwrap(); + + // Trigger reconciliation + sender_account + .cast(SenderAccountMessage::ReconcileAllocations) + .unwrap(); + + // Skip ReconcileAllocations echo + let message = msg_receiver.recv().await.expect("Channel failed"); + assert!( + matches!(message, SenderAccountMessage::ReconcileAllocations), + "Expected ReconcileAllocations message, got {message:?}" + ); + + // Should receive UpdateAllocationIds with empty set + let message = msg_receiver.recv().await.expect("Channel failed"); + match message { + SenderAccountMessage::UpdateAllocationIds(allocations) => { + assert!( + allocations.is_empty(), + "UpdateAllocationIds should contain empty set when all allocations closed" + ); + } + _ => panic!("Expected UpdateAllocationIds message, got {message:?}"), + } + + sender_account.stop_and_wait(None, None).await.unwrap(); + } + + /// Verifies that the reconciliation metric is incremented on each run. + /// Observability is critical for production monitoring of this safety mechanism. + #[tokio::test] + async fn test_reconcile_allocations_increments_metric() { + let test_db = test_assets::setup_shared_test_db().await; + let pgpool = test_db.pool; + + let (sender_account, mut msg_receiver, _, _, _) = + create_sender_account().pgpool(pgpool).call().await; + + // Get metric value before reconciliation + // Note: Prometheus metrics are shared across tests, so we test relative increments + let sender_label = SENDER.1.to_string(); + let before = ALLOCATION_RECONCILIATION_RUNS + .with_label_values(&[&sender_label]) + .get(); + + // Trigger reconciliation + sender_account + .cast(SenderAccountMessage::ReconcileAllocations) + .unwrap(); + + // Wait for message processing + flush_messages(&mut msg_receiver).await; + + // Verify metric was incremented (check relative increase, not absolute value) + let after_first = ALLOCATION_RECONCILIATION_RUNS + .with_label_values(&[&sender_label]) + .get(); + assert!( + after_first > before, + "Reconciliation metric should increment after first run (before={before}, after={after_first})" + ); + + // Trigger another reconciliation to verify it increments again + sender_account + .cast(SenderAccountMessage::ReconcileAllocations) + .unwrap(); + + flush_messages(&mut msg_receiver).await; + + let after_second = ALLOCATION_RECONCILIATION_RUNS + .with_label_values(&[&sender_label]) + .get(); + assert!( + after_second > after_first, + "Reconciliation metric should increment on each run (after_first={after_first}, after_second={after_second})" + ); + + sender_account.stop_and_wait(None, None).await.unwrap(); + } } diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index f6ccca4a9..e11ddf615 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -113,11 +113,13 @@ pub async fn create_sender_account( #[builder(default = RECEIPT_LIMIT)] rav_request_receipt_limit: u64, aggregator_endpoint: Option, #[builder(default = false)] trusted_sender: bool, + #[builder(default = Duration::from_secs(300))] allocation_reconciliation_interval: Duration, ) -> ( ActorRef, mpsc::Receiver, String, Sender, + Sender>, ) { let trusted_senders = if trusted_sender { HashSet::from([SENDER.1]) @@ -135,7 +137,7 @@ pub async fn create_sender_account( tap_sender_timeout: TAP_SENDER_TIMEOUT, trusted_senders, tap_mode: indexer_config::TapMode::Legacy, - allocation_reconciliation_interval: Duration::from_secs(300), + allocation_reconciliation_interval, })); let network_subgraph = Box::leak(Box::new( @@ -171,12 +173,14 @@ pub async fn create_sender_account( None => Url::parse(&get_grpc_url().await).unwrap(), }; + let (indexer_allocations_tx, indexer_allocations_rx) = watch::channel(initial_allocation); + let args = SenderAccountArgs { config, pgpool, sender_id: SENDER.1, escrow_accounts: escrow_accounts_rx, - indexer_allocations: watch::channel(initial_allocation).1, + indexer_allocations: indexer_allocations_rx, escrow_subgraph, network_subgraph, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), @@ -198,7 +202,13 @@ pub async fn create_sender_account( // flush all messages flush_messages(&mut receiver).await; - (sender, receiver, prefix, escrow_accounts_tx) + ( + sender, + receiver, + prefix, + escrow_accounts_tx, + indexer_allocations_tx, + ) } #[bon::builder] diff --git a/crates/tap-agent/tests/sender_account_test.rs b/crates/tap-agent/tests/sender_account_test.rs index 648922568..4ff306f3a 100644 --- a/crates/tap-agent/tests/sender_account_test.rs +++ b/crates/tap-agent/tests/sender_account_test.rs @@ -40,7 +40,7 @@ async fn sender_account_layer_test() { .await .unwrap(); - let (sender_account, mut msg_receiver, _, _) = create_sender_account() + let (sender_account, mut msg_receiver, _, _, _) = create_sender_account() .pgpool(pgpool.clone()) .max_amount_willing_to_lose_grt(TRIGGER_VALUE + 1000) .escrow_subgraph_endpoint(&mock_escrow_subgraph_server.uri()) From 7983f8fb796f839d75d2ea1b8005f7e355ea01b2 Mon Sep 17 00:00:00 2001 From: Joseph Livesey Date: Mon, 1 Dec 2025 21:48:16 -0500 Subject: [PATCH 3/3] chore(tap-agent): add observability improvements for allocation reconciliation - Add ALLOCATION_RECONCILIATION_RUNS counter metric - Elevate reconciliation logs from debug to info - Enhance config documentation explaining the connectivity failure scenario --- crates/config/src/config.rs | 8 +++++++- crates/tap-agent/src/agent/sender_account.rs | 20 +++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index eaced1496..f42f29471 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -457,7 +457,13 @@ pub struct TapConfig { pub trusted_senders: HashSet
, /// Interval in seconds for periodic allocation reconciliation. - /// This ensures stale allocations are detected after subgraph connectivity issues. + /// + /// Allocation state is normally updated via watcher events from the network subgraph. + /// However, if connectivity to the subgraph is lost, allocation closure events may be + /// missed. This periodic reconciliation forces a re-check of all allocations against + /// the current subgraph state, ensuring stale allocations are detected and processed + /// even after connectivity failures. + /// /// Default: 300 (5 minutes) #[serde(default = "default_allocation_reconciliation_interval_secs")] #[serde_as(as = "DurationSecondsWithFrac")] diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 87a36db32..6ae8e545a 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -17,7 +17,10 @@ use indexer_query::{ unfinalized_transactions, UnfinalizedTransactions, }; use indexer_watcher::watch_pipe; -use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; +use prometheus::{ + register_gauge_vec, register_int_counter_vec, register_int_gauge_vec, GaugeVec, IntCounterVec, + IntGaugeVec, +}; use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent}; use reqwest::Url; use sqlx::PgPool; @@ -118,6 +121,14 @@ static RAV_REQUEST_TRIGGER_VALUE: LazyLock = LazyLock::new(|| { ) .unwrap() }); +static ALLOCATION_RECONCILIATION_RUNS: LazyLock = LazyLock::new(|| { + register_int_counter_vec!( + "tap_allocation_reconciliation_runs_total", + "Number of allocation reconciliation runs", + &["sender"] + ) + .unwrap() +}); const INITIAL_RAV_REQUEST_CONCURRENT: usize = 1; const TAP_V1: &str = "v1"; @@ -1179,7 +1190,7 @@ impl Actor for SenderAccount { interval.tick().await; loop { interval.tick().await; - tracing::debug!( + tracing::info!( sender = %sender_for_log, "Running periodic allocation reconciliation" ); @@ -1601,8 +1612,11 @@ impl Actor for SenderAccount { // Get current allocations from the watcher and trigger UpdateAllocationIds // This forces a re-check of all allocations even if the watcher data hasn't changed, // ensuring we recover from missed closure events during connectivity issues. + ALLOCATION_RECONCILIATION_RUNS + .with_label_values(&[&state.sender.to_string()]) + .inc(); let current_allocations = state.indexer_allocations.borrow().clone(); - tracing::debug!( + tracing::info!( sender = %state.sender, allocation_count = current_allocations.len(), "Triggering allocation reconciliation"