Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d6426f5
fix: track transient connections separately
sanity Nov 18, 2025
3511ae8
fix: tidy transient registry formatting
sanity Nov 18, 2025
13851e7
fix: clean transient promotion handling
sanity Nov 18, 2025
a262c27
fix: honor transient budget and promote correctly
sanity Nov 18, 2025
e16fd8b
fix: remove unused transient helpers
sanity Nov 19, 2025
adf89ad
test: add large network soak test with diagnostic snapshots
sanity Nov 19, 2025
7f90160
test: add large network soak test with diagnostic snapshots
sanity Nov 19, 2025
d8742ab
test: harden soak riverctl retries
sanity Nov 19, 2025
5f387be
refactor: rename courtesy links to transient
sanity Nov 20, 2025
107a7d1
test: fix ExpectedInboundTracker helper for transient rename
sanity Nov 20, 2025
985cb17
feat: expose connection tuning and bump test harness
sanity Nov 20, 2025
35b87c3
test: instrument neighbor candidates and live tx tracking
sanity Nov 20, 2025
3bdda45
fix: transient connection handling and viz tooling
sanity Nov 21, 2025
7465425
fix: enforce caps on transient promotion and add cap repro test
sanity Nov 21, 2025
32de923
test: add small cap repro harness
sanity Nov 21, 2025
1034d91
fix: report ring connections in diagnostics and bound soak caps
sanity Nov 21, 2025
47f69f2
test: add warmup and ring snapshots to soak
sanity Nov 21, 2025
aff9286
chore: use published freenet-test-network
sanity Nov 22, 2025
9cec602
fix: overwrite expected inbound for same peer
sanity Nov 22, 2025
210716f
test: avoid unsupported connection cap flags
sanity Nov 22, 2025
93f0222
fix: restore connection helper methods
sanity Nov 22, 2025
367d2e3
fix: promote transient gateway links
sanity Nov 22, 2025
7f8d1ed
test: add settle delay after mesh check
sanity Nov 22, 2025
4d6fe20
fix: transient handshake and routing cleanup
sanity Nov 22, 2025
25777f6
fix(transient): tune defaults and logging per review
sanity Nov 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(crate) const PCK_VERSION: &str = env!("CARGO_PKG_VERSION");
// Initialize the executor once.
static ASYNC_RT: LazyLock<Option<Runtime>> = LazyLock::new(GlobalExecutor::initialize_async_rt);

const DEFAULT_TRANSIENT_BUDGET: usize = 32;
const DEFAULT_TRANSIENT_BUDGET: usize = 2048;
const DEFAULT_TRANSIENT_TTL_SECS: u64 = 30;

