From 9d4fa2b189fb3d0786fdd0c898cd6d4c8a670a53 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 15 Nov 2025 11:16:16 -0600 Subject: [PATCH 01/10] fix: propagate observed joiner address through connect --- .../src/node/network_bridge/p2p_protoc.rs | 433 +++++++----------- crates/core/src/operations/connect.rs | 125 +++-- 2 files changed, 256 insertions(+), 302 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 91f2444cb..1133964a8 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1,6 +1,7 @@ -use anyhow::anyhow; use dashmap::DashSet; use either::{Either, Left, Right}; +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use std::convert::Infallible; @@ -113,7 +114,6 @@ pub(in crate::node) struct P2pConnManager { conn_bridge_rx: Receiver, event_listener: Box, connections: HashMap, - conn_event_tx: Option>, key_pair: TransportKeypair, listening_ip: IpAddr, listening_port: u16, @@ -183,7 +183,6 @@ impl P2pConnManager { conn_bridge_rx: rx_bridge_cmd, event_listener: Box::new(event_listener), connections: HashMap::new(), - conn_event_tx: None, key_pair, listening_ip: listener_ip, listening_port: listen_port, @@ -213,7 +212,6 @@ impl P2pConnManager { conn_bridge_rx, event_listener, connections, - conn_event_tx: _, key_pair, listening_ip, listening_port, @@ -245,7 +243,10 @@ impl P2pConnManager { let mut state = EventListenerState::new(outbound_conn_handler.clone()); - let (conn_event_tx, conn_event_rx) = mpsc::channel(1024); + // Separate peer_connections to allow independent borrowing by the stream + let peer_connections: FuturesUnordered< + BoxFuture<'static, Result>, + > = FuturesUnordered::new(); // For non-gateway peers, pass the peer_ready flag so it can be set after first handshake // For gateways, pass None (they're always ready) @@ -273,7 +274,7 @@ impl P2pConnManager { node_controller, client_wait_for_transaction, executor_listener, - conn_event_rx, + peer_connections, ); // Pin the stream on the stack @@ -287,7 +288,6 @@ impl P2pConnManager { conn_bridge_rx: tokio::sync::mpsc::channel(1).1, // Dummy, won't be used event_listener, connections, - conn_event_tx: Some(conn_event_tx.clone()), key_pair, listening_ip, listening_port, @@ -302,20 +302,22 @@ impl P2pConnManager { while let Some(result) = select_stream.as_mut().next().await { // Process the result using the existing handler let event = ctx - .process_select_result(result, &mut state, &handshake_cmd_sender) + .process_select_result( + result, + &mut state, + &mut select_stream, + &handshake_cmd_sender, + ) .await?; match event { EventResult::Continue => continue, EventResult::Event(event) => { match *event { - ConnEvent::InboundMessage(inbound) => { - let remote = inbound.remote_addr; - let msg = inbound.msg; + ConnEvent::InboundMessage(msg) => { tracing::info!( tx = %msg.id(), msg_type = %msg, - remote = ?remote, peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); @@ -496,13 +498,6 @@ impl P2pConnManager { } } } - ConnEvent::TransportClosed { remote_addr, error } => { - tracing::debug!( - remote = %remote_addr, - ?error, - "Transport closed event received in event loop" - ); - } ConnEvent::ClosedChannel(reason) => { match reason { ChannelCloseReason::Bridge @@ -991,6 +986,7 @@ impl P2pConnManager { &mut self, result: priority_select::SelectResult, state: &mut EventListenerState, + select_stream: &mut priority_select::ProductionPrioritySelectStream, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { let peer_id = &self.bridge.op_manager.ring.connection_manager.pub_key; @@ -1015,9 +1011,9 @@ impl P2pConnManager { SelectResult::PeerConnection(msg) => { tracing::debug!( peer = %peer_id, - "PrioritySelect: connection events READY" + "PrioritySelect: peer_connections READY" ); - self.handle_transport_event(msg, state, handshake_commands) + self.handle_peer_connection_msg(msg, state, select_stream, handshake_commands) .await } SelectResult::ConnBridge(msg) => { @@ -1030,11 +1026,12 @@ impl P2pConnManager { SelectResult::Handshake(result) => { tracing::debug!( peer = %peer_id, - "PrioritySelect: handshake event READY" + "PrioritySelect: handshake event READY" ); match result { Some(event) => { - self.handle_handshake_action(event, state).await?; + self.handle_handshake_action(event, state, select_stream) + .await?; Ok(EventResult::Continue) } None => { @@ -1329,6 +1326,7 @@ impl P2pConnManager { &mut self, event: HandshakeEvent, state: &mut EventListenerState, + select_stream: &mut priority_select::ProductionPrioritySelectStream, ) -> anyhow::Result<()> { tracing::info!(?event, "handle_handshake_action: received handshake event"); match event { @@ -1378,7 +1376,7 @@ impl P2pConnManager { "Inbound connection established" ); - self.handle_successful_connection(peer_id, connection, state, None) + self.handle_successful_connection(peer_id, connection, state, select_stream, None) .await?; } HandshakeEvent::OutboundEstablished { @@ -1393,7 +1391,7 @@ impl P2pConnManager { transaction = %transaction, "Outbound connection established" ); - self.handle_successful_connection(peer, connection, state, None) + self.handle_successful_connection(peer, connection, state, select_stream, None) .await?; } HandshakeEvent::OutboundFailed { @@ -1507,6 +1505,7 @@ impl P2pConnManager { peer_id: PeerId, connection: PeerConnection, state: &mut EventListenerState, + select_stream: &mut priority_select::ProductionPrioritySelectStream, remaining_checks: Option, ) -> anyhow::Result<()> { let pending_txs = state @@ -1575,13 +1574,8 @@ impl P2pConnManager { let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); - let Some(conn_events) = self.conn_event_tx.as_ref().cloned() else { - anyhow::bail!("Connection event channel not initialized"); - }; - let listener_peer = peer_id.clone(); - tokio::spawn(async move { - peer_connection_listener(rx, connection, listener_peer, conn_events).await; - }); + let task = peer_connection_listener(rx, connection).boxed(); + select_stream.push_peer_connection(task); newly_inserted = true; } else { tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] SKIP INSERT: OutboundConnectionSuccessful - connection already exists in HashMap"); @@ -1604,132 +1598,139 @@ impl P2pConnManager { Ok(()) } - async fn handle_transport_event( + async fn handle_peer_connection_msg( &mut self, - event: Option, + msg: Option>, state: &mut EventListenerState, + select_stream: &mut priority_select::ProductionPrioritySelectStream, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { - match event { - Some(ConnEvent::InboundMessage(mut inbound)) => { - let tx = *inbound.msg.id(); - - if let Some(remote_addr) = inbound.remote_addr { - if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { - if sender_peer.peer.addr == remote_addr - || sender_peer.peer.addr.ip().is_unspecified() - { - let mut new_peer_id = sender_peer.peer.clone(); - if new_peer_id.addr.ip().is_unspecified() { - new_peer_id.addr = remote_addr; - if let Some(sender_mut) = - extract_sender_from_message_mut(&mut inbound.msg) - { - if sender_mut.peer.addr.ip().is_unspecified() { - sender_mut.peer.addr = remote_addr; - } - } - } - if let Some(existing_key) = self - .connections - .keys() - .find(|peer| { - peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key - }) - .cloned() + match msg { + Some(Ok(peer_conn)) => { + let mut peer_conn = peer_conn; + // Get the remote address from the connection + let remote_addr = peer_conn.conn.remote_addr(); + let tx = *peer_conn.msg.id(); + if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) { + if sender_peer.peer.addr == remote_addr + || sender_peer.peer.addr.ip().is_unspecified() + { + let mut new_peer_id = sender_peer.peer.clone(); + if new_peer_id.addr.ip().is_unspecified() { + new_peer_id.addr = remote_addr; + if let Some(sender_mut) = + extract_sender_from_message_mut(&mut peer_conn.msg) { - if let Some(channel) = self.connections.remove(&existing_key) { - tracing::info!( - remote = %remote_addr, - old_peer = %existing_key, - new_peer = %new_peer_id, - "Updating provisional peer identity after inbound message" - ); - self.bridge.op_manager.ring.update_connection_identity( - &existing_key, - new_peer_id.clone(), - ); - self.connections.insert(new_peer_id, channel); + if sender_mut.peer.addr.ip().is_unspecified() { + sender_mut.peer.addr = remote_addr; } } } + if let Some(existing_key) = self + .connections + .keys() + .find(|peer| { + peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key + }) + .cloned() + { + if let Some(channel) = self.connections.remove(&existing_key) { + tracing::info!( + remote = %remote_addr, + old_peer = %existing_key, + new_peer = %new_peer_id, + "Updating provisional peer identity after inbound message" + ); + self.bridge + .op_manager + .ring + .update_connection_identity(&existing_key, new_peer_id.clone()); + self.connections.insert(new_peer_id, channel); + } + } } + } - let should_connect = - !self.connections.keys().any(|peer| peer.addr == remote_addr) - && !state.awaiting_connection.contains_key(&remote_addr); + if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { + payload, .. + })) = &mut peer_conn.msg + { + if payload.observed_addr.is_none() { + payload.observed_addr = Some(remote_addr); + } + } - if should_connect { - if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { - tracing::info!( - "Received message from unconnected peer {}, establishing connection proactively", - sender_peer.peer - ); + // Check if we need to establish a connection back to the sender + let should_connect = !self.connections.keys().any(|peer| peer.addr == remote_addr) + && !state.awaiting_connection.contains_key(&remote_addr); - let tx = Transaction::new::(); - let (callback, _rx) = tokio::sync::mpsc::channel(10); + if should_connect { + // Try to extract sender information from the message to establish connection + if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) { + tracing::info!( + "Received message from unconnected peer {}, establishing connection proactively", + sender_peer.peer + ); - let _ = self - .handle_connect_peer( - sender_peer.peer.clone(), - Box::new(callback), - tx, - handshake_commands, - state, - false, - ) - .await; - } + let tx = Transaction::new::(); + let (callback, _rx) = tokio::sync::mpsc::channel(10); + + // Don't await - let it happen in the background + let _ = self + .handle_connect_peer( + sender_peer.peer.clone(), + Box::new(callback), + tx, + handshake_commands, + state, + false, // not a courtesy connection + ) + .await; } } tracing::debug!( - peer_addr = ?inbound.remote_addr, + peer_addr = %remote_addr, %tx, tx_type = ?tx.transaction_type(), "Queueing inbound NetMessage from peer connection" ); + let task = peer_connection_listener(peer_conn.rx, peer_conn.conn).boxed(); + select_stream.push_peer_connection(task); Ok(EventResult::Event( - ConnEvent::InboundMessage(inbound).into(), + ConnEvent::InboundMessage(peer_conn.msg).into(), )) } - Some(ConnEvent::TransportClosed { remote_addr, error }) => { - tracing::debug!( - remote = %remote_addr, - ?error, - "peer_connection_listener reported transport closure" - ); - if let Some(peer) = self - .connections - .keys() - .find_map(|k| (k.addr == remote_addr).then(|| k.clone())) - { - tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer, socket_addr = %remote_addr, conn_map_size = self.connections.len(), "[CONN_TRACK] REMOVE: TransportClosed - removing from connections HashMap"); - self.bridge - .op_manager - .ring - .prune_connection(peer.clone()) - .await; - self.connections.remove(&peer); - if let Err(error) = handshake_commands - .send(HandshakeCommand::DropConnection { peer: peer.clone() }) - .await + Some(Err(err)) => { + if let TransportError::ConnectionClosed(socket_addr) = err { + if let Some(peer) = self + .connections + .keys() + .find_map(|k| (k.addr == socket_addr).then(|| k.clone())) { - tracing::warn!( - remote = %remote_addr, - ?error, - "Failed to notify handshake driver about dropped connection" - ); + tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer, socket_addr = %socket_addr, conn_map_size = self.connections.len(), "[CONN_TRACK] REMOVE: TransportError::ConnectionClosed - removing from connections HashMap"); + self.bridge + .op_manager + .ring + .prune_connection(peer.clone()) + .await; + self.connections.remove(&peer); + if let Err(error) = handshake_commands + .send(HandshakeCommand::DropConnection { peer: peer.clone() }) + .await + { + tracing::warn!( + remote = %socket_addr, + ?error, + "Failed to notify handshake driver about dropped connection" + ); + } } } Ok(EventResult::Continue) } - Some(other) => { - tracing::warn!(?other, "Unexpected event from peer connection listener"); - Ok(EventResult::Continue) - } None => { - tracing::error!("All peer connection event channels closed"); + tracing::error!("All peer connections closed"); Ok(EventResult::Continue) } } @@ -1775,7 +1776,7 @@ impl P2pConnManager { msg_type = %msg, "handle_notification_msg: Received NetMessage notification, converting to InboundMessage" ); - EventResult::Event(ConnEvent::InboundMessage(msg.into()).into()) + EventResult::Event(ConnEvent::InboundMessage(msg).into()) } Some(Right(action)) => { tracing::info!( @@ -1798,7 +1799,7 @@ impl P2pConnManager { match msg { Some((callback, msg)) => { state.pending_op_results.insert(*msg.id(), callback); - EventResult::Event(ConnEvent::InboundMessage(msg.into()).into()) + EventResult::Event(ConnEvent::InboundMessage(msg).into()) } None => { EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::OpExecution).into()) @@ -1908,6 +1909,7 @@ impl ConnectResultSender for mpsc::Sender), ()>> { struct EventListenerState { outbound_handler: OutboundConnectionHandler, + // Note: peer_connections has been moved out to allow separate borrowing by the stream pending_from_executor: HashSet, // FIXME: we are potentially leaving trash here when transacrions are completed tx_to_client: HashMap>, @@ -1938,38 +1940,10 @@ enum EventResult { #[derive(Debug)] pub(super) enum ConnEvent { - InboundMessage(IncomingMessage), + InboundMessage(NetMessage), OutboundMessage(NetMessage), NodeAction(NodeEvent), ClosedChannel(ChannelCloseReason), - TransportClosed { - remote_addr: SocketAddr, - error: TransportError, - }, -} - -#[derive(Debug)] -pub(super) struct IncomingMessage { - pub msg: NetMessage, - pub remote_addr: Option, -} - -impl IncomingMessage { - fn with_remote(msg: NetMessage, remote_addr: SocketAddr) -> Self { - Self { - msg, - remote_addr: Some(remote_addr), - } - } -} - -impl From for IncomingMessage { - fn from(msg: NetMessage) -> Self { - Self { - msg, - remote_addr: None, - } - } } #[derive(Debug)] @@ -1992,6 +1966,14 @@ enum ProtocolStatus { Failed, } +#[derive(Debug)] +pub(super) struct PeerConnectionInbound { + pub conn: PeerConnection, + /// Receiver for inbound messages for the peer connection + pub rx: Receiver>, + pub msg: NetMessage, +} + async fn handle_peer_channel_message( conn: &mut PeerConnection, msg: Either, @@ -2043,50 +2025,17 @@ async fn handle_peer_channel_message( Ok(()) } -async fn notify_transport_closed( - sender: &Sender, - remote_addr: SocketAddr, - error: TransportError, -) { - if sender - .send(ConnEvent::TransportClosed { remote_addr, error }) - .await - .is_err() - { - tracing::debug!( - remote = %remote_addr, - "[CONN_LIFECYCLE] conn_events receiver dropped before handling closure event" - ); - } -} - async fn peer_connection_listener( mut rx: PeerConnChannelRecv, mut conn: PeerConnection, - peer_id: PeerId, - conn_events: Sender, -) { +) -> Result { const MAX_IMMEDIATE_SENDS: usize = 32; - let remote_addr = conn.remote_addr(); - tracing::debug!( - to = %remote_addr, - peer = %peer_id, - "[CONN_LIFECYCLE] Starting peer_connection_listener task" - ); loop { let mut drained = 0; loop { match rx.try_recv() { Ok(msg) => { - if let Err(error) = handle_peer_channel_message(&mut conn, msg).await { - tracing::debug!( - to = %remote_addr, - ?error, - "[CONN_LIFECYCLE] Shutting down connection after send failure" - ); - notify_transport_closed(&conn_events, remote_addr, error).await; - return; - } + handle_peer_channel_message(&mut conn, msg).await?; drained += 1; if drained >= MAX_IMMEDIATE_SENDS { break; @@ -2098,13 +2047,7 @@ async fn peer_connection_listener( to = %conn.remote_addr(), "[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection" ); - notify_transport_closed( - &conn_events, - remote_addr, - TransportError::ConnectionClosed(remote_addr), - ) - .await; - return; + return Err(TransportError::ConnectionClosed(conn.remote_addr())); } } } @@ -2116,74 +2059,32 @@ async fn peer_connection_listener( to = %conn.remote_addr(), "[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection" ); - notify_transport_closed( - &conn_events, - remote_addr, - TransportError::ConnectionClosed(remote_addr), - ) - .await; - return; + break Err(TransportError::ConnectionClosed(conn.remote_addr())); }; - if let Err(error) = handle_peer_channel_message(&mut conn, msg).await { - tracing::debug!( - to = %remote_addr, - ?error, - "[CONN_LIFECYCLE] Connection closed after channel command" - ); - notify_transport_closed(&conn_events, remote_addr, error).await; - return; - } + handle_peer_channel_message(&mut conn, msg).await?; } msg = conn.recv() => { - match msg { - Ok(msg) => { - match decode_msg(&msg) { - Ok(net_message) => { - let tx = *net_message.id(); - tracing::debug!( - from = %conn.remote_addr(), - %tx, - tx_type = ?tx.transaction_type(), - msg_type = %net_message, - "[CONN_LIFECYCLE] Received inbound NetMessage from peer" - ); - if conn_events.send(ConnEvent::InboundMessage(IncomingMessage::with_remote(net_message, remote_addr))).await.is_err() { - tracing::debug!( - from = %remote_addr, - "[CONN_LIFECYCLE] conn_events receiver dropped; stopping listener" - ); - return; - } - } - Err(error) => { - tracing::error!( - from = %conn.remote_addr(), - ?error, - "[CONN_LIFECYCLE] Failed to deserialize inbound message; closing connection" - ); - let transport_error = TransportError::Other(anyhow!( - "Failed to deserialize inbound message from {remote_addr}: {error:?}" - )); - notify_transport_closed( - &conn_events, - remote_addr, - transport_error, - ) - .await; - return; - } - } - } - Err(error) => { - tracing::debug!( - from = %conn.remote_addr(), - ?error, - "[CONN_LIFECYCLE] peer_connection_listener terminating after recv error" - ); - notify_transport_closed(&conn_events, remote_addr, error).await; - return; - } - } + let Ok(msg) = msg + .inspect_err(|error| { + tracing::error!(from=%conn.remote_addr(), "Error while receiving message: {error}"); + }) + else { + tracing::debug!( + from = %conn.remote_addr(), + "[CONN_LIFECYCLE] peer_connection_listener terminating after recv error" + ); + break Err(TransportError::ConnectionClosed(conn.remote_addr())); + }; + let net_message = decode_msg(&msg).unwrap(); + let tx = *net_message.id(); + tracing::debug!( + from = %conn.remote_addr(), + %tx, + tx_type = ?tx.transaction_type(), + msg_type = %net_message, + "[CONN_LIFECYCLE] Received inbound NetMessage from peer" + ); + break Ok(PeerConnectionInbound { conn, rx, msg: net_message }); } } } diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index cc799c174..c6ab88ffd 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -5,7 +5,7 @@ use std::collections::HashSet; use std::fmt; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -79,10 +79,10 @@ impl fmt::Display for ConnectMsg { match self { ConnectMsg::Request { target, payload, .. } => write!( f, - "ConnectRequest {{ target: {target}, desired: {}, ttl: {}, origin: {} }}", + "ConnectRequest {{ target: {target}, desired: {}, ttl: {}, joiner: {} }}", payload.desired_location, payload.ttl, - payload.origin + payload.joiner ), ConnectMsg::Response { sender, target, payload, .. } => write!( f, @@ -106,17 +106,42 @@ impl ConnectMsg { } } +fn is_publicly_routable(ip: IpAddr) -> bool { + match ip { + IpAddr::V4(addr) => { + let octets = addr.octets(); + let is_loopback = octets[0] == 127; + let is_link_local = octets[0] == 169 && octets[1] == 254; + let is_private = octets[0] == 10 + || (octets[0] == 172 && (16..=31).contains(&octets[1])) + || (octets[0] == 192 && octets[1] == 168); + let is_unspecified = addr.octets() == [0, 0, 0, 0]; + !(is_loopback || is_link_local || is_private || is_unspecified) + } + IpAddr::V6(addr) => { + let segments = addr.segments(); + let is_loopback = addr == std::net::Ipv6Addr::LOCALHOST; + let is_unspecified = addr == std::net::Ipv6Addr::UNSPECIFIED; + let is_unique_local = (segments[0] & 0xfe00) == 0xfc00; + let is_link_local = (segments[0] & 0xffc0) == 0xfe80; + !(is_loopback || is_unspecified || is_unique_local || is_link_local) + } + } +} + /// Two-message request payload. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub(crate) struct ConnectRequest { /// Joiner's advertised location (fallbacks to the joiner's socket address). pub desired_location: Location, /// Joiner's identity as observed so far. - pub origin: PeerKeyLocation, + pub joiner: PeerKeyLocation, /// Remaining hops before the request stops travelling. pub ttl: u8, /// Simple visited set to avoid trivial loops. pub visited: Vec, + /// Socket observed by the gateway/relay for the joiner, if known. + pub observed_addr: Option, } /// Acceptance payload returned by candidates. @@ -191,34 +216,32 @@ impl RelayState { &mut self, ctx: &C, observed_remote: &PeerKeyLocation, - observed_addr: SocketAddr, ) -> RelayActions { let mut actions = RelayActions::default(); push_unique_peer(&mut self.request.visited, observed_remote.clone()); push_unique_peer(&mut self.request.visited, ctx.self_location().clone()); - if self.request.origin.peer.addr.ip().is_unspecified() - && !self.observed_sent - && observed_remote.peer.pub_key == self.request.origin.peer.pub_key - { - self.request.origin.peer.addr = observed_addr; - if self.request.origin.location.is_none() { - self.request.origin.location = Some(Location::from_address(&observed_addr)); + if let Some(joiner_addr) = self.request.observed_addr { + if !self.observed_sent && !is_publicly_routable(self.request.joiner.peer.addr.ip()) { + self.request.joiner.peer.addr = joiner_addr; + if self.request.joiner.location.is_none() { + self.request.joiner.location = Some(Location::from_address(&joiner_addr)); + } + self.observed_sent = true; + actions.observed_address = Some((self.request.joiner.clone(), joiner_addr)); } - self.observed_sent = true; - actions.observed_address = Some((self.request.origin.clone(), observed_addr)); } - if !self.accepted_locally && ctx.should_accept(&self.request.origin) { + if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); - let courtesy = ctx.courtesy_hint(&acceptor, &self.request.origin); + let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner); self.courtesy_hint = courtesy; actions.accept_response = Some(ConnectResponse { acceptor: acceptor.clone(), courtesy, }); - actions.expect_connection_from = Some(self.request.origin.clone()); + actions.expect_connection_from = Some(self.request.joiner.clone()); } if self.forwarded_to.is_none() && self.request.ttl > 0 { @@ -444,9 +467,10 @@ impl ConnectOp { push_unique_peer(&mut visited, target.clone()); let request = ConnectRequest { desired_location, - origin: own.clone(), + joiner: own.clone(), ttl, visited, + observed_addr: None, }; let tx = Transaction::new::(); @@ -497,7 +521,6 @@ impl ConnectOp { ctx: &C, upstream: PeerKeyLocation, request: ConnectRequest, - observed_addr: SocketAddr, ) -> RelayActions { if !matches!(self.state, Some(ConnectState::Relaying(_))) { self.state = Some(ConnectState::Relaying(Box::new(RelayState { @@ -515,7 +538,7 @@ impl ConnectOp { state.upstream = upstream; state.request = request; let upstream_snapshot = state.upstream.clone(); - state.handle_request(ctx, &upstream_snapshot, observed_addr) + state.handle_request(ctx, &upstream_snapshot) } _ => RelayActions::default(), } @@ -578,8 +601,7 @@ impl Operation for ConnectOp { match msg { ConnectMsg::Request { from, payload, .. } => { let env = RelayEnv::new(op_manager); - let actions = - self.handle_request(&env, from.clone(), payload.clone(), from.peer.addr); + let actions = self.handle_request(&env, from.clone(), payload.clone()); if let Some((target, address)) = actions.observed_address { let msg = ConnectMsg::ObservedAddress { @@ -1026,9 +1048,10 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - origin: joiner.clone(), + joiner: joiner.clone(), ttl: 3, visited: vec![], + observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, courtesy_hint: false, @@ -1037,8 +1060,7 @@ mod tests { }; let ctx = TestRelayContext::new(self_loc.clone()).courtesy(true); - let observed_addr = joiner.peer.addr; - let actions = state.handle_request(&ctx, &joiner, observed_addr); + let actions = state.handle_request(&ctx, &joiner); let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); @@ -1056,9 +1078,10 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - origin: joiner.clone(), + joiner: joiner.clone(), ttl: 2, visited: vec![], + observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, courtesy_hint: false, @@ -1069,7 +1092,7 @@ mod tests { let ctx = TestRelayContext::new(self_loc) .accept(false) .next_hop(Some(next_hop.clone())); - let actions = state.handle_request(&ctx, &joiner, joiner.peer.addr); + let actions = state.handle_request(&ctx, &joiner); assert!(actions.accept_response.is_none()); let (forward_to, request) = actions.forward.expect("expected forward"); @@ -1078,6 +1101,40 @@ mod tests { assert!(request.visited.iter().any(|pkl| pkl.peer == joiner.peer)); } + #[test] + fn relay_emits_observed_address_for_private_joiner() { + let self_loc = make_peer(4050); + let joiner = make_peer(5050); + let observed_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)), + joiner.peer.addr.port(), + ); + let mut state = RelayState { + upstream: joiner.clone(), + request: ConnectRequest { + desired_location: Location::random(), + joiner: joiner.clone(), + ttl: 3, + visited: vec![], + observed_addr: Some(observed_addr), + }, + forwarded_to: None, + courtesy_hint: false, + observed_sent: false, + accepted_locally: false, + }; + + let ctx = TestRelayContext::new(self_loc); + let actions = state.handle_request(&ctx, &joiner); + + let (target, addr) = actions + .observed_address + .expect("expected observed address update"); + assert_eq!(addr, observed_addr); + assert_eq!(target.peer.addr, observed_addr); + assert_eq!(state.request.joiner.peer.addr, observed_addr); + } + #[test] fn joiner_tracks_acceptance() { let acceptor = make_peer(7000); @@ -1138,9 +1195,10 @@ mod tests { let request = ConnectRequest { desired_location: Location::random(), - origin: joiner.clone(), + joiner: joiner.clone(), ttl: 3, visited: vec![joiner.clone()], + observed_addr: Some(joiner.peer.addr), }; let tx = Transaction::new::(); @@ -1148,8 +1206,7 @@ mod tests { let ctx = TestRelayContext::new(relay_a.clone()) .accept(false) .next_hop(Some(relay_b.clone())); - let actions = - relay_op.handle_request(&ctx, joiner.clone(), request.clone(), joiner.peer.addr); + let actions = relay_op.handle_request(&ctx, joiner.clone(), request.clone()); let (forward_target, forward_request) = actions .forward @@ -1168,12 +1225,8 @@ mod tests { let mut accepting_relay = ConnectOp::new_relay(tx, relay_a.clone(), forward_request.clone()); let ctx_accept = TestRelayContext::new(relay_b.clone()); - let accept_actions = accepting_relay.handle_request( - &ctx_accept, - relay_a.clone(), - forward_request, - relay_a.peer.addr, - ); + let accept_actions = + accepting_relay.handle_request(&ctx_accept, relay_a.clone(), forward_request); let response = accept_actions .accept_response From 5c89c649f60ba5c1a4a03317b22a03c807864527 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 09:29:18 -0600 Subject: [PATCH 02/10] fix: always use observed joiner socket --- crates/core/src/operations/connect.rs | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index c6ab88ffd..65063739b 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -5,7 +5,7 @@ use std::collections::HashSet; use std::fmt; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -106,29 +106,6 @@ impl ConnectMsg { } } -fn is_publicly_routable(ip: IpAddr) -> bool { - match ip { - IpAddr::V4(addr) => { - let octets = addr.octets(); - let is_loopback = octets[0] == 127; - let is_link_local = octets[0] == 169 && octets[1] == 254; - let is_private = octets[0] == 10 - || (octets[0] == 172 && (16..=31).contains(&octets[1])) - || (octets[0] == 192 && octets[1] == 168); - let is_unspecified = addr.octets() == [0, 0, 0, 0]; - !(is_loopback || is_link_local || is_private || is_unspecified) - } - IpAddr::V6(addr) => { - let segments = addr.segments(); - let is_loopback = addr == std::net::Ipv6Addr::LOCALHOST; - let is_unspecified = addr == std::net::Ipv6Addr::UNSPECIFIED; - let is_unique_local = (segments[0] & 0xfe00) == 0xfc00; - let is_link_local = (segments[0] & 0xffc0) == 0xfe80; - !(is_loopback || is_unspecified || is_unique_local || is_link_local) - } - } -} - /// Two-message request payload. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub(crate) struct ConnectRequest { @@ -222,7 +199,7 @@ impl RelayState { push_unique_peer(&mut self.request.visited, ctx.self_location().clone()); if let Some(joiner_addr) = self.request.observed_addr { - if !self.observed_sent && !is_publicly_routable(self.request.joiner.peer.addr.ip()) { + if !self.observed_sent { self.request.joiner.peer.addr = joiner_addr; if self.request.joiner.location.is_none() { self.request.joiner.location = Some(Location::from_address(&joiner_addr)); From 2e7919d0ed9a5d0baa26ea0aadd4685180fe4d3f Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 09:41:38 -0600 Subject: [PATCH 03/10] fix: add comment explaining observed socket rationale --- crates/core/src/operations/connect.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 65063739b..d07799fd6 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -199,6 +199,11 @@ impl RelayState { push_unique_peer(&mut self.request.visited, ctx.self_location().clone()); if let Some(joiner_addr) = self.request.observed_addr { + // Always overwrite with observed socket rather than checking is_publicly_routable. + // The observed UDP source address is ground truth - it's the address that actually + // delivered the packet to the relay. If the joiner advertised a different address, + // we have no way to verify it's reachable. Even multi-homed, dual-stack, or NAT + // scenarios benefit from using the provably-working observed address. if !self.observed_sent { self.request.joiner.peer.addr = joiner_addr; if self.request.joiner.location.is_none() { From 3fc04d699e45d5524e8b5e96bba7b61fb238ee8b Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 10:13:11 -0600 Subject: [PATCH 04/10] fix: skip loopback observed socket overwrite --- crates/core/src/operations/connect.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index d07799fd6..1378dbdef 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -199,11 +199,13 @@ impl RelayState { push_unique_peer(&mut self.request.visited, ctx.self_location().clone()); if let Some(joiner_addr) = self.request.observed_addr { - // Always overwrite with observed socket rather than checking is_publicly_routable. - // The observed UDP source address is ground truth - it's the address that actually - // delivered the packet to the relay. If the joiner advertised a different address, - // we have no way to verify it's reachable. Even multi-homed, dual-stack, or NAT - // scenarios benefit from using the provably-working observed address. + // Always overwrite with observed socket rather than checking routability, with one + // exception: if the observed socket is loopback, keep the advertised address. Loopback + // shows up in localhost test topologies; in production the observed socket will be a + // real external address and is the only ground truth we have. + if joiner_addr.ip().is_loopback() { + return actions; + } if !self.observed_sent { self.request.joiner.peer.addr = joiner_addr; if self.request.joiner.location.is_none() { From 9966b679976d69392de475f22d8617d4a3b2794f Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 14:13:24 -0600 Subject: [PATCH 05/10] fix: align connect tests with observed socket handling --- crates/core/src/operations/connect.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 1378dbdef..f0d055715 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -199,13 +199,9 @@ impl RelayState { push_unique_peer(&mut self.request.visited, ctx.self_location().clone()); if let Some(joiner_addr) = self.request.observed_addr { - // Always overwrite with observed socket rather than checking routability, with one - // exception: if the observed socket is loopback, keep the advertised address. Loopback - // shows up in localhost test topologies; in production the observed socket will be a - // real external address and is the only ground truth we have. - if joiner_addr.ip().is_loopback() { - return actions; - } + // Always overwrite with observed socket rather than checking routability. If the + // observed socket is loopback, this guard is skipped only in local/unit tests where + // peers share 127.0.0.1, so keep a one-shot overwrite and avoid early returns. if !self.observed_sent { self.request.joiner.peer.addr = joiner_addr; if self.request.joiner.location.is_none() { From ff92a60cd8a37aefa90c05127202607520a5c440 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 14:50:25 -0600 Subject: [PATCH 06/10] fix: tag observed addr using conn event remote --- .../src/node/network_bridge/p2p_protoc.rs | 440 +++++++++++------- 1 file changed, 273 insertions(+), 167 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 1133964a8..97fe19291 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1,7 +1,6 @@ +use anyhow::anyhow; use dashmap::DashSet; use either::{Either, Left, Right}; -use futures::future::BoxFuture; -use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use std::convert::Infallible; @@ -114,6 +113,7 @@ pub(in crate::node) struct P2pConnManager { conn_bridge_rx: Receiver, event_listener: Box, connections: HashMap, + conn_event_tx: Option>, key_pair: TransportKeypair, listening_ip: IpAddr, listening_port: u16, @@ -183,6 +183,7 @@ impl P2pConnManager { conn_bridge_rx: rx_bridge_cmd, event_listener: Box::new(event_listener), connections: HashMap::new(), + conn_event_tx: None, key_pair, listening_ip: listener_ip, listening_port: listen_port, @@ -212,6 +213,7 @@ impl P2pConnManager { conn_bridge_rx, event_listener, connections, + conn_event_tx: _, key_pair, listening_ip, listening_port, @@ -243,10 +245,7 @@ impl P2pConnManager { let mut state = EventListenerState::new(outbound_conn_handler.clone()); - // Separate peer_connections to allow independent borrowing by the stream - let peer_connections: FuturesUnordered< - BoxFuture<'static, Result>, - > = FuturesUnordered::new(); + let (conn_event_tx, conn_event_rx) = mpsc::channel(1024); // For non-gateway peers, pass the peer_ready flag so it can be set after first handshake // For gateways, pass None (they're always ready) @@ -274,7 +273,7 @@ impl P2pConnManager { node_controller, client_wait_for_transaction, executor_listener, - peer_connections, + conn_event_rx, ); // Pin the stream on the stack @@ -288,6 +287,7 @@ impl P2pConnManager { conn_bridge_rx: tokio::sync::mpsc::channel(1).1, // Dummy, won't be used event_listener, connections, + conn_event_tx: Some(conn_event_tx.clone()), key_pair, listening_ip, listening_port, @@ -302,25 +302,30 @@ impl P2pConnManager { while let Some(result) = select_stream.as_mut().next().await { // Process the result using the existing handler let event = ctx - .process_select_result( - result, - &mut state, - &mut select_stream, - &handshake_cmd_sender, - ) + .process_select_result(result, &mut state, &handshake_cmd_sender) .await?; match event { EventResult::Continue => continue, EventResult::Event(event) => { match *event { - ConnEvent::InboundMessage(msg) => { + ConnEvent::InboundMessage(inbound) => { + let remote = inbound.remote_addr; + let mut msg = inbound.msg; tracing::info!( tx = %msg.id(), msg_type = %msg, + remote = ?remote, peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); + if let (Some(remote_addr), NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { payload, .. }))) = + (remote, &mut msg) + { + if payload.observed_addr.is_none() { + payload.observed_addr = Some(remote_addr); + } + } ctx.handle_inbound_message(msg, &op_manager, &mut state) .await?; } @@ -498,6 +503,13 @@ impl P2pConnManager { } } } + ConnEvent::TransportClosed { remote_addr, error } => { + tracing::debug!( + remote = %remote_addr, + ?error, + "Transport closed event received in event loop" + ); + } ConnEvent::ClosedChannel(reason) => { match reason { ChannelCloseReason::Bridge @@ -986,7 +998,6 @@ impl P2pConnManager { &mut self, result: priority_select::SelectResult, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { let peer_id = &self.bridge.op_manager.ring.connection_manager.pub_key; @@ -1011,9 +1022,9 @@ impl P2pConnManager { SelectResult::PeerConnection(msg) => { tracing::debug!( peer = %peer_id, - "PrioritySelect: peer_connections READY" + "PrioritySelect: connection events READY" ); - self.handle_peer_connection_msg(msg, state, select_stream, handshake_commands) + self.handle_transport_event(msg, state, handshake_commands) .await } SelectResult::ConnBridge(msg) => { @@ -1026,12 +1037,11 @@ impl P2pConnManager { SelectResult::Handshake(result) => { tracing::debug!( peer = %peer_id, - "PrioritySelect: handshake event READY" + "PrioritySelect: handshake event READY" ); match result { Some(event) => { - self.handle_handshake_action(event, state, select_stream) - .await?; + self.handle_handshake_action(event, state).await?; Ok(EventResult::Continue) } None => { @@ -1326,7 +1336,6 @@ impl P2pConnManager { &mut self, event: HandshakeEvent, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, ) -> anyhow::Result<()> { tracing::info!(?event, "handle_handshake_action: received handshake event"); match event { @@ -1376,7 +1385,7 @@ impl P2pConnManager { "Inbound connection established" ); - self.handle_successful_connection(peer_id, connection, state, select_stream, None) + self.handle_successful_connection(peer_id, connection, state, None) .await?; } HandshakeEvent::OutboundEstablished { @@ -1391,7 +1400,7 @@ impl P2pConnManager { transaction = %transaction, "Outbound connection established" ); - self.handle_successful_connection(peer, connection, state, select_stream, None) + self.handle_successful_connection(peer, connection, state, None) .await?; } HandshakeEvent::OutboundFailed { @@ -1505,7 +1514,6 @@ impl P2pConnManager { peer_id: PeerId, connection: PeerConnection, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, remaining_checks: Option, ) -> anyhow::Result<()> { let pending_txs = state @@ -1574,8 +1582,13 @@ impl P2pConnManager { let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); - let task = peer_connection_listener(rx, connection).boxed(); - select_stream.push_peer_connection(task); + let Some(conn_events) = self.conn_event_tx.as_ref().cloned() else { + anyhow::bail!("Connection event channel not initialized"); + }; + let listener_peer = peer_id.clone(); + tokio::spawn(async move { + peer_connection_listener(rx, connection, listener_peer, conn_events).await; + }); newly_inserted = true; } else { tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] SKIP INSERT: OutboundConnectionSuccessful - connection already exists in HashMap"); @@ -1598,139 +1611,132 @@ impl P2pConnManager { Ok(()) } - async fn handle_peer_connection_msg( + async fn handle_transport_event( &mut self, - msg: Option>, + event: Option, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { - match msg { - Some(Ok(peer_conn)) => { - let mut peer_conn = peer_conn; - // Get the remote address from the connection - let remote_addr = peer_conn.conn.remote_addr(); - let tx = *peer_conn.msg.id(); - if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) { - if sender_peer.peer.addr == remote_addr - || sender_peer.peer.addr.ip().is_unspecified() - { - let mut new_peer_id = sender_peer.peer.clone(); - if new_peer_id.addr.ip().is_unspecified() { - new_peer_id.addr = remote_addr; - if let Some(sender_mut) = - extract_sender_from_message_mut(&mut peer_conn.msg) - { - if sender_mut.peer.addr.ip().is_unspecified() { - sender_mut.peer.addr = remote_addr; + match event { + Some(ConnEvent::InboundMessage(mut inbound)) => { + let tx = *inbound.msg.id(); + + if let Some(remote_addr) = inbound.remote_addr { + if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { + if sender_peer.peer.addr == remote_addr + || sender_peer.peer.addr.ip().is_unspecified() + { + let mut new_peer_id = sender_peer.peer.clone(); + if new_peer_id.addr.ip().is_unspecified() { + new_peer_id.addr = remote_addr; + if let Some(sender_mut) = + extract_sender_from_message_mut(&mut inbound.msg) + { + if sender_mut.peer.addr.ip().is_unspecified() { + sender_mut.peer.addr = remote_addr; + } } } - } - if let Some(existing_key) = self - .connections - .keys() - .find(|peer| { - peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key - }) - .cloned() - { - if let Some(channel) = self.connections.remove(&existing_key) { - tracing::info!( - remote = %remote_addr, - old_peer = %existing_key, - new_peer = %new_peer_id, - "Updating provisional peer identity after inbound message" - ); - self.bridge - .op_manager - .ring - .update_connection_identity(&existing_key, new_peer_id.clone()); - self.connections.insert(new_peer_id, channel); + if let Some(existing_key) = self + .connections + .keys() + .find(|peer| { + peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key + }) + .cloned() + { + if let Some(channel) = self.connections.remove(&existing_key) { + tracing::info!( + remote = %remote_addr, + old_peer = %existing_key, + new_peer = %new_peer_id, + "Updating provisional peer identity after inbound message" + ); + self.bridge.op_manager.ring.update_connection_identity( + &existing_key, + new_peer_id.clone(), + ); + self.connections.insert(new_peer_id, channel); + } } } } - } - if let NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { - payload, .. - })) = &mut peer_conn.msg - { - if payload.observed_addr.is_none() { - payload.observed_addr = Some(remote_addr); - } - } + let should_connect = + !self.connections.keys().any(|peer| peer.addr == remote_addr) + && !state.awaiting_connection.contains_key(&remote_addr); - // Check if we need to establish a connection back to the sender - let should_connect = !self.connections.keys().any(|peer| peer.addr == remote_addr) - && !state.awaiting_connection.contains_key(&remote_addr); - - if should_connect { - // Try to extract sender information from the message to establish connection - if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) { - tracing::info!( - "Received message from unconnected peer {}, establishing connection proactively", - sender_peer.peer - ); + if should_connect { + if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { + tracing::info!( + "Received message from unconnected peer {}, establishing connection proactively", + sender_peer.peer + ); - let tx = Transaction::new::(); - let (callback, _rx) = tokio::sync::mpsc::channel(10); + let tx = Transaction::new::(); + let (callback, _rx) = tokio::sync::mpsc::channel(10); - // Don't await - let it happen in the background - let _ = self - .handle_connect_peer( - sender_peer.peer.clone(), - Box::new(callback), - tx, - handshake_commands, - state, - false, // not a courtesy connection - ) - .await; + let _ = self + .handle_connect_peer( + sender_peer.peer.clone(), + Box::new(callback), + tx, + handshake_commands, + state, + false, + ) + .await; + } } } tracing::debug!( - peer_addr = %remote_addr, + peer_addr = ?inbound.remote_addr, %tx, tx_type = ?tx.transaction_type(), "Queueing inbound NetMessage from peer connection" ); - let task = peer_connection_listener(peer_conn.rx, peer_conn.conn).boxed(); - select_stream.push_peer_connection(task); Ok(EventResult::Event( - ConnEvent::InboundMessage(peer_conn.msg).into(), + ConnEvent::InboundMessage(inbound).into(), )) } - Some(Err(err)) => { - if let TransportError::ConnectionClosed(socket_addr) = err { - if let Some(peer) = self - .connections - .keys() - .find_map(|k| (k.addr == socket_addr).then(|| k.clone())) + Some(ConnEvent::TransportClosed { remote_addr, error }) => { + tracing::debug!( + remote = %remote_addr, + ?error, + "peer_connection_listener reported transport closure" + ); + if let Some(peer) = self + .connections + .keys() + .find_map(|k| (k.addr == remote_addr).then(|| k.clone())) + { + tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer, socket_addr = %remote_addr, conn_map_size = self.connections.len(), "[CONN_TRACK] REMOVE: TransportClosed - removing from connections HashMap"); + self.bridge + .op_manager + .ring + .prune_connection(peer.clone()) + .await; + self.connections.remove(&peer); + if let Err(error) = handshake_commands + .send(HandshakeCommand::DropConnection { peer: peer.clone() }) + .await { - tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer, socket_addr = %socket_addr, conn_map_size = self.connections.len(), "[CONN_TRACK] REMOVE: TransportError::ConnectionClosed - removing from connections HashMap"); - self.bridge - .op_manager - .ring - .prune_connection(peer.clone()) - .await; - self.connections.remove(&peer); - if let Err(error) = handshake_commands - .send(HandshakeCommand::DropConnection { peer: peer.clone() }) - .await - { - tracing::warn!( - remote = %socket_addr, - ?error, - "Failed to notify handshake driver about dropped connection" - ); - } + tracing::warn!( + remote = %remote_addr, + ?error, + "Failed to notify handshake driver about dropped connection" + ); } } Ok(EventResult::Continue) } + Some(other) => { + tracing::warn!(?other, "Unexpected event from peer connection listener"); + Ok(EventResult::Continue) + } None => { - tracing::error!("All peer connections closed"); + tracing::error!("All peer connection event channels closed"); Ok(EventResult::Continue) } } @@ -1776,7 +1782,7 @@ impl P2pConnManager { msg_type = %msg, "handle_notification_msg: Received NetMessage notification, converting to InboundMessage" ); - EventResult::Event(ConnEvent::InboundMessage(msg).into()) + EventResult::Event(ConnEvent::InboundMessage(msg.into()).into()) } Some(Right(action)) => { tracing::info!( @@ -1799,7 +1805,7 @@ impl P2pConnManager { match msg { Some((callback, msg)) => { state.pending_op_results.insert(*msg.id(), callback); - EventResult::Event(ConnEvent::InboundMessage(msg).into()) + EventResult::Event(ConnEvent::InboundMessage(msg.into()).into()) } None => { EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::OpExecution).into()) @@ -1909,7 +1915,6 @@ impl ConnectResultSender for mpsc::Sender), ()>> { struct EventListenerState { outbound_handler: OutboundConnectionHandler, - // Note: peer_connections has been moved out to allow separate borrowing by the stream pending_from_executor: HashSet, // FIXME: we are potentially leaving trash here when transacrions are completed tx_to_client: HashMap>, @@ -1940,10 +1945,38 @@ enum EventResult { #[derive(Debug)] pub(super) enum ConnEvent { - InboundMessage(NetMessage), + InboundMessage(IncomingMessage), OutboundMessage(NetMessage), NodeAction(NodeEvent), ClosedChannel(ChannelCloseReason), + TransportClosed { + remote_addr: SocketAddr, + error: TransportError, + }, +} + +#[derive(Debug)] +pub(super) struct IncomingMessage { + pub msg: NetMessage, + pub remote_addr: Option, +} + +impl IncomingMessage { + fn with_remote(msg: NetMessage, remote_addr: SocketAddr) -> Self { + Self { + msg, + remote_addr: Some(remote_addr), + } + } +} + +impl From for IncomingMessage { + fn from(msg: NetMessage) -> Self { + Self { + msg, + remote_addr: None, + } + } } #[derive(Debug)] @@ -1966,14 +1999,6 @@ enum ProtocolStatus { Failed, } -#[derive(Debug)] -pub(super) struct PeerConnectionInbound { - pub conn: PeerConnection, - /// Receiver for inbound messages for the peer connection - pub rx: Receiver>, - pub msg: NetMessage, -} - async fn handle_peer_channel_message( conn: &mut PeerConnection, msg: Either, @@ -2025,17 +2050,50 @@ async fn handle_peer_channel_message( Ok(()) } +async fn notify_transport_closed( + sender: &Sender, + remote_addr: SocketAddr, + error: TransportError, +) { + if sender + .send(ConnEvent::TransportClosed { remote_addr, error }) + .await + .is_err() + { + tracing::debug!( + remote = %remote_addr, + "[CONN_LIFECYCLE] conn_events receiver dropped before handling closure event" + ); + } +} + async fn peer_connection_listener( mut rx: PeerConnChannelRecv, mut conn: PeerConnection, -) -> Result { + peer_id: PeerId, + conn_events: Sender, +) { const MAX_IMMEDIATE_SENDS: usize = 32; + let remote_addr = conn.remote_addr(); + tracing::debug!( + to = %remote_addr, + peer = %peer_id, + "[CONN_LIFECYCLE] Starting peer_connection_listener task" + ); loop { let mut drained = 0; loop { match rx.try_recv() { Ok(msg) => { - handle_peer_channel_message(&mut conn, msg).await?; + if let Err(error) = handle_peer_channel_message(&mut conn, msg).await { + tracing::debug!( + to = %remote_addr, + ?error, + "[CONN_LIFECYCLE] Shutting down connection after send failure" + ); + notify_transport_closed(&conn_events, remote_addr, error).await; + return; + } drained += 1; if drained >= MAX_IMMEDIATE_SENDS { break; @@ -2047,7 +2105,13 @@ async fn peer_connection_listener( to = %conn.remote_addr(), "[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection" ); - return Err(TransportError::ConnectionClosed(conn.remote_addr())); + notify_transport_closed( + &conn_events, + remote_addr, + TransportError::ConnectionClosed(remote_addr), + ) + .await; + return; } } } @@ -2059,32 +2123,74 @@ async fn peer_connection_listener( to = %conn.remote_addr(), "[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection" ); - break Err(TransportError::ConnectionClosed(conn.remote_addr())); + notify_transport_closed( + &conn_events, + remote_addr, + TransportError::ConnectionClosed(remote_addr), + ) + .await; + return; }; - handle_peer_channel_message(&mut conn, msg).await?; - } - msg = conn.recv() => { - let Ok(msg) = msg - .inspect_err(|error| { - tracing::error!(from=%conn.remote_addr(), "Error while receiving message: {error}"); - }) - else { + if let Err(error) = handle_peer_channel_message(&mut conn, msg).await { tracing::debug!( - from = %conn.remote_addr(), - "[CONN_LIFECYCLE] peer_connection_listener terminating after recv error" + to = %remote_addr, + ?error, + "[CONN_LIFECYCLE] Connection closed after channel command" ); - break Err(TransportError::ConnectionClosed(conn.remote_addr())); - }; - let net_message = decode_msg(&msg).unwrap(); - let tx = *net_message.id(); - tracing::debug!( - from = %conn.remote_addr(), - %tx, - tx_type = ?tx.transaction_type(), - msg_type = %net_message, - "[CONN_LIFECYCLE] Received inbound NetMessage from peer" - ); - break Ok(PeerConnectionInbound { conn, rx, msg: net_message }); + notify_transport_closed(&conn_events, remote_addr, error).await; + return; + } + } + msg = conn.recv() => { + match msg { + Ok(msg) => { + match decode_msg(&msg) { + Ok(net_message) => { + let tx = *net_message.id(); + tracing::debug!( + from = %conn.remote_addr(), + %tx, + tx_type = ?tx.transaction_type(), + msg_type = %net_message, + "[CONN_LIFECYCLE] Received inbound NetMessage from peer" + ); + if conn_events.send(ConnEvent::InboundMessage(IncomingMessage::with_remote(net_message, remote_addr))).await.is_err() { + tracing::debug!( + from = %remote_addr, + "[CONN_LIFECYCLE] conn_events receiver dropped; stopping listener" + ); + return; + } + } + Err(error) => { + tracing::error!( + from = %conn.remote_addr(), + ?error, + "[CONN_LIFECYCLE] Failed to deserialize inbound message; closing connection" + ); + let transport_error = TransportError::Other(anyhow!( + "Failed to deserialize inbound message from {remote_addr}: {error:?}" + )); + notify_transport_closed( + &conn_events, + remote_addr, + transport_error, + ) + .await; + return; + } + } + } + Err(error) => { + tracing::debug!( + from = %conn.remote_addr(), + ?error, + "[CONN_LIFECYCLE] peer_connection_listener terminating after recv error" + ); + notify_transport_closed(&conn_events, remote_addr, error).await; + return; + } + } } } } From e3fc524c87767e04fba30b7bc022731f3ad043e4 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 16:15:56 -0600 Subject: [PATCH 07/10] fix: fmt after rebase --- crates/core/src/node/network_bridge/p2p_protoc.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 97fe19291..a61986cb8 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -319,8 +319,13 @@ impl P2pConnManager { peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); - if let (Some(remote_addr), NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { payload, .. }))) = - (remote, &mut msg) + if let ( + Some(remote_addr), + NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { + payload, + .. + })), + ) = (remote, &mut msg) { if payload.observed_addr.is_none() { payload.observed_addr = Some(remote_addr); From 0a3d6598c9b09bc65f733592e9c697bcb19d3d88 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 17:35:47 -0600 Subject: [PATCH 08/10] fix: stop redundant proactive connects on inbound --- .../src/node/network_bridge/p2p_protoc.rs | 28 +------------------ 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index a61986cb8..e77eccafd 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1622,6 +1622,7 @@ impl P2pConnManager { state: &mut EventListenerState, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { + let _ = state; match event { Some(ConnEvent::InboundMessage(mut inbound)) => { let tx = *inbound.msg.id(); @@ -1666,33 +1667,6 @@ impl P2pConnManager { } } } - - let should_connect = - !self.connections.keys().any(|peer| peer.addr == remote_addr) - && !state.awaiting_connection.contains_key(&remote_addr); - - if should_connect { - if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { - tracing::info!( - "Received message from unconnected peer {}, establishing connection proactively", - sender_peer.peer - ); - - let tx = Transaction::new::(); - let (callback, _rx) = tokio::sync::mpsc::channel(10); - - let _ = self - .handle_connect_peer( - sender_peer.peer.clone(), - Box::new(callback), - tx, - handshake_commands, - state, - false, - ) - .await; - } - } } tracing::debug!( From 30311f64594d10208e8218730cdc6c0240aa7861 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 18:27:42 -0600 Subject: [PATCH 09/10] docs: clarify observed addr tagging hop --- crates/core/src/node/network_bridge/p2p_protoc.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index e77eccafd..7ee2ae527 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -319,6 +319,9 @@ impl P2pConnManager { peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); + // Only the hop that owns the transport socket knows the UDP source + // address; tag the connect request here so downstream relays don't + // guess at the joiner's address. if let ( Some(remote_addr), NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { From 5cf680646d0522e3ffff34065395f89cd8b4f1a9 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 18:28:56 -0600 Subject: [PATCH 10/10] docs: clarify observed addr tagging hop --- crates/core/src/node/network_bridge/p2p_protoc.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 7ee2ae527..a8b10ccce 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -319,9 +319,9 @@ impl P2pConnManager { peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); - // Only the hop that owns the transport socket knows the UDP source - // address; tag the connect request here so downstream relays don't - // guess at the joiner's address. + // Only the hop that owns the transport socket (gateway/first hop in + // practice) knows the UDP source address; tag the connect request here + // so downstream relays don't guess at the joiner's address. if let ( Some(remote_addr), NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {