Skip to content
45 changes: 17 additions & 28 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,29 @@ impl P2pConnManager {
match *event {
ConnEvent::InboundMessage(inbound) => {
let remote = inbound.remote_addr;
let msg = inbound.msg;
let mut msg = inbound.msg;
tracing::info!(
tx = %msg.id(),
msg_type = %msg,
remote = ?remote,
peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(),
"Received inbound message from peer - processing"
);
// Only the hop that owns the transport socket (gateway/first hop in
// practice) knows the UDP source address; tag the connect request here
// so downstream relays don't guess at the joiner's address.
if let (
Some(remote_addr),
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
payload,
..
})),
) = (remote, &mut msg)
{
if payload.observed_addr.is_none() {
payload.observed_addr = Some(remote_addr);
}
}
ctx.handle_inbound_message(msg, &op_manager, &mut state)
.await?;
}
Expand Down Expand Up @@ -1610,6 +1625,7 @@ impl P2pConnManager {
state: &mut EventListenerState,
handshake_commands: &HandshakeCommandSender,
) -> anyhow::Result<EventResult> {
let _ = state;
match event {
Some(ConnEvent::InboundMessage(mut inbound)) => {
let tx = *inbound.msg.id();
Expand Down Expand Up @@ -1654,33 +1670,6 @@ impl P2pConnManager {
}
}
}

let should_connect =
!self.connections.keys().any(|peer| peer.addr == remote_addr)
&& !state.awaiting_connection.contains_key(&remote_addr);

if should_connect {
if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) {
tracing::info!(
"Received message from unconnected peer {}, establishing connection proactively",
sender_peer.peer
);

let tx = Transaction::new::<crate::operations::connect::ConnectMsg>();
let (callback, _rx) = tokio::sync::mpsc::channel(10);

let _ = self
.handle_connect_peer(
sender_peer.peer.clone(),
Box::new(callback),
tx,
handshake_commands,
state,
false,
)
.await;
}
}
}

