diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index 4481ad204..babf7b6f7 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -367,6 +367,7 @@ pub(crate) enum NodeEvent { /// Register expectation for an inbound connection from the given peer. ExpectPeerConnection { peer: PeerId, + courtesy: bool, }, } @@ -444,8 +445,11 @@ impl Display for NodeEvent { "Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})" ) } - NodeEvent::ExpectPeerConnection { peer } => { - write!(f, "ExpectPeerConnection (from {peer})") + NodeEvent::ExpectPeerConnection { peer, courtesy } => { + write!( + f, + "ExpectPeerConnection (from {peer}, courtesy: {courtesy})" + ) } } } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index a8b10ccce..4fb266f92 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -311,7 +311,7 @@ impl P2pConnManager { match *event { ConnEvent::InboundMessage(inbound) => { let remote = inbound.remote_addr; - let mut msg = inbound.msg; + let msg = inbound.msg; tracing::info!( tx = %msg.id(), msg_type = %msg, @@ -319,21 +319,6 @@ impl P2pConnManager { peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); - // Only the hop that owns the transport socket (gateway/first hop in - // practice) knows the UDP source address; tag the connect request here - // so downstream relays don't guess at the joiner's address. - if let ( - Some(remote_addr), - NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request { - payload, - .. - })), - ) = (remote, &mut msg) - { - if payload.observed_addr.is_none() { - payload.observed_addr = Some(remote_addr); - } - } ctx.handle_inbound_message(msg, &op_manager, &mut state) .await?; } @@ -649,14 +634,14 @@ impl P2pConnManager { ) .await?; } - NodeEvent::ExpectPeerConnection { peer } => { - tracing::debug!(%peer, "ExpectPeerConnection event received; registering inbound expectation via handshake driver"); + NodeEvent::ExpectPeerConnection { peer, courtesy } => { + tracing::debug!(%peer, ?courtesy, "ExpectPeerConnection event received; registering inbound expectation via handshake driver"); state.outbound_handler.expect_incoming(peer.addr); if let Err(error) = handshake_cmd_sender .send(HandshakeCommand::ExpectInbound { peer: peer.clone(), transaction: None, - courtesy: false, + courtesy, }) .await { @@ -1393,7 +1378,7 @@ impl P2pConnManager { "Inbound connection established" ); - self.handle_successful_connection(peer_id, connection, state, None) + self.handle_successful_connection(peer_id, connection, state, None, courtesy) .await?; } HandshakeEvent::OutboundEstablished { @@ -1408,7 +1393,7 @@ impl P2pConnManager { transaction = %transaction, "Outbound connection established" ); - self.handle_successful_connection(peer, connection, state, None) + self.handle_successful_connection(peer, connection, state, None, courtesy) .await?; } HandshakeEvent::OutboundFailed { @@ -1417,11 +1402,12 @@ impl P2pConnManager { error, courtesy, } => { - tracing::info!( + tracing::warn!( remote = %peer.addr, courtesy, transaction = %transaction, ?error, + open_connections = self.bridge.op_manager.ring.open_connections(), "Outbound connection failed" ); @@ -1523,6 +1509,7 @@ impl P2pConnManager { connection: PeerConnection, state: &mut EventListenerState, remaining_checks: Option, + courtesy: bool, ) -> anyhow::Result<()> { let pending_txs = state .awaiting_connection_txs @@ -1613,7 +1600,7 @@ impl P2pConnManager { self.bridge .op_manager .ring - .add_connection(loc, peer_id.clone(), false) + .add_connection(loc, peer_id.clone(), false, courtesy) .await; } Ok(()) @@ -1625,7 +1612,6 @@ impl P2pConnManager { state: &mut EventListenerState, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { - let _ = state; match event { Some(ConnEvent::InboundMessage(mut inbound)) => { let tx = *inbound.msg.id(); @@ -1670,6 +1656,33 @@ impl P2pConnManager { } } } + + let should_connect = + !self.connections.keys().any(|peer| peer.addr == remote_addr) + && !state.awaiting_connection.contains_key(&remote_addr); + + if should_connect { + if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { + tracing::info!( + "Received message from unconnected peer {}, establishing connection proactively", + sender_peer.peer + ); + + let tx = Transaction::new::(); + let (callback, _rx) = tokio::sync::mpsc::channel(10); + + let _ = self + .handle_connect_peer( + sender_peer.peer.clone(), + Box::new(callback), + tx, + handshake_commands, + state, + false, + ) + .await; + } + } } tracing::debug!( @@ -1683,9 +1696,10 @@ impl P2pConnManager { )) } Some(ConnEvent::TransportClosed { remote_addr, error }) => { - tracing::debug!( + tracing::warn!( remote = %remote_addr, ?error, + open_connections = self.bridge.op_manager.ring.open_connections(), "peer_connection_listener reported transport closure" ); if let Some(peer) = self diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index 75a49cbb9..c77a1aef0 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -940,8 +940,8 @@ where NodeEvent::QueryNodeDiagnostics { .. } => { unimplemented!() } - NodeEvent::ExpectPeerConnection { peer } => { - tracing::debug!(%peer, "ExpectPeerConnection ignored in testing impl"); + NodeEvent::ExpectPeerConnection { peer, courtesy } => { + tracing::debug!(%peer, courtesy, "ExpectPeerConnection ignored in testing impl"); continue; } }, diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index f0d055715..ec2347f43 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -166,7 +166,7 @@ pub(crate) trait RelayContext { fn self_location(&self) -> &PeerKeyLocation; /// Determine whether we should accept the joiner immediately. - fn should_accept(&self, joiner: &PeerKeyLocation) -> bool; + fn should_accept(&self, joiner: &PeerKeyLocation, courtesy: bool) -> bool; /// Choose the next hop for the request, avoiding peers already visited. fn select_next_hop( @@ -180,10 +180,16 @@ pub(crate) trait RelayContext { } /// Result of processing a request at a relay. +#[derive(Debug, Clone)] +pub(crate) struct ExpectedConnection { + pub peer: PeerKeyLocation, + pub courtesy: bool, +} + #[derive(Debug, Default)] pub(crate) struct RelayActions { pub accept_response: Option, - pub expect_connection_from: Option, + pub expect_connection_from: Option, pub forward: Option<(PeerKeyLocation, ConnectRequest)>, pub observed_address: Option<(PeerKeyLocation, SocketAddr)>, } @@ -212,16 +218,20 @@ impl RelayState { } } - if !self.accepted_locally && ctx.should_accept(&self.request.joiner) { + let acceptor = ctx.self_location().clone(); + let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner); + + if !self.accepted_locally && ctx.should_accept(&self.request.joiner, courtesy) { self.accepted_locally = true; - let acceptor = ctx.self_location().clone(); - let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner); self.courtesy_hint = courtesy; actions.accept_response = Some(ConnectResponse { acceptor: acceptor.clone(), courtesy, }); - actions.expect_connection_from = Some(self.request.joiner.clone()); + actions.expect_connection_from = Some(ExpectedConnection { + peer: self.request.joiner.clone(), + courtesy, + }); } if self.forwarded_to.is_none() && self.request.ttl > 0 { @@ -276,14 +286,14 @@ impl RelayContext for RelayEnv<'_> { &self.self_location } - fn should_accept(&self, joiner: &PeerKeyLocation) -> bool { + fn should_accept(&self, joiner: &PeerKeyLocation, courtesy: bool) -> bool { let location = joiner .location .unwrap_or_else(|| Location::from_address(&joiner.peer.addr)); self.op_manager .ring .connection_manager - .should_accept(location, &joiner.peer) + .should_accept(location, &joiner.peer, courtesy) } fn select_next_hop( @@ -300,10 +310,7 @@ impl RelayContext for RelayEnv<'_> { } fn courtesy_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool { - // Courtesy slots still piggyback on regular connections. Flag the first acceptance so the - // joiner can prioritise it, and keep the logic simple until dedicated courtesy tracking - // is wired in (see courtesy-connection-budget branch). - self.op_manager.ring.open_connections() == 0 + self.op_manager.ring.is_gateway() } } @@ -594,10 +601,11 @@ impl Operation for ConnectOp { .await?; } - if let Some(peer) = actions.expect_connection_from { + if let Some(expected) = actions.expect_connection_from { op_manager .notify_node_event(NodeEvent::ExpectPeerConnection { - peer: peer.peer.clone(), + peer: expected.peer.peer.clone(), + courtesy: expected.courtesy, }) .await?; } @@ -656,6 +664,7 @@ impl Operation for ConnectOp { .notify_node_event( crate::message::NodeEvent::ExpectPeerConnection { peer: new_acceptor.peer.peer.clone(), + courtesy: new_acceptor.courtesy, }, ) .await?; @@ -783,7 +792,7 @@ pub(crate) async fn join_ring_request( if !op_manager .ring .connection_manager - .should_accept(location, &gateway.peer) + .should_accept(location, &gateway.peer, false) { return Err(OpError::ConnError(ConnectionError::UnwantedConnection)); } @@ -994,7 +1003,7 @@ mod tests { &self.self_loc } - fn should_accept(&self, _joiner: &PeerKeyLocation) -> bool { + fn should_accept(&self, _joiner: &PeerKeyLocation, _courtesy: bool) -> bool { self.accept } @@ -1045,7 +1054,10 @@ mod tests { let response = actions.accept_response.expect("expected acceptance"); assert_eq!(response.acceptor.peer, self_loc.peer); assert!(response.courtesy); - assert_eq!(actions.expect_connection_from.unwrap().peer, joiner.peer); + assert_eq!( + actions.expect_connection_from.unwrap().peer.peer, + joiner.peer + ); assert!(actions.forward.is_none()); } @@ -1215,6 +1227,6 @@ mod tests { let expect_conn = accept_actions .expect_connection_from .expect("acceptance should request inbound connection from joiner"); - assert_eq!(expect_conn.peer, joiner.peer); + assert_eq!(expect_conn.peer, joiner); } } diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 1963e87b3..3fcd6c689 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -82,6 +82,16 @@ pub(crate) async fn request_get( get_op: GetOp, skip_list: HashSet, ) -> Result<(), OpError> { + let mut skip_list = skip_list; + // Always avoid bouncing straight back to ourselves. + skip_list.insert( + op_manager + .ring + .connection_manager + .own_location() + .peer + .clone(), + ); let (mut candidates, id, key_val, _fetch_contract) = if let Some(GetState::PrepareRequest { key, id, @@ -1271,6 +1281,7 @@ async fn try_forward_or_return( let mut new_skip_list = skip_list.clone(); new_skip_list.insert(this_peer.peer.clone()); + new_skip_list.insert(sender.peer.clone()); let new_htl = htl.saturating_sub(1); diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 4f1d7023c..1be05a688 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -1,6 +1,9 @@ +use dashmap::DashMap; use parking_lot::Mutex; use rand::prelude::IndexedRandom; -use std::collections::{btree_map::Entry, BTreeMap}; +use std::collections::{btree_map::Entry, BTreeMap, HashSet, VecDeque}; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; use crate::topology::{Limits, TopologyManager}; @@ -22,9 +25,136 @@ pub(crate) struct ConnectionManager { pub max_connections: usize, pub rnd_if_htl_above: usize, pub pub_key: Arc, + courtesy_links: Arc>>, + max_courtesy_links: usize, + pending_courtesy: Arc>>, + pending_courtesy_addr: Arc>>, + pending_connections: DashMap, +} + +#[derive(Clone)] +struct CourtesyLink { + peer: PeerId, +} + +const MAX_COURTESY_LINKS: usize = 10; +const PENDING_CONNECTION_TTL: Duration = Duration::from_secs(30); + +#[derive(Clone)] +struct PendingConnection { + inserted_at: Instant, + reserved: bool, } impl ConnectionManager { + fn cleanup_stale_pending(&self) { + let now = Instant::now(); + let mut expired = Vec::new(); + for entry in self.pending_connections.iter() { + if now.duration_since(entry.value().inserted_at) > PENDING_CONNECTION_TTL { + expired.push(entry.key().clone()); + } + } + if expired.is_empty() { + return; + } + let mut locations = self.location_for_peer.write(); + for peer in expired { + if let Some((peer_id, meta)) = self.pending_connections.remove(&peer) { + tracing::warn!(%peer_id, "pending connection timed out; releasing slot"); + if meta.reserved { + self.release_reserved_slot(Some(&peer_id), "pending_gc"); + } + locations.remove(&peer_id); + } + } + } + + fn register_pending_connection(&self, peer: &PeerId, reserved: bool) { + self.cleanup_stale_pending(); + let previous = self.pending_connections.insert( + peer.clone(), + PendingConnection { + inserted_at: Instant::now(), + reserved, + }, + ); + if let Some(prev) = previous { + tracing::debug!(%peer, reserved_previous = prev.reserved, reserved_new = reserved, "Replacing existing pending connection entry"); + if prev.reserved && !reserved { + self.release_reserved_slot(Some(peer), "pending_replaced"); + } + } + } + + fn take_pending_connection(&self, peer: &PeerId) -> Option { + self.pending_connections.remove(peer).map(|(_, meta)| meta) + } + + fn release_reserved_slot(&self, peer: Option<&PeerId>, context: &'static str) { + let mut current = self + .reserved_connections + .load(std::sync::atomic::Ordering::SeqCst); + loop { + if current == 0 { + tracing::warn!( + ?peer, + context, + "release_reserved_slot: counter already at zero" + ); + return; + } + match self.reserved_connections.compare_exchange( + current, + current - 1, + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + ) { + Ok(_) => { + tracing::debug!( + ?peer, + previous = current, + context, + "release_reserved_slot: decremented reserved counter" + ); + return; + } + Err(actual) => current = actual, + } + } + } + + fn reserve_connection_slot(&self, peer_id: &PeerId) -> Option { + loop { + let current = self + .reserved_connections + .load(std::sync::atomic::Ordering::SeqCst); + if current == usize::MAX { + tracing::error!( + %peer_id, + "reserved connection counter overflowed; rejecting new connection" + ); + return None; + } + match self.reserved_connections.compare_exchange( + current, + current + 1, + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + ) { + Ok(_) => return Some(current), + Err(actual) => { + tracing::debug!( + %peer_id, + expected = current, + actual, + "reserved connection counter changed concurrently; retrying" + ); + } + } + } + } + pub fn new(config: &NodeConfig) -> Self { let min_connections = if let Some(v) = config.min_number_conn { v @@ -111,6 +241,133 @@ impl ConnectionManager { max_connections, rnd_if_htl_above, pub_key: Arc::new(pub_key), + courtesy_links: Arc::new(Mutex::new(VecDeque::new())), + max_courtesy_links: if is_gateway { MAX_COURTESY_LINKS } else { 0 }, + pending_courtesy: Arc::new(Mutex::new(HashSet::new())), + pending_courtesy_addr: Arc::new(Mutex::new(HashSet::new())), + pending_connections: DashMap::new(), + } + } + + fn remember_courtesy_intent(&self, peer: &PeerId) { + if !self.is_gateway { + return; + } + let mut pending = self.pending_courtesy.lock(); + pending.insert(peer.clone()); + tracing::info!( + %peer, + pending = pending.len(), + "remember_courtesy_intent: recorded pending courtesy join" + ); + if !peer.addr.ip().is_unspecified() { + let mut addr_set = self.pending_courtesy_addr.lock(); + addr_set.insert(peer.addr); + tracing::info!( + %peer, + addr = %peer.addr, + pending = addr_set.len(), + "remember_courtesy_intent: tracking courtesy addr" + ); + } + } + + fn take_pending_courtesy(&self, peer: &PeerId) -> bool { + if !self.is_gateway { + return false; + } + let mut pending = self.pending_courtesy.lock(); + let removed = pending.remove(peer); + if removed { + tracing::debug!( + %peer, + pending = pending.len(), + "take_pending_courtesy: consuming pending courtesy flag" + ); + if !peer.addr.ip().is_unspecified() { + let mut addr_set = self.pending_courtesy_addr.lock(); + if addr_set.remove(&peer.addr) { + tracing::debug!( + %peer, + addr = %peer.addr, + remaining = addr_set.len(), + "take_pending_courtesy: removed pending courtesy addr" + ); + } + } + } + removed + } + + #[allow(dead_code)] + pub(crate) fn take_pending_courtesy_by_addr(&self, addr: &SocketAddr) -> bool { + if !self.is_gateway { + return false; + } + let mut addr_set = self.pending_courtesy_addr.lock(); + if addr_set.remove(addr) { + tracing::info!( + addr = %addr, + remaining = addr_set.len(), + "take_pending_courtesy_by_addr: consuming pending courtesy flag" + ); + true + } else { + false + } + } + + fn register_courtesy_connection(&self, peer: &PeerId) -> Option { + if !self.is_gateway || self.max_courtesy_links == 0 { + return None; + } + let mut links = self.courtesy_links.lock(); + if links.len() == self.max_courtesy_links && links.iter().all(|entry| entry.peer != *peer) { + tracing::debug!( + %peer, + max = self.max_courtesy_links, + "register_courtesy_connection: budget full before inserting" + ); + } + links.retain(|entry| entry.peer != *peer); + links.push_back(CourtesyLink { peer: peer.clone() }); + tracing::info!( + %peer, + len = links.len(), + max = self.max_courtesy_links, + "register_courtesy_connection: tracked courtesy link" + ); + if links.len() > self.max_courtesy_links { + let evicted = links.pop_front().map(|entry| entry.peer); + if let Some(ref victim) = evicted { + tracing::info!( + %victim, + %peer, + "register_courtesy_connection: evicting oldest courtesy link to stay under budget" + ); + } + evicted + } else { + None + } + } + + fn unregister_courtesy_connection(&self, peer: &PeerId) { + if !self.is_gateway { + return; + } + let mut links = self.courtesy_links.lock(); + if links.is_empty() { + return; + } + let before = links.len(); + links.retain(|entry| entry.peer != *peer); + if links.len() != before { + tracing::debug!( + %peer, + remaining = links.len(), + "unregister_courtesy_connection: removed courtesy tracking entry" + ); } } @@ -119,14 +376,28 @@ impl ConnectionManager { /// /// # Panic /// Will panic if the node checking for this condition has no location assigned. - pub fn should_accept(&self, location: Location, peer_id: &PeerId) -> bool { - tracing::info!("Checking if should accept connection"); + pub fn should_accept(&self, location: Location, peer_id: &PeerId, courtesy: bool) -> bool { + let courtesy_join = courtesy && self.is_gateway; + tracing::info!( + courtesy = courtesy_join, + "Checking if should accept connection" + ); let open = self .open_connections .load(std::sync::atomic::Ordering::SeqCst); - let reserved_before = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); + + if self.location_for_peer.read().contains_key(peer_id) { + if courtesy_join { + self.remember_courtesy_intent(peer_id); + } + tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); + return true; + } + + let reserved_before = match self.reserve_connection_slot(peer_id) { + Some(val) => val, + None => return false, + }; tracing::info!( %peer_id, @@ -148,35 +419,6 @@ impl ConnectionManager { ); } - let reserved_before = loop { - let current = self - .reserved_connections - .load(std::sync::atomic::Ordering::SeqCst); - if current == usize::MAX { - tracing::error!( - %peer_id, - "reserved connection counter overflowed; rejecting new connection" - ); - return false; - } - match self.reserved_connections.compare_exchange( - current, - current + 1, - std::sync::atomic::Ordering::SeqCst, - std::sync::atomic::Ordering::SeqCst, - ) { - Ok(_) => break current, - Err(actual) => { - tracing::debug!( - %peer_id, - expected = current, - actual, - "reserved connection counter changed concurrently; retrying" - ); - } - } - }; - let total_conn = match reserved_before .checked_add(1) .and_then(|val| val.checked_add(open)) @@ -189,19 +431,23 @@ impl ConnectionManager { open, "connection counters would overflow; rejecting connection" ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.release_reserved_slot(Some(peer_id), "should_accept_overflow_guard"); return false; } }; if open == 0 { + if courtesy_join { + self.remember_courtesy_intent(peer_id); + } + self.record_pending_location(peer_id, location); + self.register_pending_connection(peer_id, true); tracing::debug!(%peer_id, "should_accept: first connection -> accepting"); return true; } - const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2; - if self.is_gateway { + const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 10; + if self.is_gateway && !courtesy_join { let direct_total = open + reserved_before; if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT { tracing::info!( @@ -211,19 +457,35 @@ impl ConnectionManager { limit = GATEWAY_DIRECT_ACCEPT_LIMIT, "Gateway reached direct-accept limit; forwarding join request instead" ); - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.release_reserved_slot(Some(peer_id), "gateway_direct_accept_limit"); tracing::info!(%peer_id, "should_accept: gateway direct-accept limit hit, forwarding instead"); return false; } } if self.location_for_peer.read().get(peer_id).is_some() { + if courtesy_join { + self.remember_courtesy_intent(peer_id); + } // We've already accepted this peer (pending or active); treat as a no-op acceptance. tracing::debug!(%peer_id, "Peer already pending/connected; acknowledging acceptance"); return true; } + if courtesy_join { + tracing::info!(%peer_id, "should_accept: marking courtesy intent"); + self.remember_courtesy_intent(peer_id); + tracing::debug!( + %peer_id, + open, + reserved = reserved_before, + "should_accept: accepting courtesy connection despite topology limits" + ); + self.record_pending_location(peer_id, location); + self.register_pending_connection(peer_id, true); + return true; + } + let accepted = if total_conn < self.min_connections { tracing::info!(%peer_id, total_conn, "should_accept: accepted (below min connections)"); true @@ -256,11 +518,11 @@ impl ConnectionManager { "should_accept: final decision" ); if !accepted { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + self.release_reserved_slot(Some(peer_id), "should_accept_rejected"); } else { tracing::info!(%peer_id, total_conn, "should_accept: accepted (reserving spot)"); self.record_pending_location(peer_id, location); + self.register_pending_connection(peer_id, true); } accepted } @@ -350,21 +612,33 @@ impl ConnectionManager { self.prune_connection(peer, false) } - pub fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { - tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology"); + pub fn add_connection( + &self, + loc: Location, + peer: PeerId, + was_reserved: bool, + courtesy: bool, + ) -> Option { + tracing::info!( + %peer, + %loc, + %was_reserved, + courtesy, + "Adding connection to topology" + ); debug_assert!(self.get_peer_key().expect("should be set") != peer); - if was_reserved { - let old = self - .reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - #[cfg(debug_assertions)] - { - tracing::debug!(old, "Decremented reserved connections"); - if old == 0 { - panic!("Underflow of reserved connections"); - } - } - let _ = old; + let pending_meta = self.take_pending_connection(&peer); + let reserved_slot = pending_meta + .as_ref() + .map(|meta| meta.reserved) + .unwrap_or(was_reserved); + if reserved_slot { + self.release_reserved_slot(Some(&peer), "add_connection"); + } else if was_reserved { + tracing::warn!( + %peer, + "add_connection: expected reserved slot but pending entry missing" + ); } let mut lop = self.location_for_peer.write(); lop.insert(peer.clone(), loc); @@ -381,6 +655,34 @@ impl ConnectionManager { self.open_connections .fetch_add(1, std::sync::atomic::Ordering::SeqCst); std::mem::drop(lop); + + let courtesy = if courtesy { + // Clear any pending markers so they don't leak. + let _ = self.take_pending_courtesy(&peer); + true + } else { + self.take_pending_courtesy(&peer) + }; + + if courtesy { + self.register_courtesy_connection(&peer) + } else { + self.unregister_courtesy_connection(&peer); + None + } + } + + #[allow(dead_code)] + pub fn register_outbound_pending(&self, peer: &PeerId, location: Option) { + if let Some(loc) = location { + self.record_pending_location(peer, loc); + } + self.register_pending_connection(peer, false); + } + + #[allow(dead_code)] + pub fn pending_location_hint(&self, peer: &PeerId) -> Option { + self.location_for_peer.read().get(peer).copied() } pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool { @@ -432,14 +734,22 @@ impl ConnectionManager { tracing::debug!(%peer, "Pruning {} connection", connection_type); let mut locations_for_peer = self.location_for_peer.write(); + let pending_meta = if is_alive { + None + } else { + self.take_pending_connection(peer) + }; let Some(loc) = locations_for_peer.remove(peer) else { if is_alive { tracing::debug!("no location found for peer, skip pruning"); return None; + } else if let Some(meta) = pending_meta { + if meta.reserved { + self.release_reserved_slot(Some(peer), "prune_missing_in_transit"); + } } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + tracing::warn!(%peer, "prune_missing_in_transit: no pending entry found while releasing"); } return None; }; @@ -452,11 +762,15 @@ impl ConnectionManager { } if is_alive { + self.unregister_courtesy_connection(peer); self.open_connections .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } else if let Some(meta) = pending_meta { + if meta.reserved { + self.release_reserved_slot(Some(peer), "prune_in_transit"); + } } else { - self.reserved_connections - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + tracing::warn!(%peer, "prune_in_transit: missing pending entry while releasing"); } Some(loc) diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 16ce71be8..ca90810eb 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -4,7 +4,6 @@ //! and routes requests to the optimal peers. use std::collections::{BTreeSet, HashSet}; -use std::net::SocketAddr; use std::{ sync::{ atomic::{AtomicU64, AtomicUsize}, @@ -228,14 +227,28 @@ impl Ring { .record_request(recipient, target, request_type); } - pub async fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) { - tracing::info!(%peer, this = ?self.connection_manager.get_peer_key(), %was_reserved, "Adding connection to peer"); - self.connection_manager - .add_connection(loc, peer.clone(), was_reserved); + pub async fn add_connection( + &self, + loc: Location, + peer: PeerId, + was_reserved: bool, + courtesy: bool, + ) -> Option { + tracing::info!( + %peer, + this = ?self.connection_manager.get_peer_key(), + %was_reserved, + courtesy, + "Adding connection to peer" + ); + let eviction_candidate = + self.connection_manager + .add_connection(loc, peer.clone(), was_reserved, courtesy); self.event_register .register_events(Either::Left(NetEventLog::connected(self, peer, loc))) .await; - self.refresh_density_request_cache() + self.refresh_density_request_cache(); + eviction_candidate } pub fn update_connection_identity(&self, old_peer: &PeerId, new_peer: PeerId) {