Skip to content
Merged
388 changes: 182 additions & 206 deletions crates/core/src/client_events/mod.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/core/src/client_events/session_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl SessionActor {
}
}

/// Handle result delivery with a specific RequestId (for actor mode)
/// Handle result delivery with a specific RequestId
async fn handle_result_delivery_with_request_id(
&mut self,
tx: Transaction,
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ impl ContractHandlerChannel<SenderHalve> {
// Route to session actor if session adapter is installed
if let Some(session_tx) = &self.session_adapter_tx {
// Register all Transaction variants with the session actor
// Note: Subscription variant is legacy and should not be used in actor mode
if let WaitingTransaction::Transaction(tx) = waiting_tx {
let msg = SessionMessage::RegisterTransaction {
tx,
Expand Down
12 changes: 3 additions & 9 deletions crates/core/src/node/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,15 @@ impl MessageProcessor {
// Convert operation result to host result
let host_result = match op_result {
Ok(Some(op_res)) => {
debug!(
"Actor mode: converting network result for transaction {}",
tx
);
debug!("Converting network result for transaction {}", tx);
Arc::new(op_res.to_host_result())
}
Ok(None) => {
debug!("Actor mode: no result to forward for transaction {}", tx);
debug!("No result to forward for transaction {}", tx);
return Ok(()); // No result to forward
}
Err(e) => {
error!(
"Actor mode: network operation error for transaction {}: {}",
tx, e
);
error!("Network operation error for transaction {}: {}", tx, e);
// Create a generic client error for operation failures
use freenet_stdlib::client_api::{ClientError, ErrorKind};
Arc::new(Err(ClientError::from(ErrorKind::OperationError {
Expand Down
26 changes: 24 additions & 2 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,18 @@ async fn gw_peer_connection_listener(
}) else {
break Err(HandshakeError::ConnectionClosed(conn.remote_addr()));
};
let net_message = decode_msg(&msg).unwrap();
let net_message = match decode_msg(&msg) {
Ok(msg) => msg,
Err(e) => {
tracing::error!(
at=?conn.my_address(),
from=%conn.remote_addr(),
error=%e,
"Failed to decode message - closing connection"
);
break Err(HandshakeError::ConnectionClosed(conn.remote_addr()));
}
};
tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), %net_message, "Received message from peer");
match net_message {
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
Expand Down Expand Up @@ -1188,7 +1199,18 @@ async fn gw_transient_peer_conn(
incoming_result = timeout(TIMEOUT, conn.recv()) => {
match incoming_result {
Ok(Ok(msg)) => {
let net_msg = decode_msg(&msg).unwrap();
let net_msg = match decode_msg(&msg) {
Ok(msg) => msg,
Err(e) => {
tracing::error!(
at=?conn.my_address(),
from=%conn.remote_addr(),
error=%e,
"Failed to decode message from transient peer - closing connection"
);
break Err(HandshakeError::ConnectionClosed(conn.remote_addr()));
}
};
if transaction.is_drop_connection_message(&net_msg) {
tracing::debug!("Received drop connection message");
break Ok((InternalEvent::DropInboundConnection(conn.remote_addr()), outbound));
Expand Down
98 changes: 90 additions & 8 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,65 @@ impl P2pConnManager {
}
}
}
ConnEvent::ClosedChannel => {
tracing::info!("Notification channel closed");
break;
ConnEvent::ClosedChannel(reason) => {
match reason {
ChannelCloseReason::Handshake
| ChannelCloseReason::Bridge
| ChannelCloseReason::Controller => {
// Critical internal channels closed - perform cleanup and shutdown gracefully
tracing::error!(
?reason,
is_gateway = self.bridge.op_manager.ring.is_gateway(),
num_connections = self.connections.len(),
"🔴 CRITICAL CHANNEL CLOSED - performing cleanup and shutting down"
);

// Clean up all active connections
let peers_to_cleanup: Vec<_> =
self.connections.keys().cloned().collect();
for peer in peers_to_cleanup {
tracing::debug!(%peer, "Cleaning up active connection due to critical channel closure");

// Clean up ring state
self.bridge
.op_manager
.ring
.prune_connection(peer.clone())
.await;

// Remove from connection map
self.connections.remove(&peer);

// Notify handshake handler to clean up
if let Err(e) = handshake_handler_msg
.drop_connection(peer.clone())
.await
{
tracing::warn!(%peer, error = ?e, "Failed to drop connection during cleanup");
}
}

// Clean up reservations for in-progress connections
// These are connections that started handshake but haven't completed yet
// Notifying the callbacks will trigger the calling code to clean up reservations
tracing::debug!(
awaiting_count = state.awaiting_connection.len(),
"Cleaning up in-progress connection reservations"
);

for (addr, mut callback) in state.awaiting_connection.drain() {
tracing::debug!(%addr, "Notifying awaiting connection of shutdown");
// Best effort notification - ignore errors since we're shutting down anyway
// The callback sender will handle cleanup on their side
let _ = callback
.send_result(Err(HandshakeError::ChannelClosed))
.await;
}

tracing::info!("Cleanup complete, exiting event loop");
break;
}
}
}
ConnEvent::NodeAction(action) => match action {
NodeEvent::DropConnection(peer) => {
Expand Down Expand Up @@ -662,7 +718,14 @@ impl P2pConnManager {
self.handle_handshake_action(event, state, handshake_handler_msg).await?;
Ok(EventResult::Continue)
}
Err(HandshakeError::ChannelClosed) => Ok(EventResult::Event(ConnEvent::ClosedChannel.into())),
Err(HandshakeError::ChannelClosed) => {
tracing::error!(
"🔴 HANDSHAKE CHANNEL CLOSED - handshake handler's channel has closed"
);
Ok(EventResult::Event(
ConnEvent::ClosedChannel(ChannelCloseReason::Handshake).into(),
))
}
Err(e) => {
tracing::warn!("Handshake error: {:?}", e);
Ok(EventResult::Continue)
Expand Down Expand Up @@ -1098,14 +1161,22 @@ impl P2pConnManager {
match msg {
Some(Left((_, msg))) => EventResult::Event(ConnEvent::OutboundMessage(*msg).into()),
Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()),
None => EventResult::Event(ConnEvent::ClosedChannel.into()),
None => {
tracing::error!("🔴 BRIDGE CHANNEL CLOSED - P2P bridge channel has closed");
EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into())
}
}
}

fn handle_node_controller_msg(&self, msg: Option<NodeEvent>) -> EventResult {
match msg {
Some(msg) => EventResult::Event(ConnEvent::NodeAction(msg).into()),
None => EventResult::Event(ConnEvent::ClosedChannel.into()),
None => {
tracing::error!(
"🔴 CONTROLLER CHANNEL CLOSED - node controller channel has closed"
);
EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Controller).into())
}
}
}

Expand Down Expand Up @@ -1239,7 +1310,17 @@ enum ConnEvent {
InboundMessage(NetMessage),
OutboundMessage(NetMessage),
NodeAction(NodeEvent),
ClosedChannel,
ClosedChannel(ChannelCloseReason),
}

#[derive(Debug)]
enum ChannelCloseReason {
/// Handshake channel closed - potentially transient, continue operation
Handshake,
/// Internal bridge channel closed - critical, must shutdown gracefully
Bridge,
/// Node controller channel closed - critical, must shutdown gracefully
Controller,
}

#[allow(dead_code)]
Expand Down Expand Up @@ -1275,7 +1356,8 @@ async fn peer_connection_listener(
Right(action) => {
tracing::debug!(to=%conn.remote_addr(), "Received action from channel");
match action {
ConnEvent::NodeAction(NodeEvent::DropConnection(_)) | ConnEvent::ClosedChannel => {
ConnEvent::NodeAction(NodeEvent::DropConnection(_))
| ConnEvent::ClosedChannel(_) => {
break Err(TransportError::ConnectionClosed(conn.remote_addr()));
}
other => {
Expand Down
20 changes: 17 additions & 3 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ where

let num_connections = connection_manager.num_connections();
let num_reserved = connection_manager.get_reserved_connections();
let max_connections = connection_manager.max_connections;

tracing::debug!(
tx = %id,
Expand All @@ -1068,19 +1069,32 @@ where
// 3. We need the connection registered so the gateway can respond to FindOptimalPeer requests
//
// See PR #1871 discussion with @iduartgomez for context.
if num_connections == 0 {
//
// IMPORTANT (issue #1908): Extended to cover early network formation (first few peers)
// During early network formation, the gateway should accept connections directly to ensure
// bidirectional connections are established. Without this, peers 2+ only get unidirectional
// connections (peer → gateway) but not the reverse (gateway → peer).
//
// However, we still respect max_connections - this only applies when there's capacity.
const EARLY_NETWORK_THRESHOLD: usize = 4;
let has_capacity = num_connections + num_reserved < max_connections;
let is_early_network = is_gateway && accepted && num_connections < EARLY_NETWORK_THRESHOLD;

if num_connections == 0 || (is_early_network && has_capacity) {
if num_reserved == 1 && is_gateway && accepted {
tracing::info!(
tx = %id,
joiner = %joiner.peer,
"Gateway bootstrap: accepting first connection directly (will register immediately)",
connections = num_connections,
has_capacity = %has_capacity,
"Gateway early network: accepting connection directly (will register immediately)",
);
let connectivity_info = ConnectivityInfo::new_bootstrap(
joiner.clone(),
1, // Single check for direct connection
);
return Ok(Some(ConnectState::AwaitingConnectivity(connectivity_info)));
} else {
} else if num_connections == 0 {
tracing::debug!(
tx = %id,
joiner = %joiner.peer,
Expand Down
Loading
Loading