diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 4cc9d99d1..627e99707 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -5,7 +5,7 @@ use futures::FutureExt; use futures::StreamExt; use std::convert::Infallible; use std::future::Future; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::time::Duration; use std::{ @@ -148,21 +148,19 @@ impl P2pConnManager { let gateways = config.get_gateways()?; let key_pair = config.key_pair.clone(); - // Initialize our peer identity before any connection attempts so join requests can - // reference the correct address. - let advertised_addr = { + // Initialize our peer identity. + // - Gateways must know their public address upfront (required) + // - Peers with configured public_address use that + // - Peers behind NAT start with a placeholder (127.0.0.1) which will be updated + // when they receive ObservedAddress from a gateway + let advertised_addr = if config.is_gateway { + // Gateways must have a public address configured let advertised_ip = config .peer_id .as_ref() .map(|peer| peer.addr.ip()) .or(config.config.network_api.public_address) - .unwrap_or_else(|| { - if listener_ip.is_unspecified() { - IpAddr::V4(Ipv4Addr::LOCALHOST) - } else { - listener_ip - } - }); + .expect("Gateway must have public_address configured"); let advertised_port = config .peer_id .as_ref() @@ -170,6 +168,14 @@ impl P2pConnManager { .or(config.config.network_api.public_port) .unwrap_or(listen_port); SocketAddr::new(advertised_ip, advertised_port) + } else if let Some(public_addr) = config.config.network_api.public_address { + // Non-gateway peer with explicitly configured public address + let port = config.config.network_api.public_port.unwrap_or(listen_port); + SocketAddr::new(public_addr, port) + } else { + // Non-gateway peer behind NAT: use placeholder address. + // This will be updated when we receive ObservedAddress from gateway. + SocketAddr::new(std::net::Ipv4Addr::new(127, 0, 0, 1).into(), listen_port) }; bridge .op_manager diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index b3a0802ee..577a179a1 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -159,6 +159,7 @@ impl NodeP2P { .min(u8::MAX as usize) as u8; let target_connections = self.op_manager.ring.connection_manager.min_connections; + let is_gateway = self.op_manager.ring.connection_manager.is_gateway(); let (tx, op, msg) = ConnectOp::initiate_join_request( joiner, query_target.clone(), @@ -166,6 +167,7 @@ impl NodeP2P { ttl, target_connections, self.op_manager.connect_forward_estimator.clone(), + is_gateway, ); tracing::debug!( diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index b99d7013c..6e462d0e7 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -22,7 +22,7 @@ use crate::node::{IsOperationCompleted, NetworkBridge, OpManager, PeerId}; use crate::operations::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::ring::PeerKeyLocation; use crate::router::{EstimatorType, IsotonicEstimator, IsotonicEvent}; -use crate::transport::TransportKeypair; +use crate::transport::{TransportKeypair, TransportPublicKey}; use crate::util::{Backoff, Contains, IterExt}; use freenet_stdlib::client_api::HostResponse; @@ -126,8 +126,9 @@ impl ConnectMsg { pub(crate) struct ConnectRequest { /// Joiner's advertised location (fallbacks to the joiner's socket address). pub desired_location: Location, - /// Joiner's identity as observed so far. - pub joiner: PeerKeyLocation, + /// Joiner's identity. NAT peers start as Unknown (just public key) until + /// a gateway observes their address and upgrades them to Known. + pub joiner: Joiner, /// Remaining hops before the request stops travelling. pub ttl: u8, /// Simple visited set to avoid trivial loops. @@ -143,6 +144,89 @@ pub(crate) struct ConnectResponse { pub acceptor: PeerKeyLocation, } +/// Represents a peer joining the network. +/// +/// NAT peers don't know their public address until a gateway observes it, +/// so we distinguish between: +/// - `Unknown`: Only have the public key (NAT peer before address discovery) +/// - `Known`: Have full PeerId with known address (gateway or after ObservedAddress) +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) enum Joiner { + /// Peer that doesn't yet know its public address (NAT peer before discovery). + Unknown(TransportPublicKey), + /// Peer with a known address (gateway, or NAT peer after ObservedAddress). + Known(PeerId), +} + +impl Joiner { + /// Returns the public key of the joiner. + #[allow(dead_code)] + pub fn pub_key(&self) -> &TransportPublicKey { + match self { + Joiner::Unknown(key) => key, + Joiner::Known(peer_id) => &peer_id.pub_key, + } + } + + /// Returns the PeerId if known, None if address is unknown. + pub fn peer_id(&self) -> Option<&PeerId> { + match self { + Joiner::Unknown(_) => None, + Joiner::Known(peer_id) => Some(peer_id), + } + } + + /// Returns true if this joiner has a known address. + #[allow(dead_code)] + pub fn has_known_address(&self) -> bool { + matches!(self, Joiner::Known(_)) + } + + /// Upgrades an Unknown joiner to Known once we observe their address. + pub fn with_observed_address(&self, addr: SocketAddr) -> Self { + match self { + Joiner::Unknown(key) => Joiner::Known(PeerId::new(addr, key.clone())), + Joiner::Known(peer_id) => { + // Avoid allocation if address hasn't changed + if peer_id.addr == addr { + self.clone() + } else { + Joiner::Known(PeerId::new(addr, peer_id.pub_key.clone())) + } + } + } + } + + /// Converts to a PeerKeyLocation if we have a known address. + /// Returns None if address is unknown. + pub fn to_peer_key_location(&self) -> Option { + match self { + Joiner::Unknown(_) => None, + Joiner::Known(peer_id) => Some(PeerKeyLocation { + peer: peer_id.clone(), + location: Some(Location::from_address(&peer_id.addr)), + }), + } + } + + /// Returns the location if we have a known address. + pub fn location(&self) -> Option { + match self { + Joiner::Unknown(_) => None, + Joiner::Known(peer_id) => Some(Location::from_address(&peer_id.addr)), + } + } +} + +impl fmt::Display for Joiner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Joiner::Unknown(key) => write!(f, "Unknown({})", key), + Joiner::Known(peer_id) => write!(f, "Known({})", peer_id), + } + } +} + /// New minimal state machine the joiner tracks. #[derive(Debug, Clone)] pub(crate) enum ConnectState { @@ -178,7 +262,8 @@ pub(crate) trait RelayContext { fn self_location(&self) -> &PeerKeyLocation; /// Determine whether we should accept the joiner immediately. - fn should_accept(&self, joiner: &PeerKeyLocation) -> bool; + /// Takes a Joiner which may or may not have a known address yet. + fn should_accept(&self, joiner: &Joiner) -> bool; /// Choose the next hop for the request, avoiding peers already visited. fn select_next_hop( @@ -277,37 +362,45 @@ impl RelayState { push_unique_peer(&mut self.request.visited, ctx.self_location().clone()); if let Some(joiner_addr) = self.request.observed_addr { - // Always overwrite with observed socket rather than checking routability. If the - // observed socket is loopback, this guard is skipped only in local/unit tests where - // peers share 127.0.0.1, so keep a one-shot overwrite and avoid early returns. + // Upgrade the joiner to Known with the observed address. + // This is critical for NAT peers who start as Unknown. if !self.observed_sent { - self.request.joiner.peer.addr = joiner_addr; - if self.request.joiner.location.is_none() { - self.request.joiner.location = Some(Location::from_address(&joiner_addr)); - } + self.request.joiner = self.request.joiner.with_observed_address(joiner_addr); self.observed_sent = true; - actions.observed_address = Some((self.request.joiner.clone(), joiner_addr)); + // Now that we have a known address, we can create a PeerKeyLocation + if let Some(joiner_pkl) = self.request.joiner.to_peer_key_location() { + actions.observed_address = Some((joiner_pkl, joiner_addr)); + } } } if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); - let dist = ring_distance(acceptor.location, self.request.joiner.location); + let joiner_location = self.request.joiner.location(); + let dist = ring_distance(acceptor.location, joiner_location); actions.accept_response = Some(ConnectResponse { acceptor: acceptor.clone(), }); - actions.expect_connection_from = Some(self.request.joiner.clone()); - // Use the joiner with updated observed address for response routing - actions.response_target = Some(self.request.joiner.clone()); - tracing::info!( - acceptor_peer = %acceptor.peer, - joiner_peer = %self.request.joiner.peer, - acceptor_loc = ?acceptor.location, - joiner_loc = ?self.request.joiner.location, - ring_distance = ?dist, - "connect: acceptance issued" - ); + // Get PeerKeyLocation for the joiner - should always succeed after observed_addr upgrade + if let Some(joiner_pkl) = self.request.joiner.to_peer_key_location() { + actions.expect_connection_from = Some(joiner_pkl.clone()); + // Use the joiner with updated observed address for response routing + actions.response_target = Some(joiner_pkl.clone()); + tracing::info!( + acceptor_peer = %acceptor.peer, + joiner_peer = %joiner_pkl.peer, + acceptor_loc = ?acceptor.location, + joiner_loc = ?joiner_pkl.location, + ring_distance = ?dist, + "connect: acceptance issued" + ); + } else { + tracing::warn!( + joiner = %self.request.joiner, + "connect: cannot accept joiner without known address" + ); + } } if self.forwarded_to.is_none() && self.request.ttl > 0 { @@ -378,14 +471,18 @@ impl RelayContext for RelayEnv<'_> { &self.self_location } - fn should_accept(&self, joiner: &PeerKeyLocation) -> bool { + fn should_accept(&self, joiner: &Joiner) -> bool { + // We can only accept joiners with known addresses + let Some(peer_id) = joiner.peer_id() else { + return false; + }; let location = joiner - .location - .unwrap_or_else(|| Location::from_address(&joiner.peer.addr)); + .location() + .unwrap_or_else(|| Location::from_address(&peer_id.addr)); self.op_manager .ring .connection_manager - .should_accept(location, &joiner.peer) + .should_accept(location, peer_id) } fn select_next_hop( @@ -626,12 +723,20 @@ impl ConnectOp { ttl: u8, target_connections: usize, connect_forward_estimator: Arc>, + is_gateway: bool, ) -> (Transaction, Self, ConnectMsg) { let mut visited = vec![own.clone()]; push_unique_peer(&mut visited, target.clone()); + // Gateways know their address, NAT peers don't until observed + let joiner = if is_gateway { + Joiner::Known(own.peer.clone()) + } else { + // NAT peer: we only know our public key, not our external address + Joiner::Unknown(own.peer.pub_key.clone()) + }; let request = ConnectRequest { desired_location, - joiner: own.clone(), + joiner, ttl, visited, observed_addr: None, @@ -941,6 +1046,14 @@ impl Operation for ConnectOp { } ConnectMsg::ObservedAddress { address, .. } => { self.handle_observed_address(*address, Instant::now()); + // Update our peer address now that we know our external address. + // This is critical for peers behind NAT who start with a placeholder + // address (127.0.0.1) and need to update it when a gateway observes + // their actual public address. + op_manager + .ring + .connection_manager + .update_peer_address(*address); Ok(store_operation_state(&mut self)) } } @@ -1050,6 +1163,7 @@ pub(crate) async fn join_ring_request( .min(u8::MAX as usize) as u8; let target_connections = op_manager.ring.connection_manager.min_connections; + let is_gateway = op_manager.ring.connection_manager.is_gateway(); let (tx, mut op, msg) = ConnectOp::initiate_join_request( own.clone(), gateway.clone(), @@ -1057,6 +1171,7 @@ pub(crate) async fn join_ring_request( ttl, target_connections, op_manager.connect_forward_estimator.clone(), + is_gateway, ); op.gateway = Some(Box::new(gateway.clone())); @@ -1225,7 +1340,7 @@ mod tests { &self.self_loc } - fn should_accept(&self, _joiner: &PeerKeyLocation) -> bool { + fn should_accept(&self, _joiner: &Joiner) -> bool { self.accept } @@ -1249,6 +1364,11 @@ mod tests { } } + /// Helper to create a Joiner::Known from a PeerKeyLocation + fn make_joiner(pkl: &PeerKeyLocation) -> Joiner { + Joiner::Known(pkl.peer.clone()) + } + #[test] fn forward_estimator_handles_missing_location() { let mut estimator = ConnectForwardEstimator::new(); @@ -1292,7 +1412,7 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - joiner: joiner.clone(), + joiner: make_joiner(&joiner), ttl: 3, visited: vec![], observed_addr: Some(joiner.peer.addr), @@ -1324,7 +1444,7 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - joiner: joiner.clone(), + joiner: make_joiner(&joiner), ttl: 2, visited: vec![], observed_addr: Some(joiner.peer.addr), @@ -1362,7 +1482,7 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - joiner: joiner.clone(), + joiner: make_joiner(&joiner), ttl: 3, visited: vec![], observed_addr: Some(observed_addr), @@ -1384,7 +1504,13 @@ mod tests { .expect("expected observed address update"); assert_eq!(addr, observed_addr); assert_eq!(target.peer.addr, observed_addr); - assert_eq!(state.request.joiner.peer.addr, observed_addr); + // After handling, the joiner should be upgraded to Known with the observed address + let joiner_peer = state + .request + .joiner + .peer_id() + .expect("joiner should be Known after observed_addr"); + assert_eq!(joiner_peer.addr, observed_addr); } #[test] @@ -1419,6 +1545,7 @@ mod tests { ttl, 2, Arc::new(RwLock::new(ConnectForwardEstimator::new())), + true, // is_gateway for test ); match msg { @@ -1451,7 +1578,7 @@ mod tests { let request = ConnectRequest { desired_location: Location::random(), - joiner: joiner.clone(), + joiner: make_joiner(&joiner), ttl: 3, visited: vec![joiner.clone()], observed_addr: Some(joiner.peer.addr), @@ -1531,7 +1658,7 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - joiner: joiner.clone(), + joiner: make_joiner(&joiner), ttl: 3, visited: vec![], observed_addr: Some(observed_public_addr), diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 2ffadb5e2..138a9fb06 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -395,6 +395,43 @@ impl ConnectionManager { } } + /// Updates the address of the peer key. + /// + /// This is used when a peer behind NAT learns its actual public address from a gateway + /// via the ObservedAddress message. The peer initially starts with a placeholder address + /// (127.0.0.1) and updates it here when the real address is discovered. + /// + /// Returns true if the address was updated, false if the address was already set to a + /// non-placeholder value. + pub fn update_peer_address(&self, new_addr: SocketAddr) -> bool { + let mut this_peer = self.peer_key.lock(); + if let Some(ref mut peer) = *this_peer { + // Only update if current address is a placeholder (localhost) + if peer.addr.ip().is_loopback() { + tracing::info!( + old_addr = %peer.addr, + %new_addr, + "Updating peer address from placeholder to observed address" + ); + peer.addr = new_addr; + true + } else { + tracing::debug!( + current_addr = %peer.addr, + %new_addr, + "Peer address already set to non-placeholder, not updating" + ); + false + } + } else { + tracing::warn!( + %new_addr, + "Attempted to update peer address but peer key not set" + ); + false + } + } + pub fn prune_alive_connection(&self, peer: &PeerId) -> Option { self.prune_connection(peer, true) } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index e11321b90..3f39ed89f 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -653,6 +653,7 @@ impl Ring { let ttl = self.max_hops_to_live.max(1).min(u8::MAX as usize) as u8; let target_connections = self.connection_manager.min_connections; + let is_gateway = self.connection_manager.is_gateway(); let (tx, op, msg) = ConnectOp::initiate_join_request( joiner, query_target.clone(), @@ -660,6 +661,7 @@ impl Ring { ttl, target_connections, op_manager.connect_forward_estimator.clone(), + is_gateway, ); live_tx_tracker.add_transaction(query_target.peer.clone(), tx);