# Computational Pipeline



-----

# Step 1: Import Libraries and Extensions

**Purpose:**  
Set up the computational environment by installing and importing all the necessary libraries and extensions.

**What Happens:**
- **Installation:**  
  In interactive environments (e.g. Jupyter), you might install RDKit using a command such as:
  ```bash
  !pip install rdkit-pypi


In [1]:
# Import standard libraries for iteration, numerical operations, data manipulation, JSON parsing, etc.
import itertools
import numpy as np
import pandas as pd
import json
import random
from collections import defaultdict
import matplotlib.pyplot as plt
import networkx as nx

# Import RDKit modules for chemical representations and reactions.
from rdkit import Chem
from rdkit.Chem import AllChem, Draw, inchi
from rdkit.Chem.rdMolDescriptors import CalcExactMolWt
from IPython.display import display

# Disable RDKit warning messages such as "Proton(s) added/removed" and "Metal was disconnected"
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.warning')

# Import functions for calculating cosine similarity and optimisation routines,
# as well as the KDTree from SciPy for efficient spatial queries.
from sklearn.metrics.pairwise import cosine_similarity
from scipy.optimize import minimize
from scipy.spatial import KDTree


# Step 2: Data Loading and Pre-processing

**Purpose:**  
Read and process compound data from a JSON file to ensure that each chemical record has a valid structural representation.

**What Happens:**
- **Validation Functions:**  
  - `is_valid_smiles(smiles)`: Checks that a SMILES string is non-empty and valid according to RDKit.
  - `inchi_to_smiles(inchi_str)`: Attempts to convert an InChI string to a valid SMILES representation.
- **Data Parsing:**  
  The function `load_compound_data(json_file_path)` reads each compound's information from the JSON file:
  - Extracts the compound name, SMILES/InChI representations, and experimental metadata.
  - Uses the validation functions to verify the chemical structure. If invalid, it attempts conversion or logs and skips the compound.
  - Parses experimental spectral data (peaks) into a Pandas DataFrame.
- **Outcome:**  
  The compounds are organised into a dictionary with each compound’s metadata and experimental spectrum.


In [2]:
# --- Validation and Data Loading Functions ---

def is_valid_smiles(smiles):
    """
    Check if a SMILES string is valid.
    Returns False if the string is empty, "N/A" or if RDKit cannot generate a molecule.
    """
    if not smiles or smiles.strip() in {"", "N/A"}:
        return False
    # Try converting the SMILES to an RDKit molecule
    return Chem.MolFromSmiles(smiles) is not None


def inchi_to_smiles(inchi_str):
    """
    Convert an InChI string to SMILES if possible.
    Returns None for empty or invalid InChI strings.
    """
    if not inchi_str or inchi_str.strip() in {"", "N/A"}:
        return None
    try:
        # Convert InChI to RDKit molecule then to SMILES
        mol = inchi.MolFromInchi(inchi_str)
        return Chem.MolToSmiles(mol) if mol else None
    except Exception:
        # Return None if conversion fails
        return None


def load_compound_data(json_file_path):
    """
    Load compounds from a JSON file and process each record.
    For each compound, validate the SMILES, optionally convert from InChI,
    extract metadata, and parse experimental peak data into a DataFrame.
    """
    with open(json_file_path, 'r') as f:
        data = json.load(f)
        
    compound_data = {}
    for compound in data:
        # Get compound name and chemical structure information
        compound_name = compound.get("Compound_Name", "Unknown")
        smiles = compound.get("Smiles") or compound.get("SMILES")
        inchi_str = compound.get("INCHI")
    
        # If the SMILES string is not valid, attempt to convert InChI to SMILES
        if not is_valid_smiles(smiles):
            smiles = inchi_to_smiles(inchi_str)
    
        # If still invalid, log the issue and skip the compound
        if not is_valid_smiles(smiles):
            with open("missing_smiles.log", "a") as log_file:
                log_file.write(f"Skipping {compound_name}: Missing or invalid SMILES/InChI\n")
            continue
    
        # Extract relevant metadata from the compound record
        metadata = {key: compound.get(key) for key in [
            "spectrum_id", "source_file", "task", "scan", "ms_level",
            "library_membership", "Precursor_MZ", "ExactMass", "Charge",
            "Compound_Source", "Instrument", "Ion_Source", "Ion_Mode",
        ]}
    
        # Convert SMILES to InChI and InChIKey (if possible) to store additional structural info
        mol = Chem.MolFromSmiles(smiles)
        if mol:
            try:
                metadata.update({
                    "smiles": smiles,
                    "inchi": inchi.MolToInchi(mol),
                    "inchikey": inchi.MolToInchiKey(mol)
                })
            except Exception:
                metadata.update({"inchi": None, "inchikey": None})
        else:
            metadata.update({"inchi": None, "inchikey": None})
    
        # Parse the peaks JSON containing the experimental spectral data,
        # and convert it into a Pandas DataFrame with columns "m/z" and "intensity"
        peaks_json = compound.get("peaks_json")
        try:
            if isinstance(peaks_json, str) and peaks_json.strip().lower() != "n/a":
                spectra_df = pd.DataFrame(json.loads(peaks_json), columns=["m/z", "intensity"])
            else:
                spectra_df = pd.DataFrame(columns=["m/z", "intensity"])
        except Exception:
            spectra_df = pd.DataFrame(columns=["m/z", "intensity"])
    
        # Store the metadata and spectra DataFrame for the compound
        compound_data[compound_name] = {"metadata": metadata, "spectra": spectra_df}
    
    return compound_data

# Step 3: Ionisation and Fragmentation

**Purpose:**  
Simulate the chemical reactions that ionise the molecule and subsequently fragment it into smaller pieces, mimicking the process in mass spectrometry.

**What Happens:**
- **Ionisation:**  
  - `perform_ionisation(smiles)`: Converts a SMILES string into an RDKit molecule and applies an ionisation reaction (using a SMARTS reaction) to generate an ionised product.
  - The ionised product is sanitised, its exact mass is calculated, and an initial DataFrame of peaks is created.
- **Fragmentation:**  
  - `build_fragmentation_tree(molecule, product_sets)`: Defines a range of fragmentation reactions (both positive and negative modes, including elimination and rearrangement reactions) using SMARTS patterns.
  - Runs these reactions iteratively on the ionised product(s) to generate fragments.
  - During the process, it constructs:
    - A DataFrame with simulated m/z peaks,
    - A list of edges (representing parent–fragment relationships),
    - A fragmentation tree detailing the reaction that generated each fragment,
    - Optional molecule images for visualisation.


In [3]:
# --- Ionisation and Fragmentation Functions ---

def perform_ionisation(smiles):
    """
    Perform an ionisation reaction on a molecule specified by a SMILES string.
    - Converts the SMILES string to an RDKit molecule.
    - Uses a SMARTS reaction to simulate ionisation.
    - Sanitises the resulting product, calculates its exact mass,
      and stores it in a DataFrame with intensity set to 1.
    Returns the original molecule, the ionised product, a DataFrame of peaks, and the set of products.
    """
    molecule = Chem.MolFromSmiles(smiles)
    if molecule is None:
        raise ValueError("Invalid SMILES provided for ionisation.")
    
    initial_mass = CalcExactMolWt(molecule)
    # Define the ionisation reaction using a SMARTS pattern
    ionisation_rxn = AllChem.ReactionFromSmarts('[O,N,S:2]>>[H][O+,N+,S+:2]')
    product_sets = ionisation_rxn.RunReactants((molecule,))
    
    if not product_sets:
        raise ValueError("Ionisation failed, no products generated.")
    
    # Select the first ionised product, ensure it is sanitised and compute its mass.
    ionised_product = product_sets[0][0]
    Chem.SanitizeMol(ionised_product)
    ionised_mass = CalcExactMolWt(ionised_product)
    
    # Create a simple DataFrame to store the ionised mass peak (with intensity=1)
    peaks_df = pd.DataFrame({'mz': [ionised_mass], 'intensity': [1]})
    
    return molecule, ionised_product, peaks_df, product_sets


def build_fragmentation_tree(molecule, product_sets):
    """
    Run a series of fragmentation reactions on the ionised molecule.
    Returns:
      - peaks_df: a DataFrame with simulated m/z peaks from fragmentation.
      - edges: a list of (parent_mass, fragment_mass) tuples representing the connections.
      - fragmentation_tree: a list of (parent_mass, fragment_mass, reaction_name) tuples indicating which reaction produced each fragment.
      - fragment_images: a list of images for the parent and fragment molecules for visualisation.
    """
    # Define a dictionary of fragmentation reactions using SMARTS patterns
    fragmentations = { 
        # CMF Reactions (Positive Ion Mode):
        'simple_inductive_cleavage': '[O+,N+,S+:2]-[C:1]>>[C+:1].[O,N,S:2]',
        'inductive_cleavage_heteroatom': '[O,N,S:1]-[C:2]-[*+:3]>>[O+,N+,S+:1]=[C+:2].[*:3]',
        'displacement_reaction_positive_a': '[O,N,S:1]-[C:2]-[C:3]-[*+:4]>>[C:3]1-[C:2]-[O+,N+,S:1]-1.[*:4]',
        'displacement_reaction_positive_b': '[O,N,S,C:1]=[C:2]-[C:3]-[*+:4]>>[O+,N+,S+,C+:1]-[C:2]=[C:3].[*:4]',
        'beta_hydrogen_removal_positive': '[O,N,S:1]-[C:2]-[C;H:3]-[C:4]-[*+:5]>>[O+,N+,S+;H1:1]-[C:2]-[C:3]=[C:4].[*:5]',
        'grob_wharton_fragmentation': '[O,N,S:1]-[C:2]-[C:3]-[C:4]-[*+:5]>>[O+,N+,S+:1]=[C:2].[C:3]=[C:4].[*:5]',
        # CMF Reactions (Negative Ion Mode):
        'alpha_elimination': '[C:1]-[C:2](=[O,S,N:3])-[O-,N-,S-:4]>>[C-:1].[C:2](=[O,S,N:3])-[O,N,S:4]',
        'gamma_elimination': '[O-,N-,S-:1]-[C:2]=[C:3]-[C:4]-[*:5]>>[O,N,S:1]=[C:2]-[C:3]=[C:4].[*-:5]',
        'epsilon_elimination': '[O-,N-,S-:1]-[C:2]=[C:3]-[C:4]=[C:5]-[C:6][*:7]>>[O,N,S:1]=[C:2]-[C:3]=[C:4]-[C:5]=[C:6].[*-:7]',
        'displacement_reaction_negative': '[O-,N-,S-:1]-[C:2]-[C:3]-[*:4]>>[O,N,S:1]1-[C:2]-[C:3]-1.[*-:4]',
        'beta_hydrogen_removal_negative': '[O-,N-,S-:1]-[C:2]-[C;H:3]-[C:4]-[*:5]>>[O,N,S;H1:1]-[C:2]-[C:3]=[C:4].[*-:5]',
        # CRF Reactions:
        'remote_hydrogen_rearrangement_a': '[O,N,S:1]-[C:2]-[C:3]-[H:4]>>[C:2]=[C:3].[H:4]-[O,N,S:1]',
        'remote_hydrogen_rearrangement_b': '[C:1]-[C:2]-[O:3]-[H:4]>>[C:1]-[H:4].[C:2]=[O:3]',
        'retro_diels_alder': '[C:1]1=[C:2]-[C:3]-[C:4]-[C:5]-[C:6]-1>>[C:6]=[C:1]-[C:2]=[C:3].[C:4]=[C:5]',
        'retro_ene': '[C:2]=[C:1]-[C:3]-[C:4]-[C:5]-[H:6]>>[H:6]-[C:2]-[C:3]=[C:1].[C:4]=[C:5]',
        'retro_heteroene': '[O,N,S:1]=[C:2]-[C:3]-[C:4]-[C:5]-[H:6]>>[H:6]-[O,N,S:1]-[C:2]=[C:3].[C:4]=[C:5]',
        'charge_remote_fragmentation': '[H:1]-[C:2]-[C:3]-[C:4]-[C:5]-[H:6]>>[C:2]=[C:3].[C:4]=[C:5].[H:1]-[H:6]',
        'aromatic_elimination': '[C:1]-[C:2]-[C:3]-[C:4]-[C:5]-[C:6]-[C:7]-[C:8]-[O,N,S:9]>>[C:1]=[C:8]-[O,N,S:9].[c:2]1[c:3][c:4][c:5][c:6][c:7]1',
        'pericyclic_shift': '[C:1]-[C:2]-[C:3]-[C:4]-[C:5]-[C:6]>>[C:1]=[C:2].[C:5]=[C:6].[C:3]=[C:4]',
        'pericyclic_1_3_shift': '[H:1]-[C:2]-[C:3]=[C:4]-[C:5]=[C:6]>>[C:2]=[C:3]-[C:4](-[H:1])=[C:5]-[C:6]',
        'pericyclic_1_5_shift': '[H:1]-[C:2]-[C:3]=[C:4]-[C:5]=[C:6]>>[C:2]=[C:3]-[C:4]=[C:5]-[C:6](-[H:1])', 
        'carbon_monoxide_elimination_a': '[C:1]1-[C:2]-[C:3]-[C:4]-[C:5]-[C:6](=O)-1>>[C:1]1-[C:2]-[C:3]-[C:4]-[C:5]-1.[C:6](#O)',
        'carbon_monoxide_elimination_b': '[C:6](#[O+1])-[C:1]-[C:2]-[C:3]-[C:4]-[C-:5]>>[C:1]1-[C:2]-[C:3]-[C:4]-[C:5]-1.[C:6](#O)',
        'radical_fragmentation': '[C:1]-[O,N,S:2]>>[C^1:1].[O^1,N^1,S^1:2]',
    }
    # Create RDKit Reaction objects from the SMARTS patterns defined above
    reactions = {name: AllChem.ReactionFromSmarts(smarts) for name, smarts in fragmentations.items()}
    
    # Initialise an empty DataFrame for m/z peaks and lists to record edges, the fragmentation tree, etc.
    peaks_df = pd.DataFrame({'mz': [], 'intensity': []})
    edges = []   # Stores (parent_mass, fragment_mass) tuples
    fragmentation_tree = []   # Stores (parent_mass, fragment_mass, reaction_name)
    processed_fragments = set()  # To avoid reprocessing the same fragment
    fragment_images = []   # For optional visualisation images
    
    # Iterate over all ionised products (each group in product_sets may have multiple products)
    for ionised_product_tuple in product_sets:
        for ionised_product in ionised_product_tuple:
            try:
                Chem.SanitizeMol(ionised_product)  # Ensure the molecule is valid
            except Exception:
                continue
            ionised_mass = CalcExactMolWt(ionised_product)
    
            # If this ionised mass is not already in our DataFrame, add it.
            if not any(abs(ionised_mass - mz) < 0.5 for mz in peaks_df['mz']):
                peaks_df = pd.concat([peaks_df, pd.DataFrame({'mz': [ionised_mass], 'intensity': [1]})], ignore_index=True)
    
            # Initialise a list with the current fragment (as tuple of molecule and its mass)
            new_fragments = [(ionised_product, ionised_mass)]
    
            # Iteratively process fragments
            while new_fragments:
                current_fragments = new_fragments
                new_fragments = []
    
                for parent, parent_mass in current_fragments:
                    # Use the SMILES string as a unique key for the parent fragment.
                    parent_key = Chem.MolToSmiles(parent)
                    if parent_key in processed_fragments:
                        continue
                    processed_fragments.add(parent_key)
    
                    # For each reaction, try to generate fragment products from the parent
                    for loss_name, reaction in reactions.items():
                        try:
                            product_sets_frag = reaction.RunReactants((parent,))
                            if product_sets_frag:
                                for product_tuple in product_sets_frag:
                                    for fragment in product_tuple:
                                        try:
                                            Chem.SanitizeMol(fragment)
                                        except Exception:
                                            continue
                                        fragment_mass = CalcExactMolWt(fragment)
                                        # Add the fragment only if its m/z is not already included
                                        if not any(abs(fragment_mass - mz) < 0.5 for mz in peaks_df['mz']):
                                            peaks_df = pd.concat(
                                                [peaks_df, pd.DataFrame({'mz': [fragment_mass], 'intensity': [1]})],
                                                ignore_index=True
                                            )
                                            new_fragments.append((fragment, fragment_mass))
                                            edges.append((parent_mass, fragment_mass))
                                            fragmentation_tree.append((parent_mass, fragment_mass, loss_name))
                                            
                                            # Generate images for the parent and fragment for visualisation purposes
                                            parent_img = Draw.MolToImage(parent)
                                            fragment_img = Draw.MolToImage(fragment)
                                            fragment_images.append(
                                                (parent_img, fragment_img, parent_mass, fragment_mass, loss_name)
                                            )
                        except Exception as e:
                            print(f"Reaction {loss_name} failed on {Chem.MolToSmiles(parent)}: {e}")
                            continue
    return peaks_df, edges, fragmentation_tree, fragment_images


# Step 4: Experimental Peak Matching

**Purpose:**  
Match the simulated m/z peaks from the fragmentation process with the experimental spectrum data.

**What Happens:**
- **Matching Process:**  
  - The function `match_peaks(peaks_df, experimental_df, tolerance=1)` uses a KDTree to efficiently search for the nearest experimental m/z peak for each simulated peak within a specified tolerance.
- **Intensity Assignment:**  
  - If a match is found, the observed intensity from the experimental data is assigned to the corresponding simulated peak; if not, the observed intensity is set to zero.
- **Outcome:**  
  The simulated peaks DataFrame (`peaks_df`) gains an extra column (`observed_intensity`), aligning simulated data with experimental observations for further optimisation.


In [4]:
def match_peaks(peaks_df, experimental_df, tolerance=1):
    """
    For each simulated peak in peaks_df, find the closest experimental peak
    within a given m/z tolerance using a KDTree.
    """
    # Depending on availability, use the "normalised_intensity" column from experimental data;
    # otherwise use "intensity"
    if "normalised_intensity" in experimental_df.columns:
        experimental_mz = experimental_df["m/z"].values
        experimental_intensities = experimental_df["normalised_intensity"].values
    else:
        experimental_mz = experimental_df["m/z"].values
        experimental_intensities = experimental_df["intensity"].values
        
    # Filter out experimental peaks with zero intensity
    valid_indices = experimental_intensities > 0
    filtered_mz = experimental_mz[valid_indices]
    filtered_intensities = experimental_intensities[valid_indices]
    
    # Build a KDTree to allow fast nearest-neighbour search on the m/z values
    exp_tree = KDTree(filtered_mz.reshape(-1, 1))
    
    # For each simulated peak, find its closest experimental peak within the given tolerance
    matched_peaks = {}
    for sim_peak in peaks_df["mz"].values:
        indices = exp_tree.query_ball_point([sim_peak], r=tolerance)
        if indices:  # If at least one peak is within tolerance, find the best match
            differences = [abs(filtered_mz[idx] - sim_peak) for idx in indices]
            best_idx = indices[np.argmin(differences)]
            matched_peaks[sim_peak] = filtered_intensities[best_idx]
        else:
            matched_peaks[sim_peak] = 0  # No match found: assign intensity 0
    
    # Map the matched experimental intensities back onto the simulated peaks DataFrame
    peaks_df["observed_intensity"] = peaks_df["mz"].map(matched_peaks)
    print("Matched peaks:", matched_peaks)
    return peaks_df

# Step 5: Genetic Algorithm Optimisation

**Purpose:**  
Optimise the transition probabilities associated with the fragmentation reactions so that the simulated spectrum best matches the experimental spectrum.

**What Happens:**
- **Fitness Evaluation:**  
  - The `fitness_function()` computes a score by comparing calculated intensities (obtained by propagating optimised transition probabilities through the fragmentation tree) with the observed experimental intensities.
- **Genetic Algorithm Components:**  
  - **Population Initialisation:**  
    `initialise_population()` generates random candidate solutions (each representing a set of transition probabilities for fragmentation edges).
  - **Selection:**  
    `select_parents()` chooses parent candidates based on their fitness using a roulette-wheel selection method.
  - **Crossover and Mutation:**  
    `crossover()` combines parent solutions, while `mutate()` introduces random variations, ensuring diversity in the population.
- **Evolution Process:**  
  - `genetic_algorithm()` repeatedly evaluates, selects, crosses over, and mutates individuals over numerous generations (or stops early if improvements stall) to obtain optimised transition probabilities.
- **Outcome:**  
  This step outputs optimised transition probabilities along with a fitness score that indicates how closely the simulated spectrum aligns with the experimental data.


In [5]:
# --- Genetic Algorithm Optimisation Functions ---

# Define global parameters for the genetic algorithm
population_size = 50  # Number of individuals per generation
generations = 1000    # Maximum number of generations for evolution
mutation_rate = 0.1   # Mutation chance for each transition probability
tolerance_fitness = 0.05  # Allowed error tolerance for intensity matching in fitness function

def fitness_function(transition_probs, edges, peaks_df, tolerance=tolerance_fitness):
    """
    Compare predicted intensities (based on the fragmentation tree propagation
    using the given transition probabilities) to the observed experimental intensities.
    Returns a fitness score that rewards lower error.
    """
    # Round the m/z values in the DataFrame to a fixed precision
    peaks_df['mz'] = peaks_df['mz'].round(6)
    edges = [(round(i, 6), round(j, 6)) for i, j in edges]
    
    # Create a directed graph from the fragmentation edges
    G = nx.DiGraph()
    G.add_edges_from(edges)
    
    # Do a topological sort to process the precursor before its fragments
    topo_order = list(nx.topological_sort(G))
    
    # Initialise a dictionary for calculated intensities (starting with zero for all m/z)
    calculated_intensities = {mz: 0 for mz in peaks_df['mz'].values}
    precursor_node = topo_order[0]
    if precursor_node not in calculated_intensities:
        print(f"Error: Precursor node {precursor_node} not in peaks_df['mz']")
        return -float('inf')
    
    # Assign the precursor intensity to 1
    calculated_intensities[precursor_node] = 1  
    
    # Normalise each node's transition probabilities so their sum does not exceed 1
    for node in topo_order:
        outgoing_edges = [(i, j) for (i, j) in edges if i == node]
        total_prob = sum(transition_probs.get((node, j), 0) for _, j in outgoing_edges)
        if total_prob > 1:
            for _, j in outgoing_edges:
                transition_probs[(node, j)] /= total_prob
    
    # Propagate the intensity from the precursor down the tree
    for node in topo_order:
        outgoing_edges = [(i, j) for (i, j) in edges if i == node]
        total_prob = sum(transition_probs.get((node, j), 0) for _, j in outgoing_edges)
        total_trans_intensity = calculated_intensities.get(node, 0) * total_prob
        
        for (i, j) in edges:
            if i == node:
                if j not in calculated_intensities:
                    calculated_intensities[j] = 0
                transition_intensity = calculated_intensities[i] * transition_probs.get((i, j), 0)
                calculated_intensities[j] += transition_intensity
        
        # Remove the transiting intensity to simulate loss from the parent node
        calculated_intensities[node] -= total_trans_intensity
    
    # Compute a fitness score by comparing calculated intensities to observed ones
    score = 0
    for mz in peaks_df['mz']:
        observed_intensity = peaks_df.loc[peaks_df['mz'] == mz, 'observed_intensity'].values[0]
        calculated_intensity = calculated_intensities.get(mz, 0)
    
        # Compute relative error if observed intensity is non-zero
        if observed_intensity > 0:
            error = abs(calculated_intensity - observed_intensity) / observed_intensity
        else:
            error = 0 if calculated_intensity == 0 else 1
    
        # Adjust score depending on how close the error is to zero
        if error <= tolerance:
            score += 1 - error
        else:
            score -= error
    return score


def select_parents(population, fitness_scores):
    """
    Select two parent individuals using roulette-wheel selection based on fitness scores.
    Adjust fitness values to be positive if necessary.
    """
    min_fitness = min(fitness_scores)
    if min_fitness < 0:
        fitness_scores = [score - min_fitness + 1 for score in fitness_scores]
    
    total_fitness = sum(fitness_scores)
    selection_probs = [score / total_fitness if total_fitness != 0 else 1/len(fitness_scores) for score in fitness_scores]
    return np.random.choice(population, size=2, p=selection_probs, replace=False)


def initialise_population(edges):
    """
    Create an initial population of random candidate solutions.
    Each candidate is a dictionary mapping each edge to a random transition probability between 0 and 1.
    """
    population = []
    for _ in range(population_size):
        individual = {edge: random.uniform(0, 1) for edge in edges}
        population.append(individual)
    return population


def crossover(parent1, parent2):
    """
    Perform crossover by randomly choosing for each edge a value from either parent.
    """
    return {key: parent1[key] if random.random() < 0.5 else parent2[key] for key in parent1}


def mutate(individual):
    """
    Mutate an individual's transition probabilities by applying a small random change.
    Ensures that probabilities remain between 0 and 1.
    """
    for key in individual:
        if random.random() < mutation_rate:
            individual[key] += random.uniform(-0.1, 0.1)
            individual[key] = max(0, min(1, individual[key]))
    return individual


def genetic_algorithm(edges, peaks_df, patience=100, epsilon=1e-6):
    """
    Run the genetic algorithm to optimise transition probabilities.
    Evolves the population over a specified number of generations or stops early if no improvement is seen.
    Returns the best candidate solution found.
    """
    population = initialise_population(edges)
    best_fitness = -float('inf')
    no_improvement_count = 0

    for generation in range(generations):
        # Evaluate fitness for each candidate solution in the population
        fitness_scores = [fitness_function(ind, edges, peaks_df) for ind in population]
        current_best_fitness = max(fitness_scores)
        
        # Check for improvement; if no significant improvement, increase the no-improvement counter.
        if current_best_fitness > best_fitness + epsilon:
            best_fitness = current_best_fitness
            no_improvement_count = 0
        else:
            no_improvement_count += 1
        
        # Early stopping if improvements have stalled
        if no_improvement_count >= patience:
            print(f"Early stopping after {generation} generations with best fitness: {best_fitness}")
            break
        
        new_population = []
        # Create new candidate solutions using selection, crossover, and mutation.
        for _ in range(population_size // 2):
            parent1, parent2 = select_parents(population, fitness_scores)
            offspring1 = mutate(crossover(parent1, parent2))
            offspring2 = mutate(crossover(parent1, parent2))
            new_population.extend([offspring1, offspring2])
        population = new_population

    best_individual = max(population, key=lambda ind: fitness_function(ind, edges, peaks_df))
    return best_individual



# Step 6: Fragmentation Trees

**Purpose:**  
Create a visual representation of the fragmentation process by constructing and plotting the fragmentation tree.

**What Happens:**
- **Tree Construction:**  
  - The function `plot_fragmentation_tree(peaks_df, fragmentation_tree, transition_probabilities)` uses the parent–fragment relationships (edges) and corresponding m/z values to build a directed graph using NetworkX.
  - Nodes in the graph represent m/z peaks, and edges represent fragmentation reactions.
- **Hierarchical Layout and Annotation:**  
  - The graph is laid out hierarchically, typically with the precursor ion (highest m/z) as the root.
  - Edges are annotated with the difference in m/z (Δm/z) and may optionally display reaction type or optimised transition probabilities.
- **Outcome:**  
  The fragmentation tree plot provides an intuitive visual overview of the fragmentation pathway and highlights key mass differences resulting from specific chemical reactions.


In [6]:
def plot_fragmentation_tree(peaks_df, fragmentation_tree, transition_probabilities):
    """
    Plot the fragmentation tree with Δm/z values and hierarchical layout.
    Constructs a directed graph (using NetworkX) where nodes represent m/z peaks and edges represent fragmentation reactions.
    """
    G = nx.DiGraph()
    node_labels = {}
    node_mz_map = {}
    
    # For each recorded fragmentation, find corresponding nodes by matching m/z values (tolerance 0.01)
    for parent_mass, child_mass, reaction in fragmentation_tree:
        parent_node = peaks_df[peaks_df['mz'].apply(lambda x: abs(x - parent_mass) < 0.01)].index[0]
        child_node = peaks_df[peaks_df['mz'].apply(lambda x: abs(x - child_mass) < 0.01)].index[0]
        
        # Add parent node if not already in the graph, and label it with its m/z value
        if parent_node not in G:
            G.add_node(parent_node)
            node_labels[parent_node] = f"m/z {parent_mass:.4f}"
            node_mz_map[parent_node] = parent_mass
        # Similarly add child node
        if child_node not in G:
            G.add_node(child_node)
            node_labels[child_node] = f"m/z {child_mass:.4f}"
            node_mz_map[child_node] = child_mass
        
        # Create a directed edge from the parent to the fragment (child)
        G.add_edge(parent_node, child_node, reaction=reaction)
    
    # Identify the precursor node (assumed to be the one with highest m/z value)
    precursor_node = peaks_df['mz'].idxmax()
    if precursor_node in G:
        node_depths = nx.single_source_shortest_path_length(G, precursor_node)
    else:
        print(f"Warning: Precursor node {precursor_node} is not in the graph.")
        node_depths = {node: 0 for node in G.nodes()}
    
    # For any nodes not reached, set their depth to one more than the maximum
    for node in G.nodes():
        if node not in node_depths:
            node_depths[node] = max(node_depths.values(), default=0) + 1
    
    # Group nodes by their depth level for layout
    levels = defaultdict(list)
    for node, depth in node_depths.items():
        levels[depth].append(node)
    
    # Define a custom layout: space nodes horizontally and vertically according to their level
    pos = {}
    horizontal_spacing = 2.0
    vertical_spacing = 1.5
    for depth, nodes in levels.items():
        num_nodes = len(nodes)
        for i, node in enumerate(nodes):
            pos[node] = (i * horizontal_spacing - num_nodes * horizontal_spacing / 2, -depth * vertical_spacing)
    
    # Annotate edges with the difference in m/z (Δm/z) and print any associated transition probability
    edge_labels = {}
    for (parent, child), (n, m, loss_name) in zip(G.edges(), fragmentation_tree):
        parent_mass = node_mz_map[parent]
        child_mass = node_mz_map[child]
        mz_diff = abs(parent_mass - child_mass)
        transition_prob = transition_probabilities.get((parent_mass, child_mass), 0)
        print(f"Transition prob for ({parent_mass}, {child_mass}): {transition_prob}")
        edge_labels[(parent, child)] = f"Δm/z {mz_diff:.4f}"
    
    # Draw the graph using NetworkX and matplotlib
    plt.figure(figsize=(30, 25))
    nx.draw_networkx_nodes(G, pos, node_size=1500, node_color='lightblue', edgecolors="black")
    nx.draw_networkx_edges(G, pos, arrowstyle='-|>', arrowsize=15, edge_color='gray', alpha=0.5)
    nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=15, font_color='black')
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=15)
    plt.title("Fragmentation Tree with Δm/z Differences", fontsize=14)
    plt.show()

# Step 7: Qualitative Spectrum Comparison

**Purpose:**  
Visually compare the optimised simulated mass spectrum with the experimental mass spectrum to assess their similarity.

**What Happens:**
- **Simulated Spectrum Plot:**  
  - The `plot_mass_spectrum(peaks_df)` function draws vertical lines at each m/z value in the simulated spectrum, with heights corresponding to optimised intensities.
  - Peaks with intensities above a certain threshold (e.g. intensity > 10) are annotated with their m/z values.
- **Side-by-Side Comparison:**  
  - The `plot_comparison_spectra(peaks_df, experimental_df)` function creates a two-panel plot:
    - The left panel displays the experimental spectrum.
    - The right panel displays the optimised simulated spectrum.
  - Both panels share the same m/z (x-axis) scale to facilitate direct visual comparisons.
- **Outcome:**  
  These visualisations allow for a qualitative assessment of how well the optimised simulation reproduces the key features of the experimental data.


In [7]:
def plot_mass_spectrum(peaks_df):
    """
    Plot a mass spectrum using the optimised intensities from peaks_df.
    Each m/z is plotted as a vertical line whose height corresponds to its intensity.
    Peaks with intensity greater than 10 are annotated with their m/z value.
    """
    mz_values = peaks_df["mz"].values
    optimised_intensities = peaks_df["observed_intensity"].values
    
    plt.figure(figsize=(12, 6))
    plt.vlines(mz_values, ymin=0, ymax=optimised_intensities, color='blue', linewidth=1.5,
               label="Optimised Intensities")
    
    # Annotate peaks with intensity > 10
    for mz, intensity in zip(mz_values, optimised_intensities):
        if intensity > 10:
            plt.annotate(f"{int(mz)}", xy=(mz, intensity), xytext=(mz, intensity + 2),
                         fontsize=8, ha="center", va="bottom", color="black")
    
    plt.xlabel("m/z", fontsize=14)
    plt.ylabel("Intensity", fontsize=14)
    plt.title("Optimised Simulated Mass Spectrum", fontsize=16, y=1.05)
    plt.xticks(fontsize=12)
    plt.yticks(fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.tight_layout()
    plt.show()


def plot_comparison_spectra(peaks_df, experimental_df):
    """
    Plot the experimental mass spectrum and the optimised simulated spectrum side by side.
    Uses shared y-axis scaling and annotates peaks exceeding a threshold.
    """
    exp_mz_values = experimental_df["m/z"].values
    exp_intensities = experimental_df["normalised_intensity"].values
    opt_mz_values = peaks_df["mz"].values
    opt_intensities = peaks_df["observed_intensity"].values
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 6), sharey=True)
    
    # Plot experimental spectrum on the left
    axes[0].vlines(exp_mz_values, ymin=0, ymax=exp_intensities, color='red', linewidth=1.5)
    axes[0].set_xlabel("m/z", fontsize=12)
    axes[0].set_ylabel("Intensity", fontsize=12)
    axes[0].set_title("Experimental Mass Spectrum", fontsize=14)
    axes[0].grid(True, linestyle='--', alpha=0.5)
    for mz, intensity in zip(exp_mz_values, exp_intensities):
        if intensity > 10:
            axes[0].annotate(f"{int(mz)}", (mz, intensity + 2), fontsize=8, ha="center", va="bottom", color="black")
    
    # Plot simulated spectrum on the right
    axes[1].vlines(opt_mz_values, ymin=0, ymax=opt_intensities, color='blue', linewidth=1.5)
    axes[1].set_xlabel("m/z", fontsize=12)
    axes[1].set_title("Simulated Mass Spectrum", fontsize=14)
    axes[1].grid(True, linestyle='--', alpha=0.5)
    for mz, intensity in zip(opt_mz_values, opt_intensities):
        if intensity > 10:
            axes[1].annotate(f"{int(mz)}", (mz, intensity + 2), fontsize=8, ha="center", va="bottom", color="black")
    
    # Ensure both subplots share the same x-axis limits for comparison
    axes[1].set_xlim(axes[0].get_xlim())
    plt.tight_layout()
    plt.show()


# Step 8: Quantitative Spectrum Comparison

**Purpose:**  
Quantitatively evaluate the similarity between the experimental and simulated spectra by calculating a numerical similarity measure.

**What Happens:**
- **Normalisation:**  
  - `normalize_spectrum(spectrum)` scales the spectrum so that the maximum intensity is 1, ensuring comparability between spectra.
- **Back-calculation:**  
  - `calculate_intensities_from_tree()` and `back_calculate_spectrum_from_tree()` use the optimised transition probabilities to propagate the precursor intensity through the fragmentation tree to re-calculate expected fragment intensities.
- **Alignment and Similarity Calculation:**  
  - `align_spectra()` merges the experimental and re-calculated spectra based on m/z values.
  - `calculate_cosine_similarity()` computes the cosine similarity between the intensity vectors of the two spectra, yielding a quantitative similarity score.
- **Outcome:**  
  The cosine similarity score provides a numeric value (ranging from 0 to 1) that quantifies the match between the simulated and experimental spectra.


In [8]:
# --- Quantitative Spectrum Comparison Functions ---

def normalize_spectrum(spectrum):
    """
    Normalise the spectrum so that the maximum intensity becomes 1.
    Copies the DataFrame to avoid modifying the original.
    """
    spectrum = spectrum.copy()
    max_intensity = spectrum['observed_intensity'].max()
    if max_intensity > 0:
        spectrum['observed_intensity'] /= max_intensity
    return spectrum


def calculate_intensities_from_tree(tree_edges, precursor_intensity, transition_probs):
    """
    Propagate the precursor intensity through the fragmentation tree using the optimised
    transition probabilities. Returns a dictionary mapping each node's m/z to its calculated intensity.
    """
    intensities = {tree_edges[0][0]: precursor_intensity}
    for (u, v) in tree_edges:
        if u in intensities:
            intensities[v] = intensities[u] * transition_probs.get((u, v), 0)
    return intensities


def back_calculate_spectrum_from_tree(tree_nodes, tree_edges, peaks_df, transition_probs):
    """
    Using the fragmentation tree and optimised transition probabilities, back-calculate the spectrum.
    It retrieves the precursor intensity from peaks_df (using the m/z value), calculates propagated
    intensities for each node, normalises the resulting spectrum, and returns it.
    """
    precursor_intensity = peaks_df.loc[tree_nodes[0], 'observed_intensity']
    calculated_intensities = calculate_intensities_from_tree(tree_edges, precursor_intensity, transition_probs)
    # Select rows from peaks_df corresponding to the nodes from the tree.
    back_calculated_spectrum = peaks_df.loc[tree_nodes][['mz']].copy()
    # Map the calculated intensities into the DataFrame (using the index as key)
    back_calculated_spectrum['observed_intensity'] = back_calculated_spectrum.index.map(lambda x: calculated_intensities.get(x, 0))
    back_calculated_spectrum = normalize_spectrum(back_calculated_spectrum)
    return back_calculated_spectrum


def align_spectra(original_spectrum, calculated_spectrum):
    """
    Aligns the experimental (original) and back-calculated spectra by merging on the m/z values.
    Rows not matching are filled with 0.
    """
    merged_spectrum = pd.merge(original_spectrum, calculated_spectrum, on='mz', how='outer', 
                               suffixes=('_orig', '_calc')).fillna(0)
    return merged_spectrum


def calculate_cosine_similarity(merged_spectrum):
    """
    Calculates the cosine similarity between the intensities of the original and calculated spectra.
    This provides a quantitative metric of similarity between the two spectra.
    """
    orig = merged_spectrum['observed_intensity_orig'].values
    calc = merged_spectrum['observed_intensity_calc'].values
    # Only consider rows where at least one intensity is nonzero
    mask = (orig != 0) | (calc != 0)
    if np.sum(mask) == 0:
        return 0
    orig_nonzero = orig[mask].reshape(1, -1)
    calc_nonzero = calc[mask].reshape(1, -1)
    similarity = cosine_similarity(orig_nonzero, calc_nonzero)[0, 0]
    return similarity


def calculate_nodes_from_edges(edges):
    """
    Extract and return a list of unique nodes (m/z values) from the list of edges.
    """
    nodes = set()
    for u, v in edges:
        nodes.add(u)
        nodes.add(v)
    return list(nodes)

# Step 9: Execute the Model

**Purpose:**  
Coordinate and run the entire computational pipeline, from data loading to final spectrum comparison.

**What Happens:**
- **Data Loading and Compound Selection:**  
  - The `main()` function loads the compound data from a specified JSON file and randomly selects (or selects by name) one compound for further analysis.
- **Ionisation & Fragmentation:**  
  - The selected compound's SMILES string is used to generate an ionised molecule and build its fragmentation tree.
- **Peak Matching:**  
  - If experimental spectrum data is available, simulated peaks are matched to experimental peaks.
- **Optimisation:**  
  - The genetic algorithm is run to optimise the transition probabilities, and the best solution is printed along with its fitness score.
- **Visualisation:**  
  - Multiple plots are generated to visualise the optimised mass spectrum, compare it to the experimental spectrum, and display the fragmentation tree.
- **Quantitative Comparison:**  
  - The cosine similarity between the experimental and back-calculated spectra is computed and printed.
- **Outcome:**  
  Executing `main()` drives the entire workflow, providing both visual and numeric outputs that assess the model’s performance in reproducing the experimental mass spectrum.


In [None]:
# --- Main Pipeline Execution ---

def main():
    """
    Execute the complete computational pipeline:
      1. Load and pre-process compound data.
      2. Perform ionisation and fragment the molecule.
      3. Match simulated peaks with experimental data.
      4. Optimise transition probabilities using a genetic algorithm.
      5. Visualise the simulated spectrum and fragmentation tree.
      6. Compute quantitative similarity (cosine similarity) between spectra.
    """
    # 1. Data Loading and Preprocessing
    compound_file = "GNPS-LIBRARY.json"  # Adjust the file path as necessary
    compound_data = load_compound_data(compound_file)
    
    # Select a compound at random (or modify to select a specific compound)
    compound_name = random.choice(list(compound_data.keys()))
    print(f"Selected compound: {compound_name}")
    compound = compound_data[compound_name]
    spectra_df = compound["spectra"]
    smiles = compound["metadata"].get("smiles")
    
    # 2. Ionisation and Fragmentation
    molecule, ionised_product, peaks_df, product_sets = perform_ionisation(smiles)
    peaks_df, edges, fragmentation_tree, fragment_images = build_fragmentation_tree(molecule, product_sets)
    
    # 3. Match simulated peaks with experimental peaks (if experimental data exists)
    if not spectra_df.empty:
        # Assume the experimental spectrum has its intensity column labelled as "normalised_intensity"
        spectra_df = spectra_df.rename(columns={"intensity": "normalised_intensity"})
        peaks_df = match_peaks(peaks_df, spectra_df)
    else:
        print("No experimental spectrum available; proceeding with simulated data.")
        peaks_df["observed_intensity"] = peaks_df["intensity"]
    
    # 4. Genetic Algorithm Optimisation of Transition Probabilities
    transition_probabilities = genetic_algorithm(edges, peaks_df)
    print(f"Optimised transition probabilities: {transition_probabilities}")
    final_fitness_score = fitness_function(transition_probabilities, edges, peaks_df)
    print("Final fitness score:", final_fitness_score)
    
    # 5. Visualisation
    plot_mass_spectrum(peaks_df)
    plot_comparison_spectra(peaks_df, spectra_df)
    plot_fragmentation_tree(peaks_df, fragmentation_tree, transition_probabilities)
    
    # 6. Spectrum Similarity Calculation (Quantitative Comparison)
    selected_nodes = calculate_nodes_from_edges(edges)
    # Normalise the original spectrum taken from peaks_df (experimental intensities)
    original_spectrum = normalize_spectrum(peaks_df[['mz', 'observed_intensity']].copy())
    back_calculated_spectrum = back_calculate_spectrum_from_tree(selected_nodes, edges, peaks_df, transition_probabilities)
    merged_spectrum = align_spectra(original_spectrum, back_calculated_spectrum)
    cosine_sim = calculate_cosine_similarity(merged_spectrum)
    print(f"Cosine Similarity between original and back-calculated spectra (normalised): {cosine_sim:.4f}")

if __name__ == "__main__":
    main()

[10:47:34] ERROR: 

[10:47:34] Explicit valence for atom # 49 Na, 2, is greater than permitted
[10:47:35] SMILES Parse Error: syntax error while parsing: InChI=1S/C16H21NO2/c1-2-3-4-5-6-11-14-16(19)15(18)12-9-7-8-10-13(12)17-14/h7-10,19H,2-6,11H2,1H3,(H,17,18)
[10:47:35] SMILES Parse Error: Failed parsing SMILES ' InChI=1S/C16H21NO2/c1-2-3-4-5-6-11-14-16(19)15(18)12-9-7-8-10-13(12)17-14/h7-10,19H,2-6,11H2,1H3,(H,17,18)' for input: ' InChI=1S/C16H21NO2/c1-2-3-4-5-6-11-14-16(19)15(18)12-9-7-8-10-13(12)17-14/h7-10,19H,2-6,11H2,1H3,(H,17,18)'
[10:47:35] ERROR: 

[10:47:37] Can't kekulize mol.  Unkekulized atoms: 10 11 12 14 16
[10:47:37] Can't kekulize mol.  Unkekulized atoms: 10 11 12 14 16
[10:47:38] SMILES Parse Error: unclosed ring for input: 'OC1=CC(C(OC)=O)=C(OC2=CC(C)=CC(O)=C2C(O)=O)C(OC)=C2'
[10:47:38] SMILES Parse Error: unclosed ring for input: 'O=C1C2=C(C=C(C)C=C2O)OC3=CC(O)=CC(C(OC)=O)=C32'
[10:47:38] SMILES Parse Error: unclosed ring for input: 'O=C([C@H](CC)C)O[C@H]1CCC=C2C1[