Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ description = "A high-performance network clustering library implementing commun


[dependencies]
anyhow = "1.0.98"
anyhow = "1.0.100"
kiddo = {version = "5.2.2" }
nalgebra-sparse = "0.10.0"
ndarray = {version = "0.16.1" , features = ["rayon"]}
num-traits = "0.2.19"
petgraph = { version = "0.8.2", features = ["rayon"] }
rayon = "1.10.0"
single-utilities = "0.8.5"
single-utilities = "0.8.6"
rand = "0.9.0"
rand_chacha = {version = "0.9.0"}
hnsw_rs = {version = "0.3.2", features = ["simdeez_f"]}
1 change: 1 addition & 0 deletions src/community_search/leiden/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
pub mod partition;
mod optimizer;
pub use optimizer::LeidenOptimizer;
mod parallel;

/// Strategy for selecting communities to consider during optimization.
#[derive(Debug, Clone, Copy, PartialEq)]
Expand Down
253 changes: 157 additions & 96 deletions src/community_search/leiden/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use rand_chacha::ChaCha8Rng;
use single_utilities::traits::FloatOpsTS;

use crate::{
community_search::leiden::{ConsiderComms, LeidenConfig, partition::VertexPartition},
network::{CSRNetwork, grouping::NetworkGrouping},
community_search::leiden::{parallel::{ConflictFreeBatcher, ParallelEvaluator}, partition::VertexPartition, ConsiderComms, LeidenConfig},
network::{grouping::NetworkGrouping, CSRNetwork},
};

