diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/receipt_store.rs index 3efa63ccf..e244bb325 100644 --- a/crates/service/src/tap/receipt_store.rs +++ b/crates/service/src/tap/receipt_store.rs @@ -112,7 +112,7 @@ impl InnerContext { .execute(&self.pgpool) .await .map_err(|e| { - tracing::error!("Failed to store receipt: {}", e); + tracing::error!("Failed to store V1 receipt: {}", e); anyhow!(e) })?; @@ -180,7 +180,7 @@ impl InnerContext { .execute(&self.pgpool) .await .map_err(|e| { - tracing::error!("Failed to store receipt: {}", e); + tracing::error!("Failed to store V2 receipt: {}", e); anyhow!(e) })?; diff --git a/crates/tap-agent/src/agent.rs b/crates/tap-agent/src/agent.rs index 9fa6143f5..5c9fa3724 100644 --- a/crates/tap-agent/src/agent.rs +++ b/crates/tap-agent/src/agent.rs @@ -174,7 +174,21 @@ pub async fn start_agent() -> (ActorRef, JoinHandl .await .expect("Error creating escrow_accounts channel"); - let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG))); + let config = Box::leak(Box::new({ + let mut config = SenderAccountConfig::from_config(&CONFIG); + // FIXME: This is a temporary measure to disable + // Horizon, even if enabled through our configuration file. + // Force disable Horizon support + config.horizon_enabled = false; + // Add a warning log so operators know their setting was ignore + if CONFIG.horizon.enabled { + tracing::warn!( + "Horizon support is configured as enabled but has been forcibly disabled as it's not fully supported yet. \ + This is a temporary measure until Horizon support is stable." + ); + } + config + })); let args = SenderAccountsManagerArgs { config, diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 1ad28617e..25d761ad5 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -191,13 +191,20 @@ impl Actor for SenderAccountsManager { allocation_id .keys() .cloned() - // TODO map based on the allocation type returned by the subgraph + // TODO: map based on the allocation type returned by the subgraph .map(AllocationId::Legacy) .collect::>() }); // we need two connections because each one will listen to different notify events let pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap(); - let pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap(); + + // Extra safety, we don't want to have a listener if horizon is not enabled + let pglistener_v2 = if config.horizon_enabled { + Some(PgListener::connect_with(&pgpool.clone()).await.unwrap()) + } else { + None + }; + let myself_clone = myself.clone(); let accounts_clone = escrow_accounts_v1.clone(); watch_pipe(accounts_clone, move |escrow_accounts| { @@ -212,19 +219,23 @@ impl Actor for SenderAccountsManager { async {} }); - let myself_clone = myself.clone(); - let _escrow_accounts_v2 = escrow_accounts_v2.clone(); - watch_pipe(_escrow_accounts_v2, move |escrow_accounts| { - let senders = escrow_accounts.get_senders(); - myself_clone - .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2( - senders, - )) - .unwrap_or_else(|e| { - tracing::error!("Error while updating sender_accounts v2: {:?}", e); - }); - async {} - }); + // Extra safety, we don't want to have a + // escrow account listener if horizon is not enabled + if config.horizon_enabled { + let myself_clone = myself.clone(); + let _escrow_accounts_v2 = escrow_accounts_v2.clone(); + watch_pipe(_escrow_accounts_v2, move |escrow_accounts| { + let senders = escrow_accounts.get_senders(); + myself_clone + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2( + senders, + )) + .unwrap_or_else(|e| { + tracing::error!("Error while updating sender_accounts v2: {:?}", e); + }); + async {} + }); + } let mut state = State { config, @@ -305,17 +316,18 @@ impl Actor for SenderAccountsManager { // after starting all senders state.new_receipts_watcher_handle_v2 = None; - if state.config.horizon_enabled { + // Extra safety, we don't want to have a listener if horizon is not enabled + if let Some(listener_v2) = pglistener_v2 { state.new_receipts_watcher_handle_v2 = Some(tokio::spawn( new_receipts_watcher() .actor_cell(myself.get_cell()) - .pglistener(pglistener_v2) + .pglistener(listener_v2) .escrow_accounts_rx(escrow_accounts_v2) .sender_type(SenderType::Horizon) .maybe_prefix(prefix) .call(), )); - } + }; tracing::info!("SenderAccountManager created!"); Ok(state) @@ -446,18 +458,41 @@ impl Actor for SenderAccountsManager { } }; - let mut sender_allocation = select! { - sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation, - _ = tokio::time::sleep(state.config.tap_sender_timeout) => { - tracing::error!("Timeout while getting pending sender allocation ids"); - return Ok(()); + // Get the sender's allocations taking into account + // the sender type + let allocations = match sender_type { + SenderType::Legacy => { + let mut sender_allocation = select! { + sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation, + _ = tokio::time::sleep(state.config.tap_sender_timeout) => { + tracing::error!(version = "V1", "Timeout while getting pending sender allocation ids"); + return Ok(()); + } + }; + sender_allocation + .remove(&sender_id) + .unwrap_or(HashSet::new()) + } + SenderType::Horizon => { + if !state.config.horizon_enabled { + tracing::info!(%sender_id, "Horizon sender failed but horizon is disabled, not restarting"); + + return Ok(()); + } + + let mut sender_allocation = select! { + sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation, + _ = tokio::time::sleep(state.config.tap_sender_timeout) => { + tracing::error!(version = "V2", "Timeout while getting pending sender allocation ids"); + return Ok(()); + } + }; + sender_allocation + .remove(&sender_id) + .unwrap_or(HashSet::new()) } }; - let allocations = sender_allocation - .remove(&sender_id) - .unwrap_or(HashSet::new()); - state .create_or_deny_sender(myself.get_cell(), sender_id, allocations, sender_type) .await; @@ -575,12 +610,12 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch pending receipts from the database"); + .expect("should be able to fetch pending receipts V1 from the database"); for row in receipts_signer_allocations_in_db { let allocation_ids = row .allocation_ids - .expect("all receipts should have an allocation_id") + .expect("all receipts V1 should have an allocation_id") .iter() .map(|allocation_id| { AllocationId::Legacy( @@ -615,28 +650,38 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch unfinalized RAVs from the database"); + .expect("should be able to fetch unfinalized RAVs V1 from the database"); for row in nonfinal_ravs_sender_allocations_in_db { - let allocation_ids = row - .allocation_ids - .expect("all RAVs should have an allocation_id") - .iter() - .map(|allocation_id| { - AllocationId::Legacy( - Address::from_str(allocation_id) - .expect("allocation_id should be a valid address"), - ) - }) - .collect::>(); - let sender_id = Address::from_str(&row.sender_address) - .expect("sender_address should be a valid address"); + // Check if allocation_ids is Some before processing, + // as ARRAY_AGG with FILTER returns NULL + // instead of an empty array + if let Some(allocation_id_strings) = row.allocation_ids { + let allocation_ids = allocation_id_strings + .iter() + .map(|allocation_id| { + AllocationId::Legacy( + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address"), + ) + }) + .collect::>(); - // Accumulate allocations for the sender - unfinalized_sender_allocations_map - .entry(sender_id) - .or_default() - .extend(allocation_ids); + if !allocation_ids.is_empty() { + let sender_id = Address::from_str(&row.sender_address) + .expect("sender_address should be a valid address"); + + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + } else { + // Log the case when allocation_ids is NULL + tracing::warn!( + "Found NULL allocation_ids. This may indicate all RAVs are finalized." + ); + } } unfinalized_sender_allocations_map } @@ -668,12 +713,12 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch pending receipts from the database"); + .expect("should be able to fetch pending V2 receipts from the database"); for row in receipts_signer_allocations_in_db { let allocation_ids = row .allocation_ids - .expect("all receipts should have an allocation_id") + .expect("all receipts V2 should have an allocation_id") .iter() .map(|allocation_id| { AllocationId::Legacy( @@ -708,28 +753,38 @@ impl State { ) .fetch_all(&self.pgpool) .await - .expect("should be able to fetch unfinalized RAVs from the database"); + .expect("should be able to fetch unfinalized V2 RAVs from the database"); for row in nonfinal_ravs_sender_allocations_in_db { - let allocation_ids = row - .allocation_ids - .expect("all RAVs should have an allocation_id") - .iter() - .map(|allocation_id| { - AllocationId::Legacy( - Address::from_str(allocation_id) - .expect("allocation_id should be a valid address"), - ) - }) - .collect::>(); - let sender_id = - Address::from_str(&row.payer).expect("sender_address should be a valid address"); + // Check if allocation_ids is Some before processing, + // as ARRAY_AGG with FILTER returns NULL instead of an + // empty array + if let Some(allocation_id_strings) = row.allocation_ids { + let allocation_ids = allocation_id_strings + .iter() + .map(|allocation_id| { + AllocationId::Legacy( + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address"), + ) + }) + .collect::>(); - // Accumulate allocations for the sender - unfinalized_sender_allocations_map - .entry(sender_id) - .or_default() - .extend(allocation_ids); + if !allocation_ids.is_empty() { + let sender_id = Address::from_str(&row.payer) + .expect("sender_address should be a valid address"); + + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + } else { + // Log the case when allocation_ids is NULL + tracing::warn!( + "Found NULL allocation_ids. This may indicate all RAVs are finalized." + ); + } } unfinalized_sender_allocations_map } @@ -1081,7 +1136,7 @@ mod tests { } #[sqlx::test(migrations = "../../migrations")] - async fn test_update_sender_allocation(pgpool: PgPool) { + async fn test_update_sender_account(pgpool: PgPool) { let (prefix, mut notify, (actor, join_handle)) = create_sender_accounts_manager().pgpool(pgpool).call().await;