Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/service/src/tap/receipt_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl InnerContext {
.execute(&self.pgpool)
.await
.map_err(|e| {
tracing::error!("Failed to store receipt: {}", e);
tracing::error!("Failed to store V1 receipt: {}", e);
anyhow!(e)
})?;

Expand Down Expand Up @@ -180,7 +180,7 @@ impl InnerContext {
.execute(&self.pgpool)
.await
.map_err(|e| {
tracing::error!("Failed to store receipt: {}", e);
tracing::error!("Failed to store V2 receipt: {}", e);
anyhow!(e)
})?;

Expand Down
16 changes: 15 additions & 1 deletion crates/tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,21 @@ pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandl
.await
.expect("Error creating escrow_accounts channel");

let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG)));
let config = Box::leak(Box::new({
let mut config = SenderAccountConfig::from_config(&CONFIG);
// FIXME: This is a temporary measure to disable
// Horizon, even if enabled through our configuration file.
// Force disable Horizon support
config.horizon_enabled = false;
// Add a warning log so operators know their setting was ignore
if CONFIG.horizon.enabled {
tracing::warn!(
"Horizon support is configured as enabled but has been forcibly disabled as it's not fully supported yet. \
This is a temporary measure until Horizon support is stable."
);
}
config
}));

let args = SenderAccountsManagerArgs {
config,
Expand Down
195 changes: 125 additions & 70 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,20 @@ impl Actor for SenderAccountsManager {
allocation_id
.keys()
.cloned()
// TODO map based on the allocation type returned by the subgraph
// TODO: map based on the allocation type returned by the subgraph
.map(AllocationId::Legacy)
.collect::<HashSet<_>>()
});
// we need two connections because each one will listen to different notify events
let pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
let pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap();

// Extra safety, we don't want to have a listener if horizon is not enabled
let pglistener_v2 = if config.horizon_enabled {
Some(PgListener::connect_with(&pgpool.clone()).await.unwrap())
} else {
None
};

let myself_clone = myself.clone();
let accounts_clone = escrow_accounts_v1.clone();
watch_pipe(accounts_clone, move |escrow_accounts| {
Expand All @@ -212,19 +219,23 @@ impl Actor for SenderAccountsManager {
async {}
});

let myself_clone = myself.clone();
let _escrow_accounts_v2 = escrow_accounts_v2.clone();
watch_pipe(_escrow_accounts_v2, move |escrow_accounts| {
let senders = escrow_accounts.get_senders();
myself_clone
.cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2(
senders,
))
.unwrap_or_else(|e| {
tracing::error!("Error while updating sender_accounts v2: {:?}", e);
});
async {}
});
// Extra safety, we don't want to have a
// escrow account listener if horizon is not enabled
if config.horizon_enabled {
let myself_clone = myself.clone();
let _escrow_accounts_v2 = escrow_accounts_v2.clone();
watch_pipe(_escrow_accounts_v2, move |escrow_accounts| {
let senders = escrow_accounts.get_senders();
myself_clone
.cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2(
senders,
))
.unwrap_or_else(|e| {
tracing::error!("Error while updating sender_accounts v2: {:?}", e);
});
async {}
});
}

let mut state = State {
config,
Expand Down Expand Up @@ -305,17 +316,18 @@ impl Actor for SenderAccountsManager {
// after starting all senders
state.new_receipts_watcher_handle_v2 = None;

if state.config.horizon_enabled {
// Extra safety, we don't want to have a listener if horizon is not enabled
if let Some(listener_v2) = pglistener_v2 {
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(
new_receipts_watcher()
.actor_cell(myself.get_cell())
.pglistener(pglistener_v2)
.pglistener(listener_v2)
.escrow_accounts_rx(escrow_accounts_v2)
.sender_type(SenderType::Horizon)
.maybe_prefix(prefix)
.call(),
));
}
};

tracing::info!("SenderAccountManager created!");
Ok(state)
Expand Down Expand Up @@ -446,18 +458,41 @@ impl Actor for SenderAccountsManager {
}
};

let mut sender_allocation = select! {
sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation,
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
tracing::error!("Timeout while getting pending sender allocation ids");
return Ok(());
// Get the sender's allocations taking into account
// the sender type
let allocations = match sender_type {
SenderType::Legacy => {
let mut sender_allocation = select! {
sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation,
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
tracing::error!(version = "V1", "Timeout while getting pending sender allocation ids");
return Ok(());
}
};
sender_allocation
.remove(&sender_id)
.unwrap_or(HashSet::new())
}
SenderType::Horizon => {
if !state.config.horizon_enabled {
tracing::info!(%sender_id, "Horizon sender failed but horizon is disabled, not restarting");

return Ok(());
}

let mut sender_allocation = select! {
sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation,
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
tracing::error!(version = "V2", "Timeout while getting pending sender allocation ids");
return Ok(());
}
};
sender_allocation
.remove(&sender_id)
.unwrap_or(HashSet::new())
}
};

