# Analyze CPP_V values from MD trajectories

In [4]:
import os
import glob
import itertools

import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import MDAnalysis as mda
import tqdm
import joblib
from joblib import Parallel, delayed
from joblib._parallel_backends import LokyBackend
from joblib.parallel import Parallel as JoblibParallel
import seaborn as sns
from scipy.stats import linregress, spearmanr

In [6]:
# User-defined variables
NUM_CPU = 64 # number of CPU cores to allocate to task

PROTONATED_MD_TRAJECTORY_BASE_DIR = "data/MD_trajectory_protonated" # half-protonated systems
NEUTRAL_MD_TRAJECTORY_BASE_DIR = "data/MD_trajectory_neutral" # fully-neutral systems

PROTONATED_OUTPUT_FILE = "results/CPP_V_results_protonated.csv" # where to save CPP results for protonated trajectories
NEUTRAL_OUTPUT_FILE = "results/CPP_V_results_neutral.csv" # where to save CPP results for neutral trajectories

In [None]:
protonated_trajectory_names = [
    name for name in os.listdir(PROTONATED_MD_TRAJECTORY_BASE_DIR)
    if os.path.isdir(os.path.join(PROTONATED_MD_TRAJECTORY_BASE_DIR, name))
]
protonated_trajectory_dirs = [
    os.path.join(PROTONATED_MD_TRAJECTORY_BASE_DIR, name, "openmm")
    for name in protonated_trajectory_names
]

# Filter neutral system directories that contain "YX" or "LM"
neutral_trajectory_names = [
    name for name in os.listdir(NEUTRAL_MD_TRAJECTORY_BASE_DIR)
    if os.path.isdir(os.path.join(NEUTRAL_MD_TRAJECTORY_BASE_DIR, name)) and ("LM" in name)#("YX" in name or "LM" in name)
]
neutral_trajectory_dirs = [
    os.path.join(NEUTRAL_MD_TRAJECTORY_BASE_DIR, name, "openmm")
    for name in neutral_trajectory_names
]

In [None]:
# Load ionizable lipid atom dictionary for LM_2019 LNPs
il_atom_dict = pd.read_csv("/data/il_atom_dict_LM_2019.csv")

In [5]:
# Chunk to define functions relevant to computing CPP_V

# Define excluded residues
excluded_residues = {"CLA", "CHL1", "TIP3", "SOD", "DOPE", "DSPC", "DOTAP"}

# Function to identify residue names
def identify_residues(u, excluded_residues):
    residue_names = set(res.resname for res in u.residues)
    filtered_residues = [res for res in residue_names if res not in excluded_residues]
    # If only one residue, treat it as neutral and leave protonated blank
    if len(filtered_residues) == 1:
        return None, filtered_residues[0]
    # Find potential protonated and neutral residues
    potential_protonated = [res for res in filtered_residues if "H" in res]
    potential_neutral = [res for res in filtered_residues if res not in potential_protonated]
    # Choose protonated residue as the one with more "H" characters
    protonated_residue = max(potential_protonated, key=lambda res: res.count("H"), default=None)
    # Choose neutral residue as any other residue (fallback to None if empty)
    neutral_residue = next((res for res in filtered_residues if res != protonated_residue), None)
    return protonated_residue, neutral_residue

# Function to remove outliers using 1.5*IQR
def remove_outliers(data):
    q1 = np.percentile(data, 25)
    q3 = np.percentile(data, 75)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    return data[(data >= lower_bound) & (data <= upper_bound)]

