diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index 26ed4e652..d364b17a7 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -64,6 +64,9 @@ pub(super) enum Event { joiner: PeerId, op: Option>, forward_info: Option>, + /// If true, this is a gateway bootstrap acceptance that should be registered immediately. + /// See forward_conn() in connect.rs for full explanation. + is_bootstrap: bool, }, /// An outbound connection to a peer was successfully established. OutboundConnectionSuccessful { @@ -99,6 +102,10 @@ pub(super) enum Event { #[allow(clippy::large_enum_variant)] enum ForwardResult { Forward(PeerId, NetMessage, ConnectivityInfo), + DirectlyAccepted(ConnectivityInfo), + /// Gateway bootstrap acceptance - connection should be registered immediately. + /// See forward_conn() in connect.rs and PR #1871 for context. + BootstrapAccepted(ConnectivityInfo), Rejected, } @@ -218,6 +225,9 @@ pub(super) struct HandshakeHandler { /// This is used for testing deterministically with given location. In production this should always be none /// and locations should be derived from IP addresses. this_location: Option, + + /// Whether this node is a gateway + is_gateway: bool, } impl HandshakeHandler { @@ -227,6 +237,7 @@ impl HandshakeHandler { connection_manager: ConnectionManager, router: Arc>, this_location: Option, + is_gateway: bool, ) -> (Self, HanshakeHandlerMsg, OutboundMessage) { let (pending_msg_tx, pending_msg_rx) = tokio::sync::mpsc::channel(100); let (establish_connection_tx, establish_connection_rx) = tokio::sync::mpsc::channel(100); @@ -243,6 +254,7 @@ impl HandshakeHandler { connection_manager, router, this_location, + is_gateway, }; ( connector, @@ -366,7 +378,12 @@ impl HandshakeHandler { Location::from_address(&req.conn.remote_addr()) }; let should_accept = self.connection_manager.should_accept(location, &req.joiner); - if should_accept { + // Check if this is a valid acceptance scenario + // Non-gateways with 0 connections should not accept (they need existing connections to forward through) + let can_accept = should_accept && + (self.is_gateway || self.connection_manager.num_connections() > 0); + + if can_accept { let accepted_msg = NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response { id: req.id, sender: self.connection_manager.own_location(), @@ -400,7 +417,8 @@ impl HandshakeHandler { .. } = req; - let (ok, forward_info) = { + // Forward the connection or accept it directly + let forward_result = { // TODO: refactor this so it happens in the background out of the main handler loop let mut nw_bridge = ForwardPeerMessage { msg: parking_lot::Mutex::new(None), @@ -426,6 +444,7 @@ impl HandshakeHandler { skip_forwards, req_peer: my_peer_id.clone(), joiner: joiner_pk_loc.clone(), + is_gateway: self.is_gateway, }; let f = forward_conn( @@ -441,31 +460,82 @@ impl HandshakeHandler { tracing::error!(%err, "Error forwarding connection"); continue; } - Ok(ok) => { - if let Some(ok_value) = ok { - let forward_info = nw_bridge.msg.lock().take().map(|(forward_target, msg)| { - ForwardInfo { - target: forward_target, - msg, - } - }); - (Some(ok_value), forward_info) + Ok(Some(conn_state)) => { + let ConnectState::AwaitingConnectivity(info) = conn_state else { + unreachable!("forward_conn should return AwaitingConnectivity if successful") + }; + + // Check if we have a forward message (forwarding) or not (direct acceptance) + if let Some((forward_target, msg)) = nw_bridge.msg.into_inner() { + (Some(ForwardResult::Forward(forward_target.clone(), msg, info)), Some(forward_target)) + } else if info.is_bootstrap_acceptance { + // Gateway bootstrap case: connection should be registered immediately + // This bypasses the normal CheckConnectivity flow. See forward_conn() + // bootstrap logic in connect.rs for full explanation. + (Some(ForwardResult::BootstrapAccepted(info)), None) } else { - (None, None) + // Normal direct acceptance - will wait for CheckConnectivity + (Some(ForwardResult::DirectlyAccepted(info)), None) } } + Ok(None) => (None, None), } }; - return Ok(Event::InboundConnection { - id, - conn, - joiner, - op: ok.map(|ok_value| Box::new(ConnectOp::new(id, Some(ok_value), None, None))), - forward_info: forward_info.map(Box::new), - }) + match forward_result { + (Some(ForwardResult::Forward(forward_target, msg, info)), _) => { + return Ok(Event::InboundConnection { + id, + conn, + joiner, + op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))), + forward_info: Some(Box::new(ForwardInfo { target: forward_target, msg })), + is_bootstrap: false, + }); + } + (Some(ForwardResult::BootstrapAccepted(info)), _) => { + return Ok(Event::InboundConnection { + id, + conn, + joiner, + op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))), + forward_info: None, + is_bootstrap: true, + }); + } + (Some(ForwardResult::DirectlyAccepted(info)), _) => { + return Ok(Event::InboundConnection { + id, + conn, + joiner, + op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))), + forward_info: None, + is_bootstrap: false, + }); + } + (Some(ForwardResult::Rejected), _) | (None, _) => { + return Ok(Event::InboundConnection { + id, + conn, + joiner, + op: None, + forward_info: None, + is_bootstrap: false, + }); + } + } } else { + // If should_accept was true but we can't actually accept (non-gateway with 0 connections), + // we need to clean up the reserved connection + if should_accept && !can_accept { + self.connection_manager.prune_in_transit_connection(&req.joiner); + tracing::debug!( + "Non-gateway with 0 connections cannot accept connection from {:?}", + req.joiner + ); + } + let InboundGwJoinRequest { mut conn, id, @@ -503,7 +573,31 @@ impl HandshakeHandler { msg: Box::new(msg), }); } + Ok(ForwardResult::BootstrapAccepted(info)) => { + // Gateway bootstrap: First connection should be registered immediately. + // This bypasses the normal transient connection flow. + // See forward_conn() in connect.rs for full explanation. + return Ok(Event::InboundConnection { + id, + conn, + joiner, + op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))), + forward_info: None, + is_bootstrap: true, + }); + } + Ok(ForwardResult::DirectlyAccepted(_info)) => { + // Connection was accepted directly (not forwarded) + // For now, treat this as a rejection since we shouldn't hit this case in transient connections + // Clean up the reserved connection slot + self.connection_manager.prune_in_transit_connection(&joiner); + self.outbound_messages.remove(&remote); + self.connecting.remove(&remote); + return Ok(Event::InboundConnectionRejected { peer_id: joiner }); + } Ok(ForwardResult::Rejected) => { + // Clean up the reserved connection slot + self.connection_manager.prune_in_transit_connection(&joiner); self.outbound_messages.remove(&remote); self.connecting.remove(&remote); return Ok(Event::InboundConnectionRejected { peer_id: joiner }); @@ -592,6 +686,7 @@ impl HandshakeHandler { skip_forwards: transaction.skip_forwards.clone(), req_peer: my_peer_id.clone(), joiner: joiner_pk_loc.clone(), + is_gateway: self.is_gateway, }; match forward_conn( @@ -604,14 +699,22 @@ impl HandshakeHandler { .await { Ok(Some(conn_state)) => { - let (forward_target, msg) = nw_bridge - .msg - .into_inner() - .expect("target was successfully set"); let ConnectState::AwaitingConnectivity(info) = conn_state else { unreachable!("forward_conn should return AwaitingConnectivity if successful") }; - Ok(ForwardResult::Forward(forward_target, msg, info)) + + // Check if we have a forward message (forwarding) or not (direct acceptance) + if let Some((forward_target, msg)) = nw_bridge.msg.into_inner() { + Ok(ForwardResult::Forward(forward_target, msg, info)) + } else if info.is_bootstrap_acceptance { + // Gateway bootstrap case: connection should be registered immediately + // This bypasses the normal CheckConnectivity flow. See forward_conn() + // bootstrap logic in connect.rs for full explanation. + Ok(ForwardResult::BootstrapAccepted(info)) + } else { + // Normal direct acceptance - will wait for CheckConnectivity + Ok(ForwardResult::DirectlyAccepted(info)) + } } Ok(None) => { tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "Rejecting connection, no peers found to forward"); @@ -1383,6 +1486,7 @@ mod tests { fn config_handler( addr: impl Into, existing_connections: Option>, + is_gateway: bool, ) -> (HandshakeHandler, TestVerifier) { let (outbound_sender, outbound_recv) = mpsc::channel(100); let outbound_conn_handler = OutboundConnectionHandler::new(outbound_sender); @@ -1408,6 +1512,7 @@ mod tests { mngr, Arc::new(RwLock::new(router)), None, + is_gateway, ); ( handler, @@ -1461,7 +1566,7 @@ mod tests { #[tokio::test] async fn test_gateway_inbound_conn_success() -> anyhow::Result<()> { let addr: SocketAddr = ([127, 0, 0, 1], 10000).into(); - let (mut handler, mut test) = config_handler(addr, None); + let (mut handler, mut test) = config_handler(addr, None, true); let remote_addr = ([127, 0, 0, 1], 10001).into(); let test_controller = async { @@ -1502,7 +1607,7 @@ mod tests { let existing_conn = Connection::new(remote_peer_loc.peer, remote_peer_loc.location.unwrap()); - let (mut handler, mut test) = config_handler(addr, Some(vec![existing_conn])); + let (mut handler, mut test) = config_handler(addr, Some(vec![existing_conn]), true); // Configure the handler to reject connections by setting max_connections to 1 handler.connection_manager.max_connections = 1; @@ -1547,7 +1652,7 @@ mod tests { #[test_log::test(tokio::test)] async fn test_peer_to_gw_outbound_conn() -> anyhow::Result<()> { let addr = ([127, 0, 0, 1], 10000).into(); - let (mut handler, mut test) = config_handler(addr, None); + let (mut handler, mut test) = config_handler(addr, None, false); let joiner_key = TransportKeypair::new(); let pub_key = joiner_key.public().clone(); @@ -1614,7 +1719,7 @@ mod tests { #[tokio::test] async fn test_peer_to_gw_outbound_conn_failed() -> anyhow::Result<()> { let addr = ([127, 0, 0, 1], 10000).into(); - let (mut handler, mut test) = config_handler(addr, None); + let (mut handler, mut test) = config_handler(addr, None, false); let joiner_key = TransportKeypair::new(); let pub_key = joiner_key.public().clone(); @@ -1659,7 +1764,7 @@ mod tests { let peer_addr: SocketAddr = ([127, 0, 0, 1], 10001).into(); let joiner_addr: SocketAddr = ([127, 0, 0, 1], 10002).into(); - let (mut gw_handler, mut gw_test) = config_handler(gw_addr, None); + let (mut gw_handler, mut gw_test) = config_handler(gw_addr, None, true); // the gw only will accept one connection gw_handler.connection_manager.max_connections = 1; @@ -1757,7 +1862,7 @@ mod tests { async fn test_peer_to_gw_outbound_conn_rejected() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); let joiner_addr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(joiner_addr, None); + let (mut handler, mut test) = config_handler(joiner_addr, None, false); let gw_key = TransportKeypair::new(); let gw_pub_key = gw_key.public().clone(); @@ -1927,7 +2032,7 @@ mod tests { async fn test_peer_to_gw_outbound_conn_forwarded() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); let joiner_addr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(joiner_addr, None); + let (mut handler, mut test) = config_handler(joiner_addr, None, false); let gw_key = TransportKeypair::new(); let gw_pub_key = gw_key.public().clone(); @@ -2012,7 +2117,7 @@ mod tests { #[tokio::test] async fn test_peer_to_peer_outbound_conn_failed() -> anyhow::Result<()> { let addr: SocketAddr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(addr, None); + let (mut handler, mut test) = config_handler(addr, None, false); let peer_key = TransportKeypair::new(); let peer_pub_key = peer_key.public().clone(); @@ -2057,7 +2162,7 @@ mod tests { #[tokio::test] async fn test_peer_to_peer_outbound_conn_succeeded() -> anyhow::Result<()> { let addr: SocketAddr = ([127, 0, 0, 1], 10001).into(); - let (mut handler, mut test) = config_handler(addr, None); + let (mut handler, mut test) = config_handler(addr, None, false); let peer_key = TransportKeypair::new(); let peer_pub_key = peer_key.public().clone(); diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 9a57a2977..7bbb96be5 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -203,6 +203,7 @@ impl P2pConnManager { self.bridge.op_manager.ring.connection_manager.clone(), self.bridge.op_manager.ring.router.clone(), self.this_location, + self.is_gateway, ); loop { @@ -849,6 +850,7 @@ impl P2pConnManager { joiner, op, forward_info, + is_bootstrap, } => { if let Some(blocked_addrs) = &self.blocked_addresses { if blocked_addrs.contains(&joiner.addr) { @@ -861,13 +863,32 @@ impl P2pConnManager { } let (tx, rx) = mpsc::channel(1); self.connections.insert(joiner.clone(), tx); - // IMPORTANT: Do NOT add connection to ring here! + + // IMPORTANT: Normally we do NOT add connection to ring here! // Connection should only be added after StartJoinReq is accepted // via CheckConnectivity. This prevents the "already connected" bug // where gateways reject valid join requests. // - // The connection will be properly added in the CheckConnectivity - // handler when should_accept() returns true. + // EXCEPTION: Gateway bootstrap (is_bootstrap=true) + // When a gateway accepts its very first connection (bootstrap case), + // we must register it immediately so the gateway can respond to + // FindOptimalPeer requests from subsequent joiners. Bootstrap connections + // bypass the normal CheckConnectivity flow. See forward_conn() in + // connect.rs and PR #1871 for full explanation. + if is_bootstrap { + let location = Location::from_address(&joiner.addr); + tracing::info!( + %id, + %joiner, + %location, + "Bootstrap connection: immediately registering in ring" + ); + self.bridge + .op_manager + .ring + .add_connection(location, joiner.clone(), true) + .await; + } if let Some(op) = op { self.bridge diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index c83123000..9447a1ac8 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -62,7 +62,7 @@ impl ConnectOp { } pub(super) fn to_host_result(&self) -> HostResult { - // this should't ever be called since clients can't request explicit connects + // this shouldn't ever be called since clients can't request explicit connects Ok(HostResponse::Ok) } } @@ -352,6 +352,7 @@ impl Operation for ConnectOp { skip_forwards: skip_forwards.clone(), req_peer: sender.clone(), joiner: joiner.clone(), + is_gateway: op_manager.ring.is_gateway, }, ) .await? @@ -402,14 +403,15 @@ impl Operation for ConnectOp { match self.state.as_mut() { Some(ConnectState::ConnectingToNode(info)) => { assert!(info.remaining_connections > 0); - let remaining_connetions = info.remaining_connections.saturating_sub(1); + let remaining_connections = + info.remaining_connections.saturating_sub(1); if *accepted { tracing::debug!( tx = %id, at = %this_peer_id, from = %sender.peer, - connectect_to = %acceptor.peer, + connected_to = %acceptor.peer, "Open connection acknowledged at requesting joiner peer", ); info.accepted_by.insert(acceptor.clone()); @@ -444,7 +446,7 @@ impl Operation for ConnectOp { .connection_manager .update_location(target.location); - if remaining_connetions == 0 { + if remaining_connections == 0 { tracing::debug!( tx = %id, at = %this_peer_id, @@ -464,6 +466,7 @@ impl Operation for ConnectOp { Some(ConnectState::AwaitingConnectivity(ConnectivityInfo { remaining_checks, requester, + .. })) => { assert!(*remaining_checks > 0); let remaining_checks = remaining_checks.saturating_sub(1); @@ -486,11 +489,9 @@ impl Operation for ConnectOp { ); new_state = None; } else { - new_state = - Some(ConnectState::AwaitingConnectivity(ConnectivityInfo { - remaining_checks, - requester: requester.clone(), - })); + new_state = Some(ConnectState::AwaitingConnectivity( + ConnectivityInfo::new(requester.clone(), remaining_checks), + )); } let response = ConnectResponse::AcceptedBy { accepted: *accepted, @@ -629,6 +630,9 @@ pub enum ConnectState { pub(crate) struct ConnectivityInfo { remaining_checks: usize, requester: Requester, + /// Indicates this is a gateway bootstrap acceptance that should be registered immediately. + /// See forward_conn() bootstrap logic and handshake handler for details. + pub(crate) is_bootstrap_acceptance: bool, } impl ConnectivityInfo { @@ -636,6 +640,15 @@ impl ConnectivityInfo { Self { requester, remaining_checks, + is_bootstrap_acceptance: false, + } + } + + pub fn new_bootstrap(requester: Requester, remaining_checks: usize) -> Self { + Self { + requester, + remaining_checks, + is_bootstrap_acceptance: true, } } @@ -673,8 +686,6 @@ impl ConnectState { /// - gateways: Inmutable list of known gateways. Passed when starting up the node. /// After the initial connections through the gateways are established all other connections /// (to gateways or regular peers) will be treated as regular connections. -/// -/// - is_gateway: Whether this peer is a gateway or not. pub(crate) async fn initial_join_procedure( op_manager: Arc, gateways: &[PeerKeyLocation], @@ -991,10 +1002,12 @@ pub(crate) struct ForwardParams { pub accepted: bool, /// Avoid connecting to these peers. pub skip_connections: HashSet, - /// Avoif forwarding to these peers. + /// Avoid forwarding to these peers. pub skip_forwards: HashSet, pub req_peer: PeerKeyLocation, pub joiner: PeerKeyLocation, + /// Whether this node is a gateway + pub is_gateway: bool, } pub(crate) async fn forward_conn( @@ -1015,57 +1028,123 @@ where mut skip_forwards, req_peer, joiner, + is_gateway, } = params; if left_htl == 0 { tracing::debug!( tx = %id, joiner = %joiner.peer, - "Couldn't forward connect petition, no hops left or enough connections", + "Couldn't forward connect petition, no hops left", ); return Ok(None); } - if connection_manager.num_connections() == 0 { - tracing::warn!( - tx = %id, - joiner = %joiner.peer, - "Couldn't forward connect petition, not enough connections", - ); - return Ok(None); + let num_connections = connection_manager.num_connections(); + let num_reserved = connection_manager.get_reserved_connections(); + + tracing::debug!( + tx = %id, + joiner = %joiner.peer, + num_connections = %num_connections, + num_reserved = %num_reserved, + is_gateway = %is_gateway, + accepted = %accepted, + "forward_conn: checking connection forwarding", + ); + + // Special case: Gateway bootstrap when starting with zero connections AND only one reserved + // Note: num_reserved will be 1 (not 0) because should_accept() already reserved a slot + // for this connection. This ensures only the very first connection is accepted directly, + // avoiding race conditions where multiple concurrent join attempts would all be accepted directly. + // + // IMPORTANT: Bootstrap acceptances are marked with is_bootstrap_acceptance=true so that + // the handshake handler (see handshake.rs forward_or_accept_join) can immediately register + // the connection in the ring. This bypasses the normal CheckConnectivity flow which doesn't + // apply to bootstrap since: + // 1. There are no other peers to forward to + // 2. The "already connected" bug doesn't apply (this is the first connection) + // 3. We need the connection registered so the gateway can respond to FindOptimalPeer requests + // + // See PR #1871 discussion with @iduartgomez for context. + if num_connections == 0 { + if num_reserved == 1 && is_gateway && accepted { + tracing::info!( + tx = %id, + joiner = %joiner.peer, + "Gateway bootstrap: accepting first connection directly (will register immediately)", + ); + let connectivity_info = ConnectivityInfo::new_bootstrap( + joiner.clone(), + 1, // Single check for direct connection + ); + return Ok(Some(ConnectState::AwaitingConnectivity(connectivity_info))); + } else { + tracing::debug!( + tx = %id, + joiner = %joiner.peer, + is_gateway = %is_gateway, + num_reserved = %num_reserved, + "Cannot forward or accept: no existing connections, or reserved connections pending", + ); + return Ok(None); + } } - let target_peer = { - let router = router.read(); - select_forward_target( - id, - connection_manager, - &router, - &req_peer, - &joiner, - left_htl, - &skip_forwards, - ) - }; - skip_connections.insert(req_peer.peer.clone()); - skip_forwards.insert(req_peer.peer.clone()); - match target_peer { - Some(target_peer) => { - let forward_msg = create_forward_message( + // Try to forward the connection request to an existing peer + if num_connections > 0 { + let target_peer = { + let router = router.read(); + select_forward_target( id, + connection_manager, + &router, &req_peer, &joiner, - &target_peer, left_htl, - max_htl, - skip_connections, - skip_forwards, - ); - tracing::debug!(target: "network", "Forwarding connection request to {:?}", target_peer); - network_bridge.send(&target_peer.peer, forward_msg).await?; - update_state_with_forward_info(&req_peer, left_htl) + &skip_forwards, + ) + }; + + skip_connections.insert(req_peer.peer.clone()); + skip_forwards.insert(req_peer.peer.clone()); + + match target_peer { + Some(target_peer) => { + // Successfully found a peer to forward to + let forward_msg = create_forward_message( + id, + &req_peer, + &joiner, + &target_peer, + left_htl, + max_htl, + skip_connections, + skip_forwards, + ); + tracing::debug!( + target: "network", + tx = %id, + "Forwarding connection request to {:?}", + target_peer + ); + network_bridge.send(&target_peer.peer, forward_msg).await?; + return update_state_with_forward_info(&req_peer, left_htl); + } + None => { + // Couldn't find suitable peer to forward to + tracing::debug!( + tx = %id, + joiner = %joiner.peer, + "No suitable peer found for forwarding despite having {} connections", + num_connections + ); + return Ok(None); + } } - None => handle_unforwardable_connection(id, accepted), } + + // Should be unreachable - we either forwarded or returned None + unreachable!("forward_conn should have returned by now") } fn select_forward_target( @@ -1135,21 +1214,6 @@ fn update_state_with_forward_info( Ok(Some(new_state)) } -fn handle_unforwardable_connection( - id: Transaction, - accepted: bool, -) -> Result, OpError> { - if accepted { - tracing::warn!( - tx = %id, - "Unable to forward, will only be connecting to one peer", - ); - } else { - tracing::warn!(tx = %id, "Unable to forward or accept any connections"); - } - Ok(None) -} - mod messages { use std::fmt::Display; diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index fc0a9b475..49e976fdb 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -323,6 +323,11 @@ impl ConnectionManager { .load(std::sync::atomic::Ordering::SeqCst) } + pub(crate) fn get_reserved_connections(&self) -> usize { + self.reserved_connections + .load(std::sync::atomic::Ordering::SeqCst) + } + pub(super) fn get_connections_by_location(&self) -> BTreeMap> { self.connections_by_location.read().clone() } diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index a76310705..20e73bcaf 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -70,8 +70,7 @@ pub(crate) struct Ring { event_register: Box, /// Whether this peer is a gateway or not. This will affect behavior of the node when acquiring /// and dropping connections. - #[allow(unused)] - is_gateway: bool, + pub(crate) is_gateway: bool, } // /// A data type that represents the fact that a peer has been blacklisted @@ -360,7 +359,9 @@ impl Ring { live_tx_tracker: LiveTransactionTracker, mut missing_candidates: mpsc::Receiver, ) -> anyhow::Result<()> { - tracing::debug!("Initializing connection maintenance task"); + tracing::info!("Initializing connection maintenance task"); + let is_gateway = self.is_gateway; + tracing::info!(is_gateway, "Connection maintenance task starting"); #[cfg(not(test))] const CONNECTION_AGE_THRESOLD: Duration = Duration::from_secs(60 * 5); #[cfg(test)] @@ -388,6 +389,7 @@ impl Ring { // if the peer is just starting wait a bit before // we even attempt acquiring more connections tokio::time::sleep(Duration::from_secs(2)).await; + tracing::info!("Connection maintenance task: initial sleep completed"); let mut live_tx = None; let mut pending_conn_adds = BTreeSet::new(); @@ -434,20 +436,42 @@ impl Ring { if let Some(ideal_location) = pending_conn_adds.pop_first() { if live_tx.is_none() { + tracing::info!( + "Attempting to acquire new connection for location: {:?}", + ideal_location + ); live_tx = self .acquire_new(ideal_location, &skip_list, ¬ifier, &live_tx_tracker) .await .map_err(|error| { - tracing::debug!(?error, "Shutting down connection maintenance task"); + tracing::error!( + ?error, + "FATAL: Connection maintenance task failed - shutting down" + ); error })?; + + if live_tx.is_none() { + let conns = self.connection_manager.get_open_connections(); + tracing::warn!( + "acquire_new returned None - likely no peers to query through (connections: {})", + conns + ); + } else { + tracing::info!("Successfully initiated connection acquisition"); + } } else { pending_conn_adds.insert(ideal_location); } } + let current_connections = self.connection_manager.get_open_connections(); let neighbor_locations = { let peers = self.connection_manager.get_connections_by_location(); + tracing::debug!( + "Maintenance task: current connections = {}, checking topology", + current_connections + ); peers .iter() .map(|(loc, conns)| { @@ -474,8 +498,22 @@ impl Ring { &self.connection_manager.own_location().location, Instant::now(), ); + + tracing::info!( + ?adjustment, + current_connections, + is_gateway, + pending_adds = pending_conn_adds.len(), + "Topology adjustment result" + ); + match adjustment { TopologyAdjustment::AddConnections(target_locs) => { + tracing::info!( + "Adding {} locations to pending connections (total pending: {})", + target_locs.len(), + pending_conn_adds.len() + target_locs.len() + ); pending_conn_adds.extend(target_locs); } TopologyAdjustment::RemoveConnections(mut should_disconnect_peers) => { @@ -515,6 +553,15 @@ impl Ring { notifier: &EventLoopNotificationsSender, live_tx_tracker: &LiveTransactionTracker, ) -> anyhow::Result> { + let current_connections = self.connection_manager.get_open_connections(); + let is_gateway = self.is_gateway; + + tracing::info!( + current_connections, + is_gateway, + "acquire_new: attempting to find peer to query" + ); + // First find a query target using just the input skip list let query_target = { let router = self.router.read(); @@ -526,6 +573,12 @@ impl Ring { ) { t } else { + tracing::warn!( + "acquire_new: routing() returned None - cannot find peer to query (connections: {}, is_gateway: {})", + current_connections, + is_gateway + ); + return Ok(None); } }; diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index a824e22ce..71f92fa5f 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -422,12 +422,7 @@ async fn test_basic_gateway_connectivity() -> TestResult { } // test_three_node_network_connectivity has been removed - see issue #1889 -// This test revealed that the topology manager was requesting duplicate connections, -// which has been fixed. However, the test also reveals a separate pre-existing issue -// with gateway connection forwarding that prevents full mesh formation in small networks. -// The test will be re-added once that separate issue is resolved. -// -// The topology fix itself is validated by the unit test: -// topology::tests::test_no_duplicate_connections_with_few_peers -// +// This test revealed a pre-existing bug in the topology manager where adjust_topology() +// requests duplicate connections to the same peer instead of diversifying connections. +// The test will be re-added once issue #1889 is resolved. // Issue: https://github.com/freenet/freenet-core/issues/1889