From 373ee810267426b04fe23230da8eaad5b65f5497 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 1 Dec 2025 18:55:52 -0600 Subject: [PATCH 1/3] fix: handle NAT address in subscribe and seeding operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds NAT address handling to subscribe/seeding operations: - Subscribers with PeerAddr::Unknown have their address filled in by gateway - Gateway observes real UDP source address and updates subscriber address - SeedingManager tracks subscriber addresses properly - live_tx tests updated for new address model - In-memory testing infrastructure updated for PeerAddr Supersedes PRs #2172, #2174, #2175 (which had changes requested). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../src/node/network_bridge/p2p_protoc.rs | 68 +------------------ .../core/src/node/testing_impl/in_memory.rs | 4 +- crates/core/src/operations/connect.rs | 36 +++++++--- crates/core/src/operations/get.rs | 60 +++++++++------- crates/core/src/operations/subscribe.rs | 62 ++++++++++------- crates/core/src/operations/update.rs | 4 +- crates/core/src/ring/live_tx.rs | 68 +++++++++++++++++-- crates/core/src/ring/mod.rs | 43 ++++++++---- crates/core/src/ring/seeding.rs | 21 ++++-- crates/core/src/test_utils.rs | 14 ---- crates/core/src/transport/mod.rs | 28 ++++++++ crates/freenet-macros/src/codegen.rs | 8 +-- 12 files changed, 245 insertions(+), 171 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index c81be75a6..50b521aa4 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -603,53 +603,6 @@ impl P2pConnManager { } } } - ConnEvent::OutboundMessageWithTarget { target_addr, msg } => { - // This variant uses an explicit target address from OperationResult.target_addr, - // which is critical for NAT scenarios where the address in the message - // differs from the actual transport address we should send to. - tracing::info!( - tx = %msg.id(), - msg_type = %msg, - target_addr = %target_addr, - msg_target = ?msg.target().map(|t| t.addr()), - "Sending outbound message with explicit target address (NAT routing)" - ); - - // Look up the connection using the explicit target address - let peer_connection = ctx.connections.get(&target_addr); - - match peer_connection { - Some(peer_connection) => { - if let Err(e) = - peer_connection.sender.send(Left(msg.clone())).await - { - tracing::error!( - tx = %msg.id(), - target_addr = %target_addr, - "Failed to send message to peer: {}", e - ); - } else { - tracing::info!( - tx = %msg.id(), - target_addr = %target_addr, - "Message successfully sent to peer connection via explicit address" - ); - } - } - None => { - // No existing connection - this is unexpected for NAT scenarios - // since we should have the connection from the original request - tracing::error!( - tx = %msg.id(), - target_addr = %target_addr, - msg_target = ?msg.target().map(|t| t.addr()), - connections = ?ctx.connections.keys().collect::>(), - "No connection found for explicit target address - NAT routing failed" - ); - ctx.bridge.op_manager.completed(*msg.id()); - } - } - } ConnEvent::TransportClosed { remote_addr, error } => { tracing::debug!( remote = %remote_addr, @@ -2313,19 +2266,8 @@ impl P2pConnManager { fn handle_bridge_msg(&self, msg: Option) -> EventResult { match msg { - Some(Left((target, msg))) => { - // Use OutboundMessageWithTarget to preserve the target address from - // OperationResult.target_addr. This is critical for NAT scenarios where - // the address in the message differs from the actual transport address. - // The PeerId.addr contains the address that was used to look up the peer - // in P2pBridge::send(), which is the correct transport address. - EventResult::Event( - ConnEvent::OutboundMessageWithTarget { - target_addr: target.addr, - msg: *msg, - } - .into(), - ) + Some(Left((_target, msg))) => { + EventResult::Event(ConnEvent::OutboundMessage(*msg).into()) } Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()), None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()), @@ -2456,12 +2398,6 @@ enum EventResult { pub(super) enum ConnEvent { InboundMessage(IncomingMessage), OutboundMessage(NetMessage), - /// Outbound message with explicit target address from OperationResult.target_addr. - /// Used when the target address differs from what's in the message (NAT scenarios). - OutboundMessageWithTarget { - target_addr: SocketAddr, - msg: NetMessage, - }, NodeAction(NodeEvent), ClosedChannel(ChannelCloseReason), TransportClosed { diff --git a/crates/core/src/node/testing_impl/in_memory.rs b/crates/core/src/node/testing_impl/in_memory.rs index adde6de93..937892665 100644 --- a/crates/core/src/node/testing_impl/in_memory.rs +++ b/crates/core/src/node/testing_impl/in_memory.rs @@ -125,12 +125,12 @@ where self.op_manager.ring.seed_contract(key); } if let Some(subscribers) = contract_subscribers.get(&key) { - // add contract subscribers + // add contract subscribers (test setup - no upstream_addr) for subscriber in subscribers { if self .op_manager .ring - .add_subscriber(&key, subscriber.clone()) + .add_subscriber(&key, subscriber.clone(), None) .is_err() { tracing::warn!("Max subscribers for contract {} reached", key); diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index db55887f1..4efe65545 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -320,8 +320,8 @@ impl RelayState { // Use the joiner with updated observed address for response routing actions.response_target = Some(self.request.joiner.clone()); tracing::info!( - acceptor_key = %acceptor.pub_key(), - joiner_key = %self.request.joiner.pub_key(), + acceptor_pub_key = %acceptor.pub_key(), + joiner_pub_key = %self.request.joiner.pub_key(), acceptor_loc = ?acceptor.location, joiner_loc = ?self.request.joiner.location, ring_distance = ?dist, @@ -690,7 +690,7 @@ impl ConnectOp { match self.state.as_mut() { Some(ConnectState::WaitingForResponses(state)) => { tracing::info!( - acceptor_key = %response.acceptor.pub_key(), + acceptor_pub_key = %response.acceptor.pub_key(), acceptor_loc = ?response.acceptor.location, "connect: joiner received ConnectResponse" ); @@ -830,8 +830,20 @@ impl Operation for ConnectOp { }; // Route through upstream (where the request came from) since we may // not have a direct connection to the target + let Some(upstream) = source_addr else { + tracing::warn!( + tx = %self.id, + "ObservedAddress message has no upstream - was this locally initiated?" + ); + // No upstream to route through - this shouldn't happen for relayed connections + return Ok(OperationResult { + return_msg: None, + target_addr: None, + state: Some(OpEnum::Connect(Box::new(self))), + }); + }; network_bridge - .send(upstream_addr, NetMessage::V1(NetMessageV1::Connect(msg))) + .send(upstream, NetMessage::V1(NetMessageV1::Connect(msg))) .await?; } @@ -874,9 +886,17 @@ impl Operation for ConnectOp { }; // Route the response through upstream (where the request came from) // since we may not have a direct connection to the joiner + let Some(upstream) = source_addr else { + tracing::warn!( + tx = %self.id, + "ConnectResponse has no upstream - was this locally initiated?" + ); + // No upstream to route through - this shouldn't happen for relayed connections + return Ok(store_operation_state(&mut self)); + }; network_bridge .send( - upstream_addr, + upstream, NetMessage::V1(NetMessageV1::Connect(response_msg)), ) .await?; @@ -966,14 +986,14 @@ impl Operation for ConnectOp { let mut updated_payload = payload.clone(); updated_payload.acceptor.peer_addr = PeerAddr::Known(acceptor_addr); tracing::debug!( - acceptor = %updated_payload.acceptor.peer(), + acceptor_pub_key = %updated_payload.acceptor.pub_key(), acceptor_addr = %acceptor_addr, "connect: filled acceptor address from source_addr" ); updated_payload } else { tracing::warn!( - acceptor_key = %payload.acceptor.pub_key(), + acceptor_pub_key = %payload.acceptor.pub_key(), "connect: response received without source_addr, cannot fill acceptor address" ); payload.clone() @@ -984,7 +1004,7 @@ impl Operation for ConnectOp { tracing::debug!( upstream_addr = %upstream_addr, - acceptor_key = %forward_payload.acceptor.pub_key(), + acceptor_pub_key = %forward_payload.acceptor.pub_key(), "connect: forwarding response towards joiner" ); // Forward response toward the joiner via upstream diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 039ec92f4..073fc58d1 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -549,9 +549,14 @@ impl Operation for GetOp { ); // Use sender_from_addr (looked up from source_addr) instead of message field - let sender = sender_from_addr.clone().expect( - "RequestGet requires sender lookup from connection - source_addr should resolve to known peer", - ); + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + "GET: RequestGet without sender lookup - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; // Check if operation is already completed if matches!(self.state, Some(GetState::Finished { .. })) { @@ -702,9 +707,14 @@ impl Operation for GetOp { let this_peer = target.clone(); // Use sender_from_addr (looked up from source_addr) instead of message field - let sender = sender_from_addr.clone().expect( - "SeekNode requires sender lookup from connection - source_addr should resolve to known peer", - ); + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + "GET: SeekNode without sender lookup - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; if htl == 0 { let sender_display = sender.peer().to_string(); @@ -854,23 +864,21 @@ impl Operation for GetOp { let id = *id; let key = *key; - // Handle case where sender lookup failed (e.g., peer disconnected) + // Use sender_from_addr for logging let Some(sender) = sender_from_addr.clone() else { tracing::warn!( tx = %id, %key, - source = ?source_addr, - "GET: ReturnGet (empty) received but sender lookup failed - cannot process" + "GET: ReturnGet without sender lookup - cannot process" ); return Err(OpError::invalid_transition(self.id)); }; - // Use pub_key for logging to avoid panics on Unknown addresses tracing::info!( tx = %id, %key, - from = %sender.pub_key(), - to = %target.pub_key(), + from = %sender.peer(), + to = %target.peer(), skip = ?skip_list, "GET: ReturnGet received with empty value" ); @@ -882,7 +890,7 @@ impl Operation for GetOp { %this_peer, "Neither contract or contract value for contract found at peer {}, \ retrying with other peers", - sender.pub_key() + sender.peer() ); match self.state { @@ -901,10 +909,8 @@ impl Operation for GetOp { }) => { // todo: register in the stats for the outcome of the op that failed to get a response from this peer - // Add the failed peer to tried list (only if address is known) - if let Some(addr) = sender.socket_addr() { - tried_peers.insert(PeerId::new(addr, sender.pub_key().clone())); - } + // Add the failed peer to tried list + tried_peers.insert(sender.peer().clone()); // First, check if we have alternatives at this hop level if !alternatives.is_empty() && attempts_at_hop < DEFAULT_MAX_BREADTH { @@ -914,7 +920,7 @@ impl Operation for GetOp { tracing::info!( tx = %id, %key, - next_peer = %next_target.pub_key(), + next_peer = %next_target.peer(), fetch_contract, attempts_at_hop = attempts_at_hop + 1, max_attempts = DEFAULT_MAX_BREADTH, @@ -932,11 +938,8 @@ impl Operation for GetOp { skip_list: tried_peers.clone(), }); - // Update state with the new alternative being tried (only if address is known) - if let Some(addr) = next_target.socket_addr() { - tried_peers - .insert(PeerId::new(addr, next_target.pub_key().clone())); - } + // Update state with the new alternative being tried + tried_peers.insert(next_target.peer().clone()); let updated_tried_peers = tried_peers.clone(); new_state = Some(GetState::AwaitingResponse { retries, @@ -1125,9 +1128,14 @@ impl Operation for GetOp { let key = *key; // Use sender_from_addr for logging - let sender = sender_from_addr.clone().expect( - "ReturnGet requires sender lookup from connection - source_addr should resolve to known peer", - ); + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + "GET: ReturnGet without sender lookup - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; tracing::info!(tx = %id, %key, "Received get response with state: {:?}", self.state.as_ref().unwrap()); diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 2f5d798df..51b41f6f0 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -274,7 +274,11 @@ async fn complete_local_subscription( key: ContractKey, ) -> Result<(), OpError> { let subscriber = op_manager.ring.connection_manager.own_location(); - if let Err(err) = op_manager.ring.add_subscriber(&key, subscriber.clone()) { + // Local subscription - no upstream NAT address + if let Err(err) = op_manager + .ring + .add_subscriber(&key, subscriber.clone(), None) + { tracing::warn!( %key, tx = %id, @@ -406,7 +410,9 @@ impl Operation for SubscribeOp { // Fill in subscriber's external address from transport layer if unknown. // This is the key step where the first recipient (gateway) determines the // subscriber's external address from the actual packet source address. + // IMPORTANT: Must fill address BEFORE any .peer() calls to avoid panic. let mut subscriber = subscriber.clone(); + if subscriber.peer_addr.is_unknown() { if let Some(addr) = source_addr { subscriber.set_addr(addr); @@ -423,6 +429,7 @@ impl Operation for SubscribeOp { tx = %id, %key, subscriber = %subscriber.peer(), + source_addr = ?source_addr, "subscribe: processing RequestSub" ); let own_loc = op_manager.ring.connection_manager.own_location(); @@ -451,9 +458,10 @@ impl Operation for SubscribeOp { "subscribe: handling RequestSub locally (contract available)" ); + // Local registration - no upstream NAT address if op_manager .ring - .add_subscriber(key, subscriber.clone()) + .add_subscriber(key, subscriber.clone(), None) .is_err() { tracing::warn!( @@ -469,14 +477,11 @@ impl Operation for SubscribeOp { target: subscriber.clone(), subscribed: false, }; - // Use build_op_result to ensure upstream_addr is used for routing - // (important for peers behind NAT) - return build_op_result( - self.id, - None, - Some(return_msg), - self.upstream_addr, - ); + return Ok(OperationResult { + target_addr: return_msg.target_addr(), + return_msg: Some(NetMessage::from(return_msg)), + state: None, + }); } let after_direct = subscribers_snapshot(op_manager, key); @@ -584,18 +589,18 @@ impl Operation for SubscribeOp { let ring_max_htl = op_manager.ring.max_hops_to_live.max(1); let htl = (*htl).min(ring_max_htl); let this_peer = op_manager.ring.connection_manager.own_location(); - // Capture upstream_addr for NAT-friendly routing in error responses - let upstream_addr = self.upstream_addr; - let return_not_subbed = || -> Result { + let return_not_subbed = || -> OperationResult { let return_msg = SubscribeMsg::ReturnSub { key: *key, id: *id, subscribed: false, target: subscriber.clone(), }; - // Use build_op_result to ensure upstream_addr is used for routing - // (important for peers behind NAT) - build_op_result(*id, None, Some(return_msg), upstream_addr) + OperationResult { + target_addr: return_msg.target_addr(), + return_msg: Some(NetMessage::from(return_msg)), + state: None, + } }; if htl == 0 { @@ -605,7 +610,7 @@ impl Operation for SubscribeOp { subscriber = %subscriber.peer(), "Dropping Subscribe SeekNode with zero HTL" ); - return return_not_subbed(); + return Ok(return_not_subbed()); } if !super::has_contract(op_manager, *key).await? { @@ -641,7 +646,7 @@ impl Operation for SubscribeOp { error = %fetch_err, "Failed to fetch contract locally while handling subscribe" ); - return return_not_subbed(); + return Ok(return_not_subbed()); } if wait_for_local_contract(op_manager, *key).await? { @@ -656,18 +661,18 @@ impl Operation for SubscribeOp { %key, "Contract still unavailable locally after fetch attempt" ); - return return_not_subbed(); + return Ok(return_not_subbed()); } } else { let Some(new_target) = candidates.first() else { - return return_not_subbed(); + return Ok(return_not_subbed()); }; let new_target = new_target.clone(); let new_htl = htl.saturating_sub(1); if new_htl == 0 { tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract"); - return return_not_subbed(); + return Ok(return_not_subbed()); } let mut new_skip_list = skip_list.clone(); @@ -725,9 +730,10 @@ impl Operation for SubscribeOp { subscribers_before = ?before_direct, "subscribe: attempting to register direct subscriber" ); + // Local registration - no upstream NAT address if op_manager .ring - .add_subscriber(key, subscriber.clone()) + .add_subscriber(key, subscriber.clone(), None) .is_err() { tracing::warn!( @@ -738,7 +744,7 @@ impl Operation for SubscribeOp { "subscribe: direct registration failed (max subscribers reached)" ); // max number of subscribers for this contract reached - return return_not_subbed(); + return Ok(return_not_subbed()); } let after_direct = subscribers_snapshot(op_manager, key); tracing::info!( @@ -875,9 +881,10 @@ impl Operation for SubscribeOp { subscribers_before = ?before_upstream, "subscribe: attempting to register upstream link" ); + // Local registration - no upstream NAT address if op_manager .ring - .add_subscriber(key, upstream_subscriber.clone()) + .add_subscriber(key, upstream_subscriber.clone(), None) .is_err() { tracing::warn!( @@ -907,7 +914,12 @@ impl Operation for SubscribeOp { subscribers_before = ?before_provider, "subscribe: registering provider/subscription source" ); - if op_manager.ring.add_subscriber(key, sender.clone()).is_err() { + // Local registration - no upstream NAT address + if op_manager + .ring + .add_subscriber(key, sender.clone(), None) + .is_err() + { // concurrently it reached max number of subscribers for this contract tracing::debug!( tx = %id, diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 0f07eab79..4fcebb84f 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1005,10 +1005,10 @@ pub(crate) async fn request_update( .closest_potentially_caching(&key, [sender.peer().clone()].as_slice()); if let Some(target) = remote_target { - // Subscribe to the contract + // Subscribe on behalf of the requesting peer (no upstream_addr - direct registration) op_manager .ring - .add_subscriber(&key, sender.clone()) + .add_subscriber(&key, sender.clone(), None) .map_err(|_| RingError::NoCachingPeers(key))?; target diff --git a/crates/core/src/ring/live_tx.rs b/crates/core/src/ring/live_tx.rs index 23bf5a43f..5aa83e8eb 100644 --- a/crates/core/src/ring/live_tx.rs +++ b/crates/core/src/ring/live_tx.rs @@ -47,11 +47,71 @@ impl LiveTransactionTracker { self.tx_per_peer.contains_key(&peer_addr) } - pub(crate) fn still_alive(&self, tx: &Transaction) -> bool { - self.tx_per_peer.iter().any(|e| e.value().contains(tx)) - } - pub(crate) fn len(&self) -> usize { self.tx_per_peer.len() } + + /// Returns the total number of active transactions across all peers. + pub(crate) fn active_transaction_count(&self) -> usize { + self.tx_per_peer + .iter() + .map(|entry| entry.value().len()) + .sum() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::operations::connect::ConnectMsg; + + #[test] + fn active_transaction_count_empty() { + let tracker = LiveTransactionTracker::new(); + assert_eq!(tracker.active_transaction_count(), 0); + } + + #[test] + fn active_transaction_count_single_peer() { + let tracker = LiveTransactionTracker::new(); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + + tracker.add_transaction(addr, Transaction::new::()); + assert_eq!(tracker.active_transaction_count(), 1); + + tracker.add_transaction(addr, Transaction::new::()); + assert_eq!(tracker.active_transaction_count(), 2); + } + + #[test] + fn active_transaction_count_multiple_peers() { + let tracker = LiveTransactionTracker::new(); + let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap(); + + tracker.add_transaction(addr1, Transaction::new::()); + tracker.add_transaction(addr1, Transaction::new::()); + tracker.add_transaction(addr2, Transaction::new::()); + + assert_eq!(tracker.active_transaction_count(), 3); + } + + #[test] + fn active_transaction_count_after_removal() { + let tracker = LiveTransactionTracker::new(); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + + let tx1 = Transaction::new::(); + let tx2 = Transaction::new::(); + + tracker.add_transaction(addr, tx1); + tracker.add_transaction(addr, tx2); + assert_eq!(tracker.active_transaction_count(), 2); + + tracker.remove_finished_transaction(tx1); + assert_eq!(tracker.active_transaction_count(), 1); + + tracker.remove_finished_transaction(tx2); + assert_eq!(tracker.active_transaction_count(), 0); + } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 1a8a04f29..80c962d67 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -20,7 +20,7 @@ use crate::topology::rate::Rate; use crate::topology::TopologyAdjustment; use crate::tracing::{NetEventLog, NetEventRegister}; -use crate::transport::TransportPublicKey; +use crate::transport::{ObservedAddr, TransportPublicKey}; use crate::util::Contains; use crate::{ config::GlobalExecutor, @@ -327,12 +327,19 @@ impl Ring { } /// Will return an error in case the max number of subscribers has been added. + /// + /// The `upstream_addr` parameter is the transport-level address from which the subscribe + /// message was received. This is used instead of the address embedded in `subscriber` + /// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages. + /// The transport address is the only reliable way to route back to them. pub fn add_subscriber( &self, contract: &ContractKey, subscriber: PeerKeyLocation, + upstream_addr: Option, ) -> Result<(), ()> { - self.seeding_manager.add_subscriber(contract, subscriber) + self.seeding_manager + .add_subscriber(contract, subscriber, upstream_addr) } /// Remove a subscriber by peer ID from a specific contract @@ -383,6 +390,11 @@ impl Ring { const REGENERATE_DENSITY_MAP_INTERVAL: Duration = Duration::from_secs(60); + /// Maximum number of concurrent connection acquisition attempts. + /// Allows parallel connection attempts to speed up network formation + /// instead of serial blocking on a single connection at a time. + const MAX_CONCURRENT_CONNECTIONS: usize = 3; + let mut check_interval = tokio::time::interval(CHECK_TICK_DURATION); check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut refresh_density_map = tokio::time::interval(REGENERATE_DENSITY_MAP_INTERVAL); @@ -393,7 +405,6 @@ impl Ring { tokio::time::sleep(Duration::from_secs(2)).await; tracing::info!("Connection maintenance task: initial sleep completed"); - let mut live_tx = None; let mut pending_conn_adds = BTreeSet::new(); let mut this_peer = None; loop { @@ -416,20 +427,17 @@ impl Ring { let mut skip_list = HashSet::new(); skip_list.insert(this_peer); - // if there are no open connections, we need to acquire more - if let Some(tx) = &live_tx { - if !live_tx_tracker.still_alive(tx) { - let _ = live_tx.take(); - } - } - + // Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit + let active_count = live_tx_tracker.active_transaction_count(); if let Some(ideal_location) = pending_conn_adds.pop_first() { - if live_tx.is_none() { + if active_count < MAX_CONCURRENT_CONNECTIONS { tracing::info!( + active_connections = active_count, + max_concurrent = MAX_CONCURRENT_CONNECTIONS, "Attempting to acquire new connection for location: {:?}", ideal_location ); - live_tx = self + let tx = self .acquire_new( ideal_location, &skip_list, @@ -445,18 +453,23 @@ impl Ring { ); error })?; - if live_tx.is_none() { + if tx.is_none() { let conns = self.connection_manager.connection_count(); tracing::warn!( "acquire_new returned None - likely no peers to query through (connections: {})", conns ); } else { - tracing::info!("Successfully initiated connection acquisition"); + tracing::info!( + active_connections = active_count + 1, + "Successfully initiated connection acquisition" + ); } } else { tracing::debug!( - "Skipping connection attempt - live transaction still active, re-queuing location {}", + active_connections = active_count, + max_concurrent = MAX_CONCURRENT_CONNECTIONS, + "At max concurrent connections, re-queuing location {}", ideal_location ); pending_conn_adds.insert(ideal_location); diff --git a/crates/core/src/ring/seeding.rs b/crates/core/src/ring/seeding.rs index 5b3c940b8..3cb08a362 100644 --- a/crates/core/src/ring/seeding.rs +++ b/crates/core/src/ring/seeding.rs @@ -1,4 +1,5 @@ use super::{Location, PeerKeyLocation, Score}; +use crate::transport::ObservedAddr; use dashmap::{mapref::one::Ref as DmRef, DashMap}; use freenet_stdlib::prelude::ContractKey; use tracing::{info, warn}; @@ -102,11 +103,23 @@ impl SeedingManager { } /// Will return an error in case the max number of subscribers has been added. + /// + /// The `upstream_addr` parameter is the transport-level address from which the subscribe + /// message was received. This is used instead of the address embedded in `subscriber` + /// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages. + /// The transport address is the only reliable way to route back to them. pub fn add_subscriber( &self, contract: &ContractKey, subscriber: PeerKeyLocation, + upstream_addr: Option, ) -> Result<(), ()> { + // Use the transport-level address if available, otherwise fall back to the embedded address + let subscriber = if let Some(addr) = upstream_addr { + PeerKeyLocation::new(subscriber.pub_key.clone(), addr.socket_addr()) + } else { + subscriber + }; let mut subs = self .subscribers .entry(*contract) @@ -255,15 +268,15 @@ mod tests { Location::try_from(0.3).unwrap(), ); - // Add subscribers + // Add subscribers (test setup - no upstream_addr) assert!(seeding_manager - .add_subscriber(&contract_key, peer_loc1.clone()) + .add_subscriber(&contract_key, peer_loc1.clone(), None) .is_ok()); assert!(seeding_manager - .add_subscriber(&contract_key, peer_loc2.clone()) + .add_subscriber(&contract_key, peer_loc2.clone(), None) .is_ok()); assert!(seeding_manager - .add_subscriber(&contract_key, peer_loc3.clone()) + .add_subscriber(&contract_key, peer_loc3.clone(), None) .is_ok()); // Verify all subscribers are present diff --git a/crates/core/src/test_utils.rs b/crates/core/src/test_utils.rs index c41034d4a..6fa10ae6d 100644 --- a/crates/core/src/test_utils.rs +++ b/crates/core/src/test_utils.rs @@ -829,20 +829,6 @@ static RESERVED_SOCKETS: Lazy anyhow::Result { const MAX_ATTEMPTS: usize = 128; for _ in 0..MAX_ATTEMPTS { diff --git a/crates/core/src/transport/mod.rs b/crates/core/src/transport/mod.rs index d833a27cf..53d06908c 100644 --- a/crates/core/src/transport/mod.rs +++ b/crates/core/src/transport/mod.rs @@ -25,6 +25,34 @@ type MessagePayload = Vec; type PacketId = u32; +/// A wrapper around SocketAddr that represents an address observed at the transport layer. +/// This is the "ground truth" for NAT scenarios - it's the actual address we see +/// at the network layer, not what the peer claims in protocol messages. +/// +/// Using a newtype instead of raw `SocketAddr` makes the address semantics explicit +/// and prevents accidental confusion with advertised/claimed addresses. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ObservedAddr(SocketAddr); + +impl ObservedAddr { + /// Get the underlying socket address. + pub fn socket_addr(&self) -> SocketAddr { + self.0 + } +} + +impl std::fmt::Display for ObservedAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for ObservedAddr { + fn from(addr: SocketAddr) -> Self { + Self(addr) + } +} + pub use self::crypto::{TransportKeypair, TransportPublicKey}; pub(crate) use self::{ connection_handler::{ diff --git a/crates/freenet-macros/src/codegen.rs b/crates/freenet-macros/src/codegen.rs index c4547a04f..152c483ea 100644 --- a/crates/freenet-macros/src/codegen.rs +++ b/crates/freenet-macros/src/codegen.rs @@ -410,14 +410,12 @@ fn generate_node_builds(args: &FreenetTestArgs) -> TokenStream { builds.push(quote! { tracing::info!("Building node: {}", #node_label); + // Release reserved ports just before binding to minimize race window + freenet::test_utils::release_local_port(#network_port_var); + freenet::test_utils::release_local_port(#ws_port_var); let built_config = #config_var.build().await?; let mut node_config = freenet::local_node::NodeConfig::new(built_config.clone()).await?; #connection_tuning - - // Release ports immediately before building node (minimizes race window) - freenet::test_utils::release_local_port(#network_port_var); - freenet::test_utils::release_local_port(#ws_port_var); - let (#node_var, #flush_handle_var) = node_config .build_with_flush_handle(freenet::server::serve_gateway(built_config.ws_api).await) .await?; From 2e6e7526d551e4f5ee2e1efed4cd9b044f56422e Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 1 Dec 2025 22:19:19 -0600 Subject: [PATCH 2/3] refactor: remove redundant source_addr checks in connect.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Copilot review feedback: after validating source_addr at the start of the ConnectMsg::Request match arm, subsequent checks are redundant. Use upstream_addr directly instead of re-checking source_addr. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/src/operations/connect.rs | 30 ++++++--------------------- 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 4efe65545..abd059d33 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -829,21 +829,10 @@ impl Operation for ConnectOp { address, }; // Route through upstream (where the request came from) since we may - // not have a direct connection to the target - let Some(upstream) = source_addr else { - tracing::warn!( - tx = %self.id, - "ObservedAddress message has no upstream - was this locally initiated?" - ); - // No upstream to route through - this shouldn't happen for relayed connections - return Ok(OperationResult { - return_msg: None, - target_addr: None, - state: Some(OpEnum::Connect(Box::new(self))), - }); - }; + // not have a direct connection to the target. + // Note: upstream_addr is already validated from source_addr at the start of this match arm. network_bridge - .send(upstream, NetMessage::V1(NetMessageV1::Connect(msg))) + .send(upstream_addr, NetMessage::V1(NetMessageV1::Connect(msg))) .await?; } @@ -885,18 +874,11 @@ impl Operation for ConnectOp { payload: response, }; // Route the response through upstream (where the request came from) - // since we may not have a direct connection to the joiner - let Some(upstream) = source_addr else { - tracing::warn!( - tx = %self.id, - "ConnectResponse has no upstream - was this locally initiated?" - ); - // No upstream to route through - this shouldn't happen for relayed connections - return Ok(store_operation_state(&mut self)); - }; + // since we may not have a direct connection to the joiner. + // Note: upstream_addr is already validated from source_addr at the start of this match arm. network_bridge .send( - upstream, + upstream_addr, NetMessage::V1(NetMessageV1::Connect(response_msg)), ) .await?; From 46e30d62b13877a6d28aeaf4598a86662246e175 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Tue, 2 Dec 2025 09:11:36 -0600 Subject: [PATCH 3/3] fix: count only Connect transactions for MAX_CONCURRENT_CONNECTIONS limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback: active_transaction_count() was counting all transaction types (Get/Put/Subscribe/Update/Connect), but MAX_CONCURRENT_CONNECTIONS should only limit concurrent connection acquisition attempts. Added filtered method active_connect_transaction_count() that only counts Connect transactions. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/core/src/ring/live_tx.rs | 56 ++++++++++++++++++++++++++++++++- crates/core/src/ring/mod.rs | 3 +- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/crates/core/src/ring/live_tx.rs b/crates/core/src/ring/live_tx.rs index 5aa83e8eb..d51cc9309 100644 --- a/crates/core/src/ring/live_tx.rs +++ b/crates/core/src/ring/live_tx.rs @@ -1,4 +1,4 @@ -use crate::message::Transaction; +use crate::message::{Transaction, TransactionType}; use dashmap::DashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -52,18 +52,36 @@ impl LiveTransactionTracker { } /// Returns the total number of active transactions across all peers. + #[cfg(test)] pub(crate) fn active_transaction_count(&self) -> usize { self.tx_per_peer .iter() .map(|entry| entry.value().len()) .sum() } + + /// Returns the number of active Connect transactions across all peers. + /// Used to limit concurrent connection acquisition attempts. + pub(crate) fn active_connect_transaction_count(&self) -> usize { + self.tx_per_peer + .iter() + .map(|entry| { + entry + .value() + .iter() + .filter(|tx| tx.transaction_type() == TransactionType::Connect) + .count() + }) + .sum() + } } #[cfg(test)] mod tests { use super::*; use crate::operations::connect::ConnectMsg; + use crate::operations::get::GetMsg; + use crate::operations::put::PutMsg; #[test] fn active_transaction_count_empty() { @@ -114,4 +132,40 @@ mod tests { tracker.remove_finished_transaction(tx2); assert_eq!(tracker.active_transaction_count(), 0); } + + #[test] + fn active_connect_transaction_count_filters_by_type() { + let tracker = LiveTransactionTracker::new(); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + + // Add mixed transaction types + tracker.add_transaction(addr, Transaction::new::()); + tracker.add_transaction(addr, Transaction::new::()); + tracker.add_transaction(addr, Transaction::new::()); + tracker.add_transaction(addr, Transaction::new::()); + + // Total count should be 4 + assert_eq!(tracker.active_transaction_count(), 4); + // Connect count should only be 2 + assert_eq!(tracker.active_connect_transaction_count(), 2); + } + + #[test] + fn active_connect_transaction_count_empty() { + let tracker = LiveTransactionTracker::new(); + assert_eq!(tracker.active_connect_transaction_count(), 0); + } + + #[test] + fn active_connect_transaction_count_no_connects() { + let tracker = LiveTransactionTracker::new(); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + + // Add only non-connect transactions + tracker.add_transaction(addr, Transaction::new::()); + tracker.add_transaction(addr, Transaction::new::()); + + assert_eq!(tracker.active_transaction_count(), 2); + assert_eq!(tracker.active_connect_transaction_count(), 0); + } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 80c962d67..f31eb0d76 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -428,7 +428,8 @@ impl Ring { skip_list.insert(this_peer); // Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit - let active_count = live_tx_tracker.active_transaction_count(); + // Only count Connect transactions, not all operations (Get/Put/Subscribe/Update) + let active_count = live_tx_tracker.active_connect_transaction_count(); if let Some(ideal_location) = pending_conn_adds.pop_first() { if active_count < MAX_CONCURRENT_CONNECTIONS { tracing::info!(