Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 63 additions & 71 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};
use tokio::net::UdpSocket;
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender};
use tokio::time::{sleep, timeout};
use tokio::time::timeout;
use tracing::Instrument;

use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge};
Expand Down Expand Up @@ -1275,57 +1275,77 @@ impl P2pConnManager {
);
}

// If a transient transport already exists, promote it without dialing anew.
// If we already have a transport channel, reuse it instead of dialing again. This covers
// transient->normal promotion without tripping duplicate connection errors.
if self.connections.contains_key(&peer) {
tracing::info!(
tx = %tx,
remote = %peer,
transient,
"connect_peer: reusing existing transport / promoting transient if present"
"connect_peer: reusing existing transport"
);
let connection_manager = &self.bridge.op_manager.ring.connection_manager;
if let Some(entry) = connection_manager.drop_transient(&peer) {
let loc = entry
.location
.unwrap_or_else(|| Location::from_address(&peer.addr));
if connection_manager.should_accept(loc, &peer) {
let current = connection_manager.num_connections();
if current >= connection_manager.max_connections {
tracing::warn!(
tx = %tx,
remote = %peer,
current_connections = current,
max_connections = connection_manager.max_connections,
%loc,
"connect_peer: transient promotion rejected due to capacity"
);
} else {
self.bridge
.op_manager
.ring
.add_connection(loc, peer.clone(), true)
.await;
tracing::info!(
tx = %tx,
remote = %peer,
%loc,
"connect_peer: promoted transient after admission check"
);
}
} else {
// Re-run admission + cap guard when promoting a transient connection.
let should_accept = connection_manager.should_accept(loc, &peer);
if !should_accept {
tracing::warn!(
tx = %tx,
remote = %peer,
%peer,
%loc,
"connect_peer: transient failed admission on promotion"
"connect_peer: promotion rejected by admission logic"
);
callback
.send_result(Err(()))
.await
.inspect_err(|err| {
tracing::debug!(
tx = %tx,
remote = %peer,
?err,
"connect_peer: failed to notify rejected-promotion callback"
);
})
.ok();
return Ok(());
}
let current = connection_manager.connection_count();
if current >= connection_manager.max_connections {
tracing::warn!(
tx = %tx,
%peer,
current_connections = current,
max_connections = connection_manager.max_connections,
%loc,
"connect_peer: rejecting transient promotion to enforce cap"
);
callback
.send_result(Err(()))
.await
.inspect_err(|err| {
tracing::debug!(
tx = %tx,
remote = %peer,
?err,
"connect_peer: failed to notify cap-rejection callback"
);
})
.ok();
return Ok(());
}
self.bridge
.op_manager
.ring
.add_connection(loc, peer.clone(), true)
.await;
tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient");
}

// Return the remote peer we are connected to (not our own peer key).
let resolved_peer_id = peer.clone();
callback
.send_result(Ok((resolved_peer_id, None)))
.send_result(Ok((peer.clone(), None)))
.await
.inspect_err(|err| {
tracing::debug!(
Expand Down Expand Up @@ -1459,7 +1479,7 @@ impl P2pConnManager {
connection,
transient,
} => {
let conn_manager = &self.bridge.op_manager.ring.connection_manager;
let _conn_manager = &self.bridge.op_manager.ring.connection_manager;
let remote_addr = connection.remote_addr();

if let Some(blocked_addrs) = &self.blocked_addresses {
Expand All @@ -1474,7 +1494,6 @@ impl P2pConnManager {
}
}

let provided_peer = peer.clone();
let peer_id = peer.unwrap_or_else(|| {
tracing::info!(
remote = %remote_addr,
Expand All @@ -1501,10 +1520,10 @@ impl P2pConnManager {
"Inbound connection established"
);

let is_transient =
conn_manager.is_gateway() && provided_peer.is_none() && transaction.is_none();

self.handle_successful_connection(peer_id, connection, state, None, is_transient)
// Honor the handshake’s transient flag; don’t silently downgrade to transient just
// because this is an unsolicited inbound (that was causing the gateway to never
// register stable links).
self.handle_successful_connection(peer_id, connection, state, None, transient)
.await?;
}
HandshakeEvent::OutboundEstablished {
Expand All @@ -1519,7 +1538,7 @@ impl P2pConnManager {
transaction = %transaction,
"Outbound connection established"
);
self.handle_successful_connection(peer, connection, state, None, false)
self.handle_successful_connection(peer, connection, state, None, transient)
.await?;
}
HandshakeEvent::OutboundFailed {
Expand Down Expand Up @@ -1644,14 +1663,9 @@ impl P2pConnManager {
current = connection_manager.transient_count(),
"Transient connection budget exhausted; dropping inbound connection"
);
if let Some(callbacks) = state.awaiting_connection.remove(&peer_id.addr) {
for mut cb in callbacks {
let _ = cb.send_result(Err(())).await;
}
}
state.awaiting_connection_txs.remove(&peer_id.addr);
return Ok(());
}

let pending_txs = state
.awaiting_connection_txs
.remove(&peer_id.addr)
Expand Down Expand Up @@ -1714,19 +1728,6 @@ impl P2pConnManager {
// Only insert if connection doesn't already exist to avoid dropping existing channel
let mut newly_inserted = false;
if !self.connections.contains_key(&peer_id) {
if is_transient {
let cm = &self.bridge.op_manager.ring.connection_manager;
let current = cm.transient_count();
if current >= cm.transient_budget() {
tracing::warn!(
remote = %peer_id.addr,
budget = cm.transient_budget(),
current,
"Transient connection budget exhausted; dropping inbound connection before insert"
);
return Ok(());
}
}
let (tx, rx) = mpsc::channel(10);
tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap");
self.connections.insert(peer_id.clone(), tx);
Expand All @@ -1742,20 +1743,15 @@ impl P2pConnManager {
tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] SKIP INSERT: OutboundConnectionSuccessful - connection already exists in HashMap");
}

let promote_to_ring = !is_transient || connection_manager.is_gateway();

if newly_inserted {
let pending_loc = connection_manager.prune_in_transit_connection(&peer_id);
if promote_to_ring {
if !is_transient {
let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr));
self.bridge
.op_manager
.ring
.add_connection(loc, peer_id.clone(), false)
.await;
if is_transient {
connection_manager.drop_transient(&peer_id);
}
} else {
// Update location now that we know it; budget was reserved before any work.
connection_manager.try_register_transient(peer_id.clone(), pending_loc);
Expand All @@ -1768,18 +1764,14 @@ impl P2pConnManager {
let cm = connection_manager.clone();
let peer = peer_id.clone();
tokio::spawn(async move {
sleep(ttl).await;
tokio::time::sleep(ttl).await;
if cm.drop_transient(&peer).is_some() {
tracing::info!(%peer, "Transient connection expired; dropping");
if let Err(err) = drop_tx
.send(Right(NodeEvent::DropConnection(peer.clone())))
.await
{
tracing::warn!(
%peer,
?err,
"Failed to dispatch DropConnection for expired transient"
);
tracing::warn!(%peer, ?err, "Failed to dispatch DropConnection for expired transient");
}
}
});
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,9 @@ impl RelayContext for RelayEnv<'_> {
}

fn transient_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool {
// Courtesy slots still piggyback on regular connections. Flag the first acceptance so the
// joiner can prioritise it, and keep the logic simple until dedicated transient tracking
// is wired in (see transient-connection-budget branch).
self.op_manager.ring.open_connections() == 0
// Treat joiner acceptances as full connections; marking the first link as transient causes
// it to expire under the transient TTL and leaves the ring under-connected.
false
}
}

Expand Down
Loading
Loading