def compute_cpp_for_residue(dcd_files, topology_file, residue_name, head_atoms, anchor_atom, tail1_ends, tail2_ends, tail3_ends):
    """
     Compute the CPP_V metric for a specific residue across multiple DCD files.

     Parameters:
         dcd_files (list): List of DCD file paths.
         topology_file (str): Path to the topology file.
         residue_name (str): Name of the residue to analyze.
         head_atoms (list): List of head atom names.
         anchor_atom (list): List containing the anchor atom name.
         tail1_ends (list): List of tail1 end atom names.
         tail2_ends (list): List of tail2 end atom names.
         tail3_ends (list): List of tail3 end atom names.

     Returns:
         np.ndarray: Final concatenated CPP array across all DCD files.
     """
    cpp_arrays = []
    for dcd_file in dcd_files:
        u = mda.Universe(topology_file, dcd_file)
        n_frames = len(u.trajectory)
        residue_group = u.select_atoms(f"resname {residue_name}")
        n_molecules = len(residue_group.residues)
    
        coordinates_array = []  # List to store coordinates per molecule per frame
    
        for frame_idx, ts in enumerate(u.trajectory):
            frame_coords = []
            for mol_idx, residue in enumerate(residue_group.residues):
                atom_group = residue.atoms
                all_atom_names = [atom.name for atom in atom_group]
    
                # Collect coordinates for all atoms in the molecule
                molecule_coords = np.full((len(all_atom_names), 3), np.nan)  # Initialize with NaN
                for atom_idx, atom_name in enumerate(all_atom_names):
                    atom = atom_group.select_atoms(f"name {atom_name}")
                    if len(atom) == 1:
                        molecule_coords[atom_idx, :] = atom.positions[0]
                frame_coords.append(molecule_coords)
            coordinates_array.append(frame_coords)
    
        coordinates_array = np.array(coordinates_array)  # Convert to numpy array
    
        cpp_array = np.zeros((n_frames, n_molecules))
        for frame_idx in range(n_frames):
            for mol_idx in range(n_molecules):
                molecule_coords = coordinates_array[frame_idx, mol_idx]
                all_atom_names = [atom.name for atom in residue_group.residues[mol_idx].atoms]
    
                # Ensure anchor_atom exists in all_atom_names
                if anchor_atom[0] not in all_atom_names:
                    cpp_array[frame_idx, mol_idx] = np.nan
                    continue
    
                anchor_pos = molecule_coords[all_atom_names.index(anchor_atom[0])]
    
                # **Filter out NaN values before calculating distances**
                valid_tail1_ends = [t for t in tail1_ends if t in all_atom_names]
                valid_tail2_ends = [t for t in tail2_ends if t in all_atom_names]
                valid_tail3_ends = [t for t in tail3_ends if t in all_atom_names]
    
                tail1_distances = np.linalg.norm(
                    molecule_coords[[all_atom_names.index(t) for t in valid_tail1_ends]] - anchor_pos, axis=1
                ) if valid_tail1_ends else np.array([])
    
                tail2_distances = np.linalg.norm(
                    molecule_coords[[all_atom_names.index(t) for t in valid_tail2_ends]] - anchor_pos, axis=1
                ) if valid_tail2_ends else np.array([])
    
                tail3_distances = np.linalg.norm(
                    molecule_coords[[all_atom_names.index(t) for t in valid_tail3_ends]] - anchor_pos, axis=1
                ) if valid_tail3_ends else np.array([])
    
                # Skip if any tail distances are missing
                if len(tail1_distances) == 0 and len(tail2_distances) == 0 and len(tail3_distances) == 0:
                    cpp_array[frame_idx, mol_idx] = np.nan
                    continue
    
                #lc = 0.3 + np.mean([np.max(tail1_distances), np.max(tail2_distances), np.max(tail3_distances)])
                lc = 0.3 + np.mean([np.max(dist) for dist in [tail1_distances, tail2_distances, tail3_distances] if dist.size > 0])
    
                # Compute head and tail radii
                head_xy_coords = molecule_coords[[all_atom_names.index(t) for t in head_atoms if t in all_atom_names]][:, :2]
                if len(head_xy_coords) < 2:
                    cpp_array[frame_idx, mol_idx] = np.nan
                    continue
                max_head_distance = np.max(np.linalg.norm(head_xy_coords[:, np.newaxis] - head_xy_coords[np.newaxis, :], axis=-1))
                head_radius = (max_head_distance + 0.3) / 2
    
                tail_xy_coords = molecule_coords[[all_atom_names.index(t) for t in valid_tail1_ends + valid_tail2_ends + valid_tail3_ends]][:, :2]
                if len(tail_xy_coords) < 2:
                    cpp_array[frame_idx, mol_idx] = np.nan
                    continue
                max_tail_distance = np.max(np.linalg.norm(tail_xy_coords[:, np.newaxis] - tail_xy_coords[np.newaxis, :], axis=-1))
                tail_radius = (max_tail_distance + 0.3) / 2
    
                # Compute CPP metric
                a0 = np.pi * (head_radius**2)
                v = (1 / 3) * np.pi * lc * ((head_radius**2) + (head_radius * tail_radius) + (tail_radius**2))
                cpp_array[frame_idx, mol_idx] = v / (a0 * lc)
    
        cpp_arrays.append(cpp_array)
    
    final_cpp_array = np.concatenate(cpp_arrays, axis=0)
    return final_cpp_array

