From 2fa124016eaf999d4711738fe1edf7d6735d43b6 Mon Sep 17 00:00:00 2001 From: Ian Date: Wed, 15 Oct 2025 09:02:15 +0000 Subject: [PATCH 1/3] intermediate puish --- Cargo.lock | 8 +- Cargo.toml | 4 +- src/community_search/leiden/mod.rs | 1 + src/community_search/leiden/optimizer.rs | 2 +- src/community_search/leiden/parallel.rs | 368 ++++++++++++++++++ src/community_search/leiden/partition/mod.rs | 3 + .../leiden/partition/modularity.rs | 40 ++ src/community_search/leiden/partition/rb.rs | 71 +++- 8 files changed, 475 insertions(+), 22 deletions(-) create mode 100644 src/community_search/leiden/parallel.rs diff --git a/Cargo.lock b/Cargo.lock index 95c1980..507d9ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,9 +96,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "approx" @@ -1052,9 +1052,9 @@ dependencies = [ [[package]] name = "single-utilities" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f97ec9e4d36daf3ad33945cf05da59e950f4d636b7b5f09ded4b4819e29126aa" +checksum = "af872bc6b62cf87b9a6c5ecebc98137877d127fb6dcb329410b739eff914fa29" dependencies = [ "num-traits", ] diff --git a/Cargo.toml b/Cargo.toml index 27349d3..116278d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,14 +11,14 @@ description = "A high-performance network clustering library implementing commun [dependencies] -anyhow = "1.0.98" +anyhow = "1.0.100" kiddo = {version = "5.2.2" } nalgebra-sparse = "0.10.0" ndarray = {version = "0.16.1" , features = ["rayon"]} num-traits = "0.2.19" petgraph = { version = "0.8.2", features = ["rayon"] } rayon = "1.10.0" -single-utilities = "0.8.5" +single-utilities = "0.8.6" rand = "0.9.0" rand_chacha = {version = "0.9.0"} hnsw_rs = {version = "0.3.2", features = ["simdeez_f"]} diff --git a/src/community_search/leiden/mod.rs b/src/community_search/leiden/mod.rs index cd74ca9..0b089dc 100644 --- a/src/community_search/leiden/mod.rs +++ b/src/community_search/leiden/mod.rs @@ -7,6 +7,7 @@ pub mod partition; mod optimizer; pub use optimizer::LeidenOptimizer; +mod parallel; /// Strategy for selecting communities to consider during optimization. #[derive(Debug, Clone, Copy, PartialEq)] diff --git a/src/community_search/leiden/optimizer.rs b/src/community_search/leiden/optimizer.rs index 755ae20..1df3165 100644 --- a/src/community_search/leiden/optimizer.rs +++ b/src/community_search/leiden/optimizer.rs @@ -434,7 +434,7 @@ where for partition in partitions.iter() { if partition.node_count() != n { - panic!("Number of nodes are not equal for all graphs."); + return Err(anyhow::anyhow!("Number of nodes are not equal for all graphs.")); } } diff --git a/src/community_search/leiden/parallel.rs b/src/community_search/leiden/parallel.rs new file mode 100644 index 0000000..3e9e21b --- /dev/null +++ b/src/community_search/leiden/parallel.rs @@ -0,0 +1,368 @@ +use std::{ + collections::{HashSet, VecDeque}, string::ParseError, thread::current +}; + +use num_traits::Float; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use single_utilities::traits::FloatOpsTS; + +use crate::{ + community_search::leiden::{ + ConsiderComms, + partition::{self, VertexPartition}, + }, + network::{CSRNetwork, grouping::NetworkGrouping}, +}; + +#[derive(Debug, Clone, Copy)] +pub struct ProposedMove { + pub node: usize, + pub from_comm: usize, + pub to_comm: usize, + pub improvement: N, +} + +impl ProposedMove { + #[inline] + pub fn new(node: usize, from_comm: usize, to_comm: usize, improvement: N) -> Self { + Self { + node, + from_comm, + to_comm, + improvement, + } + } + + #[inline] + pub fn is_beneficial(&self) -> bool { + self.from_comm != self.to_comm && self.improvement > N::zero() + } +} + +#[derive(Debug, Clone)] +pub struct ParallelConfig { + pub max_batch_size: usize, + pub max_iterations: usize, +} + +impl Default for ParallelConfig { + fn default() -> Self { + Self { + max_batch_size: 10_000, + max_iterations: 1_000, + } + } +} + +pub struct ConflictFreeBatcher { + max_batch_size: usize, +} + +impl ConflictFreeBatcher { + pub fn new(max_batch_size: usize) -> Self { + Self { max_batch_size } + } + + pub fn create_batches( + &self, + nodes: &[usize], + network: &CSRNetwork, + skip_stable: &[bool], + ) -> Vec> + where + N: FloatOpsTS + 'static, + { + let mut batches = Vec::new(); + let mut remaining: Vec = + nodes.iter().copied().filter(|&n| !skip_stable[n]).collect(); + + while !remaining.is_empty() { + let (batch, leftover) = self.extract_batch(&remaining, network); + + if batch.is_empty() { + break; + } + + batches.push(batch); + remaining = leftover; + } + batches + } + + fn extract_batch( + &self, + candidates: &[usize], + network: &CSRNetwork, + ) -> (Vec, Vec) + where + N: FloatOpsTS + 'static, + { + let mut batch = Vec::new(); + let mut leftover = Vec::new(); + let mut locked = vec![false; network.node_count()]; + + for &node in candidates { + if self.has_conflict(node, network, &locked) { + leftover.push(node); + } else { + batch.push(node); + self.mark_locked(node, network, &mut locked); + if batch.len() >= self.max_batch_size { + leftover.extend_from_slice( + &candidates[candidates.iter().position(|&n| n == node).unwrap() + 1..], + ); + break; + } + } + } + (batch, leftover) + } + + #[inline] + fn has_conflict(&self, node: usize, network: &CSRNetwork, locked: &[bool]) -> bool + where + N: FloatOpsTS + 'static, + { + if locked[node] { + return true; + } + + for (neighbor, _) in network.neighbors(node) { + if locked[neighbor] { + return true; + } + } + false + } + + #[inline] + fn mark_locked(&self, node: usize, network: &CSRNetwork, locked: &mut [bool]) + where + N: FloatOpsTS + 'static, + { + locked[node] = true; + for (neighbor, _) in network.neighbors(node) { + locked[neighbor] = true; + } + } +} + +pub struct ParallelEvaluator; + +impl ParallelEvaluator { + pub fn evaluate_batch( + batch: &[usize], + partitions: &[P], + layer_weights: &[N], + consider_comms: ConsiderComms, + consider_empty: bool, + max_comm_size: Option, + ) -> Vec> + where + N: FloatOpsTS + 'static, + G: NetworkGrouping, + P: VertexPartition, + { + batch + .par_iter() + .filter_map(|&node| { + Self::evaluate_node( + node, + partitions, + layer_weights, + consider_comms, + consider_empty, + max_comm_size, + ) + }) + .collect() + } + + fn evaluate_node( + node: usize, + partitions: &[P], + layer_weights: &[N], + consider_comms: ConsiderComms, + consider_empty: bool, + max_comm_size: Option, + ) -> Option> + where + N: FloatOpsTS + 'static, + G: NetworkGrouping, + P: VertexPartition, + { + let current_comm = partitions[0].membership(node); + + let candidates = Self::get_candidates(node, partitions, consider_comms); + + let (best_comm, best_improv) = Self::find_best( + node, + current_comm, + &candidates, + partitions, + layer_weights, + consider_empty, + max_comm_size, + )?; + + if best_comm != current_comm && best_improv > N::zero() { + Some(ProposedMove { + node, + from_comm: current_comm, + to_comm: best_comm, + improvement: best_improv, + }) + } else { + None + } + } + + fn get_candidates(node: usize, partitions: &[P], stategy: ConsiderComms) -> Vec + where + N: FloatOpsTS + 'static, + G: NetworkGrouping, + P: VertexPartition, + { + let mut comms = HashSet::new(); + match stategy { + ConsiderComms::AllComms => { + for comm in 0..partitions[0].community_count() { + if partitions[0].cnodes(comm) > 0 { + comms.insert(comm); + } + } + } + ConsiderComms::AllNeighComms => { + for partition in partitions { + for (neighbor, _) in partition.network().neighbors(node) { + comms.insert(partition.membership(neighbor)); + } + } + } + ConsiderComms::RandComm | ConsiderComms::RandNeighComm => { + for partition in partitions { + for (neighbor, _) in partition.network().neighbors(node) { + comms.insert(partition.membership(neighbor)); + } + } + } + } + + comms.into_iter().collect() + } + + fn find_best( + node: usize, + current_comm: usize, + candidates: &[usize], + partitions: &[P], + layer_weights: &[N], + consider_empty: bool, + max_comm_size: Option, + ) -> Option<(usize, N)> + where + N: FloatOpsTS + 'static, + G: NetworkGrouping, + P: VertexPartition, + { + let epsilon = N::from(10.0).unwrap() * ::epsilon(); + let mut best_comm = current_comm; + let mut best_improv = epsilon; + + let mut to_check = candidates.to_vec(); + if consider_empty && partitions[0].cnodes(current_comm) > 1 { + to_check.push(partitions[0].community_count()); + } + + for &comm in &to_check { + if let Some(max_size) = max_comm_size { + if comm < partitions[0].community_count() + && partitions[0].csize(comm) + 1 > max_size + { + continue; + } + } + + let mut total_improv = N::zero(); + for (idx, partitions) in partitions.iter().enumerate() { + let layer_improv = partitions.diff_move_readonly(node, comm); + total_improv += layer_weights[idx] * layer_improv; + } + + if total_improv > best_improv { + best_comm = comm; + best_improv = total_improv; + } + } + + Some((best_comm, best_improv)) + } +} + +pub struct MoveApplicator; + +impl MoveApplicator { + pub fn apply_moves( + moves: Vec>, + partitions: &mut [P], + is_stable: &mut [bool], + is_fixed: &[bool], + pending: &mut VecDeque, + ) -> N + where + N: FloatOpsTS + 'static, + G: NetworkGrouping + Clone, + P: VertexPartition, + { + + let mut total_improv = N::zero(); + + if moves.is_empty() { + return N::zero(); + } + + for m in moves { + Self::ensure_community_exists(m.to_comm, partitions); + + for partition in partitions.iter_mut() { + partition.move_node(m.node, m.to_comm); + } + + total_improv += m.improvement; + is_stable[m.node] = true; + + let network = partitions[0].network(); + Self::mark_neighbors_unstable(m.node, network, is_stable, is_fixed, pending); + } + total_improv + } + + fn ensure_community_exists(comm: usize, partitions: &mut [P]) + where + N: FloatOpsTS + 'static, + G: NetworkGrouping + Clone, + P: VertexPartition { + for partition in partitions.iter_mut() { + while partition.community_count() <= comm { + partition.add_empty_community(); + } + } + } + + fn mark_neighbors_unstable( + node: usize, + network: &CSRNetwork, + is_stable: &mut [bool], + is_fixed: &[bool], + pending: &mut VecDeque + ) + where + N: FloatOpsTS + 'static { + for (neighbor, _) in network.neighbors(node) { + if !is_fixed[neighbor] && is_stable[neighbor] { + is_stable[neighbor] = false; + pending.push_back(neighbor); + } + } + } +} diff --git a/src/community_search/leiden/partition/mod.rs b/src/community_search/leiden/partition/mod.rs index cebb0dc..5bbe4cc 100644 --- a/src/community_search/leiden/partition/mod.rs +++ b/src/community_search/leiden/partition/mod.rs @@ -188,5 +188,8 @@ where comms.into_iter().collect() } + + fn diff_move_readonly(&self, node: usize, new_community: usize) -> N; + } diff --git a/src/community_search/leiden/partition/modularity.rs b/src/community_search/leiden/partition/modularity.rs index c0cb3d5..2e6324b 100644 --- a/src/community_search/leiden/partition/modularity.rs +++ b/src/community_search/leiden/partition/modularity.rs @@ -127,6 +127,42 @@ where fn node_strength(&self, node: usize) -> N { self.network.strength(node) } + + pub fn diff_move_readonly(&self, node: usize, new_community: usize) -> N { + let old_comm = self.grouping.get_group(node); + if new_community == old_comm { + return N::zero(); + } + + if self.total_weight == N::zero() { + return N::zero(); + } + + let two_m = N::from(2.0).unwrap() * self.total_weight; + + let w_to_old = self.weight_to_comm(node, old_comm); + let w_to_new = if new_community < self.grouping.group_count() { + self.weight_to_comm(node, new_community) + } else { + N::zero() + }; + + + let k_i = self.network.strength(node); + + let k_old = self.total_weight_from_comm(old_comm); + let k_new = if new_community < self.grouping.group_count() { + self.total_weight_from_comm(new_community) + } else { + N::zero() + }; + + + let delta_edges = w_to_new - w_to_old; + let delta_expected = (k_new * k_i - (k_old - k_i) * k_i) / two_m; + + (delta_edges - delta_expected) / self.total_weight + } } impl VertexPartition for ModularityPartition @@ -225,4 +261,8 @@ where fn create_like_with_membership(&self, network: CSRNetwork, membership: &[usize]) -> Self { Self::create_with_membership(network, membership) } + + fn diff_move_readonly(&self, node: usize, new_community: usize) -> N { + self.diff_move_readonly(node, new_community) + } } diff --git a/src/community_search/leiden/partition/rb.rs b/src/community_search/leiden/partition/rb.rs index f05d48b..d359e00 100644 --- a/src/community_search/leiden/partition/rb.rs +++ b/src/community_search/leiden/partition/rb.rs @@ -6,8 +6,7 @@ use single_utilities::traits::FloatOpsTS; use crate::{ - community_search::leiden::partition::VertexPartition, - network::{grouping::NetworkGrouping, CSRNetwork}, + community_search::leiden::partition::VertexPartition, neighborhood, network::{grouping::NetworkGrouping, CSRNetwork} }; /// Reichardt-Bornholdt configuration model partition for multi-resolution community detection. @@ -152,19 +151,6 @@ where self.community_internal_weights_dirty = true; } - /// Fast weight calculation from node to community. - #[inline] - fn weight_to_comm(&self, node: usize, community: usize) -> N { - let mut weight = N::zero(); - // Use iterator directly to avoid function call overhead - for (neighbor, edge_weight) in self.network.neighbors(node) { - if self.grouping.get_group(neighbor) == community { - weight += edge_weight; - } - } - weight - } - /// Gets cached community strength (fast lookup). #[inline] fn get_community_strength(&self, community: usize) -> N { @@ -188,6 +174,52 @@ where self.node_strengths[node] } + pub fn diff_move_readonly(&self, node: usize, new_community: usize) -> N { + let old_comm = self.grouping.get_group(node); + if new_community == old_comm { + return N::zero(); + } + + if self.two_m == N::zero() { + return N::zero(); + } + + let k_i = self.node_strengths[node]; + let self_weight = self.node_self_weight(node); + + let w_to_old = self.weight_to_comm(node, old_comm); + let w_to_new = if new_community < self.grouping.group_count() { + self.weight_to_comm(node, new_community) + } else { + N::zero() + }; + + let k_old = self.compute_total_weight_from_comm_uncached(old_comm); + let k_new = if new_community < self.grouping.group_count() { + self.compute_total_weight_from_comm_uncached(new_community) + } else { + N::zero() + }; + + let delta_w_in = (w_to_new + self_weight) - w_to_old; + + let delta_k_squared = N::from(2.0).unwrap() * k_i * (k_new - k_old + k_i); + let delta_null_model = self.resolution * delta_k_squared / self.two_m; + + delta_w_in - delta_null_model + } + + #[inline] + fn weight_to_comm(&self, node: usize, community: usize) -> N { + let mut weight = N::zero(); + for (neighbor, edge_weight) in self.network.neighbors(node) { + if self.grouping.get_group(neighbor) == community { + weight += edge_weight; + } + } + weight + } + } impl VertexPartition for RBConfigurationPartition @@ -257,6 +289,15 @@ where fn create_like_with_membership(&self, network: CSRNetwork, membership: &[usize]) -> Self { Self::with_membership_and_resolution(network, membership, self.resolution) } + + fn diff_move_readonly(&self, node: usize, new_community: usize) -> N { + self.diff_move_readonly(node, new_community) + } + + fn add_empty_community(&mut self) { + self.community_strengths.push(N::zero()); + self.community_internal_weights.push(N::zero()); + } } // Helper methods for computing without cache (for const methods) From 9e75ad8ff215f82f9e837156dc6651b70ac9ee73 Mon Sep 17 00:00:00 2001 From: Ian Date: Wed, 15 Oct 2025 10:46:47 +0000 Subject: [PATCH 2/3] incremental update --- src/community_search/leiden/optimizer.rs | 450 ++++++++++++++++++----- src/network/csr_network.rs | 213 ++++++----- 2 files changed, 474 insertions(+), 189 deletions(-) diff --git a/src/community_search/leiden/optimizer.rs b/src/community_search/leiden/optimizer.rs index 1df3165..380fdff 100644 --- a/src/community_search/leiden/optimizer.rs +++ b/src/community_search/leiden/optimizer.rs @@ -3,7 +3,10 @@ //! Contains the core optimization logic for the Leiden community detection algorithm, //! including node movement, community merging, and partition refinement strategies. -use std::{collections::VecDeque, time::Instant}; +use std::{ + collections::VecDeque, + time::{Duration, Instant}, +}; use anyhow::Ok; use num_traits::Float; @@ -12,7 +15,11 @@ use rand_chacha::ChaCha8Rng; use single_utilities::traits::FloatOpsTS; use crate::{ - community_search::leiden::{ConsiderComms, LeidenConfig, partition::VertexPartition}, + community_search::leiden::{ + ConsiderComms, LeidenConfig, + parallel::{ConflictFreeBatcher, ParallelEvaluator}, + partition::VertexPartition, + }, network::{CSRNetwork, grouping::NetworkGrouping}, }; @@ -195,110 +202,105 @@ impl LeidenOptimizer { /// Evaluates the quality improvement for moving a node to each candidate /// community and returns the community and improvement of the best move. fn find_best_community_move( - &self, - v: usize, - v_comm: usize, - comms: &[usize], - partitions: &mut [P], // Changed to mutable slice - layer_weights: &[N], - max_comm_size: Option, -) -> anyhow::Result<(usize, N)> -where - N: FloatOpsTS + 'static, - G: NetworkGrouping, - P: VertexPartition, -{ - let mut max_comm = v_comm; - let time = Instant::now(); - // println!("Finding best community move: {:?}", time.elapsed()); - - // Pre-compute these values once instead of in the loop - let v_comm_size = partitions[0].csize(v_comm); - let epsilon_threshold = N::from(10.0).unwrap() * ::epsilon(); - - let mut max_improv = if let Some(max_size) = max_comm_size { - if max_size < v_comm_size { - ::neg_infinity() + &self, + v: usize, + v_comm: usize, + comms: &[usize], + partitions: &mut [P], + layer_weights: &[N], + max_comm_size: Option, + ) -> anyhow::Result<(usize, N)> + where + N: FloatOpsTS + 'static, + G: NetworkGrouping, + P: VertexPartition, + { + let mut max_comm = v_comm; + let time = Instant::now(); + // println!("Finding best community move: {:?}", time.elapsed()); + + // Pre-compute these values once instead of in the loop + let v_comm_size = partitions[0].csize(v_comm); + let epsilon_threshold = N::from(10.0).unwrap() * ::epsilon(); + + let mut max_improv = if let Some(max_size) = max_comm_size { + if max_size < v_comm_size { + ::neg_infinity() + } else { + epsilon_threshold + } } else { epsilon_threshold + }; + + const V_SIZE: usize = 1; + + if comms.is_empty() { + return Ok((max_comm, max_improv)); } - } else { - epsilon_threshold - }; - const V_SIZE: usize = 1; // Made it a const for better optimization + // println!("Prefiltering valid comms {:?}", time.elapsed()); + let valid_comms: Vec = if let Some(max_size) = max_comm_size { + comms + .iter() + .copied() + .filter(|&comm| partitions[0].csize(comm) + V_SIZE <= max_size) + .collect() + } else { + comms.to_vec() + }; + // println!("Filtered valid comms: {:?}", time.elapsed()); - // Early exit if no communities to check - if comms.is_empty() { - return Ok((max_comm, max_improv)); - } - - // println!("Prefiltering valid comms {:?}", time.elapsed()); - // Pre-filter communities by size constraint to avoid repeated checks - let valid_comms: Vec = if let Some(max_size) = max_comm_size { - comms - .iter() - .copied() - .filter(|&comm| partitions[0].csize(comm) + V_SIZE <= max_size) - .collect() - } else { - comms.to_vec() - }; - // println!("Filtered valid comms: {:?}", time.elapsed()); - - // Early exit if no valid communities - if valid_comms.is_empty() { - return Ok((max_comm, max_improv)); - } + if valid_comms.is_empty() { + return Ok((max_comm, max_improv)); + } - // Optimized single-layer case - if partitions.len() == 1 && layer_weights[0] == N::one() { - // println!("checking valid comms: {:?}", time.elapsed()); - - // Get mutable reference to the single partition - let partition = &mut partitions[0]; - - for &comm in &valid_comms { - let t = Instant::now(); - let possible_improv = partition.diff_move(v, comm); - // println!("Executed diff move, took: {:?}", t.elapsed()); - - if possible_improv > max_improv { - max_comm = comm; - max_improv = possible_improv; + // Optimized single-layer case + if partitions.len() == 1 && layer_weights[0] == N::one() { + // println!("checking valid comms: {:?}", time.elapsed()); + + let partition = &mut partitions[0]; + + for &comm in &valid_comms { + let t = Instant::now(); + let possible_improv = partition.diff_move(v, comm); + // println!("Executed diff move, took: {:?}", t.elapsed()); + + if possible_improv > max_improv { + max_comm = comm; + max_improv = possible_improv; + } } - } - } else { - // Multi-layer case - for &comm in &valid_comms { - let mut possible_improv = N::zero(); - - for layer_idx in 0..partitions.len() { - // Get mutable reference to current partition - let layer_improv = partitions[layer_idx].diff_move(v, comm); - possible_improv += layer_weights[layer_idx] * layer_improv; - - // Early termination optimization - if possible_improv + epsilon_threshold < max_improv { - let remaining_positive = layer_weights[layer_idx + 1..] - .iter() - .all(|&w| w >= N::zero()); - - if remaining_positive && layer_improv <= N::zero() { - break; + } else { + // Multi-layer case + for &comm in &valid_comms { + let mut possible_improv = N::zero(); + + for layer_idx in 0..partitions.len() { + let layer_improv = partitions[layer_idx].diff_move(v, comm); + possible_improv += layer_weights[layer_idx] * layer_improv; + + // Early termination optimization + if possible_improv + epsilon_threshold < max_improv { + let remaining_positive = layer_weights[layer_idx + 1..] + .iter() + .all(|&w| w >= N::zero()); + + if remaining_positive && layer_improv <= N::zero() { + break; + } } } - } - if possible_improv > max_improv { - max_comm = comm; - max_improv = possible_improv; + if possible_improv > max_improv { + max_comm = comm; + max_improv = possible_improv; + } } } - } - Ok((max_comm, max_improv)) -} + Ok((max_comm, max_improv)) + } /// Collects candidate communities that a node can potentially move to. /// @@ -434,7 +436,9 @@ where for partition in partitions.iter() { if partition.node_count() != n { - return Err(anyhow::anyhow!("Number of nodes are not equal for all graphs.")); + return Err(anyhow::anyhow!( + "Number of nodes are not equal for all graphs." + )); } } @@ -580,6 +584,261 @@ where Ok(total_improv) } + fn move_nodes_parallel( + &mut self, + partitions: &mut [P], + layer_weights: &[N], + is_membership_fixed: &[bool], + consider_comms: ConsiderComms, + consider_empty_community: bool, + max_comm_size: Option, + ) -> anyhow::Result + where + N: FloatOpsTS + 'static, + G: NetworkGrouping + Clone + Default, + P: VertexPartition, + { + let n = partitions[0].node_count(); + let network = partitions[0].network().clone(); + + let mut total_improv = N::zero(); + let mut is_node_stable = is_membership_fixed.to_vec(); + + let mut nodes: Vec = (0..n).filter(|&v| !is_membership_fixed[v]).collect(); + nodes.shuffle(&mut self.rng); + + let mut pending_nodes: VecDeque = nodes.into(); + let batcher = ConflictFreeBatcher::new(10_000); + + const MIN_PARALLEL_BATCH_SIZE: usize = 50; + const MIN_PARALLEL_NODES: usize = 100; + + let mut total_candidates_checked = 0usize; + let mut total_nodes_evaluated = 0usize; + + let mut batch_round = 0; + + while !pending_nodes.is_empty() { + let current_nodes: Vec = pending_nodes.drain(..).collect(); + + if current_nodes.len() < MIN_PARALLEL_NODES { + println!(" Sequential evaluation: {} nodes", current_nodes.len()); + + let seq_start = Instant::now(); + let mut seq_beneficial = 0; + + for node in current_nodes { + total_nodes_evaluated += 1; + + let current_comm = partitions[0].membership(node); + + let mut comm_added = vec![false; partitions[0].community_count()]; + let mut candidates = Vec::new(); + + self.collect_candidate_communities( + node, + partitions, + consider_comms, + &mut candidates, + &mut comm_added, + ); + + total_candidates_checked += candidates.len(); + + if consider_empty_community && partitions[0].cnodes(current_comm) > 1 { + let empty_comm = partitions[0].get_empty_community(); + candidates.push(empty_comm); + } + + let (best_comm, improvement) = self.find_best_community_move( + node, + current_comm, + &candidates, + partitions, + layer_weights, + max_comm_size, + )?; + + if best_comm != current_comm && improvement > N::zero() { + seq_beneficial += 1; + total_improv += improvement; + + for partition in partitions.iter_mut() { + partition.move_node(node, best_comm); + } + + is_node_stable[node] = true; + + for (neighbor, _) in network.neighbors(node) { + if is_node_stable[neighbor] + && partitions[0].membership(neighbor) != best_comm + && !is_membership_fixed[neighbor] + { + pending_nodes.push_back(neighbor); + is_node_stable[neighbor] = false; + } + } + } + } + + println!( + " Sequential: {} beneficial moves in {:?}", + seq_beneficial, + seq_start.elapsed() + ); + batch_round += 1; + continue; + } + + let batches = batcher.create_batches(¤t_nodes, &network, &is_node_stable); + + println!( + " Batch #{}: {} nodes -> {} conflict-free batches", + batch_round, + current_nodes.len(), + batches.len() + ); + + for (batch_idx, batch) in batches.iter().enumerate() { + if batch.is_empty() { + continue; + } + + let batch_start = Instant::now(); + + if batch.len() < MIN_PARALLEL_BATCH_SIZE { + let mut seq_beneficial = 0; + + for &node in batch.iter() { + total_nodes_evaluated += 1; + + let current_comm = partitions[0].membership(node); + + let mut comm_added = vec![false; partitions[0].community_count()]; + let mut candidates = Vec::new(); + + self.collect_candidate_communities( + node, + partitions, + consider_comms, + &mut candidates, + &mut comm_added, + ); + + total_candidates_checked += candidates.len(); + + if consider_empty_community && partitions[0].cnodes(current_comm) > 1 { + let empty_comm = partitions[0].get_empty_community(); + candidates.push(empty_comm); + } + + let (best_comm, improvement) = self.find_best_community_move( + node, + current_comm, + &candidates, + partitions, + layer_weights, + max_comm_size, + )?; + + if best_comm != current_comm && improvement > N::zero() { + seq_beneficial += 1; + total_improv += improvement; + + for partition in partitions.iter_mut() { + partition.move_node(node, best_comm); + } + + is_node_stable[node] = true; + + for (neighbor, _) in network.neighbors(node) { + if is_node_stable[neighbor] + && partitions[0].membership(neighbor) != best_comm + && !is_membership_fixed[neighbor] + { + pending_nodes.push_back(neighbor); + is_node_stable[neighbor] = false; + } + } + } + } + + if batch_idx < 3 || seq_beneficial > 0 { + println!( + " Sub-batch #{} (seq): {} nodes, {} beneficial (took {:?})", + batch_idx, + batch.len(), + seq_beneficial, + batch_start.elapsed() + ); + } + } else { + let proposed_moves = ParallelEvaluator::evaluate_batch( + batch, + partitions, + layer_weights, + consider_comms, + consider_empty_community, + max_comm_size, + ); + + total_nodes_evaluated += batch.len(); + + let mut beneficial_count = 0; + for proposed in proposed_moves { + if proposed.is_beneficial() { + beneficial_count += 1; + total_improv += proposed.improvement; + + for partition in partitions.iter_mut() { + partition.move_node(proposed.node, proposed.to_comm); + } + + is_node_stable[proposed.node] = true; + + for (neighbor, _) in network.neighbors(proposed.node) { + if is_node_stable[neighbor] + && partitions[0].membership(neighbor) != proposed.to_comm + && !is_membership_fixed[neighbor] + { + pending_nodes.push_back(neighbor); + is_node_stable[neighbor] = false; + } + } + } + } + + if batch_idx < 3 || beneficial_count > 0 { + println!( + " Sub-batch #{} (par): {} nodes, {} beneficial (took {:?})", + batch_idx, + batch.len(), + beneficial_count, + batch_start.elapsed() + ); + } + } + } + + batch_round += 1; + } + + if total_nodes_evaluated > 0 { + println!( + " Avg candidates per node: {:.1}", + total_candidates_checked as f64 / total_nodes_evaluated as f64 + ); + } + + partitions[0].renumber_communities(); + let membership = partitions[0].membership_vector(); + for partition in partitions.iter_mut().skip(1) { + partition.set_membership(&membership); + } + + Ok(total_improv) + } + fn move_nodes_constrained( &mut self, partitions: &mut [P], @@ -1158,13 +1417,12 @@ where while aggregate_further { println!("Starting iteration {:?}, time: {:?}", i, time.elapsed()); let improvement = match self.config.optimise_routine { - super::OptimiseRoutine::MoveNodes => self.move_nodes( + super::OptimiseRoutine::MoveNodes => self.move_nodes_parallel( &mut collapsed_partitions, layer_weights, &is_collapsed_membership_fixed, self.config.consider_comms, self.config.consider_empty_community, - false, self.config.max_community_size, )?, super::OptimiseRoutine::MergeNodes => self.merge_nodes( diff --git a/src/network/csr_network.rs b/src/network/csr_network.rs index 49477d3..f11575c 100644 --- a/src/network/csr_network.rs +++ b/src/network/csr_network.rs @@ -5,7 +5,7 @@ //! efficient neighbor iteration and community detection operations. use core::num; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use nalgebra_sparse::CsrMatrix; use rand::random; @@ -24,7 +24,7 @@ use crate::network::grouping::{self, NetworkGrouping}; /// * `N` - Node weight type (e.g., f32, f64) /// * `E` - Edge weight type (e.g., f32, f64) #[derive(Debug, Clone)] -pub struct CSRNetwork { +pub struct CSRNetworkData { node_ptrs: Vec, neighbors: Vec, weights: Vec, @@ -37,6 +37,11 @@ pub struct CSRNetwork { edge_count: usize, } +#[derive(Debug, Clone)] +pub struct CSRNetwork { + data: Arc>, +} + impl CSRNetwork where N: FloatOpsTS + 'static, @@ -94,7 +99,7 @@ where node_ptrs.push(neighbors.len()); } - Self { + let csr = CSRNetworkData { node_ptrs, neighbors, weights, @@ -103,6 +108,10 @@ where strengths, total_weight, edge_count: edges.len(), + }; + + Self { + data: Arc::new(csr), } } @@ -136,12 +145,12 @@ where pub fn neighbors(&self, node: usize) -> CSRNeighborIterator { debug_assert!(node < self.node_count()); - let start = self.node_ptrs[node]; - let end = self.node_ptrs[node + 1]; + let start = self.data.node_ptrs[node]; + let end = self.data.node_ptrs[node + 1]; CSRNeighborIterator { - neighbor_ptr: unsafe { self.neighbors.as_ptr().add(start) }, - weight_ptr: unsafe { self.weights.as_ptr().add(start) }, + neighbor_ptr: unsafe { self.data.neighbors.as_ptr().add(start) }, + weight_ptr: unsafe { self.data.weights.as_ptr().add(start) }, remaining: end - start, } } @@ -149,32 +158,32 @@ where /// Returns the number of nodes in the network. #[inline] pub fn node_count(&self) -> usize { - self.node_weights.len() + self.data.node_weights.len() } /// Returns the number of edges in the network. #[inline] pub fn edge_count(&self) -> usize { - self.edge_count + self.data.edge_count } /// Returns the degree (number of neighbors) of a node. #[inline] pub fn degree(&self, node: usize) -> usize { - self.degrees[node] + self.data.degrees[node] } /// Returns the strength (sum of edge weights) of a node. #[inline] pub fn strength(&self, node: usize) -> E { - self.strengths[node] + self.data.strengths[node] } /// Returns the weight of a node. #[inline] pub fn node_weight(&self, node: usize) -> N { - self.node_weights[node] + self.data.node_weights[node] } /// Returns the total weight of all edges in the network. #[inline] pub fn total_weight(&self) -> E { - self.total_weight + self.data.total_weight } /// Selects a random neighbor of a node with uniform probability. @@ -187,8 +196,8 @@ where } let random_idx = rng.random_range(0..degree); - let neighbor_idx = self.node_ptrs[node] + random_idx; - Some(self.neighbors[neighbor_idx]) + let neighbor_idx = self.data.node_ptrs[node] + random_idx; + Some(self.data.neighbors[neighbor_idx]) } /// Returns the weight of an edge between two nodes. @@ -202,11 +211,11 @@ where (to, from) }; - let start = self.node_ptrs[search_node]; - let end = self.node_ptrs[search_node + 1]; + let start = self.data.node_ptrs[search_node]; + let end = self.data.node_ptrs[search_node + 1]; - match self.neighbors[start..end].binary_search(&target) { - Ok(pos) => Some(self.weights[start + pos]), + match self.data.neighbors[start..end].binary_search(&target) { + Ok(pos) => Some(self.data.weights[start + pos]), Err(_) => None, } } @@ -217,29 +226,29 @@ where /// appropriately. Used in multilevel clustering algorithms. pub fn aggregate(&self, grouping: &G) -> Self { let new_node_count = grouping.group_count(); - + let mut new_node_weights = vec![N::zero(); new_node_count]; - + for node in 0..self.node_count() { let group = grouping.get_group(node); - new_node_weights[group] += self.node_weights[node]; + new_node_weights[group] += self.data.node_weights[node]; } - + let mut edge_memo = HashMap::new(); let mut self_loop_weights = HashMap::new(); - + for node in 0..self.node_count() { - let start = self.node_ptrs[node]; - let end = self.node_ptrs[node + 1]; - + let start = self.data.node_ptrs[node]; + let end = self.data.node_ptrs[node + 1]; + for i in start..end { - let neighbor = self.neighbors[i]; - let weight = self.weights[i]; - + let neighbor = self.data.neighbors[i]; + let weight = self.data.weights[i]; + if node <= neighbor { let g1 = grouping.get_group(node); let g2 = grouping.get_group(neighbor); - + if g1 == g2 { *self_loop_weights.entry(g1).or_insert(E::zero()) += weight; } else { @@ -249,19 +258,19 @@ where } } } - + let mut edges = Vec::new(); - + for (&group, &weight) in self_loop_weights.iter() { if weight > E::zero() { edges.push((group, group, weight)); } } - + for (&(g1, g2), &weight) in edge_memo.iter() { edges.push((g1, g2, weight)); } - + Self::from_edges(&edges, new_node_weights) } @@ -278,7 +287,7 @@ where for (new_id, &old_id) in group_members.iter().enumerate() { node_map.insert(old_id, new_id); - new_node_weights.push(self.node_weights[old_id]); + new_node_weights.push(self.data.node_weights[old_id]); } let mut edges = Vec::new(); @@ -300,8 +309,8 @@ where pub fn to_csr_matrix(&self) -> CsrMatrix { let n = self.node_count(); let mut row_ptrs = vec![0; n + 1]; - let mut col_indices = Vec::with_capacity(self.neighbors.len()); - let mut values = Vec::with_capacity(self.weights.len()); + let mut col_indices = Vec::with_capacity(self.data.neighbors.len()); + let mut values = Vec::with_capacity(self.data.weights.len()); for node in 0..n { for (neighbor, weight) in self.neighbors(node) { @@ -329,7 +338,7 @@ where /// Calculates the density of the network (ratio of actual to possible edges). pub fn density(&self) -> f64 { let n = self.node_count() as f64; - let m = self.edge_count as f64; + let m = self.data.edge_count as f64; let max_edges = n * (n - 1.0) / 2.0; if max_edges > 0.0 { m / max_edges } else { 0.0 } @@ -340,22 +349,27 @@ where /// Uses optimized unsafe pointer arithmetic for maximum performance in /// clustering algorithms. #[inline] - pub fn weight_to_comm(&self, node: usize, community: usize, grouping: &impl NetworkGrouping) -> E { - let start = self.node_ptrs[node]; - let end = self.node_ptrs[node + 1]; - + pub fn weight_to_comm( + &self, + node: usize, + community: usize, + grouping: &impl NetworkGrouping, + ) -> E { + let start = self.data.node_ptrs[node]; + let end = self.data.node_ptrs[node + 1]; + if start == end { return E::zero(); } - + let mut weight = E::zero(); - + // Direct unsafe pointer access for maximum performance unsafe { - let mut neighbor_ptr = self.neighbors.as_ptr().add(start); - let mut weight_ptr = self.weights.as_ptr().add(start); + let mut neighbor_ptr = self.data.neighbors.as_ptr().add(start); + let mut weight_ptr = self.data.weights.as_ptr().add(start); let mut remaining = end - start; - + while remaining > 0 { let neighbor = *neighbor_ptr; if grouping.get_group(neighbor) == community { @@ -366,110 +380,126 @@ where remaining -= 1; } } - + weight } #[inline] - pub fn weight_to_two_comms(&self, node: usize, comm1: usize, comm2: usize, grouping: &impl NetworkGrouping) -> (E, E) { - let start = self.node_ptrs[node]; - let end = self.node_ptrs[node + 1]; - + pub fn weight_to_two_comms( + &self, + node: usize, + comm1: usize, + comm2: usize, + grouping: &impl NetworkGrouping, + ) -> (E, E) { + let start = self.data.node_ptrs[node]; + let end = self.data.node_ptrs[node + 1]; + if start == end { return (E::zero(), E::zero()); } - + let mut w1 = E::zero(); let mut w2 = E::zero(); - + unsafe { - let mut neighbor_ptr = self.neighbors.as_ptr().add(start); - let mut weight_ptr = self.weights.as_ptr().add(start); + let mut neighbor_ptr = self.data.neighbors.as_ptr().add(start); + let mut weight_ptr = self.data.weights.as_ptr().add(start); let mut remaining = end - start; - + while remaining > 0 { let neighbor = *neighbor_ptr; let neighbor_comm = grouping.get_group(neighbor); let weight = *weight_ptr; - + if neighbor_comm == comm1 { w1 += weight; } else if neighbor_comm == comm2 { w2 += weight; } - + neighbor_ptr = neighbor_ptr.add(1); weight_ptr = weight_ptr.add(1); remaining -= 1; } } - + (w1, w2) } - pub fn weight_to_comms_batch(&self, node: usize, communities: &[usize], grouping: &impl NetworkGrouping) -> Vec { + pub fn weight_to_comms_batch( + &self, + node: usize, + communities: &[usize], + grouping: &impl NetworkGrouping, + ) -> Vec { let mut weights = vec![E::zero(); communities.len()]; - + // Create lookup map for community index let community_to_idx: HashMap = communities .iter() .enumerate() .map(|(i, &c)| (c, i)) .collect(); - - let start = self.node_ptrs[node]; - let end = self.node_ptrs[node + 1]; - + + let start = self.data.node_ptrs[node]; + let end = self.data.node_ptrs[node + 1]; + unsafe { - let mut neighbor_ptr = self.neighbors.as_ptr().add(start); - let mut weight_ptr = self.weights.as_ptr().add(start); + let mut neighbor_ptr = self.data.neighbors.as_ptr().add(start); + let mut weight_ptr = self.data.weights.as_ptr().add(start); let mut remaining = end - start; - + while remaining > 0 { let neighbor = *neighbor_ptr; let neighbor_comm = grouping.get_group(neighbor); - + if let Some(&idx) = community_to_idx.get(&neighbor_comm) { weights[idx] += *weight_ptr; } - + neighbor_ptr = neighbor_ptr.add(1); weight_ptr = weight_ptr.add(1); remaining -= 1; } } - + weights } #[inline] pub fn self_loop_weight(&self, node: usize) -> E { - let start = self.node_ptrs[node]; - let end = self.node_ptrs[node + 1]; - + let start = self.data.node_ptrs[node]; + let end = self.data.node_ptrs[node + 1]; + if start >= end { return E::zero(); } - + // Check if first neighbor is self (common optimization) - if self.neighbors[start] == node { - return self.weights[start]; + if self.data.neighbors[start] == node { + return self.data.weights[start]; } - + // Binary search since neighbors are sorted - match self.neighbors[start..end].binary_search(&node) { - Ok(pos) => self.weights[start + pos], + match self.data.neighbors[start..end].binary_search(&node) { + Ok(pos) => self.data.weights[start + pos], Err(_) => E::zero(), } } - pub fn community_internal_weight(&self, community: usize, grouping: &impl NetworkGrouping) -> E { + pub fn community_internal_weight( + &self, + community: usize, + grouping: &impl NetworkGrouping, + ) -> E { let members = &grouping.get_group_members()[community]; let mut total_weight = E::zero(); - + // Use parallel processing for large communities if members.len() > 100 { - total_weight = members.par_iter() + total_weight = members + .par_iter() .map(|&node| { let mut internal_weight = E::zero(); for (neighbor, weight) in self.neighbors(node) { @@ -497,26 +527,23 @@ where } } } - + total_weight } pub fn community_total_strength(&self, community: usize, grouping: &impl NetworkGrouping) -> E { let members = &grouping.get_group_members()[community]; - + if members.len() > 50 { // Parallel for large communities - members.par_iter() - .map(|&node| self.strength(node)) - .sum() + members.par_iter().map(|&node| self.strength(node)).sum() } else { - members.iter() + members + .iter() .map(|&node| self.strength(node)) .fold(E::zero(), |acc, x| acc + x) } } - - } /// High-performance iterator over neighbors and edge weights. From 7604d81389dc33a92914fea2322f4428d17f04f7 Mon Sep 17 00:00:00 2001 From: Ian Date: Wed, 15 Oct 2025 10:50:20 +0000 Subject: [PATCH 3/3] reverted changes....stays the same --- src/community_search/leiden/optimizer.rs | 257 +++-------------------- 1 file changed, 30 insertions(+), 227 deletions(-) diff --git a/src/community_search/leiden/optimizer.rs b/src/community_search/leiden/optimizer.rs index 380fdff..fc57924 100644 --- a/src/community_search/leiden/optimizer.rs +++ b/src/community_search/leiden/optimizer.rs @@ -3,10 +3,7 @@ //! Contains the core optimization logic for the Leiden community detection algorithm, //! including node movement, community merging, and partition refinement strategies. -use std::{ - collections::VecDeque, - time::{Duration, Instant}, -}; +use std::{collections::VecDeque, time::Instant}; use anyhow::Ok; use num_traits::Float; @@ -15,12 +12,8 @@ use rand_chacha::ChaCha8Rng; use single_utilities::traits::FloatOpsTS; use crate::{ - community_search::leiden::{ - ConsiderComms, LeidenConfig, - parallel::{ConflictFreeBatcher, ParallelEvaluator}, - partition::VertexPartition, - }, - network::{CSRNetwork, grouping::NetworkGrouping}, + community_search::leiden::{parallel::{ConflictFreeBatcher, ParallelEvaluator}, partition::VertexPartition, ConsiderComms, LeidenConfig}, + network::{grouping::NetworkGrouping, CSRNetwork}, }; /// Result of evaluating a potential community move for a node. @@ -584,6 +577,7 @@ impl LeidenOptimizer { Ok(total_improv) } + fn move_nodes_parallel( &mut self, partitions: &mut [P], @@ -591,203 +585,35 @@ impl LeidenOptimizer { is_membership_fixed: &[bool], consider_comms: ConsiderComms, consider_empty_community: bool, - max_comm_size: Option, - ) -> anyhow::Result + max_comm_size: Option + ) -> anyhow::Result where N: FloatOpsTS + 'static, - G: NetworkGrouping + Clone + Default, - P: VertexPartition, - { - let n = partitions[0].node_count(); - let network = partitions[0].network().clone(); - - let mut total_improv = N::zero(); - let mut is_node_stable = is_membership_fixed.to_vec(); - - let mut nodes: Vec = (0..n).filter(|&v| !is_membership_fixed[v]).collect(); - nodes.shuffle(&mut self.rng); - - let mut pending_nodes: VecDeque = nodes.into(); - let batcher = ConflictFreeBatcher::new(10_000); - - const MIN_PARALLEL_BATCH_SIZE: usize = 50; - const MIN_PARALLEL_NODES: usize = 100; - - let mut total_candidates_checked = 0usize; - let mut total_nodes_evaluated = 0usize; - - let mut batch_round = 0; - - while !pending_nodes.is_empty() { - let current_nodes: Vec = pending_nodes.drain(..).collect(); - - if current_nodes.len() < MIN_PARALLEL_NODES { - println!(" Sequential evaluation: {} nodes", current_nodes.len()); - - let seq_start = Instant::now(); - let mut seq_beneficial = 0; - - for node in current_nodes { - total_nodes_evaluated += 1; - - let current_comm = partitions[0].membership(node); - - let mut comm_added = vec![false; partitions[0].community_count()]; - let mut candidates = Vec::new(); - - self.collect_candidate_communities( - node, - partitions, - consider_comms, - &mut candidates, - &mut comm_added, - ); - - total_candidates_checked += candidates.len(); - - if consider_empty_community && partitions[0].cnodes(current_comm) > 1 { - let empty_comm = partitions[0].get_empty_community(); - candidates.push(empty_comm); - } - - let (best_comm, improvement) = self.find_best_community_move( - node, - current_comm, - &candidates, - partitions, - layer_weights, - max_comm_size, - )?; - - if best_comm != current_comm && improvement > N::zero() { - seq_beneficial += 1; - total_improv += improvement; - - for partition in partitions.iter_mut() { - partition.move_node(node, best_comm); - } - - is_node_stable[node] = true; - - for (neighbor, _) in network.neighbors(node) { - if is_node_stable[neighbor] - && partitions[0].membership(neighbor) != best_comm - && !is_membership_fixed[neighbor] - { - pending_nodes.push_back(neighbor); - is_node_stable[neighbor] = false; - } - } - } - } - - println!( - " Sequential: {} beneficial moves in {:?}", - seq_beneficial, - seq_start.elapsed() - ); - batch_round += 1; - continue; - } - - let batches = batcher.create_batches(¤t_nodes, &network, &is_node_stable); - - println!( - " Batch #{}: {} nodes -> {} conflict-free batches", - batch_round, - current_nodes.len(), - batches.len() - ); - - for (batch_idx, batch) in batches.iter().enumerate() { - if batch.is_empty() { - continue; - } - - let batch_start = Instant::now(); - - if batch.len() < MIN_PARALLEL_BATCH_SIZE { - let mut seq_beneficial = 0; - - for &node in batch.iter() { - total_nodes_evaluated += 1; - - let current_comm = partitions[0].membership(node); - - let mut comm_added = vec![false; partitions[0].community_count()]; - let mut candidates = Vec::new(); - - self.collect_candidate_communities( - node, - partitions, - consider_comms, - &mut candidates, - &mut comm_added, - ); - - total_candidates_checked += candidates.len(); + G: NetworkGrouping, + P: VertexPartition { + let n = partitions[0].node_count(); + let network = partitions[0].network().clone(); - if consider_empty_community && partitions[0].cnodes(current_comm) > 1 { - let empty_comm = partitions[0].get_empty_community(); - candidates.push(empty_comm); - } + let mut total_improv = N::zero(); + let mut is_node_stable = is_membership_fixed.to_vec(); - let (best_comm, improvement) = self.find_best_community_move( - node, - current_comm, - &candidates, - partitions, - layer_weights, - max_comm_size, - )?; + let mut nodes: Vec = (0..n) + .filter(|&v| !is_membership_fixed[v]) + .collect(); + nodes.shuffle(&mut self.rng); - if best_comm != current_comm && improvement > N::zero() { - seq_beneficial += 1; - total_improv += improvement; + let mut pending_nodes: VecDeque = nodes.into(); + let batcher = ConflictFreeBatcher::new(10_000); - for partition in partitions.iter_mut() { - partition.move_node(node, best_comm); - } + while !pending_nodes.is_empty() { + let current_nodes: Vec = pending_nodes.drain(..).collect(); + let batches = batcher.create_batches(¤t_nodes, &network, &is_node_stable); - is_node_stable[node] = true; + for batch in batches { + let proposed_moves = ParallelEvaluator::evaluate_batch(&batch, partitions, layer_weights, consider_comms, consider_empty_community, max_comm_size); - for (neighbor, _) in network.neighbors(node) { - if is_node_stable[neighbor] - && partitions[0].membership(neighbor) != best_comm - && !is_membership_fixed[neighbor] - { - pending_nodes.push_back(neighbor); - is_node_stable[neighbor] = false; - } - } - } - } - - if batch_idx < 3 || seq_beneficial > 0 { - println!( - " Sub-batch #{} (seq): {} nodes, {} beneficial (took {:?})", - batch_idx, - batch.len(), - seq_beneficial, - batch_start.elapsed() - ); - } - } else { - let proposed_moves = ParallelEvaluator::evaluate_batch( - batch, - partitions, - layer_weights, - consider_comms, - consider_empty_community, - max_comm_size, - ); - - total_nodes_evaluated += batch.len(); - - let mut beneficial_count = 0; for proposed in proposed_moves { if proposed.is_beneficial() { - beneficial_count += 1; total_improv += proposed.improvement; for partition in partitions.iter_mut() { @@ -797,48 +623,25 @@ impl LeidenOptimizer { is_node_stable[proposed.node] = true; for (neighbor, _) in network.neighbors(proposed.node) { - if is_node_stable[neighbor] - && partitions[0].membership(neighbor) != proposed.to_comm - && !is_membership_fixed[neighbor] - { + if is_node_stable[neighbor] && partitions[0].membership(neighbor) != proposed.to_comm && !is_membership_fixed[neighbor] { pending_nodes.push_back(neighbor); is_node_stable[neighbor] = false; } } } } - - if batch_idx < 3 || beneficial_count > 0 { - println!( - " Sub-batch #{} (par): {} nodes, {} beneficial (took {:?})", - batch_idx, - batch.len(), - beneficial_count, - batch_start.elapsed() - ); - } } } - batch_round += 1; - } - - if total_nodes_evaluated > 0 { - println!( - " Avg candidates per node: {:.1}", - total_candidates_checked as f64 / total_nodes_evaluated as f64 - ); - } + partitions[0].renumber_communities(); + let membership = partitions[0].membership_vector(); + for partition in partitions.iter_mut().skip(1) { + partition.set_membership(&membership); + } - partitions[0].renumber_communities(); - let membership = partitions[0].membership_vector(); - for partition in partitions.iter_mut().skip(1) { - partition.set_membership(&membership); + Ok(total_improv) } - Ok(total_improv) - } - fn move_nodes_constrained( &mut self, partitions: &mut [P],