<a href="https://colab.research.google.com/github/MehrdadJalali-AI/InverseLinkPredcition/blob/main/MLEA_Sparcification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import networkx as nx
from rdkit import Chem
from rdkit.Chem import AllChem
import networkx.algorithms.community as nx_comm
import time
from typing import Callable, Tuple
import warnings

# Suppress RDKit warnings
from rdkit import rdBase
rdBase.DisableLog("rdApp.*")
warnings.filterwarnings("ignore")

class LotusEffectAlgorithm:
    """A population-based optimization algorithm for graph sparsification."""
    def __init__(self, population_size: int, dimensions: int, lower_bound: float,
                 upper_bound: float, max_evals: int, fitness_function: Callable):
        self.pop_size = population_size
        self.dims = dimensions
        self.lb = lower_bound
        self.ub = upper_bound
        self.max_evals = max_evals
        self.fitness_func = fitness_function
        self.population = np.random.uniform(lower_bound, upper_bound, (population_size, dimensions))
        self.best_solution = self.population[0].copy()
        self.best_fitness = float('inf')
        self.evals = 0

    def mutate(self, individual: np.ndarray, mutation_rate: float = 0.1) -> np.ndarray:
        """Apply random mutation to an individual."""
        mask = np.random.random(self.dims) < mutation_rate
        individual[mask] = np.random.uniform(self.lb, self.ub, mask.sum())
        return np.clip(individual, self.lb, self.ub)

    def optimize(self) -> Tuple[np.ndarray, float, int]:
        """Optimize the edge mask to minimize fitness (maximize modularity)."""
        for _ in range(self.max_evals // self.pop_size):
            for i in range(self.pop_size):
                if self.evals >= self.max_evals:
                    break

                # Evaluate fitness
                fitness = self.fitness_func(self.population[i])
                self.evals += 1

                if fitness < self.best_fitness:
                    self.best_fitness = fitness
                    self.best_solution = self.population[i].copy()

                # Mutate to explore solution space
                self.population[i] = self.mutate(self.population[i])

        return self.best_solution, self.best_fitness, self.evals

def load_edges_list(filename: str) -> pd.DataFrame:
    """Load edge list from a CSV file."""
    return pd.read_csv(filename)

def generate_fingerprint(smiles: str) -> np.ndarray:
    """Generate a Morgan fingerprint from a SMILES string."""
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return np.zeros(1024, dtype=np.float32)
    return np.array(AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=1024), dtype=np.float32)

def label_encode_metal_names(metal_names: np.ndarray) -> np.ndarray:
    """Convert metal names to integer labels."""
    unique_metals = np.unique(metal_names)
    metal_dict = {metal: idx for idx, metal in enumerate(unique_metals)}
    return np.array([metal_dict[metal] for metal in metal_names], dtype=np.int32)

def load_summary_data(filename: str, node_labels: np.ndarray) -> pd.DataFrame:
    """Load and process summary data for nodes."""
    summary_data = pd.read_csv(filename, index_col=0)
    linker_smiles = summary_data['linker SMILES']
    linker_features = np.stack(linker_smiles.apply(generate_fingerprint).values)
    metal_features = label_encode_metal_names(summary_data['metal']).reshape(-1, 1)
    other_features = summary_data[['Largest Cavity Diameter', 'Pore Limiting Diameter',
                                  'Largest Free Sphere']].astype(np.float32).values
    features = np.concatenate((linker_features, metal_features, other_features), axis=1)
    return pd.DataFrame(features, index=summary_data.index)

def graph_modularity_score(edge_mask: np.ndarray, edge_list: list, original_graph: nx.Graph) -> float:
    """Calculate the negative modularity score of a sparsified graph."""
    graph = nx.Graph(original_graph)  # Copy the original graph
    for i, keep in enumerate(edge_mask):
        if keep < 0.5 and graph.has_edge(*edge_list[i]):
            graph.remove_edge(*edge_list[i])

    if graph.number_of_edges() == 0:
        return 1.0  # Worst case: no edges
    try:
        communities = nx_comm.greedy_modularity_communities(graph)
        return -nx_comm.modularity(graph, communities)
    except Exception:
        return 1.0  # Handle edge cases gracefully

def main():
    """Main function to sparsify a graph using the Lotus Effect Algorithm."""
    start_time = time.time()
    config = {
        'edges_list_filename': 'edges_list_0.8_Full_2.csv',
        'summary_data_filename': '1M1L3D_summary.csv',
        'output_file': 'mlea_sparsified_graph_refined.csv',
        'population_size': 20,
        'max_function_evaluations': 300
    }

    # Load data
    edges = load_edges_list(config['edges_list_filename'])
    node_labels = pd.concat([edges['source'], edges['target']]).unique()
    summary_data = load_summary_data(config['summary_data_filename'], node_labels)

    # Build initial graph
    G = nx.Graph()
    edge_list = []
    for _, row in edges.iterrows():
        G.add_edge(row['source'], row['target'], weight=row['weight'])
        edge_list.append((row['source'], row['target']))

    # Define fitness function and optimize
    fitness_func = lambda mask: graph_modularity_score(mask, edge_list, G)
    lea = LotusEffectAlgorithm(
        population_size=config['population_size'],
        dimensions=len(edge_list),
        lower_bound=0.0,
        upper_bound=1.0,
        max_evals=config['max_function_evaluations'],
        fitness_function=fitness_func
    )
    best_mask, best_score, evals = lea.optimize()

    # Apply best mask to sparsify graph
    G_final = nx.Graph(G)
    for i, keep in enumerate(best_mask):
        if keep < 0.5 and G_final.has_edge(*edge_list[i]):
            G_final.remove_edge(*edge_list[i])

    # Save and report results
    nx.write_edgelist(G_final, config['output_file'], data=['weight'])
    print(f"Best modularity score: {-best_score:.4f} after {evals} evaluations")
    print(f"Sparsified graph saved to '{config['output_file']}'")
    print(f"Remaining nodes: {G_final.number_of_nodes()}, edges: {G_final.number_of_edges()}")
    print(f"Total runtime: {time.time() - start_time:.2f} seconds")

if __name__ == "__main__":
    main()

Best modularity score: 0.8340 after 300 evaluations
Sparsified graph saved to 'mlea_sparsified_graph_refined.csv'
Remaining nodes: 12561, edges: 103583
Total runtime: 4160.06 seconds
