From 51cf49e7a7dd760c4e642da9320b48398a5189f0 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 15 Nov 2025 12:55:01 -0600 Subject: [PATCH 1/2] fix: persist peer connection listener --- .../src/node/network_bridge/p2p_protoc.rs | 420 +++++++++++------- .../node/network_bridge/priority_select.rs | 44 +- .../network_bridge/priority_select/tests.rs | 36 +- crates/core/src/operations/put.rs | 101 ----- 4 files changed, 300 insertions(+), 301 deletions(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index bd18f56f0..b4f26078b 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1,7 +1,5 @@ use dashmap::DashSet; use either::{Either, Left, Right}; -use futures::future::BoxFuture; -use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use std::convert::Infallible; @@ -114,6 +112,7 @@ pub(in crate::node) struct P2pConnManager { conn_bridge_rx: Receiver, event_listener: Box, connections: HashMap, + conn_event_tx: Option>, key_pair: TransportKeypair, listening_ip: IpAddr, listening_port: u16, @@ -183,6 +182,7 @@ impl P2pConnManager { conn_bridge_rx: rx_bridge_cmd, event_listener: Box::new(event_listener), connections: HashMap::new(), + conn_event_tx: None, key_pair, listening_ip: listener_ip, listening_port: listen_port, @@ -212,6 +212,7 @@ impl P2pConnManager { conn_bridge_rx, event_listener, connections, + conn_event_tx: _, key_pair, listening_ip, listening_port, @@ -243,10 +244,7 @@ impl P2pConnManager { let mut state = EventListenerState::new(outbound_conn_handler.clone()); - // Separate peer_connections to allow independent borrowing by the stream - let peer_connections: FuturesUnordered< - BoxFuture<'static, Result>, - > = FuturesUnordered::new(); + let (conn_event_tx, conn_event_rx) = mpsc::channel(1024); // For non-gateway peers, pass the peer_ready flag so it can be set after first handshake // For gateways, pass None (they're always ready) @@ -274,7 +272,7 @@ impl P2pConnManager { node_controller, client_wait_for_transaction, executor_listener, - peer_connections, + conn_event_rx, ); // Pin the stream on the stack @@ -288,6 +286,7 @@ impl P2pConnManager { conn_bridge_rx: tokio::sync::mpsc::channel(1).1, // Dummy, won't be used event_listener, connections, + conn_event_tx: Some(conn_event_tx.clone()), key_pair, listening_ip, listening_port, @@ -302,22 +301,20 @@ impl P2pConnManager { while let Some(result) = select_stream.as_mut().next().await { // Process the result using the existing handler let event = ctx - .process_select_result( - result, - &mut state, - &mut select_stream, - &handshake_cmd_sender, - ) + .process_select_result(result, &mut state, &handshake_cmd_sender) .await?; match event { EventResult::Continue => continue, EventResult::Event(event) => { match *event { - ConnEvent::InboundMessage(msg) => { + ConnEvent::InboundMessage(inbound) => { + let remote = inbound.remote_addr; + let msg = inbound.msg; tracing::info!( tx = %msg.id(), msg_type = %msg, + remote = ?remote, peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(), "Received inbound message from peer - processing" ); @@ -498,6 +495,13 @@ impl P2pConnManager { } } } + ConnEvent::TransportClosed { remote_addr, error } => { + tracing::debug!( + remote = %remote_addr, + ?error, + "Transport closed event received in event loop" + ); + } ConnEvent::ClosedChannel(reason) => { match reason { ChannelCloseReason::Bridge @@ -986,7 +990,6 @@ impl P2pConnManager { &mut self, result: priority_select::SelectResult, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { let peer_id = &self.bridge.op_manager.ring.connection_manager.pub_key; @@ -1011,9 +1014,9 @@ impl P2pConnManager { SelectResult::PeerConnection(msg) => { tracing::debug!( peer = %peer_id, - "PrioritySelect: peer_connections READY" + "PrioritySelect: connection events READY" ); - self.handle_peer_connection_msg(msg, state, select_stream, handshake_commands) + self.handle_transport_event(msg, state, handshake_commands) .await } SelectResult::ConnBridge(msg) => { @@ -1026,12 +1029,11 @@ impl P2pConnManager { SelectResult::Handshake(result) => { tracing::debug!( peer = %peer_id, - "PrioritySelect: handshake event READY" + "PrioritySelect: handshake event READY" ); match result { Some(event) => { - self.handle_handshake_action(event, state, select_stream) - .await?; + self.handle_handshake_action(event, state).await?; Ok(EventResult::Continue) } None => { @@ -1326,7 +1328,6 @@ impl P2pConnManager { &mut self, event: HandshakeEvent, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, ) -> anyhow::Result<()> { tracing::info!(?event, "handle_handshake_action: received handshake event"); match event { @@ -1376,7 +1377,7 @@ impl P2pConnManager { "Inbound connection established" ); - self.handle_successful_connection(peer_id, connection, state, select_stream, None) + self.handle_successful_connection(peer_id, connection, state, None) .await?; } HandshakeEvent::OutboundEstablished { @@ -1391,7 +1392,7 @@ impl P2pConnManager { transaction = %transaction, "Outbound connection established" ); - self.handle_successful_connection(peer, connection, state, select_stream, None) + self.handle_successful_connection(peer, connection, state, None) .await?; } HandshakeEvent::OutboundFailed { @@ -1505,7 +1506,6 @@ impl P2pConnManager { peer_id: PeerId, connection: PeerConnection, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, remaining_checks: Option, ) -> anyhow::Result<()> { let pending_txs = state @@ -1574,8 +1574,13 @@ impl P2pConnManager { let (tx, rx) = mpsc::channel(10); tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap"); self.connections.insert(peer_id.clone(), tx); - let task = peer_connection_listener(rx, connection).boxed(); - select_stream.push_peer_connection(task); + let Some(conn_events) = self.conn_event_tx.as_ref().cloned() else { + anyhow::bail!("Connection event channel not initialized"); + }; + let listener_peer = peer_id.clone(); + tokio::spawn(async move { + peer_connection_listener(rx, connection, listener_peer, conn_events).await; + }); newly_inserted = true; } else { tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] SKIP INSERT: OutboundConnectionSuccessful - connection already exists in HashMap"); @@ -1598,130 +1603,132 @@ impl P2pConnManager { Ok(()) } - async fn handle_peer_connection_msg( + async fn handle_transport_event( &mut self, - msg: Option>, + event: Option, state: &mut EventListenerState, - select_stream: &mut priority_select::ProductionPrioritySelectStream, handshake_commands: &HandshakeCommandSender, ) -> anyhow::Result { - match msg { - Some(Ok(peer_conn)) => { - let mut peer_conn = peer_conn; - // Get the remote address from the connection - let remote_addr = peer_conn.conn.remote_addr(); - let tx = *peer_conn.msg.id(); - if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) { - if sender_peer.peer.addr == remote_addr - || sender_peer.peer.addr.ip().is_unspecified() - { - let mut new_peer_id = sender_peer.peer.clone(); - if new_peer_id.addr.ip().is_unspecified() { - new_peer_id.addr = remote_addr; - if let Some(sender_mut) = - extract_sender_from_message_mut(&mut peer_conn.msg) - { - if sender_mut.peer.addr.ip().is_unspecified() { - sender_mut.peer.addr = remote_addr; + match event { + Some(ConnEvent::InboundMessage(mut inbound)) => { + let tx = *inbound.msg.id(); + + if let Some(remote_addr) = inbound.remote_addr { + if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { + if sender_peer.peer.addr == remote_addr + || sender_peer.peer.addr.ip().is_unspecified() + { + let mut new_peer_id = sender_peer.peer.clone(); + if new_peer_id.addr.ip().is_unspecified() { + new_peer_id.addr = remote_addr; + if let Some(sender_mut) = + extract_sender_from_message_mut(&mut inbound.msg) + { + if sender_mut.peer.addr.ip().is_unspecified() { + sender_mut.peer.addr = remote_addr; + } } } - } - if let Some(existing_key) = self - .connections - .keys() - .find(|peer| { - peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key - }) - .cloned() - { - if let Some(channel) = self.connections.remove(&existing_key) { - tracing::info!( - remote = %remote_addr, - old_peer = %existing_key, - new_peer = %new_peer_id, - "Updating provisional peer identity after inbound message" - ); - self.bridge - .op_manager - .ring - .update_connection_identity(&existing_key, new_peer_id.clone()); - self.connections.insert(new_peer_id, channel); + if let Some(existing_key) = self + .connections + .keys() + .find(|peer| { + peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key + }) + .cloned() + { + if let Some(channel) = self.connections.remove(&existing_key) { + tracing::info!( + remote = %remote_addr, + old_peer = %existing_key, + new_peer = %new_peer_id, + "Updating provisional peer identity after inbound message" + ); + self.bridge.op_manager.ring.update_connection_identity( + &existing_key, + new_peer_id.clone(), + ); + self.connections.insert(new_peer_id, channel); + } } } } - } - // Check if we need to establish a connection back to the sender - let should_connect = !self.connections.keys().any(|peer| peer.addr == remote_addr) - && !state.awaiting_connection.contains_key(&remote_addr); + let should_connect = + !self.connections.keys().any(|peer| peer.addr == remote_addr) + && !state.awaiting_connection.contains_key(&remote_addr); - if should_connect { - // Try to extract sender information from the message to establish connection - if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) { - tracing::info!( - "Received message from unconnected peer {}, establishing connection proactively", - sender_peer.peer - ); + if should_connect { + if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) { + tracing::info!( + "Received message from unconnected peer {}, establishing connection proactively", + sender_peer.peer + ); - let tx = Transaction::new::(); - let (callback, _rx) = tokio::sync::mpsc::channel(10); + let tx = Transaction::new::(); + let (callback, _rx) = tokio::sync::mpsc::channel(10); - // Don't await - let it happen in the background - let _ = self - .handle_connect_peer( - sender_peer.peer.clone(), - Box::new(callback), - tx, - handshake_commands, - state, - false, // not a courtesy connection - ) - .await; + let _ = self + .handle_connect_peer( + sender_peer.peer.clone(), + Box::new(callback), + tx, + handshake_commands, + state, + false, + ) + .await; + } } } tracing::debug!( - peer_addr = %remote_addr, + peer_addr = ?inbound.remote_addr, %tx, tx_type = ?tx.transaction_type(), "Queueing inbound NetMessage from peer connection" ); - let task = peer_connection_listener(peer_conn.rx, peer_conn.conn).boxed(); - select_stream.push_peer_connection(task); Ok(EventResult::Event( - ConnEvent::InboundMessage(peer_conn.msg).into(), + ConnEvent::InboundMessage(inbound).into(), )) } - Some(Err(err)) => { - if let TransportError::ConnectionClosed(socket_addr) = err { - if let Some(peer) = self - .connections - .keys() - .find_map(|k| (k.addr == socket_addr).then(|| k.clone())) + Some(ConnEvent::TransportClosed { remote_addr, error }) => { + tracing::debug!( + remote = %remote_addr, + ?error, + "peer_connection_listener reported transport closure" + ); + if let Some(peer) = self + .connections + .keys() + .find_map(|k| (k.addr == remote_addr).then(|| k.clone())) + { + tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer, socket_addr = %remote_addr, conn_map_size = self.connections.len(), "[CONN_TRACK] REMOVE: TransportClosed - removing from connections HashMap"); + self.bridge + .op_manager + .ring + .prune_connection(peer.clone()) + .await; + self.connections.remove(&peer); + if let Err(error) = handshake_commands + .send(HandshakeCommand::DropConnection { peer: peer.clone() }) + .await { - tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer, socket_addr = %socket_addr, conn_map_size = self.connections.len(), "[CONN_TRACK] REMOVE: TransportError::ConnectionClosed - removing from connections HashMap"); - self.bridge - .op_manager - .ring - .prune_connection(peer.clone()) - .await; - self.connections.remove(&peer); - if let Err(error) = handshake_commands - .send(HandshakeCommand::DropConnection { peer: peer.clone() }) - .await - { - tracing::warn!( - remote = %socket_addr, - ?error, - "Failed to notify handshake driver about dropped connection" - ); - } + tracing::warn!( + remote = %remote_addr, + ?error, + "Failed to notify handshake driver about dropped connection" + ); } } Ok(EventResult::Continue) } + Some(other) => { + tracing::warn!(?other, "Unexpected event from peer connection listener"); + Ok(EventResult::Continue) + } None => { - tracing::error!("All peer connections closed"); + tracing::error!("All peer connection event channels closed"); Ok(EventResult::Continue) } } @@ -1767,7 +1774,7 @@ impl P2pConnManager { msg_type = %msg, "handle_notification_msg: Received NetMessage notification, converting to InboundMessage" ); - EventResult::Event(ConnEvent::InboundMessage(msg).into()) + EventResult::Event(ConnEvent::InboundMessage(msg.into()).into()) } Some(Right(action)) => { tracing::info!( @@ -1790,7 +1797,7 @@ impl P2pConnManager { match msg { Some((callback, msg)) => { state.pending_op_results.insert(*msg.id(), callback); - EventResult::Event(ConnEvent::InboundMessage(msg).into()) + EventResult::Event(ConnEvent::InboundMessage(msg.into()).into()) } None => { EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::OpExecution).into()) @@ -1900,7 +1907,6 @@ impl ConnectResultSender for mpsc::Sender), ()>> { struct EventListenerState { outbound_handler: OutboundConnectionHandler, - // Note: peer_connections has been moved out to allow separate borrowing by the stream pending_from_executor: HashSet, // FIXME: we are potentially leaving trash here when transacrions are completed tx_to_client: HashMap>, @@ -1931,10 +1937,38 @@ enum EventResult { #[derive(Debug)] pub(super) enum ConnEvent { - InboundMessage(NetMessage), + InboundMessage(IncomingMessage), OutboundMessage(NetMessage), NodeAction(NodeEvent), ClosedChannel(ChannelCloseReason), + TransportClosed { + remote_addr: SocketAddr, + error: TransportError, + }, +} + +#[derive(Debug)] +pub(super) struct IncomingMessage { + pub msg: NetMessage, + pub remote_addr: Option, +} + +impl IncomingMessage { + fn with_remote(msg: NetMessage, remote_addr: SocketAddr) -> Self { + Self { + msg, + remote_addr: Some(remote_addr), + } + } +} + +impl From for IncomingMessage { + fn from(msg: NetMessage) -> Self { + Self { + msg, + remote_addr: None, + } + } } #[derive(Debug)] @@ -1957,14 +1991,6 @@ enum ProtocolStatus { Failed, } -#[derive(Debug)] -pub(super) struct PeerConnectionInbound { - pub conn: PeerConnection, - /// Receiver for inbound messages for the peer connection - pub rx: Receiver>, - pub msg: NetMessage, -} - async fn handle_peer_channel_message( conn: &mut PeerConnection, msg: Either, @@ -2016,17 +2042,50 @@ async fn handle_peer_channel_message( Ok(()) } +async fn notify_transport_closed( + sender: &Sender, + remote_addr: SocketAddr, + error: TransportError, +) { + if sender + .send(ConnEvent::TransportClosed { remote_addr, error }) + .await + .is_err() + { + tracing::debug!( + remote = %remote_addr, + "[CONN_LIFECYCLE] conn_events receiver dropped before handling closure event" + ); + } +} + async fn peer_connection_listener( mut rx: PeerConnChannelRecv, mut conn: PeerConnection, -) -> Result { + peer_id: PeerId, + conn_events: Sender, +) { const MAX_IMMEDIATE_SENDS: usize = 32; + let remote_addr = conn.remote_addr(); + tracing::debug!( + to = %remote_addr, + peer = %peer_id, + "[CONN_LIFECYCLE] Starting peer_connection_listener task" + ); loop { let mut drained = 0; loop { match rx.try_recv() { Ok(msg) => { - handle_peer_channel_message(&mut conn, msg).await?; + if let Err(error) = handle_peer_channel_message(&mut conn, msg).await { + tracing::debug!( + to = %remote_addr, + ?error, + "[CONN_LIFECYCLE] Shutting down connection after send failure" + ); + notify_transport_closed(&conn_events, remote_addr, error).await; + return; + } drained += 1; if drained >= MAX_IMMEDIATE_SENDS { break; @@ -2038,7 +2097,13 @@ async fn peer_connection_listener( to = %conn.remote_addr(), "[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection" ); - return Err(TransportError::ConnectionClosed(conn.remote_addr())); + notify_transport_closed( + &conn_events, + remote_addr, + TransportError::ConnectionClosed(remote_addr), + ) + .await; + return; } } } @@ -2050,32 +2115,71 @@ async fn peer_connection_listener( to = %conn.remote_addr(), "[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection" ); - break Err(TransportError::ConnectionClosed(conn.remote_addr())); + notify_transport_closed( + &conn_events, + remote_addr, + TransportError::ConnectionClosed(remote_addr), + ) + .await; + return; }; - handle_peer_channel_message(&mut conn, msg).await?; - } - msg = conn.recv() => { - let Ok(msg) = msg - .inspect_err(|error| { - tracing::error!(from=%conn.remote_addr(), "Error while receiving message: {error}"); - }) - else { + if let Err(error) = handle_peer_channel_message(&mut conn, msg).await { tracing::debug!( - from = %conn.remote_addr(), - "[CONN_LIFECYCLE] peer_connection_listener terminating after recv error" + to = %remote_addr, + ?error, + "[CONN_LIFECYCLE] Connection closed after channel command" ); - break Err(TransportError::ConnectionClosed(conn.remote_addr())); - }; - let net_message = decode_msg(&msg).unwrap(); - let tx = *net_message.id(); - tracing::debug!( - from = %conn.remote_addr(), - %tx, - tx_type = ?tx.transaction_type(), - msg_type = %net_message, - "[CONN_LIFECYCLE] Received inbound NetMessage from peer" - ); - break Ok(PeerConnectionInbound { conn, rx, msg: net_message }); + notify_transport_closed(&conn_events, remote_addr, error).await; + return; + } + } + msg = conn.recv() => { + match msg { + Ok(msg) => { + match decode_msg(&msg) { + Ok(net_message) => { + let tx = *net_message.id(); + tracing::debug!( + from = %conn.remote_addr(), + %tx, + tx_type = ?tx.transaction_type(), + msg_type = %net_message, + "[CONN_LIFECYCLE] Received inbound NetMessage from peer" + ); + if conn_events.send(ConnEvent::InboundMessage(IncomingMessage::with_remote(net_message, remote_addr))).await.is_err() { + tracing::debug!( + from = %remote_addr, + "[CONN_LIFECYCLE] conn_events receiver dropped; stopping listener" + ); + return; + } + } + Err(error) => { + tracing::error!( + from = %conn.remote_addr(), + ?error, + "[CONN_LIFECYCLE] Failed to deserialize inbound message; closing connection" + ); + notify_transport_closed( + &conn_events, + remote_addr, + TransportError::ConnectionClosed(remote_addr), + ) + .await; + return; + } + } + } + Err(error) => { + tracing::debug!( + from = %conn.remote_addr(), + ?error, + "[CONN_LIFECYCLE] peer_connection_listener terminating after recv error" + ); + notify_transport_closed(&conn_events, remote_addr, error).await; + return; + } + } } } } diff --git a/crates/core/src/node/network_bridge/priority_select.rs b/crates/core/src/node/network_bridge/priority_select.rs index 677e22555..cdd1ac091 100644 --- a/crates/core/src/node/network_bridge/priority_select.rs +++ b/crates/core/src/node/network_bridge/priority_select.rs @@ -2,20 +2,19 @@ //! This avoids waker registration issues that can occur with nested tokio::select! macros. use either::Either; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream}; +use futures::Stream; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::Receiver; -use super::p2p_protoc::PeerConnectionInbound; +use super::p2p_protoc::ConnEvent; use crate::contract::{ ContractHandlerChannel, ExecutorToEventLoopChannel, NetworkEventListenerHalve, WaitingResolution, }; use crate::dev_tool::{PeerId, Transaction}; use crate::message::{NetMessage, NodeEvent}; -use crate::transport::TransportError; // P2pBridgeEvent type alias for the event bridge channel pub type P2pBridgeEvent = Either<(PeerId, Box), NodeEvent>; @@ -25,7 +24,7 @@ pub type P2pBridgeEvent = Either<(PeerId, Box), NodeEvent>; pub(super) enum SelectResult { Notification(Option>), OpExecution(Option<(tokio::sync::mpsc::Sender, NetMessage)>), - PeerConnection(Option>), + PeerConnection(Option), ConnBridge(Option), Handshake(Option), NodeController(Option), @@ -110,10 +109,7 @@ where tokio_stream::wrappers::ReceiverStream<(tokio::sync::mpsc::Sender, NetMessage)>, conn_bridge: tokio_stream::wrappers::ReceiverStream, node_controller: tokio_stream::wrappers::ReceiverStream, - - // FuturesUnordered already implements Stream (owned) - peer_connections: - FuturesUnordered>>, + conn_events: tokio_stream::wrappers::ReceiverStream, // HandshakeHandler now implements Stream directly - maintains state across polls // Generic to allow testing with mocks @@ -129,6 +125,7 @@ where op_execution_closed: bool, conn_bridge_closed: bool, node_controller_closed: bool, + conn_events_closed: bool, } impl PrioritySelectStream @@ -146,9 +143,7 @@ where node_controller: Receiver, client_wait_for_transaction: C, executor_listener: E, - peer_connections: FuturesUnordered< - BoxFuture<'static, Result>, - >, + conn_events: Receiver, ) -> Self { use tokio_stream::wrappers::ReceiverStream; @@ -157,7 +152,7 @@ where op_execution: ReceiverStream::new(op_execution_rx), conn_bridge: ReceiverStream::new(conn_bridge_rx), node_controller: ReceiverStream::new(node_controller), - peer_connections, + conn_events: ReceiverStream::new(conn_events), handshake_handler, client_wait_for_transaction, executor_listener, @@ -165,16 +160,9 @@ where op_execution_closed: false, conn_bridge_closed: false, node_controller_closed: false, + conn_events_closed: false, } } - - /// Add a new peer connection task to the stream - pub fn push_peer_connection( - &mut self, - task: BoxFuture<'static, Result>, - ) { - self.peer_connections.push(task); - } } impl Stream for PrioritySelectStream @@ -225,10 +213,18 @@ where } } - // Priority 3: Peer connections (only if not empty) - if !this.peer_connections.is_empty() { - match Pin::new(&mut this.peer_connections).poll_next(cx) { - Poll::Ready(msg) => return Poll::Ready(Some(SelectResult::PeerConnection(msg))), + // Priority 3: Peer connection events + if !this.conn_events_closed { + match Pin::new(&mut this.conn_events).poll_next(cx) { + Poll::Ready(Some(event)) => { + return Poll::Ready(Some(SelectResult::PeerConnection(Some(event)))) + } + Poll::Ready(None) => { + this.conn_events_closed = true; + if first_closed_channel.is_none() { + first_closed_channel = Some(SelectResult::PeerConnection(None)); + } + } Poll::Pending => {} } } diff --git a/crates/core/src/node/network_bridge/priority_select/tests.rs b/crates/core/src/node/network_bridge/priority_select/tests.rs index 071ca67cc..8326919bf 100644 --- a/crates/core/src/node/network_bridge/priority_select/tests.rs +++ b/crates/core/src/node/network_bridge/priority_select/tests.rs @@ -49,7 +49,7 @@ async fn test_priority_select_future_wakeup() { let (notif_tx, notif_rx) = mpsc::channel(10); let (_op_tx, op_rx) = mpsc::channel(10); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); let (_bridge_tx, bridge_rx) = mpsc::channel(10); let (_node_tx, node_rx) = mpsc::channel(10); @@ -72,7 +72,7 @@ async fn test_priority_select_future_wakeup() { node_rx, MockClient, MockExecutor, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -128,7 +128,7 @@ async fn test_priority_select_future_priority_ordering() { let (notif_tx, notif_rx) = mpsc::channel(10); let (op_tx, op_rx) = mpsc::channel(10); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); let (bridge_tx, bridge_rx) = mpsc::channel(10); let (_, node_rx) = mpsc::channel(10); @@ -157,7 +157,7 @@ async fn test_priority_select_future_priority_ordering() { node_rx, MockClient, MockExecutor, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -199,7 +199,7 @@ async fn test_priority_select_future_concurrent_messages() { } let (notif_tx, notif_rx) = mpsc::channel(100); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); // Send all 15 messages for _ in 0..15 { @@ -222,7 +222,7 @@ async fn test_priority_select_future_concurrent_messages() { node_rx, MockClient, MockExecutor, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -263,7 +263,7 @@ async fn test_priority_select_future_buffered_messages() { } let (notif_tx, notif_rx) = mpsc::channel(10); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); // Send message BEFORE creating stream let test_msg = NetMessage::V1(crate::message::NetMessageV1::Aborted( @@ -284,7 +284,7 @@ async fn test_priority_select_future_buffered_messages() { node_rx, MockClient, MockExecutor, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -331,7 +331,7 @@ async fn test_priority_select_future_rapid_cancellations() { } let (notif_tx, notif_rx) = mpsc::channel(100); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); // Send 10 messages for _ in 0..10 { @@ -354,7 +354,7 @@ async fn test_priority_select_future_rapid_cancellations() { node_rx, MockClient, MockExecutor, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -415,7 +415,7 @@ async fn test_priority_select_event_loop_simulation() { // Create channels once (like in wait_for_event) let (notif_tx, notif_rx) = mpsc::channel::>(10); let (op_tx, op_rx) = mpsc::channel::<(tokio::sync::mpsc::Sender, NetMessage)>(10); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); let (bridge_tx, bridge_rx) = mpsc::channel::(10); let (node_tx, node_rx) = mpsc::channel::(10); @@ -472,7 +472,7 @@ async fn test_priority_select_event_loop_simulation() { node_rx, MockClient, MockExecutor, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -660,7 +660,7 @@ async fn test_with_seed(seed: u64) { // Create channels once (like in wait_for_event) let (notif_tx, notif_rx) = mpsc::channel::>(100); let (op_tx, op_rx) = mpsc::channel::<(tokio::sync::mpsc::Sender, NetMessage)>(100); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(100); let (bridge_tx, bridge_rx) = mpsc::channel::(100); let (node_tx, node_rx) = mpsc::channel::(100); let (client_tx, client_rx) = mpsc::channel::< @@ -862,7 +862,7 @@ async fn test_with_seed(seed: u64) { rx: executor_rx, closed: false, }, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -1166,7 +1166,7 @@ async fn test_priority_select_all_pending_waker_registration() { // Create all 8 channels let (notif_tx, notif_rx) = mpsc::channel::>(10); let (op_tx, op_rx) = mpsc::channel::<(tokio::sync::mpsc::Sender, NetMessage)>(10); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); let (bridge_tx, bridge_rx) = mpsc::channel::(10); let (node_tx, node_rx) = mpsc::channel::(10); let (client_tx, client_rx) = mpsc::channel::< @@ -1242,7 +1242,7 @@ async fn test_priority_select_all_pending_waker_registration() { rx: executor_rx, closed: false, }, - peers, + conn_event_rx, ); tokio::pin!(stream); @@ -1320,7 +1320,7 @@ async fn test_sparse_messages_reproduce_race() { let (notif_tx, notif_rx) = mpsc::channel::>(10); let (_, op_rx) = mpsc::channel(1); - let peers = FuturesUnordered::new(); + let (_conn_event_tx, conn_event_rx) = mpsc::channel(10); let (_, bridge_rx) = mpsc::channel(1); let (_, node_rx) = mpsc::channel(1); @@ -1356,7 +1356,7 @@ async fn test_sparse_messages_reproduce_race() { node_rx, MockClient, MockExecutor, - peers, + conn_event_rx, ); tokio::pin!(stream); diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index e4c2f96ed..1ef1c7e35 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -322,29 +322,6 @@ impl Operation for PutOp { origin: origin.clone(), }); - // Best-effort: notify the original requester directly when we are the final hop. - if origin.peer != sender.peer { - let direct_ack = PutMsg::SuccessfulPut { - id: *id, - target: origin.clone(), - key, - sender: own_location.clone(), - origin: origin.clone(), - }; - - if let Err(err) = conn_manager - .send(&origin.peer, NetMessage::from(direct_ack)) - .await - { - tracing::warn!( - tx = %id, - %key, - origin_peer = %origin.peer, - "Failed to send direct SuccessfulPut to origin from final hop: {err}" - ); - } - } - // Mark operation as finished new_state = Some(PutState::Finished { key }); } @@ -441,7 +418,6 @@ impl Operation for PutOp { // Broadcast changes to subscribers let broadcast_to = op_manager.get_broadcast_targets(&key, &sender.peer); - let broadcast_was_empty = broadcast_to.is_empty(); match try_to_broadcast( *id, last_hop, @@ -460,31 +436,6 @@ impl Operation for PutOp { } Err(err) => return Err(err), } - - // When we are the last hop and have no additional broadcast targets, notify the - // original requester directly to avoid relying solely on intermediate hops. - if last_hop && broadcast_was_empty && origin.peer != sender.peer { - let sender_loc = op_manager.ring.connection_manager.own_location(); - let direct_ack = PutMsg::SuccessfulPut { - id: *id, - target: origin.clone(), - key, - sender: sender_loc.clone(), - origin: origin.clone(), - }; - - if let Err(err) = conn_manager - .send(&origin.peer, NetMessage::from(direct_ack)) - .await - { - tracing::warn!( - tx = %id, - %key, - origin_peer = %origin.peer, - "Failed to send direct SuccessfulPut to origin from SeekNode final hop: {err}" - ); - } - } } PutMsg::BroadcastTo { id, @@ -585,30 +536,6 @@ impl Operation for PutOp { conn_manager .send(&upstream.peer, NetMessage::from(ack)) .await?; - - // Also ack the original requester so they don't depend on upstream propagation. - if origin.peer != sender.peer && origin.peer != upstream.peer { - let direct_ack = PutMsg::SuccessfulPut { - id: *id, - target: origin.clone(), - key: *key, - sender: sender.clone(), - origin: origin.clone(), - }; - - if let Err(err) = conn_manager - .send(&origin.peer, NetMessage::from(direct_ack)) - .await - { - tracing::warn!( - tx = %id, - %key, - origin_peer = %origin.peer, - "Failed to send direct SuccessfulPut to origin from broadcast start: {err}" - ); - } - } - new_state = None; } @@ -778,34 +705,6 @@ impl Operation for PutOp { ); return_msg = None; } - - // Send a direct acknowledgement to the original requester if we are not it - if state_origin.peer != local_peer.peer - && !upstream - .as_ref() - .map(|u| u.peer == state_origin.peer) - .unwrap_or(false) - { - let direct_ack = PutMsg::SuccessfulPut { - id: *id, - target: state_origin.clone(), - key, - sender: local_peer, - origin: state_origin.clone(), - }; - - if let Err(err) = conn_manager - .send(&state_origin.peer, NetMessage::from(direct_ack)) - .await - { - tracing::warn!( - tx = %id, - %key, - origin_peer = %state_origin.peer, - "Failed to send direct SuccessfulPut to origin: {err}" - ); - } - } } Some(PutState::Finished { .. }) => { // Operation already completed - this is a duplicate SuccessfulPut message From b067d99ce9b9df9739587f15201f4d2dde3009f6 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 16 Nov 2025 09:03:58 -0600 Subject: [PATCH 2/2] fix: surface decode errors when closing transport --- crates/core/src/node/network_bridge/p2p_protoc.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index b4f26078b..91f2444cb 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use dashmap::DashSet; use either::{Either, Left, Right}; use futures::FutureExt; @@ -2160,10 +2161,13 @@ async fn peer_connection_listener( ?error, "[CONN_LIFECYCLE] Failed to deserialize inbound message; closing connection" ); + let transport_error = TransportError::Other(anyhow!( + "Failed to deserialize inbound message from {remote_addr}: {error:?}" + )); notify_transport_closed( &conn_events, remote_addr, - TransportError::ConnectionClosed(remote_addr), + transport_error, ) .await; return;