const QUALIFIER: &str = "";
Expand Down
129 changes: 106 additions & 23 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,12 @@ impl P2pConnManager {
"Failed to enqueue DropConnection command"
);
}
// Immediately prune topology counters so we don't leak open connection slots.
ctx.bridge
.op_manager
.ring
.prune_connection(peer.clone())
.await;
if let Some(conn) = ctx.connections.remove(&peer) {
// TODO: review: this could potentially leave garbage tasks in the background with peer listener
match timeout(
Expand Down Expand Up @@ -828,15 +834,23 @@ impl P2pConnManager {

// Collect network information
if config.include_network_info {
let connected_peers: Vec<_> = ctx
.connections
.keys()
.map(|p| (p.to_string(), p.addr.to_string()))
.collect();
let cm = &op_manager.ring.connection_manager;
let connections_by_loc = cm.get_connections_by_location();
let mut connected_peers = Vec::new();
for conns in connections_by_loc.values() {
for conn in conns {
connected_peers.push((
conn.location.peer.to_string(),
conn.location.peer.addr.to_string(),
));
}
}
connected_peers.sort_by(|a, b| a.0.cmp(&b.0));
connected_peers.dedup_by(|a, b| a.0 == b.0);

response.network_info = Some(NetworkInfo {
active_connections: connected_peers.len(),
connected_peers,
active_connections: ctx.connections.len(),
});
}

Expand Down Expand Up @@ -915,28 +929,43 @@ impl P2pConnManager {
}
}

// Collect topology-backed connection info (exclude transient transports).
let cm = &op_manager.ring.connection_manager;
let connections_by_loc = cm.get_connections_by_location();
let mut connected_peer_ids = Vec::new();
if config.include_detailed_peer_info {
use freenet_stdlib::client_api::ConnectedPeerInfo;
for conns in connections_by_loc.values() {
for conn in conns {
connected_peer_ids.push(conn.location.peer.to_string());
response.connected_peers_detailed.push(
ConnectedPeerInfo {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why all this conversions to string? seem unnecessary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept the conversions because NodeDiagnosticsResponse uses String for peer_id/address fields; the source types here are PeerId/SocketAddr so we need owned strings for serialization. If we want to send the raw types instead we would need to change the diagnostics payload schema in stdlib.

peer_id: conn.location.peer.to_string(),
address: conn.location.peer.addr.to_string(),
},
);
}
}
} else {
for conns in connections_by_loc.values() {
connected_peer_ids.extend(
conns.iter().map(|c| c.location.peer.to_string()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to convert to string here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above: diagnostics payload is String-typed, so we need owned strings for the public key and addresses. Left the conversions in-place.

);
}
}
connected_peer_ids.sort();
connected_peer_ids.dedup();

// Collect system metrics
if config.include_system_metrics {
let seeding_contracts =
op_manager.ring.all_network_subscriptions().len() as u32;
response.system_metrics = Some(SystemMetrics {
active_connections: ctx.connections.len() as u32,
active_connections: connected_peer_ids.len() as u32,
seeding_contracts,
});
}

// Collect detailed peer information if requested
if config.include_detailed_peer_info {
use freenet_stdlib::client_api::ConnectedPeerInfo;
// Populate detailed peer information from actual connections
for peer in ctx.connections.keys() {
response.connected_peers_detailed.push(ConnectedPeerInfo {
peer_id: peer.to_string(),
address: peer.addr.to_string(),
});
}
}

match timeout(
Duration::from_secs(2),
callback.send(QueryResult::NodeDiagnostics(response)),
Expand Down Expand Up @@ -976,6 +1005,13 @@ impl P2pConnManager {
} => {
tracing::debug!(%tx, %key, "local subscribe complete");

// If this is a child operation, complete it and let the parent flow handle result delivery.
if op_manager.is_sub_operation(tx) {
tracing::info!(%tx, %key, "completing child subscribe operation");
op_manager.completed(tx);
continue;
}

if !op_manager.is_sub_operation(tx) {
let response = Ok(HostResponse::ContractResponse(
ContractResponse::SubscribeResponse { key, subscribed },
Expand Down Expand Up @@ -1281,10 +1317,57 @@ impl P2pConnManager {
let loc = entry
.location
.unwrap_or_else(|| Location::from_address(&peer.addr));
// Re-run admission + cap guard when promoting a transient connection.
let should_accept = connection_manager.should_accept(loc, &peer);
if !should_accept {
tracing::warn!(
tx = %tx,
%peer,
%loc,
"connect_peer: promotion rejected by admission logic"
);
callback
.send_result(Err(()))
.await
.inspect_err(|err| {
tracing::debug!(
tx = %tx,
remote = %peer,
?err,
"connect_peer: failed to notify rejected-promotion callback"
);
})
.ok();
return Ok(());
}
let current = connection_manager.connection_count();
if current >= connection_manager.max_connections {
tracing::warn!(
tx = %tx,
%peer,
current_connections = current,
max_connections = connection_manager.max_connections,
%loc,
"connect_peer: rejecting transient promotion to enforce cap"
);
callback
.send_result(Err(()))
.await
.inspect_err(|err| {
tracing::debug!(
tx = %tx,
remote = %peer,
?err,
"connect_peer: failed to notify cap-rejection callback"
);
})
.ok();
return Ok(());
}
self.bridge
.op_manager
.ring
.add_connection(loc, peer.clone(), false)
.add_connection(loc, peer.clone(), true)
.await;
tracing::info!(tx = %tx, remote = %peer, "connect_peer: promoted transient");
}
Expand Down Expand Up @@ -1734,7 +1817,7 @@ impl P2pConnManager {
);
return Ok(());
}
let current = connection_manager.num_connections();
let current = connection_manager.connection_count();
if current >= connection_manager.max_connections {
tracing::warn!(
%peer_id,
Expand All @@ -1749,7 +1832,7 @@ impl P2pConnManager {
self.bridge
.op_manager
.ring
.add_connection(loc, peer_id.clone(), false)
.add_connection(loc, peer_id.clone(), true)
.await;
if is_transient {
connection_manager.drop_transient(&peer_id);
Expand All @@ -1760,7 +1843,7 @@ impl P2pConnManager {
let should_accept = connection_manager.should_accept(loc, &peer_id);
if should_accept {
connection_manager.drop_transient(&peer_id);
let current = connection_manager.num_connections();
let current = connection_manager.connection_count();
if current >= connection_manager.max_connections {
tracing::warn!(
%peer_id,
Expand Down
76 changes: 50 additions & 26 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,15 @@ pub(crate) async fn join_ring_request(
OpError::ConnError(ConnectionError::LocationUnknown)
})?;

tracing::debug!(
peer = %gateway.peer,
reserved_connections = op_manager
.ring
.connection_manager
.get_reserved_connections(),
"join_ring_request: evaluating gateway connection attempt"
);

if !op_manager
.ring
.connection_manager
Expand Down Expand Up @@ -926,56 +935,71 @@ pub(crate) async fn initial_join_procedure(
gateways.len()
);

let mut in_flight_gateways = HashSet::new();

loop {
let open_conns = op_manager.ring.open_connections();
let unconnected_gateways: Vec<_> =
op_manager.ring.is_not_connected(gateways.iter()).collect();
let available_gateways: Vec<_> = unconnected_gateways
.into_iter()
.filter(|gateway| !in_flight_gateways.contains(&gateway.peer))
.collect();

tracing::debug!(
"Connection status: open_connections = {}, unconnected_gateways = {}",
open_conns,
unconnected_gateways.len()
open_connections = open_conns,
inflight_gateway_dials = in_flight_gateways.len(),
available_gateways = available_gateways.len(),
"Connection status before join attempt"
);

let unconnected_count = unconnected_gateways.len();
let available_count = available_gateways.len();

if open_conns < BOOTSTRAP_THRESHOLD && unconnected_count > 0 {
if open_conns < BOOTSTRAP_THRESHOLD && available_count > 0 {
tracing::info!(
"Below bootstrap threshold ({} < {}), attempting to connect to {} gateways",
open_conns,
BOOTSTRAP_THRESHOLD,
number_of_parallel_connections.min(unconnected_count)
number_of_parallel_connections.min(available_count)
);
let select_all = FuturesUnordered::new();
for gateway in unconnected_gateways
let mut select_all = FuturesUnordered::new();
for gateway in available_gateways
.into_iter()
.shuffle()
.take(number_of_parallel_connections)
{
tracing::info!(%gateway, "Attempting connection to gateway");
in_flight_gateways.insert(gateway.peer.clone());
let op_manager = op_manager.clone();
let gateway_clone = gateway.clone();
select_all.push(async move {
(join_ring_request(None, gateway, &op_manager).await, gateway)
(
join_ring_request(None, &gateway_clone, &op_manager).await,
gateway_clone,
)
});
}
select_all
.for_each(|(res, gateway)| async move {
if let Err(error) = res {
if !matches!(
error,
OpError::ConnError(
crate::node::ConnectionError::UnwantedConnection
)
) {
tracing::error!(
%gateway,
%error,
"Failed while attempting connection to gateway"
);
}
while let Some((res, gateway)) = select_all.next().await {
if let Err(error) = res {
if !matches!(
error,
OpError::ConnError(crate::node::ConnectionError::UnwantedConnection)
) {
tracing::error!(
%gateway,
%error,
"Failed while attempting connection to gateway"
);
}
})
.await;
}
in_flight_gateways.remove(&gateway.peer);
}
} else if open_conns < BOOTSTRAP_THRESHOLD && available_count == 0 {
tracing::debug!(
open_connections = open_conns,
inflight = in_flight_gateways.len(),
"Below threshold but all gateways are already connected or in-flight"
);
} else if open_conns >= BOOTSTRAP_THRESHOLD {
tracing::trace!(
"Have {} connections (>= threshold of {}), not attempting gateway connections",
Expand Down
Loading
Loading