tracing::debug!(
Expand Down
103 changes: 68 additions & 35 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ impl fmt::Display for ConnectMsg {
match self {
ConnectMsg::Request { target, payload, .. } => write!(
f,
"ConnectRequest {{ target: {target}, desired: {}, ttl: {}, origin: {} }}",
"ConnectRequest {{ target: {target}, desired: {}, ttl: {}, joiner: {} }}",
payload.desired_location,
payload.ttl,
payload.origin
payload.joiner
),
ConnectMsg::Response { sender, target, payload, .. } => write!(
f,
Expand Down Expand Up @@ -112,11 +112,13 @@ pub(crate) struct ConnectRequest {
/// Joiner's advertised location (fallbacks to the joiner's socket address).
pub desired_location: Location,
/// Joiner's identity as observed so far.
pub origin: PeerKeyLocation,
pub joiner: PeerKeyLocation,
/// Remaining hops before the request stops travelling.
pub ttl: u8,
/// Simple visited set to avoid trivial loops.
pub visited: Vec<PeerKeyLocation>,
/// Socket observed by the gateway/relay for the joiner, if known.
pub observed_addr: Option<SocketAddr>,
}

/// Acceptance payload returned by candidates.
Expand Down Expand Up @@ -191,34 +193,35 @@ impl RelayState {
&mut self,
ctx: &C,
observed_remote: &PeerKeyLocation,
observed_addr: SocketAddr,
) -> RelayActions {
let mut actions = RelayActions::default();
push_unique_peer(&mut self.request.visited, observed_remote.clone());
push_unique_peer(&mut self.request.visited, ctx.self_location().clone());

if self.request.origin.peer.addr.ip().is_unspecified()
&& !self.observed_sent
&& observed_remote.peer.pub_key == self.request.origin.peer.pub_key
{
self.request.origin.peer.addr = observed_addr;
if self.request.origin.location.is_none() {
self.request.origin.location = Some(Location::from_address(&observed_addr));
if let Some(joiner_addr) = self.request.observed_addr {
// Always overwrite with observed socket rather than checking routability. If the
// observed socket is loopback, this guard is skipped only in local/unit tests where
// peers share 127.0.0.1, so keep a one-shot overwrite and avoid early returns.
if !self.observed_sent {
self.request.joiner.peer.addr = joiner_addr;
if self.request.joiner.location.is_none() {
self.request.joiner.location = Some(Location::from_address(&joiner_addr));
}
self.observed_sent = true;
actions.observed_address = Some((self.request.joiner.clone(), joiner_addr));
}
self.observed_sent = true;
actions.observed_address = Some((self.request.origin.clone(), observed_addr));
}

if !self.accepted_locally && ctx.should_accept(&self.request.origin) {
if !self.accepted_locally && ctx.should_accept(&self.request.joiner) {
self.accepted_locally = true;
let acceptor = ctx.self_location().clone();
let courtesy = ctx.courtesy_hint(&acceptor, &self.request.origin);
let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner);
self.courtesy_hint = courtesy;
actions.accept_response = Some(ConnectResponse {
acceptor: acceptor.clone(),
courtesy,
});
actions.expect_connection_from = Some(self.request.origin.clone());
actions.expect_connection_from = Some(self.request.joiner.clone());
}

if self.forwarded_to.is_none() && self.request.ttl > 0 {
Expand Down Expand Up @@ -444,9 +447,10 @@ impl ConnectOp {
push_unique_peer(&mut visited, target.clone());
let request = ConnectRequest {
desired_location,
origin: own.clone(),
joiner: own.clone(),
ttl,
visited,
observed_addr: None,
};

let tx = Transaction::new::<ConnectMsg>();
Expand Down Expand Up @@ -497,7 +501,6 @@ impl ConnectOp {
ctx: &C,
upstream: PeerKeyLocation,
request: ConnectRequest,
observed_addr: SocketAddr,
) -> RelayActions {
if !matches!(self.state, Some(ConnectState::Relaying(_))) {
self.state = Some(ConnectState::Relaying(Box::new(RelayState {
Expand All @@ -515,7 +518,7 @@ impl ConnectOp {
state.upstream = upstream;
state.request = request;
let upstream_snapshot = state.upstream.clone();
state.handle_request(ctx, &upstream_snapshot, observed_addr)
state.handle_request(ctx, &upstream_snapshot)
}
_ => RelayActions::default(),
}
Expand Down Expand Up @@ -578,8 +581,7 @@ impl Operation for ConnectOp {
match msg {
ConnectMsg::Request { from, payload, .. } => {
let env = RelayEnv::new(op_manager);
let actions =
self.handle_request(&env, from.clone(), payload.clone(), from.peer.addr);
let actions = self.handle_request(&env, from.clone(), payload.clone());

if let Some((target, address)) = actions.observed_address {
let msg = ConnectMsg::ObservedAddress {
Expand Down Expand Up @@ -1026,9 +1028,10 @@ mod tests {
upstream: joiner.clone(),
request: ConnectRequest {
desired_location: Location::random(),
origin: joiner.clone(),
joiner: joiner.clone(),
ttl: 3,
visited: vec![],
observed_addr: Some(joiner.peer.addr),
},
forwarded_to: None,
courtesy_hint: false,
Expand All @@ -1037,8 +1040,7 @@ mod tests {
};

let ctx = TestRelayContext::new(self_loc.clone()).courtesy(true);
let observed_addr = joiner.peer.addr;
let actions = state.handle_request(&ctx, &joiner, observed_addr);
let actions = state.handle_request(&ctx, &joiner);

let response = actions.accept_response.expect("expected acceptance");
assert_eq!(response.acceptor.peer, self_loc.peer);
Expand All @@ -1056,9 +1058,10 @@ mod tests {
upstream: joiner.clone(),
request: ConnectRequest {
desired_location: Location::random(),
origin: joiner.clone(),
joiner: joiner.clone(),
ttl: 2,
visited: vec![],
observed_addr: Some(joiner.peer.addr),
},
forwarded_to: None,
courtesy_hint: false,
Expand All @@ -1069,7 +1072,7 @@ mod tests {
let ctx = TestRelayContext::new(self_loc)
.accept(false)
.next_hop(Some(next_hop.clone()));
let actions = state.handle_request(&ctx, &joiner, joiner.peer.addr);
let actions = state.handle_request(&ctx, &joiner);

assert!(actions.accept_response.is_none());
let (forward_to, request) = actions.forward.expect("expected forward");
Expand All @@ -1078,6 +1081,40 @@ mod tests {
assert!(request.visited.iter().any(|pkl| pkl.peer == joiner.peer));
}

#[test]
fn relay_emits_observed_address_for_private_joiner() {
let self_loc = make_peer(4050);
let joiner = make_peer(5050);
let observed_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)),
joiner.peer.addr.port(),
);
let mut state = RelayState {
upstream: joiner.clone(),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: vec![],
observed_addr: Some(observed_addr),
},
forwarded_to: None,
courtesy_hint: false,
observed_sent: false,
accepted_locally: false,
};

let ctx = TestRelayContext::new(self_loc);
let actions = state.handle_request(&ctx, &joiner);

let (target, addr) = actions
.observed_address
.expect("expected observed address update");
assert_eq!(addr, observed_addr);
assert_eq!(target.peer.addr, observed_addr);
assert_eq!(state.request.joiner.peer.addr, observed_addr);
}

#[test]
fn joiner_tracks_acceptance() {
let acceptor = make_peer(7000);
Expand Down Expand Up @@ -1138,18 +1175,18 @@ mod tests {

let request = ConnectRequest {
desired_location: Location::random(),
origin: joiner.clone(),
joiner: joiner.clone(),
ttl: 3,
visited: vec![joiner.clone()],
observed_addr: Some(joiner.peer.addr),
};

let tx = Transaction::new::<ConnectMsg>();
let mut relay_op = ConnectOp::new_relay(tx, joiner.clone(), request.clone());
let ctx = TestRelayContext::new(relay_a.clone())
.accept(false)
.next_hop(Some(relay_b.clone()));
let actions =
relay_op.handle_request(&ctx, joiner.clone(), request.clone(), joiner.peer.addr);
let actions = relay_op.handle_request(&ctx, joiner.clone(), request.clone());

let (forward_target, forward_request) = actions
.forward
Expand All @@ -1168,12 +1205,8 @@ mod tests {
let mut accepting_relay =
ConnectOp::new_relay(tx, relay_a.clone(), forward_request.clone());
let ctx_accept = TestRelayContext::new(relay_b.clone());
let accept_actions = accepting_relay.handle_request(
&ctx_accept,
relay_a.clone(),
forward_request,
relay_a.peer.addr,
);
let accept_actions =
accepting_relay.handle_request(&ctx_accept, relay_a.clone(), forward_request);

let response = accept_actions
.accept_response
Expand Down
Loading