diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 91f2444cb..a8b10ccce 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -311,7 +311,7 @@ impl P2pConnManager { match *event { ConnEvent::InboundMessage(inbound) => { let remote = inbound.remote_addr; - let msg = inbound.msg; + let mut msg = inbound.msg; tracing::info!( tx = %msg.id(), msg_type = %msg, @@ -319,6 +319,21 @@ impl P2pConnManager { peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); + // Only the hop that owns the transport socket (gateway/first hop in + // practice) knows the UDP source address; tag the connect request here + // so downstream relays don't guess at the joiner's address. + if let ( + Some(remote_addr), + NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { + payload, + .. + })), + ) = (remote, &mut msg) + { + if payload.observed_addr.is_none() { + payload.observed_addr = Some(remote_addr); + } + } ctx.handle_inbound_message(msg, &op_manager, &mut state) .await?; } @@ -1610,6 +1625,7 @@ impl P2pConnManager { state: &mut EventListenerState, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { + let _ = state; match event { Some(ConnEvent::InboundMessage(mut inbound)) => { let tx = *inbound.msg.id(); @@ -1654,33 +1670,6 @@ impl P2pConnManager { } } } - - let should_connect = - !self.connections.keys().any(|peer| peer.addr == remote_addr) - && !state.awaiting_connection.contains_key(&remote_addr); - - if should_connect { - if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { - tracing::info!( - "Received message from unconnected peer {}, establishing connection proactively", - sender_peer.peer - ); - - let tx = Transaction::new::(); - let (callback, _rx) = tokio::sync::mpsc::channel(10); - - let _ = self - .handle_connect_peer( - sender_peer.peer.clone(), - Box::new(callback), - tx, - handshake_commands, - state, - false, - ) - .await; - } - } } tracing::debug!( diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index cc799c174..f0d055715 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -79,10 +79,10 @@ impl fmt::Display for ConnectMsg { match self { ConnectMsg::Request { target, payload, .. } => write!( f, - "ConnectRequest {{ target: {target}, desired: {}, ttl: {}, origin: {} }}", + "ConnectRequest {{ target: {target}, desired: {}, ttl: {}, joiner: {} }}", payload.desired_location, payload.ttl, - payload.origin + payload.joiner ), ConnectMsg::Response { sender, target, payload, .. } => write!( f, @@ -112,11 +112,13 @@ pub(crate) struct ConnectRequest { /// Joiner's advertised location (fallbacks to the joiner's socket address). pub desired_location: Location, /// Joiner's identity as observed so far. - pub origin: PeerKeyLocation, + pub joiner: PeerKeyLocation, /// Remaining hops before the request stops travelling. pub ttl: u8, /// Simple visited set to avoid trivial loops. pub visited: Vec, + /// Socket observed by the gateway/relay for the joiner, if known. + pub observed_addr: Option, } /// Acceptance payload returned by candidates. @@ -191,34 +193,35 @@ impl RelayState { &mut self, ctx: &C, observed_remote: &PeerKeyLocation, - observed_addr: SocketAddr, ) -> RelayActions { let mut actions = RelayActions::default(); push_unique_peer(&mut self.request.visited, observed_remote.clone()); push_unique_peer(&mut self.request.visited, ctx.self_location().clone()); - if self.request.origin.peer.addr.ip().is_unspecified() - && !self.observed_sent - && observed_remote.peer.pub_key == self.request.origin.peer.pub_key - { - self.request.origin.peer.addr = observed_addr; - if self.request.origin.location.is_none() { - self.request.origin.location = Some(Location::from_address(&observed_addr)); + if let Some(joiner_addr) = self.request.observed_addr { + // Always overwrite with observed socket rather than checking routability. If the + // observed socket is loopback, this guard is skipped only in local/unit tests where + // peers share 127.0.0.1, so keep a one-shot overwrite and avoid early returns. + if !self.observed_sent { + self.request.joiner.peer.addr = joiner_addr; + if self.request.joiner.location.is_none() { + self.request.joiner.location = Some(Location::from_address(&joiner_addr)); + } + self.observed_sent = true; + actions.observed_address = Some((self.request.joiner.clone(), joiner_addr)); } - self.observed_sent = true; - actions.observed_address = Some((self.request.origin.clone(), observed_addr)); } - if !self.accepted_locally && ctx.should_accept(&self.request.origin) { + if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { self.accepted_locally = true; let acceptor = ctx.self_location().clone(); - let courtesy = ctx.courtesy_hint(&acceptor, &self.request.origin); + let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner); self.courtesy_hint = courtesy; actions.accept_response = Some(ConnectResponse { acceptor: acceptor.clone(), courtesy, }); - actions.expect_connection_from = Some(self.request.origin.clone()); + actions.expect_connection_from = Some(self.request.joiner.clone()); } if self.forwarded_to.is_none() && self.request.ttl > 0 { @@ -444,9 +447,10 @@ impl ConnectOp { push_unique_peer(&mut visited, target.clone()); let request = ConnectRequest { desired_location, - origin: own.clone(), + joiner: own.clone(), ttl, visited, + observed_addr: None, }; let tx = Transaction::new::(); @@ -497,7 +501,6 @@ impl ConnectOp { ctx: &C, upstream: PeerKeyLocation, request: ConnectRequest, - observed_addr: SocketAddr, ) -> RelayActions { if !matches!(self.state, Some(ConnectState::Relaying(_))) { self.state = Some(ConnectState::Relaying(Box::new(RelayState { @@ -515,7 +518,7 @@ impl ConnectOp { state.upstream = upstream; state.request = request; let upstream_snapshot = state.upstream.clone(); - state.handle_request(ctx, &upstream_snapshot, observed_addr) + state.handle_request(ctx, &upstream_snapshot) } _ => RelayActions::default(), } @@ -578,8 +581,7 @@ impl Operation for ConnectOp { match msg { ConnectMsg::Request { from, payload, .. } => { let env = RelayEnv::new(op_manager); - let actions = - self.handle_request(&env, from.clone(), payload.clone(), from.peer.addr); + let actions = self.handle_request(&env, from.clone(), payload.clone()); if let Some((target, address)) = actions.observed_address { let msg = ConnectMsg::ObservedAddress { @@ -1026,9 +1028,10 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - origin: joiner.clone(), + joiner: joiner.clone(), ttl: 3, visited: vec![], + observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, courtesy_hint: false, @@ -1037,8 +1040,7 @@ mod tests { }; let ctx = TestRelayContext::new(self_loc.clone()).courtesy(true); - let observed_addr = joiner.peer.addr; - let actions = state.handle_request(&ctx, &joiner, observed_addr); + let actions = state.handle_request(&ctx, &joiner); let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); @@ -1056,9 +1058,10 @@ mod tests { upstream: joiner.clone(), request: ConnectRequest { desired_location: Location::random(), - origin: joiner.clone(), + joiner: joiner.clone(), ttl: 2, visited: vec![], + observed_addr: Some(joiner.peer.addr), }, forwarded_to: None, courtesy_hint: false, @@ -1069,7 +1072,7 @@ mod tests { let ctx = TestRelayContext::new(self_loc) .accept(false) .next_hop(Some(next_hop.clone())); - let actions = state.handle_request(&ctx, &joiner, joiner.peer.addr); + let actions = state.handle_request(&ctx, &joiner); assert!(actions.accept_response.is_none()); let (forward_to, request) = actions.forward.expect("expected forward"); @@ -1078,6 +1081,40 @@ mod tests { assert!(request.visited.iter().any(|pkl| pkl.peer == joiner.peer)); } + #[test] + fn relay_emits_observed_address_for_private_joiner() { + let self_loc = make_peer(4050); + let joiner = make_peer(5050); + let observed_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)), + joiner.peer.addr.port(), + ); + let mut state = RelayState { + upstream: joiner.clone(), + request: ConnectRequest { + desired_location: Location::random(), + joiner: joiner.clone(), + ttl: 3, + visited: vec![], + observed_addr: Some(observed_addr), + }, + forwarded_to: None, + courtesy_hint: false, + observed_sent: false, + accepted_locally: false, + }; + + let ctx = TestRelayContext::new(self_loc); + let actions = state.handle_request(&ctx, &joiner); + + let (target, addr) = actions + .observed_address + .expect("expected observed address update"); + assert_eq!(addr, observed_addr); + assert_eq!(target.peer.addr, observed_addr); + assert_eq!(state.request.joiner.peer.addr, observed_addr); + } + #[test] fn joiner_tracks_acceptance() { let acceptor = make_peer(7000); @@ -1138,9 +1175,10 @@ mod tests { let request = ConnectRequest { desired_location: Location::random(), - origin: joiner.clone(), + joiner: joiner.clone(), ttl: 3, visited: vec![joiner.clone()], + observed_addr: Some(joiner.peer.addr), }; let tx = Transaction::new::(); @@ -1148,8 +1186,7 @@ mod tests { let ctx = TestRelayContext::new(relay_a.clone()) .accept(false) .next_hop(Some(relay_b.clone())); - let actions = - relay_op.handle_request(&ctx, joiner.clone(), request.clone(), joiner.peer.addr); + let actions = relay_op.handle_request(&ctx, joiner.clone(), request.clone()); let (forward_target, forward_request) = actions .forward @@ -1168,12 +1205,8 @@ mod tests { let mut accepting_relay = ConnectOp::new_relay(tx, relay_a.clone(), forward_request.clone()); let ctx_accept = TestRelayContext::new(relay_b.clone()); - let accept_actions = accepting_relay.handle_request( - &ctx_accept, - relay_a.clone(), - forward_request, - relay_a.peer.addr, - ); + let accept_actions = + accepting_relay.handle_request(&ctx_accept, relay_a.clone(), forward_request); let response = accept_actions .accept_response