/// Result of evaluating a potential community move for a node.
Expand Down Expand Up @@ -195,110 +195,105 @@ impl LeidenOptimizer {
/// Evaluates the quality improvement for moving a node to each candidate
/// community and returns the community and improvement of the best move.
fn find_best_community_move<N, G, P>(
&self,
v: usize,
v_comm: usize,
comms: &[usize],
partitions: &mut [P], // Changed to mutable slice
layer_weights: &[N],
max_comm_size: Option<usize>,
) -> anyhow::Result<(usize, N)>
where
N: FloatOpsTS + 'static,
G: NetworkGrouping,
P: VertexPartition<N, G>,
{
let mut max_comm = v_comm;
let time = Instant::now();
// println!("Finding best community move: {:?}", time.elapsed());

// Pre-compute these values once instead of in the loop
let v_comm_size = partitions[0].csize(v_comm);
let epsilon_threshold = N::from(10.0).unwrap() * <N as Float>::epsilon();

let mut max_improv = if let Some(max_size) = max_comm_size {
if max_size < v_comm_size {
<N as Float>::neg_infinity()
&self,
v: usize,
v_comm: usize,
comms: &[usize],
partitions: &mut [P],
layer_weights: &[N],
max_comm_size: Option<usize>,
) -> anyhow::Result<(usize, N)>
where
N: FloatOpsTS + 'static,
G: NetworkGrouping,
P: VertexPartition<N, G>,
{
let mut max_comm = v_comm;
let time = Instant::now();
// println!("Finding best community move: {:?}", time.elapsed());

// Pre-compute these values once instead of in the loop
let v_comm_size = partitions[0].csize(v_comm);
let epsilon_threshold = N::from(10.0).unwrap() * <N as Float>::epsilon();

let mut max_improv = if let Some(max_size) = max_comm_size {
if max_size < v_comm_size {
<N as Float>::neg_infinity()
} else {
epsilon_threshold
}
} else {
epsilon_threshold
};

const V_SIZE: usize = 1;

if comms.is_empty() {
return Ok((max_comm, max_improv));
}
} else {
epsilon_threshold
};

const V_SIZE: usize = 1; // Made it a const for better optimization
// println!("Prefiltering valid comms {:?}", time.elapsed());
let valid_comms: Vec<usize> = if let Some(max_size) = max_comm_size {
comms
.iter()
.copied()
.filter(|&comm| partitions[0].csize(comm) + V_SIZE <= max_size)
.collect()
} else {
comms.to_vec()
};
// println!("Filtered valid comms: {:?}", time.elapsed());

// Early exit if no communities to check
if comms.is_empty() {
return Ok((max_comm, max_improv));
}

// println!("Prefiltering valid comms {:?}", time.elapsed());
// Pre-filter communities by size constraint to avoid repeated checks
let valid_comms: Vec<usize> = if let Some(max_size) = max_comm_size {
comms
.iter()
.copied()
.filter(|&comm| partitions[0].csize(comm) + V_SIZE <= max_size)
.collect()
} else {
comms.to_vec()
};
// println!("Filtered valid comms: {:?}", time.elapsed());

// Early exit if no valid communities
if valid_comms.is_empty() {
return Ok((max_comm, max_improv));
}
if valid_comms.is_empty() {
return Ok((max_comm, max_improv));
}

// Optimized single-layer case
if partitions.len() == 1 && layer_weights[0] == N::one() {
// println!("checking valid comms: {:?}", time.elapsed());
// Get mutable reference to the single partition
let partition = &mut partitions[0];

for &comm in &valid_comms {
let t = Instant::now();
let possible_improv = partition.diff_move(v, comm);
// println!("Executed diff move, took: {:?}", t.elapsed());

if possible_improv > max_improv {
max_comm = comm;
max_improv = possible_improv;
// Optimized single-layer case
if partitions.len() == 1 && layer_weights[0] == N::one() {
// println!("checking valid comms: {:?}", time.elapsed());

let partition = &mut partitions[0];

for &comm in &valid_comms {
let t = Instant::now();
let possible_improv = partition.diff_move(v, comm);
// println!("Executed diff move, took: {:?}", t.elapsed());

if possible_improv > max_improv {
max_comm = comm;
max_improv = possible_improv;
}
}
}
} else {
// Multi-layer case
for &comm in &valid_comms {
let mut possible_improv = N::zero();

for layer_idx in 0..partitions.len() {
// Get mutable reference to current partition
let layer_improv = partitions[layer_idx].diff_move(v, comm);
possible_improv += layer_weights[layer_idx] * layer_improv;

// Early termination optimization
if possible_improv + epsilon_threshold < max_improv {
let remaining_positive = layer_weights[layer_idx + 1..]
.iter()
.all(|&w| w >= N::zero());

if remaining_positive && layer_improv <= N::zero() {
break;
} else {
// Multi-layer case
for &comm in &valid_comms {
let mut possible_improv = N::zero();

for layer_idx in 0..partitions.len() {
let layer_improv = partitions[layer_idx].diff_move(v, comm);
possible_improv += layer_weights[layer_idx] * layer_improv;

// Early termination optimization
if possible_improv + epsilon_threshold < max_improv {
let remaining_positive = layer_weights[layer_idx + 1..]
.iter()
.all(|&w| w >= N::zero());

if remaining_positive && layer_improv <= N::zero() {
break;
}
}
}
}

if possible_improv > max_improv {
max_comm = comm;
max_improv = possible_improv;
if possible_improv > max_improv {
max_comm = comm;
max_improv = possible_improv;
}
}
}
}

Ok((max_comm, max_improv))
}
Ok((max_comm, max_improv))
}

/// Collects candidate communities that a node can potentially move to.
///
Expand Down Expand Up @@ -434,7 +429,9 @@ where

for partition in partitions.iter() {
if partition.node_count() != n {
panic!("Number of nodes are not equal for all graphs.");
return Err(anyhow::anyhow!(
"Number of nodes are not equal for all graphs."
));
}
}

Expand Down Expand Up @@ -580,6 +577,71 @@ where
Ok(total_improv)
}


fn move_nodes_parallel<N, G, P>(
&mut self,
partitions: &mut [P],
layer_weights: &[N],
is_membership_fixed: &[bool],
consider_comms: ConsiderComms,
consider_empty_community: bool,
max_comm_size: Option<usize>
) -> anyhow::Result<N>
where
N: FloatOpsTS + 'static,
G: NetworkGrouping,
P: VertexPartition<N, G> {
let n = partitions[0].node_count();
let network = partitions[0].network().clone();

let mut total_improv = N::zero();
let mut is_node_stable = is_membership_fixed.to_vec();

let mut nodes: Vec<usize> = (0..n)
.filter(|&v| !is_membership_fixed[v])
.collect();
nodes.shuffle(&mut self.rng);

let mut pending_nodes: VecDeque<usize> = nodes.into();
let batcher = ConflictFreeBatcher::new(10_000);

while !pending_nodes.is_empty() {
let current_nodes: Vec<usize> = pending_nodes.drain(..).collect();
let batches = batcher.create_batches(&current_nodes, &network, &is_node_stable);

for batch in batches {
let proposed_moves = ParallelEvaluator::evaluate_batch(&batch, partitions, layer_weights, consider_comms, consider_empty_community, max_comm_size);

for proposed in proposed_moves {
if proposed.is_beneficial() {
total_improv += proposed.improvement;

for partition in partitions.iter_mut() {
partition.move_node(proposed.node, proposed.to_comm);
}

is_node_stable[proposed.node] = true;

for (neighbor, _) in network.neighbors(proposed.node) {
if is_node_stable[neighbor] && partitions[0].membership(neighbor) != proposed.to_comm && !is_membership_fixed[neighbor] {
pending_nodes.push_back(neighbor);
is_node_stable[neighbor] = false;
}
}
}
}
}
}

partitions[0].renumber_communities();
let membership = partitions[0].membership_vector();
for partition in partitions.iter_mut().skip(1) {
partition.set_membership(&membership);
}

Ok(total_improv)
}

fn move_nodes_constrained<N, G, P>(
&mut self,
partitions: &mut [P],
Expand Down Expand Up @@ -1158,13 +1220,12 @@ where
while aggregate_further {
println!("Starting iteration {:?}, time: {:?}", i, time.elapsed());
let improvement = match self.config.optimise_routine {
super::OptimiseRoutine::MoveNodes => self.move_nodes(
super::OptimiseRoutine::MoveNodes => self.move_nodes_parallel(
&mut collapsed_partitions,
layer_weights,
&is_collapsed_membership_fixed,
self.config.consider_comms,
self.config.consider_empty_community,
false,
self.config.max_community_size,
)?,
super::OptimiseRoutine::MergeNodes => self.merge_nodes(
Expand Down
Loading