Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/src/ring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ impl Ring {
&neighbor_locations,
&self.connection_manager.own_location().location,
Instant::now(),
current_connections,
);

tracing::info!(
Expand Down
59 changes: 42 additions & 17 deletions crates/core/src/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ impl TopologyManager {
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
my_location: &Option<Location>,
at_time: Instant,
current_connections: usize,
) -> TopologyAdjustment {
#[cfg(debug_assertions)]
{
Expand All @@ -294,21 +295,22 @@ impl TopologyManager {
{
LAST_LOG.with(|last_log| {
tracing::trace!(
"Adjusting topology at {:?}. Current neighbors: {:?}",
"Adjusting topology at {:?}. Current connections: {}, Filtered neighbors: {}",
at_time,
current_connections,
neighbor_locations.len()
);
*last_log.borrow_mut() = Instant::now();
});
}
}

if neighbor_locations.len() < self.limits.min_connections {
if current_connections < self.limits.min_connections {
let mut locations = Vec::new();
let below_threshold = self.limits.min_connections - neighbor_locations.len();
let below_threshold = self.limits.min_connections - current_connections;
if below_threshold > 0 {
// If we have no connections at all, bootstrap by targeting own location
if neighbor_locations.is_empty() {
if current_connections == 0 {
match my_location {
Some(location) => {
// The first connect message should target the peer's own
Expand All @@ -331,7 +333,7 @@ impl TopologyManager {
LAST_LOG.with(|last_log| {
tracing::trace!(
minimum_num_peers_hard_limit = self.limits.min_connections,
num_peers = neighbor_locations.len(),
num_peers = current_connections,
to_add = below_threshold,
"Bootstrap: adding first connection at own location"
);
Expand All @@ -341,7 +343,7 @@ impl TopologyManager {
}
}
// If we have 1-4 connections, use random locations for diversity
else if neighbor_locations.len() < 5 {
else if current_connections < 5 {
for _i in 0..below_threshold {
locations.push(Location::random());
}
Expand All @@ -356,7 +358,7 @@ impl TopologyManager {
LAST_LOG.with(|last_log| {
tracing::trace!(
minimum_num_peers_hard_limit = self.limits.min_connections,
num_peers = neighbor_locations.len(),
num_peers = current_connections,
to_add = below_threshold,
"Early stage: adding connections at random locations for diversity"
);
Expand All @@ -381,6 +383,16 @@ impl TopologyManager {
return TopologyAdjustment::AddConnections(locations);
}

// Skip resource-based removal in very small networks to avoid destabilizing them
// During startup or in small test networks, we need stability more than optimization
if current_connections < 5 {
debug!(
"Skipping resource-based topology adjustment for small network (connections: {})",
current_connections
);
return TopologyAdjustment::NoChange;
}

let increase_usage_if_below: RateProportion =
RateProportion::new(MINIMUM_DESIRED_RESOURCE_USAGE_PROPORTION);
let decrease_usage_if_above: RateProportion =
Expand All @@ -392,11 +404,10 @@ impl TopologyManager {
debug!(?usage_proportion, "Resource usage information");

let adjustment: anyhow::Result<TopologyAdjustment> =
if neighbor_locations.len() > self.limits.max_connections {
if current_connections > self.limits.max_connections {
debug!(
"Number of neighbors ({:?}) is above maximum ({:?}), removing connections",
neighbor_locations.len(),
self.limits.max_connections
"Number of connections ({:?}) is above maximum ({:?}), removing connections",
current_connections, self.limits.max_connections
);

self.update_connection_acquisition_strategy(ConnectionAcquisitionStrategy::Slow);
Expand Down Expand Up @@ -721,8 +732,12 @@ mod tests {
neighbor_locations.insert(peer.location.unwrap(), vec![]);
}

let adjustment =
resource_manager.adjust_topology(&neighbor_locations, &None, Instant::now());
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
peers.len(),
);
match adjustment {
TopologyAdjustment::RemoveConnections(peers) => {
assert_eq!(peers.len(), 1);
Expand Down Expand Up @@ -766,8 +781,12 @@ mod tests {
neighbor_locations.insert(peer.location.unwrap(), vec![]);
}

let adjustment =
resource_manager.adjust_topology(&neighbor_locations, &None, Instant::now());
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
peers.len(),
);

match adjustment {
TopologyAdjustment::AddConnections(locations) => {
Expand Down Expand Up @@ -807,8 +826,12 @@ mod tests {
neighbor_locations.insert(peer.location.unwrap(), vec![]);
}

let adjustment =
resource_manager.adjust_topology(&neighbor_locations, &None, report_time);
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
report_time,
peers.len(),
);

match adjustment {
TopologyAdjustment::NoChange => {}
Expand Down Expand Up @@ -848,6 +871,7 @@ mod tests {
&neighbor_locations,
&Some(my_location),
report_time,
peers.len(),
);

match adjustment {
Expand Down Expand Up @@ -992,6 +1016,7 @@ mod tests {
&neighbor_locations,
&Some(Location::new(0.5)),
Instant::now(),
1, // 1 current connection
);

match adjustment {
Expand Down
Loading