From 24581d5b743b154e0a5cf02a03e1f31b0814ad02 Mon Sep 17 00:00:00 2001 From: neithanmo Date: Mon, 14 Apr 2025 12:24:47 -0500 Subject: [PATCH 1/7] feat(tap-agent): Add horizon config flag safety checks Add conditional initialization of pglistener_v2 and escrow account listener based on horizon_enabled flag, this as an extray safety measure. Improve error messages specificity for v2 components. --- .../src/agent/sender_accounts_manager.rs | 107 ++++++++++++------ 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 1ad28617e..aa5fd0cea 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -191,13 +191,20 @@ impl Actor for SenderAccountsManager { allocation_id .keys() .cloned() - // TODO map based on the allocation type returned by the subgraph + // TODO: map based on the allocation type returned by the subgraph .map(AllocationId::Legacy) .collect::>() }); // we need two connections because each one will listen to different notify events let pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap(); - let pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap(); + + // Extra safety, we don't want to have a listener if horizon is not enabled + let pglistener_v2 = if config.horizon_enabled { + Some(PgListener::connect_with(&pgpool.clone()).await.unwrap()) + } else { + None + }; + let myself_clone = myself.clone(); let accounts_clone = escrow_accounts_v1.clone(); watch_pipe(accounts_clone, move |escrow_accounts| { @@ -212,19 +219,23 @@ impl Actor for SenderAccountsManager { async {} }); - let myself_clone = myself.clone(); - let _escrow_accounts_v2 = escrow_accounts_v2.clone(); - watch_pipe(_escrow_accounts_v2, move |escrow_accounts| { - let senders = escrow_accounts.get_senders(); - myself_clone - .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2( - senders, - )) - .unwrap_or_else(|e| { - tracing::error!("Error while updating sender_accounts v2: {:?}", e); - }); - async {} - }); + // Extra safety, we don't want to have a + // escrow account listener if horizon is not enabled + if config.horizon_enabled { + let myself_clone = myself.clone(); + let _escrow_accounts_v2 = escrow_accounts_v2.clone(); + watch_pipe(_escrow_accounts_v2, move |escrow_accounts| { + let senders = escrow_accounts.get_senders(); + myself_clone + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2( + senders, + )) + .unwrap_or_else(|e| { + tracing::error!("Error while updating sender_accounts v2: {:?}", e); + }); + async {} + }); + } let mut state = State { config, @@ -305,17 +316,18 @@ impl Actor for SenderAccountsManager { // after starting all senders state.new_receipts_watcher_handle_v2 = None; - if state.config.horizon_enabled { + // Extra safety, we don't want to have a listener if horizon is not enabled + if let Some(listener_v2) = pglistener_v2 { state.new_receipts_watcher_handle_v2 = Some(tokio::spawn( new_receipts_watcher() .actor_cell(myself.get_cell()) - .pglistener(pglistener_v2) + .pglistener(listener_v2) .escrow_accounts_rx(escrow_accounts_v2) .sender_type(SenderType::Horizon) .maybe_prefix(prefix) .call(), )); - } + }; tracing::info!("SenderAccountManager created!"); Ok(state) @@ -446,18 +458,41 @@ impl Actor for SenderAccountsManager { } }; - let mut sender_allocation = select! { - sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation, - _ = tokio::time::sleep(state.config.tap_sender_timeout) => { - tracing::error!("Timeout while getting pending sender allocation ids"); - return Ok(()); + // Get the sender's allocations taking into account + // the sender type + let allocations = match sender_type { + SenderType::Legacy => { + let mut sender_allocation = select! { + sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation, + _ = tokio::time::sleep(state.config.tap_sender_timeout) => { + tracing::error!("Timeout while getting pending sender allocation ids"); + return Ok(()); + } + }; + sender_allocation + .remove(&sender_id) + .unwrap_or(HashSet::new()) + } + SenderType::Horizon => { + if !state.config.horizon_enabled { + tracing::info!(%sender_id, "Horizon sender failed but horizon is disabled, not restarting"); + + return Ok(()); + } + + let mut sender_allocation = select! { + sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation, + _ = tokio::time::sleep(state.config.tap_sender_timeout) => { + tracing::error!("Timeout while getting pending V2 sender allocation ids"); + return Ok(()); + } + }; + sender_allocation + .remove(&sender_id) + .unwrap_or(HashSet::new()) } }; - let allocations = sender_allocation - .remove(&sender_id) - .unwrap_or(HashSet::new()); - state .create_or_deny_sender(myself.get_cell(), sender_id, allocations, sender_type) .await; @@ -575,12 +610,12 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch pending receipts from the database"); + .expect("should be able to fetch pending receipts V1 from the database"); for row in receipts_signer_allocations_in_db { let allocation_ids = row .allocation_ids - .expect("all receipts should have an allocation_id") + .expect("all receipts V1 should have an allocation_id") .iter() .map(|allocation_id| { AllocationId::Legacy( @@ -615,12 +650,12 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch unfinalized RAVs from the database"); + .expect("should be able to fetch unfinalized RAVs V1 from the database"); for row in nonfinal_ravs_sender_allocations_in_db { let allocation_ids = row .allocation_ids - .expect("all RAVs should have an allocation_id") + .expect("all RAVs V1 should have an allocation_id") .iter() .map(|allocation_id| { AllocationId::Legacy( @@ -668,12 +703,12 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch pending receipts from the database"); + .expect("should be able to fetch pending V2 receipts from the database"); for row in receipts_signer_allocations_in_db { let allocation_ids = row .allocation_ids - .expect("all receipts should have an allocation_id") + .expect("all receipts V2 should have an allocation_id") .iter() .map(|allocation_id| { AllocationId::Legacy( @@ -708,12 +743,12 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch unfinalized RAVs from the database"); + .expect("should be able to fetch unfinalized V2 RAVs from the database"); for row in nonfinal_ravs_sender_allocations_in_db { let allocation_ids = row .allocation_ids - .expect("all RAVs should have an allocation_id") + .expect("all RAVs V2 should have an allocation_id") .iter() .map(|allocation_id| { AllocationId::Legacy( @@ -1081,7 +1116,7 @@ mod tests { } #[sqlx::test(migrations = "../../migrations")] - async fn test_update_sender_allocation(pgpool: PgPool) { + async fn test_update_sender_account(pgpool: PgPool) { let (prefix, mut notify, (actor, join_handle)) = create_sender_accounts_manager().pgpool(pgpool).call().await; From 28aadbe9da47a4446141289821eef466c75506ba Mon Sep 17 00:00:00 2001 From: neithanmo Date: Mon, 14 Apr 2025 12:26:33 -0500 Subject: [PATCH 2/7] feat(receipt): Improve logs to provide more context about receipt types --- crates/service/src/tap/receipt_store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/receipt_store.rs index 3efa63ccf..e244bb325 100644 --- a/crates/service/src/tap/receipt_store.rs +++ b/crates/service/src/tap/receipt_store.rs @@ -112,7 +112,7 @@ impl InnerContext { .execute(&self.pgpool) .await .map_err(|e| { - tracing::error!("Failed to store receipt: {}", e); + tracing::error!("Failed to store V1 receipt: {}", e); anyhow!(e) })?; @@ -180,7 +180,7 @@ impl InnerContext { .execute(&self.pgpool) .await .map_err(|e| { - tracing::error!("Failed to store receipt: {}", e); + tracing::error!("Failed to store V2 receipt: {}", e); anyhow!(e) })?; From 523783c072421ea86c993f32bd80d420bcba2caf Mon Sep 17 00:00:00 2001 From: neithanmo Date: Mon, 14 Apr 2025 12:28:02 -0500 Subject: [PATCH 3/7] fix(horizon): Extra safety measure to ensure horizon is disabled, even when enable by configuration. --- crates/tap-agent/src/agent.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/tap-agent/src/agent.rs b/crates/tap-agent/src/agent.rs index 9fa6143f5..b1939ae96 100644 --- a/crates/tap-agent/src/agent.rs +++ b/crates/tap-agent/src/agent.rs @@ -174,7 +174,21 @@ pub async fn start_agent() -> (ActorRef, JoinHandl .await .expect("Error creating escrow_accounts channel"); - let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG))); + let config = Box::leak(Box::new({ + let mut config = SenderAccountConfig::from_config(&CONFIG); + // FIXME: This is a temporary measure to disable + // Horizon, even if enable through our configuration file. + // Force disable Horizon support + config.horizon_enabled = false; + // Add a warning log so operators know their setting was ignore + if CONFIG.horizon.enabled { + tracing::warn!( + "Horizon support is configured as enabled but has been forcibly disabled as it's not fully supported yet. \ + This is a temporary measure until Horizon support is stable." + ); + } + config + })); let args = SenderAccountsManagerArgs { config, From 30a13fc656a40600fd33a126d1455a603c43fc2f Mon Sep 17 00:00:00 2001 From: neithanmo Date: Wed, 16 Apr 2025 10:17:29 -0500 Subject: [PATCH 4/7] fix(tap-agent): Prevent panic on NULL allocation_ids from RAV query The `ARRAY_AGG(...) FILTER (WHERE NOT last)` query for RAVs can return NULL. Changed `.expect()` to `if let Some` to handle this valid case where a sender has only finalized RAVs, preventing a panic. --- .../src/agent/sender_accounts_manager.rs | 84 ++++++++++--------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index aa5fd0cea..fe4528cec 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -465,7 +465,7 @@ impl Actor for SenderAccountsManager { let mut sender_allocation = select! { sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation, _ = tokio::time::sleep(state.config.tap_sender_timeout) => { - tracing::error!("Timeout while getting pending sender allocation ids"); + tracing::error!(version = "V1", "Timeout while getting pending sender allocation ids"); return Ok(()); } }; @@ -483,7 +483,7 @@ impl Actor for SenderAccountsManager { let mut sender_allocation = select! { sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation, _ = tokio::time::sleep(state.config.tap_sender_timeout) => { - tracing::error!("Timeout while getting pending V2 sender allocation ids"); + tracing::error!(version = "V2", "Timeout while getting pending sender allocation ids"); return Ok(()); } }; @@ -653,25 +653,29 @@ impl State { .expect("should be able to fetch unfinalized RAVs V1 from the database"); for row in nonfinal_ravs_sender_allocations_in_db { - let allocation_ids = row - .allocation_ids - .expect("all RAVs V1 should have an allocation_id") - .iter() - .map(|allocation_id| { - AllocationId::Legacy( - Address::from_str(allocation_id) - .expect("allocation_id should be a valid address"), - ) - }) - .collect::>(); - let sender_id = Address::from_str(&row.sender_address) - .expect("sender_address should be a valid address"); + // Check if allocation_ids is Some before processing, + // as ARRAY_AGG with FILTER can return NULL + if let Some(allocation_id_strings) = row.allocation_ids { + let allocation_ids = allocation_id_strings + .iter() + .map(|allocation_id| { + AllocationId::Legacy( + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address"), + ) + }) + .collect::>(); - // Accumulate allocations for the sender - unfinalized_sender_allocations_map - .entry(sender_id) - .or_default() - .extend(allocation_ids); + if !allocation_ids.is_empty() { + let sender_id = Address::from_str(&row.sender_address) + .expect("sender_address should be a valid address"); + + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + } } unfinalized_sender_allocations_map } @@ -746,25 +750,29 @@ impl State { .expect("should be able to fetch unfinalized V2 RAVs from the database"); for row in nonfinal_ravs_sender_allocations_in_db { - let allocation_ids = row - .allocation_ids - .expect("all RAVs V2 should have an allocation_id") - .iter() - .map(|allocation_id| { - AllocationId::Legacy( - Address::from_str(allocation_id) - .expect("allocation_id should be a valid address"), - ) - }) - .collect::>(); - let sender_id = - Address::from_str(&row.payer).expect("sender_address should be a valid address"); + // Check if allocation_ids is Some before processing, + // as ARRAY_AGG with FILTER can return NULL + if let Some(allocation_id_strings) = row.allocation_ids { + let allocation_ids = allocation_id_strings // Use the unwrapped Vec + .iter() + .map(|allocation_id| { + AllocationId::Legacy( + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address"), + ) + }) + .collect::>(); - // Accumulate allocations for the sender - unfinalized_sender_allocations_map - .entry(sender_id) - .or_default() - .extend(allocation_ids); + if !allocation_ids.is_empty() { + let sender_id = Address::from_str(&row.payer) + .expect("sender_address should be a valid address"); + + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + } } unfinalized_sender_allocations_map } From 3326d95a039bae407716b8e9277c08ff7b17b3a5 Mon Sep 17 00:00:00 2001 From: neithanmo Date: Wed, 16 Apr 2025 10:18:41 -0500 Subject: [PATCH 5/7] fix(agent): Fix typo --- crates/tap-agent/src/agent.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/tap-agent/src/agent.rs b/crates/tap-agent/src/agent.rs index b1939ae96..5c9fa3724 100644 --- a/crates/tap-agent/src/agent.rs +++ b/crates/tap-agent/src/agent.rs @@ -177,7 +177,7 @@ pub async fn start_agent() -> (ActorRef, JoinHandl let config = Box::leak(Box::new({ let mut config = SenderAccountConfig::from_config(&CONFIG); // FIXME: This is a temporary measure to disable - // Horizon, even if enable through our configuration file. + // Horizon, even if enabled through our configuration file. // Force disable Horizon support config.horizon_enabled = false; // Add a warning log so operators know their setting was ignore From 633ad43e74cfbdf65e9d8d5f846b47667c718acc Mon Sep 17 00:00:00 2001 From: neithanmo Date: Fri, 18 Apr 2025 09:22:34 -0500 Subject: [PATCH 6/7] chore(agent): Add warning to tell when null allocations are found for senders --- .../src/agent/sender_accounts_manager.rs | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index fe4528cec..d1cf92913 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -654,7 +654,8 @@ impl State { for row in nonfinal_ravs_sender_allocations_in_db { // Check if allocation_ids is Some before processing, - // as ARRAY_AGG with FILTER can return NULL + // as ARRAY_AGG with FILTER returns NULL + // instead of an empty array if let Some(allocation_id_strings) = row.allocation_ids { let allocation_ids = allocation_id_strings .iter() @@ -675,6 +676,12 @@ impl State { .or_default() .extend(allocation_ids); } + } else { + // Log the case when allocation_ids is NULL + tracing::warn!( + sender_address = %row.sender_address, + "Found NULL allocation_ids. This may indicate all RAVs are finalized." + ); } } unfinalized_sender_allocations_map @@ -751,9 +758,10 @@ impl State { for row in nonfinal_ravs_sender_allocations_in_db { // Check if allocation_ids is Some before processing, - // as ARRAY_AGG with FILTER can return NULL + // as ARRAY_AGG with FILTER returns NULL instead of an + // empty array if let Some(allocation_id_strings) = row.allocation_ids { - let allocation_ids = allocation_id_strings // Use the unwrapped Vec + let allocation_ids = allocation_id_strings .iter() .map(|allocation_id| { AllocationId::Legacy( @@ -772,6 +780,12 @@ impl State { .or_default() .extend(allocation_ids); } + } else { + // Log the case when allocation_ids is NULL + tracing::warn!( + sender_address = %row.sender_address, + "Found NULL allocation_ids. This may indicate all RAVs are finalized." + ); } } unfinalized_sender_allocations_map From 8e13cb4406729e9194d47f5a409e3996c1a35da8 Mon Sep 17 00:00:00 2001 From: neithanmo Date: Fri, 18 Apr 2025 09:39:52 -0500 Subject: [PATCH 7/7] fix(agent): Remove unknown field usage in logs --- crates/tap-agent/src/agent/sender_accounts_manager.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index d1cf92913..25d761ad5 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -679,7 +679,6 @@ impl State { } else { // Log the case when allocation_ids is NULL tracing::warn!( - sender_address = %row.sender_address, "Found NULL allocation_ids. This may indicate all RAVs are finalized." ); } @@ -783,7 +782,6 @@ impl State { } else { // Log the case when allocation_ids is NULL tracing::warn!( - sender_address = %row.sender_address, "Found NULL allocation_ids. This may indicate all RAVs are finalized." ); }