let allocations = sender_allocation
.remove(&sender_id)
.unwrap_or(HashSet::new());

state
.create_or_deny_sender(myself.get_cell(), sender_id, allocations, sender_type)
.await;
Expand Down Expand Up @@ -575,12 +610,12 @@ impl State {
)
.fetch_all(&self.pgpool)
.await
.expect("should be able to fetch pending receipts from the database");
.expect("should be able to fetch pending receipts V1 from the database");

for row in receipts_signer_allocations_in_db {
let allocation_ids = row
.allocation_ids
.expect("all receipts should have an allocation_id")
.expect("all receipts V1 should have an allocation_id")
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Expand Down Expand Up @@ -615,28 +650,38 @@ impl State {
)
.fetch_all(&self.pgpool)
.await
.expect("should be able to fetch unfinalized RAVs from the database");
.expect("should be able to fetch unfinalized RAVs V1 from the database");

for row in nonfinal_ravs_sender_allocations_in_db {
let allocation_ids = row
.allocation_ids
.expect("all RAVs should have an allocation_id")
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Address::from_str(allocation_id)
.expect("allocation_id should be a valid address"),
)
})
.collect::<HashSet<_>>();
let sender_id = Address::from_str(&row.sender_address)
.expect("sender_address should be a valid address");
// Check if allocation_ids is Some before processing,
// as ARRAY_AGG with FILTER returns NULL
// instead of an empty array
if let Some(allocation_id_strings) = row.allocation_ids {
let allocation_ids = allocation_id_strings
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Address::from_str(allocation_id)
.expect("allocation_id should be a valid address"),
)
})
.collect::<HashSet<_>>();

// Accumulate allocations for the sender
unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
if !allocation_ids.is_empty() {
let sender_id = Address::from_str(&row.sender_address)
.expect("sender_address should be a valid address");

unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
}
} else {
// Log the case when allocation_ids is NULL
tracing::warn!(
"Found NULL allocation_ids. This may indicate all RAVs are finalized."
);
}
}
unfinalized_sender_allocations_map
}
Expand Down Expand Up @@ -668,12 +713,12 @@ impl State {
)
.fetch_all(&self.pgpool)
.await
.expect("should be able to fetch pending receipts from the database");
.expect("should be able to fetch pending V2 receipts from the database");

for row in receipts_signer_allocations_in_db {
let allocation_ids = row
.allocation_ids
.expect("all receipts should have an allocation_id")
.expect("all receipts V2 should have an allocation_id")
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Expand Down Expand Up @@ -708,28 +753,38 @@ impl State {
)
.fetch_all(&self.pgpool)
.await
.expect("should be able to fetch unfinalized RAVs from the database");
.expect("should be able to fetch unfinalized V2 RAVs from the database");

for row in nonfinal_ravs_sender_allocations_in_db {
let allocation_ids = row
.allocation_ids
.expect("all RAVs should have an allocation_id")
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Address::from_str(allocation_id)
.expect("allocation_id should be a valid address"),
)
})
.collect::<HashSet<_>>();
let sender_id =
Address::from_str(&row.payer).expect("sender_address should be a valid address");
// Check if allocation_ids is Some before processing,
// as ARRAY_AGG with FILTER returns NULL instead of an
// empty array
if let Some(allocation_id_strings) = row.allocation_ids {
let allocation_ids = allocation_id_strings
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Address::from_str(allocation_id)
.expect("allocation_id should be a valid address"),
)
})
.collect::<HashSet<_>>();

// Accumulate allocations for the sender
unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
if !allocation_ids.is_empty() {
let sender_id = Address::from_str(&row.payer)
.expect("sender_address should be a valid address");

unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
}
} else {
// Log the case when allocation_ids is NULL
tracing::warn!(
"Found NULL allocation_ids. This may indicate all RAVs are finalized."
);
}
}
unfinalized_sender_allocations_map
}
Expand Down Expand Up @@ -1081,7 +1136,7 @@ mod tests {
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_update_sender_allocation(pgpool: PgPool) {
async fn test_update_sender_account(pgpool: PgPool) {
let (prefix, mut notify, (actor, join_handle)) =
create_sender_accounts_manager().pgpool(pgpool).call().await;

Expand Down
Loading