Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
22ef50d
fix: Enable gateway bootstrap when starting with zero connections
sanity Sep 29, 2025
10b4f00
Fix gateway detection in connection bootstrap logic
sanity Sep 29, 2025
21eec39
test: add comprehensive tests for gateway bootstrap functionality
github-actions[bot] Sep 30, 2025
316a115
fix: address CI failures - formatting and non-gateway connection reje…
github-actions[bot] Sep 30, 2025
ddb8581
test: update gateway bootstrap test to query connections via websocket
github-actions[bot] Sep 30, 2025
9d66869
fix: address test failures and formatting issues
github-actions[bot] Sep 30, 2025
a5027dc
fix: allow gateways to accept connections when below minimum threshold
github-actions[bot] Sep 30, 2025
e6f82d6
test: add comprehensive tests for 1 to MIN_CONNECTIONS scenarios
github-actions[bot] Sep 30, 2025
5461586
fix: properly handle connection acceptance and cleanup reservations
github-actions[bot] Sep 30, 2025
6004326
fix: properly handle connection acceptance and cleanup reservations
github-actions[bot] Sep 30, 2025
e8a5865
fix: compile errors and typos in connect.rs
github-actions[bot] Sep 30, 2025
1b2c463
fix: handle direct connection acceptance in handshake tests
github-actions[bot] Sep 30, 2025
f2de6b4
test: fix flakiness in test_gateway_bootstrap_three_node_network
github-actions[bot] Sep 30, 2025
c175814
fix: compilation error in connectivity test (tuple field access)
sanity Sep 30, 2025
af95577
Fix connection forwarding logic for mesh topology
sanity Sep 30, 2025
7d840f0
Fix race condition in gateway bootstrap
sanity Sep 30, 2025
e8c0542
Implement immediate bootstrap connection registration (PR #1871)
sanity Oct 1, 2025
4c8101e
Remove test_three_node_network_connectivity - deferred to issue #1889
sanity Oct 1, 2025
3a558e3
Merge branch 'main' into fix-connection-maintenance-bootstrap
sanity Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 138 additions & 33 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub(super) enum Event {
joiner: PeerId,
op: Option<Box<ConnectOp>>,
forward_info: Option<Box<ForwardInfo>>,
/// If true, this is a gateway bootstrap acceptance that should be registered immediately.
/// See forward_conn() in connect.rs for full explanation.
is_bootstrap: bool,
},
/// An outbound connection to a peer was successfully established.
OutboundConnectionSuccessful {
Expand Down Expand Up @@ -99,6 +102,10 @@ pub(super) enum Event {
#[allow(clippy::large_enum_variant)]
enum ForwardResult {
Forward(PeerId, NetMessage, ConnectivityInfo),
DirectlyAccepted(ConnectivityInfo),
/// Gateway bootstrap acceptance - connection should be registered immediately.
/// See forward_conn() in connect.rs and PR #1871 for context.
BootstrapAccepted(ConnectivityInfo),
Rejected,
}

Expand Down Expand Up @@ -218,6 +225,9 @@ pub(super) struct HandshakeHandler {
/// This is used for testing deterministically with given location. In production this should always be none
/// and locations should be derived from IP addresses.
this_location: Option<Location>,

/// Whether this node is a gateway
is_gateway: bool,
}

impl HandshakeHandler {
Expand All @@ -227,6 +237,7 @@ impl HandshakeHandler {
connection_manager: ConnectionManager,
router: Arc<RwLock<Router>>,
this_location: Option<Location>,
is_gateway: bool,
) -> (Self, HanshakeHandlerMsg, OutboundMessage) {
let (pending_msg_tx, pending_msg_rx) = tokio::sync::mpsc::channel(100);
let (establish_connection_tx, establish_connection_rx) = tokio::sync::mpsc::channel(100);
Expand All @@ -243,6 +254,7 @@ impl HandshakeHandler {
connection_manager,
router,
this_location,
is_gateway,
};
(
connector,
Expand Down Expand Up @@ -366,7 +378,12 @@ impl HandshakeHandler {
Location::from_address(&req.conn.remote_addr())
};
let should_accept = self.connection_manager.should_accept(location, &req.joiner);
if should_accept {
// Check if this is a valid acceptance scenario
// Non-gateways with 0 connections should not accept (they need existing connections to forward through)
let can_accept = should_accept &&
(self.is_gateway || self.connection_manager.num_connections() > 0);

if can_accept {
let accepted_msg = NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Response {
id: req.id,
sender: self.connection_manager.own_location(),
Expand Down Expand Up @@ -400,7 +417,8 @@ impl HandshakeHandler {
..
} = req;

let (ok, forward_info) = {
// Forward the connection or accept it directly
let forward_result = {
// TODO: refactor this so it happens in the background out of the main handler loop
let mut nw_bridge = ForwardPeerMessage {
msg: parking_lot::Mutex::new(None),
Expand All @@ -426,6 +444,7 @@ impl HandshakeHandler {
skip_forwards,
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
is_gateway: self.is_gateway,
};

let f = forward_conn(
Expand All @@ -441,31 +460,82 @@ impl HandshakeHandler {
tracing::error!(%err, "Error forwarding connection");
continue;
}
Ok(ok) => {
if let Some(ok_value) = ok {
let forward_info = nw_bridge.msg.lock().take().map(|(forward_target, msg)| {
ForwardInfo {
target: forward_target,
msg,
}
});
(Some(ok_value), forward_info)
Ok(Some(conn_state)) => {
let ConnectState::AwaitingConnectivity(info) = conn_state else {
unreachable!("forward_conn should return AwaitingConnectivity if successful")
};

// Check if we have a forward message (forwarding) or not (direct acceptance)
if let Some((forward_target, msg)) = nw_bridge.msg.into_inner() {
(Some(ForwardResult::Forward(forward_target.clone(), msg, info)), Some(forward_target))
} else if info.is_bootstrap_acceptance {
// Gateway bootstrap case: connection should be registered immediately
// This bypasses the normal CheckConnectivity flow. See forward_conn()
// bootstrap logic in connect.rs for full explanation.
(Some(ForwardResult::BootstrapAccepted(info)), None)
} else {
(None, None)
// Normal direct acceptance - will wait for CheckConnectivity
(Some(ForwardResult::DirectlyAccepted(info)), None)
}
}
Ok(None) => (None, None),
}
};

return Ok(Event::InboundConnection {
id,
conn,
joiner,
op: ok.map(|ok_value| Box::new(ConnectOp::new(id, Some(ok_value), None, None))),
forward_info: forward_info.map(Box::new),
})
match forward_result {
(Some(ForwardResult::Forward(forward_target, msg, info)), _) => {
return Ok(Event::InboundConnection {
id,
conn,
joiner,
op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))),
forward_info: Some(Box::new(ForwardInfo { target: forward_target, msg })),
is_bootstrap: false,
});
}
(Some(ForwardResult::BootstrapAccepted(info)), _) => {
return Ok(Event::InboundConnection {
id,
conn,
joiner,
op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))),
forward_info: None,
is_bootstrap: true,
});
}
(Some(ForwardResult::DirectlyAccepted(info)), _) => {
return Ok(Event::InboundConnection {
id,
conn,
joiner,
op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))),
forward_info: None,
is_bootstrap: false,
});
}
(Some(ForwardResult::Rejected), _) | (None, _) => {
return Ok(Event::InboundConnection {
id,
conn,
joiner,
op: None,
forward_info: None,
is_bootstrap: false,
});
}
}

} else {
// If should_accept was true but we can't actually accept (non-gateway with 0 connections),
// we need to clean up the reserved connection
if should_accept && !can_accept {
self.connection_manager.prune_in_transit_connection(&req.joiner);
tracing::debug!(
"Non-gateway with 0 connections cannot accept connection from {:?}",
req.joiner
);
}

let InboundGwJoinRequest {
mut conn,
id,
Expand Down Expand Up @@ -503,7 +573,31 @@ impl HandshakeHandler {
msg: Box::new(msg),
});
}
Ok(ForwardResult::BootstrapAccepted(info)) => {
// Gateway bootstrap: First connection should be registered immediately.
// This bypasses the normal transient connection flow.
// See forward_conn() in connect.rs for full explanation.
return Ok(Event::InboundConnection {
id,
conn,
joiner,
op: Some(Box::new(ConnectOp::new(id, Some(ConnectState::AwaitingConnectivity(info)), None, None))),
forward_info: None,
is_bootstrap: true,
});
}
Ok(ForwardResult::DirectlyAccepted(_info)) => {
// Connection was accepted directly (not forwarded)
// For now, treat this as a rejection since we shouldn't hit this case in transient connections
// Clean up the reserved connection slot
self.connection_manager.prune_in_transit_connection(&joiner);
self.outbound_messages.remove(&remote);
self.connecting.remove(&remote);
return Ok(Event::InboundConnectionRejected { peer_id: joiner });
}
Ok(ForwardResult::Rejected) => {
// Clean up the reserved connection slot
self.connection_manager.prune_in_transit_connection(&joiner);
self.outbound_messages.remove(&remote);
self.connecting.remove(&remote);
return Ok(Event::InboundConnectionRejected { peer_id: joiner });
Expand Down Expand Up @@ -592,6 +686,7 @@ impl HandshakeHandler {
skip_forwards: transaction.skip_forwards.clone(),
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
is_gateway: self.is_gateway,
};

match forward_conn(
Expand All @@ -604,14 +699,22 @@ impl HandshakeHandler {
.await
{
Ok(Some(conn_state)) => {
let (forward_target, msg) = nw_bridge
.msg
.into_inner()
.expect("target was successfully set");
let ConnectState::AwaitingConnectivity(info) = conn_state else {
unreachable!("forward_conn should return AwaitingConnectivity if successful")
};
Ok(ForwardResult::Forward(forward_target, msg, info))

// Check if we have a forward message (forwarding) or not (direct acceptance)
if let Some((forward_target, msg)) = nw_bridge.msg.into_inner() {
Ok(ForwardResult::Forward(forward_target, msg, info))
} else if info.is_bootstrap_acceptance {
// Gateway bootstrap case: connection should be registered immediately
// This bypasses the normal CheckConnectivity flow. See forward_conn()
// bootstrap logic in connect.rs for full explanation.
Ok(ForwardResult::BootstrapAccepted(info))
} else {
// Normal direct acceptance - will wait for CheckConnectivity
Ok(ForwardResult::DirectlyAccepted(info))
}
}
Ok(None) => {
tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr(), "Rejecting connection, no peers found to forward");
Expand Down Expand Up @@ -1383,6 +1486,7 @@ mod tests {
fn config_handler(
addr: impl Into<SocketAddr>,
existing_connections: Option<Vec<Connection>>,
is_gateway: bool,
) -> (HandshakeHandler, TestVerifier) {
let (outbound_sender, outbound_recv) = mpsc::channel(100);
let outbound_conn_handler = OutboundConnectionHandler::new(outbound_sender);
Expand All @@ -1408,6 +1512,7 @@ mod tests {
mngr,
Arc::new(RwLock::new(router)),
None,
is_gateway,
);
(
handler,
Expand Down Expand Up @@ -1461,7 +1566,7 @@ mod tests {
#[tokio::test]
async fn test_gateway_inbound_conn_success() -> anyhow::Result<()> {
let addr: SocketAddr = ([127, 0, 0, 1], 10000).into();
let (mut handler, mut test) = config_handler(addr, None);
let (mut handler, mut test) = config_handler(addr, None, true);

let remote_addr = ([127, 0, 0, 1], 10001).into();
let test_controller = async {
Expand Down Expand Up @@ -1502,7 +1607,7 @@ mod tests {
let existing_conn =
Connection::new(remote_peer_loc.peer, remote_peer_loc.location.unwrap());

let (mut handler, mut test) = config_handler(addr, Some(vec![existing_conn]));
let (mut handler, mut test) = config_handler(addr, Some(vec![existing_conn]), true);

// Configure the handler to reject connections by setting max_connections to 1
handler.connection_manager.max_connections = 1;
Expand Down Expand Up @@ -1547,7 +1652,7 @@ mod tests {
#[test_log::test(tokio::test)]
async fn test_peer_to_gw_outbound_conn() -> anyhow::Result<()> {
let addr = ([127, 0, 0, 1], 10000).into();
let (mut handler, mut test) = config_handler(addr, None);
let (mut handler, mut test) = config_handler(addr, None, false);

let joiner_key = TransportKeypair::new();
let pub_key = joiner_key.public().clone();
Expand Down Expand Up @@ -1614,7 +1719,7 @@ mod tests {
#[tokio::test]
async fn test_peer_to_gw_outbound_conn_failed() -> anyhow::Result<()> {
let addr = ([127, 0, 0, 1], 10000).into();
let (mut handler, mut test) = config_handler(addr, None);
let (mut handler, mut test) = config_handler(addr, None, false);

let joiner_key = TransportKeypair::new();
let pub_key = joiner_key.public().clone();
Expand Down Expand Up @@ -1659,7 +1764,7 @@ mod tests {
let peer_addr: SocketAddr = ([127, 0, 0, 1], 10001).into();
let joiner_addr: SocketAddr = ([127, 0, 0, 1], 10002).into();

let (mut gw_handler, mut gw_test) = config_handler(gw_addr, None);
let (mut gw_handler, mut gw_test) = config_handler(gw_addr, None, true);

// the gw only will accept one connection
gw_handler.connection_manager.max_connections = 1;
Expand Down Expand Up @@ -1757,7 +1862,7 @@ mod tests {
async fn test_peer_to_gw_outbound_conn_rejected() -> anyhow::Result<()> {
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None);
let joiner_addr = ([127, 0, 0, 1], 10001).into();
let (mut handler, mut test) = config_handler(joiner_addr, None);
let (mut handler, mut test) = config_handler(joiner_addr, None, false);

let gw_key = TransportKeypair::new();
let gw_pub_key = gw_key.public().clone();
Expand Down Expand Up @@ -1927,7 +2032,7 @@ mod tests {
async fn test_peer_to_gw_outbound_conn_forwarded() -> anyhow::Result<()> {
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None);
let joiner_addr = ([127, 0, 0, 1], 10001).into();
let (mut handler, mut test) = config_handler(joiner_addr, None);
let (mut handler, mut test) = config_handler(joiner_addr, None, false);

let gw_key = TransportKeypair::new();
let gw_pub_key = gw_key.public().clone();
Expand Down Expand Up @@ -2012,7 +2117,7 @@ mod tests {
#[tokio::test]
async fn test_peer_to_peer_outbound_conn_failed() -> anyhow::Result<()> {
let addr: SocketAddr = ([127, 0, 0, 1], 10001).into();
let (mut handler, mut test) = config_handler(addr, None);
let (mut handler, mut test) = config_handler(addr, None, false);

let peer_key = TransportKeypair::new();
let peer_pub_key = peer_key.public().clone();
Expand Down Expand Up @@ -2057,7 +2162,7 @@ mod tests {
#[tokio::test]
async fn test_peer_to_peer_outbound_conn_succeeded() -> anyhow::Result<()> {
let addr: SocketAddr = ([127, 0, 0, 1], 10001).into();
let (mut handler, mut test) = config_handler(addr, None);
let (mut handler, mut test) = config_handler(addr, None, false);

let peer_key = TransportKeypair::new();
let peer_pub_key = peer_key.public().clone();
Expand Down
Loading
Loading