diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 4140ea7a8..51b41f6f0 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -11,7 +11,6 @@ use crate::{ message::{InnerMessage, NetMessage, Transaction}, node::{NetworkBridge, OpManager, PeerId}, ring::{Location, PeerKeyLocation, RingError}, - transport::ObservedAddr, }; use freenet_stdlib::{ client_api::{ContractResponse, ErrorKind, HostResponse}, @@ -275,7 +274,7 @@ async fn complete_local_subscription( key: ContractKey, ) -> Result<(), OpError> { let subscriber = op_manager.ring.connection_manager.own_location(); - // Local subscription - no upstream_addr needed since it's our own peer + // Local subscription - no upstream NAT address if let Err(err) = op_manager .ring .add_subscriber(&key, subscriber.clone(), None) @@ -310,7 +309,7 @@ pub(crate) struct SubscribeOp { state: Option, /// The address we received this operation's message from. /// Used for connection-based routing: responses are sent back to this address. - upstream_addr: Option, + upstream_addr: Option, } impl SubscribeOp { @@ -364,16 +363,11 @@ impl Operation for SubscribeOp { } Ok(None) => { // new request to subscribe to a contract, initialize the machine - tracing::debug!( - tx = %id, - ?source_addr, - "subscribe: load_or_init creating new op with source_addr as upstream_addr" - ); Ok(OpInitialization { op: Self { state: Some(SubscribeState::ReceivedRequest), id, - upstream_addr: source_addr.map(ObservedAddr::new), // Connection-based routing: store who sent us this request + upstream_addr: source_addr, // Connection-based routing: store who sent us this request }, source_addr, }) @@ -413,29 +407,31 @@ impl Operation for SubscribeOp { target: _, subscriber, } => { - // ALWAYS use the transport-level source address when available. - // This is critical for NAT peers: they may embed a "known" but wrong address - // (e.g., 127.0.0.1:31337 for loopback). The transport address is the only - // reliable way to route responses back through the NAT. + // Fill in subscriber's external address from transport layer if unknown. + // This is the key step where the first recipient (gateway) determines the + // subscriber's external address from the actual packet source address. + // IMPORTANT: Must fill address BEFORE any .peer() calls to avoid panic. let mut subscriber = subscriber.clone(); + if subscriber.peer_addr.is_unknown() { + if let Some(addr) = source_addr { + subscriber.set_addr(addr); + tracing::debug!( + tx = %id, + %key, + subscriber_addr = %addr, + "subscribe: filled subscriber address from source_addr" + ); + } + } + tracing::debug!( tx = %id, %key, - subscriber_orig = %subscriber.peer(), + subscriber = %subscriber.peer(), source_addr = ?source_addr, "subscribe: processing RequestSub" ); - - if let Some(addr) = source_addr { - subscriber.set_addr(addr); - tracing::debug!( - tx = %id, - %key, - subscriber_updated = %subscriber.peer(), - "subscribe: updated subscriber address from transport source" - ); - } let own_loc = op_manager.ring.connection_manager.own_location(); if !matches!( @@ -462,10 +458,10 @@ impl Operation for SubscribeOp { "subscribe: handling RequestSub locally (contract available)" ); - // Use upstream_addr for NAT routing - subscriber may embed wrong address + // Local registration - no upstream NAT address if op_manager .ring - .add_subscriber(key, subscriber.clone(), self.upstream_addr) + .add_subscriber(key, subscriber.clone(), None) .is_err() { tracing::warn!( @@ -532,13 +528,6 @@ impl Operation for SubscribeOp { subscribed: true, }; - tracing::debug!( - tx = %id, - %key, - upstream_addr = ?self.upstream_addr, - "subscribe: creating ReturnSub with upstream_addr" - ); - return build_op_result( self.id, None, @@ -741,10 +730,7 @@ impl Operation for SubscribeOp { subscribers_before = ?before_direct, "subscribe: attempting to register direct subscriber" ); - // Pass None: subscriber address was already corrected by Gateway at the - // start of the subscribe flow. Using self.upstream_addr here would - // incorrectly overwrite with the forwarder's address instead of the - // original subscriber's Gateway-corrected address. + // Local registration - no upstream NAT address if op_manager .ring .add_subscriber(key, subscriber.clone(), None) @@ -895,7 +881,7 @@ impl Operation for SubscribeOp { subscribers_before = ?before_upstream, "subscribe: attempting to register upstream link" ); - // upstream_subscriber was stored in op state, no transport address available + // Local registration - no upstream NAT address if op_manager .ring .add_subscriber(key, upstream_subscriber.clone(), None) @@ -928,10 +914,7 @@ impl Operation for SubscribeOp { subscribers_before = ?before_provider, "subscribe: registering provider/subscription source" ); - // Pass None: sender was already looked up from source_addr (line ~866), - // so it has the correct transport address. Using self.upstream_addr - // would incorrectly use the original requester's address instead of - // the provider's address. + // Local registration - no upstream NAT address if op_manager .ring .add_subscriber(key, sender.clone(), None) @@ -993,26 +976,17 @@ fn build_op_result( id: Transaction, state: Option, msg: Option, - upstream_addr: Option, + upstream_addr: Option, ) -> Result { // For response messages (ReturnSub), use upstream_addr directly for routing. // This is more reliable than extracting from the message's target field, which // may have been looked up from connection_manager (subject to race conditions). // For forward messages (SeekNode, RequestSub, FetchRouting), use the message's target. let target_addr = match &msg { - // Convert ObservedAddr to SocketAddr at the transport boundary - Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr.map(|a| a.socket_addr()), + Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr, _ => msg.as_ref().and_then(|m| m.target_addr()), }; - tracing::debug!( - tx = %id, - msg_type = ?msg.as_ref().map(|m| std::any::type_name_of_val(m)), - ?upstream_addr, - ?target_addr, - "build_op_result: computed target_addr" - ); - let output_op = state.map(|state| SubscribeOp { id, state: Some(state), diff --git a/crates/core/src/ring/live_tx.rs b/crates/core/src/ring/live_tx.rs index 23bf5a43f..5aa83e8eb 100644 --- a/crates/core/src/ring/live_tx.rs +++ b/crates/core/src/ring/live_tx.rs @@ -47,11 +47,71 @@ impl LiveTransactionTracker { self.tx_per_peer.contains_key(&peer_addr) } - pub(crate) fn still_alive(&self, tx: &Transaction) -> bool { - self.tx_per_peer.iter().any(|e| e.value().contains(tx)) - } - pub(crate) fn len(&self) -> usize { self.tx_per_peer.len() } + + /// Returns the total number of active transactions across all peers. + pub(crate) fn active_transaction_count(&self) -> usize { + self.tx_per_peer + .iter() + .map(|entry| entry.value().len()) + .sum() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::operations::connect::ConnectMsg; + + #[test] + fn active_transaction_count_empty() { + let tracker = LiveTransactionTracker::new(); + assert_eq!(tracker.active_transaction_count(), 0); + } + + #[test] + fn active_transaction_count_single_peer() { + let tracker = LiveTransactionTracker::new(); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + + tracker.add_transaction(addr, Transaction::new::()); + assert_eq!(tracker.active_transaction_count(), 1); + + tracker.add_transaction(addr, Transaction::new::()); + assert_eq!(tracker.active_transaction_count(), 2); + } + + #[test] + fn active_transaction_count_multiple_peers() { + let tracker = LiveTransactionTracker::new(); + let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap(); + + tracker.add_transaction(addr1, Transaction::new::()); + tracker.add_transaction(addr1, Transaction::new::()); + tracker.add_transaction(addr2, Transaction::new::()); + + assert_eq!(tracker.active_transaction_count(), 3); + } + + #[test] + fn active_transaction_count_after_removal() { + let tracker = LiveTransactionTracker::new(); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + + let tx1 = Transaction::new::(); + let tx2 = Transaction::new::(); + + tracker.add_transaction(addr, tx1); + tracker.add_transaction(addr, tx2); + assert_eq!(tracker.active_transaction_count(), 2); + + tracker.remove_finished_transaction(tx1); + assert_eq!(tracker.active_transaction_count(), 1); + + tracker.remove_finished_transaction(tx2); + assert_eq!(tracker.active_transaction_count(), 0); + } } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 7229a8af2..80c962d67 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -330,7 +330,8 @@ impl Ring { /// /// The `upstream_addr` parameter is the transport-level address from which the subscribe /// message was received. This is used instead of the address embedded in `subscriber` - /// because NAT peers may embed incorrect addresses in their messages. + /// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages. + /// The transport address is the only reliable way to route back to them. pub fn add_subscriber( &self, contract: &ContractKey, @@ -389,6 +390,11 @@ impl Ring { const REGENERATE_DENSITY_MAP_INTERVAL: Duration = Duration::from_secs(60); + /// Maximum number of concurrent connection acquisition attempts. + /// Allows parallel connection attempts to speed up network formation + /// instead of serial blocking on a single connection at a time. + const MAX_CONCURRENT_CONNECTIONS: usize = 3; + let mut check_interval = tokio::time::interval(CHECK_TICK_DURATION); check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut refresh_density_map = tokio::time::interval(REGENERATE_DENSITY_MAP_INTERVAL); @@ -399,7 +405,6 @@ impl Ring { tokio::time::sleep(Duration::from_secs(2)).await; tracing::info!("Connection maintenance task: initial sleep completed"); - let mut live_tx = None; let mut pending_conn_adds = BTreeSet::new(); let mut this_peer = None; loop { @@ -422,20 +427,17 @@ impl Ring { let mut skip_list = HashSet::new(); skip_list.insert(this_peer); - // if there are no open connections, we need to acquire more - if let Some(tx) = &live_tx { - if !live_tx_tracker.still_alive(tx) { - let _ = live_tx.take(); - } - } - + // Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit + let active_count = live_tx_tracker.active_transaction_count(); if let Some(ideal_location) = pending_conn_adds.pop_first() { - if live_tx.is_none() { + if active_count < MAX_CONCURRENT_CONNECTIONS { tracing::info!( + active_connections = active_count, + max_concurrent = MAX_CONCURRENT_CONNECTIONS, "Attempting to acquire new connection for location: {:?}", ideal_location ); - live_tx = self + let tx = self .acquire_new( ideal_location, &skip_list, @@ -451,18 +453,23 @@ impl Ring { ); error })?; - if live_tx.is_none() { + if tx.is_none() { let conns = self.connection_manager.connection_count(); tracing::warn!( "acquire_new returned None - likely no peers to query through (connections: {})", conns ); } else { - tracing::info!("Successfully initiated connection acquisition"); + tracing::info!( + active_connections = active_count + 1, + "Successfully initiated connection acquisition" + ); } } else { tracing::debug!( - "Skipping connection attempt - live transaction still active, re-queuing location {}", + active_connections = active_count, + max_concurrent = MAX_CONCURRENT_CONNECTIONS, + "At max concurrent connections, re-queuing location {}", ideal_location ); pending_conn_adds.insert(ideal_location); diff --git a/crates/core/src/transport/mod.rs b/crates/core/src/transport/mod.rs index e30deee56..53d06908c 100644 --- a/crates/core/src/transport/mod.rs +++ b/crates/core/src/transport/mod.rs @@ -25,16 +25,7 @@ type MessagePayload = Vec; type PacketId = u32; -pub use self::crypto::{TransportKeypair, TransportPublicKey}; -pub(crate) use self::{ - connection_handler::{ - create_connection_handler, InboundConnectionHandler, OutboundConnectionHandler, - }, - peer_connection::PeerConnection, -}; - -/// Address observed at the transport layer (from UDP packet source). -/// +/// A wrapper around SocketAddr that represents an address observed at the transport layer. /// This is the "ground truth" for NAT scenarios - it's the actual address we see /// at the network layer, not what the peer claims in protocol messages. /// @@ -44,11 +35,6 @@ pub(crate) use self::{ pub struct ObservedAddr(SocketAddr); impl ObservedAddr { - /// Create a new observed address from a socket address. - pub fn new(addr: SocketAddr) -> Self { - Self(addr) - } - /// Get the underlying socket address. pub fn socket_addr(&self) -> SocketAddr { self.0 @@ -67,6 +53,14 @@ impl From for ObservedAddr { } } +pub use self::crypto::{TransportKeypair, TransportPublicKey}; +pub(crate) use self::{ + connection_handler::{ + create_connection_handler, InboundConnectionHandler, OutboundConnectionHandler, + }, + peer_connection::PeerConnection, +}; + #[derive(Debug, thiserror::Error)] pub(crate) enum TransportError { #[error("transport handler channel closed, socket likely closed")]