## Half-protonated systems

In [None]:
batch_names = protonated_trajectory_names
batch_dirs = protonated_trajectory_dirs

In [None]:
def process_trajectory(protonated_trajectory_name, protonated_trajectory_dir):
    try:
        dcd_files = [f for f in os.listdir(protonated_trajectory_dir) if f.endswith(".dcd")]
        dcd_indices = sorted([
            int(f.split("_")[1].split(".")[0])
            for f in dcd_files
            if f.split("_")[1].split(".")[0].isdigit()
        ], reverse=True)
        last50_files = [os.path.join(protonated_trajectory_dir, f"step3_{i}.dcd") for i in range(150, 100, -1)]

        topology_file = os.path.join(os.path.dirname(protonated_trajectory_dir), "openmm", "system.psf")

        u = mda.Universe(topology_file, last50_files[0])
        protonated_residue, neutral_residue = identify_residues(u, excluded_residues)

        il_row = il_atom_dict[il_atom_dict["folder_name"] == protonated_trajectory_name].iloc[0]
        head_atoms = il_row["head_total"].split(",") if isinstance(il_row["head_total"], str) else []
        anchor_atom = il_row["anchor_atom"].split(",") if isinstance(il_row["anchor_atom"], str) else []
        tail1_ends = il_row["tail1_terminal"].split(",") if isinstance(il_row["tail1_terminal"], str) else []
        tail2_ends = il_row["tail2_terminal"].split(",") if isinstance(il_row["tail2_terminal"], str) else []
        tail3_ends = []

        def compute_stats(dcd_list, residue):
            cpp = compute_cpp_for_residue(
                dcd_list, topology_file, residue, head_atoms, anchor_atom, tail1_ends, tail2_ends, tail3_ends
            )
        
            # cpp is (n_frames, n_molecules)
            frame_means = np.nanmean(cpp, axis=1)  #row-wise mean across molecules for each frame
            molecule_means = np.nanmean(cpp, axis=0) #column-wise mean across frames for each molecule
            std_per_frame = np.nanstd(frame_means)
            std_per_molecule = np.nanstd(molecule_means)
        
            #cpp_clean = remove_outliers(cpp)
            cpp_clean = cpp
        
            return {
                "mean": np.mean(cpp_clean),
                "std": np.std(cpp_clean),
                "sem": np.std(cpp_clean) / np.sqrt(len(cpp_clean)) if len(cpp_clean) > 0 else np.nan,
                "std_per_frame": std_per_frame,
                "std_per_molecule": std_per_molecule
            }
    
        # Compute stats for each chunk + combined "all"
        stats = {
            "all": {
                "protonated": compute_stats(last50_files, protonated_residue),
                "neutral": compute_stats(last50_files, neutral_residue)
            }
        }

        result_entry = {"folder_name": protonated_trajectory_name}
        for chunk in ["all"]:
            for res_type in ["protonated", "neutral"]:
                result_entry[f"mean_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["mean"]
                result_entry[f"std_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["std"]
                result_entry[f"sem_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["sem"]
                result_entry[f"std_per_frame_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["std_per_frame"]
                result_entry[f"std_per_molecule_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["std_per_molecule"]

        u = None
        del u

        return result_entry

    except Exception as e:
        print(f"Error processing {protonated_trajectory_name}: {e}")
        return None

# -- Progress bar integration --
class TqdmBackend(LokyBackend):
    def __init__(self, tqdm_bar, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.tqdm_bar = tqdm_bar

    def apply_async(self, func, callback=None):
        return super().apply_async(
            func,
            callback=lambda *a, **k: (
                callback and callback(*a, **k),
                self.tqdm_bar.update()
            )[1]
        )

# Initialize tqdm bar
progress_bar = tqdm(total=len(batch_names), desc="Processing batch", position=0)

# Use custom backend instance manually
backend = TqdmBackend(progress_bar)

with JoblibParallel(n_jobs=NUM_CPU, backend=backend) as parallel:
    results = parallel(
        delayed(process_trajectory)(name, dir_)
        for name, dir_ in zip(batch_names, batch_dirs)
    )

progress_bar.close()

# Remove any failed jobs
results = [r for r in results if r is not None]

# Save to CSV
pd.DataFrame(results).to_csv(PROTONATED_OUTPUT_FILE, index=False)

## Fully-neutral systems

In [None]:
batch_names = neutral_trajectory_names
batch_dirs = neutral_trajectory_dirs

In [None]:
def process_trajectory(neutral_trajectory_name, neutral_trajectory_dir):
    try:
        dcd_files = [f for f in os.listdir(neutral_trajectory_dir) if f.endswith(".dcd")]
        dcd_indices = sorted([
            int(f.split("_")[1].split(".")[0])
            for f in dcd_files
            if f.split("_")[1].split(".")[0].isdigit()
        ], reverse=True)
        last50_files = [os.path.join(neutral_trajectory_dir, f"step3_{i}.dcd") for i in range(150, 100, -1)]

        topology_file = os.path.join(os.path.dirname(neutral_trajectory_dir), "openmm", "system.psf")

        u = mda.Universe(topology_file, last50_files[0])
        protonated_residue, neutral_residue = identify_residues(u, excluded_residues)

        il_row = il_atom_dict[il_atom_dict["folder_name"] == neutral_trajectory_name].iloc[0]
        head_atoms = il_row["head_total"].split(",") if isinstance(il_row["head_total"], str) else []
        anchor_atom = il_row["anchor_atom"].split(",") if isinstance(il_row["anchor_atom"], str) else []
        tail1_ends = il_row["tail1_terminal"].split(",") if isinstance(il_row["tail1_terminal"], str) else []
        tail2_ends = il_row["tail2_terminal"].split(",") if isinstance(il_row["tail2_terminal"], str) else []
        tail3_ends = []

        def compute_stats(dcd_list, residue):
            cpp = compute_cpp_for_residue(
                dcd_list, topology_file, residue, head_atoms, anchor_atom, tail1_ends, tail2_ends, tail3_ends
            )
        
            # cpp is (n_frames, n_molecules)
            frame_means = np.nanmean(cpp, axis=1)  #row-wise mean across molecules for each frame
            molecule_means = np.nanmean(cpp, axis=0) #column-wise mean across frames for each molecule
            std_per_frame = np.nanstd(frame_means)
            std_per_molecule = np.nanstd(molecule_means)
        
            #cpp_clean = remove_outliers(cpp)
            cpp_clean = cpp
        
            return {
                "mean": np.mean(cpp_clean),
                "std": np.std(cpp_clean),
                "sem": np.std(cpp_clean) / np.sqrt(len(cpp_clean)) if len(cpp_clean) > 0 else np.nan,
                "std_per_frame": std_per_frame,
                "std_per_molecule": std_per_molecule
            }
    
        # Compute stats for each chunk + combined "all"
        stats = {
            "all": {
                "neutral": compute_stats(last50_files, neutral_residue)
            }
        }

        result_entry = {"folder_name": neutral_trajectory_name}
        for chunk in ["all"]:
            for res_type in ["neutral"]:
                result_entry[f"mean_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["mean"]
                result_entry[f"std_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["std"]
                result_entry[f"sem_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["sem"]
                result_entry[f"std_per_frame_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["std_per_frame"]
                result_entry[f"std_per_molecule_cpp_{res_type}_{chunk}"] = stats[chunk][res_type]["std_per_molecule"]

        u = None
        del u
        
        return result_entry

    except Exception as e:
        print(f"Error processing {neutral_trajectory_name}: {e}")
        return None

# -- Progress bar integration --
class TqdmBackend(LokyBackend):
    def __init__(self, tqdm_bar, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.tqdm_bar = tqdm_bar

    def apply_async(self, func, callback=None):
        return super().apply_async(
            func,
            callback=lambda *a, **k: (
                callback and callback(*a, **k),
                self.tqdm_bar.update()
            )[1]
        )

# Initialize tqdm bar
progress_bar = tqdm(total=len(batch_names), desc="Processing batch", position=0)

# Use custom backend instance manually
backend = TqdmBackend(progress_bar)

with JoblibParallel(n_jobs=NUM_CPU, backend=backend) as parallel:
    results = parallel(
        delayed(process_trajectory)(name, dir_)
        for name, dir_ in zip(batch_names, batch_dirs)
    )

progress_bar.close()

# Remove any failed jobs
results = [r for r in results if r is not None]

# Save to CSV
pd.DataFrame(results).to_csv(NEUTRAL_OUTPUT_FILE, index=False)