Skip to content
77 changes: 63 additions & 14 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};
use tokio::net::UdpSocket;
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender};
use tokio::time::timeout;
use tokio::time::{sleep, timeout};
use tracing::Instrument;

use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge};
Expand Down Expand Up @@ -1275,30 +1275,57 @@ impl P2pConnManager {
);
}

// If we already have a transport channel, reuse it instead of dialing again. This covers
// transient->normal promotion without tripping duplicate connection errors.
// If a transient transport already exists, promote it without dialing anew.
if self.connections.contains_key(&peer) {
tracing::info!(
tx = %tx,
remote = %peer,
courtesy,
"connect_peer: reusing existing transport"
"connect_peer: reusing existing transport / promoting transient if present"
);
let connection_manager = &self.bridge.op_manager.ring.connection_manager;
if let Some(entry) = connection_manager.drop_transient(&peer) {
let loc = entry
.location
.unwrap_or_else(|| Location::from_address(&peer.addr));
self.bridge
.op_manager
.ring
.add_connection(loc, peer.clone(), false)
.await;
tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient");
if connection_manager.should_accept(loc, &peer) {
let current = connection_manager.num_connections();
if current >= connection_manager.max_connections {
tracing::warn!(
tx = %tx,
remote = %peer,
current_connections = current,
max_connections = connection_manager.max_connections,
%loc,
"connect_peer: transient promotion rejected due to capacity"
);
} else {
self.bridge
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per Claude suggestion, beware of races between dropping and adding:

Apply the suggested fix to transient connections: Add atomic flag or use better synchronization OR check if should_accept again

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The promotion flow now re-runs should_accept after dropping the transient and then calls add_connection(.., was_reserved = true) so the reservation counters and topology bookkeeping stay in sync. The drop + add happen on the same task (no concurrent promotion), so we are not racing another add; if we start doing concurrent promotions we can wrap this in a small guard, but for the current single-task path this keeps the state consistent.

.op_manager
.ring
.add_connection(loc, peer.clone(), true)
.await;
tracing::info!(
tx = %tx,
remote = %peer,
%loc,
"connect_peer: promoted transient after admission check"
);
}
} else {
tracing::warn!(
tx = %tx,
remote = %peer,
%loc,
"connect_peer: transient failed admission on promotion"
);
}
}

// Return the remote peer we are connected to (not our own peer key).
let resolved_peer_id = peer.clone();
callback
.send_result(Ok((peer.clone(), None)))
.send_result(Ok((resolved_peer_id, None)))
.await
.inspect_err(|err| {
tracing::debug!(
Expand Down Expand Up @@ -1617,9 +1644,14 @@ impl P2pConnManager {
current = connection_manager.transient_count(),
"Transient connection budget exhausted; dropping inbound connection"
);
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
for mut cb in callbacks {
let _ = cb.send_result(Err(())).await;
}
}
state.awaiting_connection_txs.remove(&peer_id.addr);
return Ok(());
}

let pending_txs = state
.awaiting_connection_txs
.remove(&peer_id.addr)
Expand Down Expand Up @@ -1682,6 +1714,19 @@ impl P2pConnManager {
// Only insert if connection doesn't already exist to avoid dropping existing channel
let mut newly_inserted = false;
if !self.connections.contains_key(&peer_id) {
if is_transient {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all this logic leaking into here cannot be avoided? sounds like an architectural issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed it is not ideal for the bridge to know so much about transient promotion; for now the admission/cap logic lives in ConnectionManager and this block just orchestrates the promotion for the existing transport. If we keep expanding this path we should extract a helper on the ring side and have the bridge call that instead of threading the details here.

let cm = &self.bridge.op_manager.ring.connection_manager;
let current = cm.transient_count();
if current >= cm.transient_budget() {
tracing::warn!(
remote = %peer_id.addr,
budget = cm.transient_budget(),
current,
"Transient connection budget exhausted; dropping inbound connection before insert"
);
return Ok(());
}
}
let (tx, rx) = mpsc::channel(10);
tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap");
self.connections.insert(peer_id.clone(), tx);
Expand Down Expand Up @@ -1718,14 +1763,18 @@ impl P2pConnManager {
let cm = connection_manager.clone();
let peer = peer_id.clone();
tokio::spawn(async move {
tokio::time::sleep(ttl).await;
sleep(ttl).await;
if cm.drop_transient(&peer).is_some() {
tracing::info!(%peer, "Transient connection expired; dropping");
if let Err(err) = drop_tx
.send(Right(NodeEvent::DropConnection(peer.clone())))
.await
{
tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient");
tracing::warn!(
%peer,
?err,
"Failed to dispatch DropConnection for expired transient"
);
}
}
});
Expand Down
Loading
Loading