In [2]:
# Load necessary libraries
import os
import numpy as np
import pandas as pd
import multiprocessing
from tqdm import tqdm
import warnings
from scipy.spatial.transform import Rotation as R
from Bio.PDB import PDBParser
from Bio import PDB

warnings.filterwarnings("ignore")

In [3]:
# --- PDB Extraction Function ---
def extract_ca_coordinates(pdb_file):
    """
    Loads a PDB file and extracts C-alpha coordinates for each standard amino acid,
    returning them as a list of NumPy arrays, where each array is (1, 3) for a single residue.
    """
    parser = PDBParser(QUIET=True)
    structure = parser.get_structure("prot", pdb_file)
    
    coords_per_residue = [] 
    
    for model in structure:
        for chain in model:
            for residue in chain:
                if PDB.is_aa(residue, standard=True) and residue.has_id('CA'):
                    coords_per_residue.append(np.array([residue['CA'].coord]))
    
    return coords_per_residue

In [4]:
# --- calculate_rmsd_after_superposition function ---
def calculate_rmsd_after_superposition(coords1, coords2):
    """
    Calculates the Root Mean Square Deviation (RMSD) between two sets of 3D coordinates
    after optimally superposing coords2 onto coords1 using the Kabsch algorithm (via scipy).

    Args:
        coords1 (np.ndarray): N x 3 array of 3D coordinates (reference).
        coords2 (np.ndarray): M x 3 array of 3D coordinates (to be superposed).

    Returns:
        float: The minimal RMSD between the two sets of coordinates.
    """
    # Robustness checks for empty arrays
    if coords1.shape[0] == 0 and coords2.shape[0] == 0:
        return 0.0 # Both empty, perfectly "aligned" (no cost)
    elif coords1.shape[0] == 0 or coords2.shape[0] == 0:
        # One is empty, the other is not. This should incur a high cost.
        return 1000.0 # A high penalty, indicating a fundamental mismatch
    
    # Handle single-point comparison directly (avoids R.align_vectors issues for N=1)
    if coords1.shape[0] == 1 and coords2.shape[0] == 1:
        return np.linalg.norm(coords1[0] - coords2[0]) # Euclidean distance for 1 point is its RMSD

    # Standard sanity check for 3D coordinates
    if coords1.shape[1] != 3 or coords2.shape[1] != 3:
        raise ValueError("Input coordinate arrays must be (N, 3) or (M, 3).")

    # Center the coordinates
    centroid1 = np.mean(coords1, axis=0)
    centroid2 = np.mean(coords2, axis=0)
    centered_coords1 = coords1 - centroid1
    centered_coords2 = coords2 - centroid2

    # --- Check for degenerate (all points identical / zero length after centering) inputs ---
    # If the sum of squared magnitudes of the centered vectors is effectively zero,
    # it means all points are identical, and alignment is trivial (RMSD is 0).
    is_coords1_degenerate = np.isclose(np.sum(centered_coords1**2), 0.0)
    is_coords2_degenerate = np.isclose(np.sum(centered_coords2**2), 0.0)

    if is_coords1_degenerate and is_coords2_degenerate:
        return 0.0 # Both sets of points are effectively identical (or collapsed to a single point)
    elif is_coords1_degenerate or is_coords2_degenerate:
        # One set is degenerate (e.g., all its atoms are at the same coordinate),
        # while the other is not. This is a significant structural mismatch.
        return 1000.0 # High penalty

    # Find the optimal rotation (will only be called if N > 1 AND not degenerate)
    rotation, rmsd = R.align_vectors(centered_coords2, centered_coords1)

    return rmsd


In [5]:
# --- dtw_with_rmsd_cost function ---
def dtw_with_rmsd_cost(seq1_coords, seq2_coords):
    """
    Performs Dynamic Time Warping (DTW) on two sequences of 3D backbone coordinates,
    using RMSD as the local cost metric between corresponding residues.

    Args:
        seq1_coords (list of np.ndarray): List where each element is a (1, 3) or (N_atoms, 3)
                                          numpy array representing the coordinates for one residue/segment.
                                          For simple C-alpha, it's (1, 3).
        seq2_coords (list of np.ndarray): Similar list for the second sequence.

    Returns:
        tuple: (dtw_cost, warping_path)
            dtw_cost (float): The total accumulated cost of the optimal warping path.
            warping_path (list): A list of (index_seq1, index_seq2) tuples representing
                                 the optimal alignment path.
    """
    n = len(seq1_coords)
    m = len(seq2_coords)

    # Handle cases where one or both sequences are empty
    if n == 0 and m == 0:
        return 0.0, []
    elif n == 0 or m == 0:
        return np.inf, [] # Infinite cost if one sequence is empty and the other is not

    # Initialize cost matrix
    D = np.full((n + 1, m + 1), np.inf)
    D[0, 0] = 0

    # Fill the cost matrix
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            local_cost = calculate_rmsd_after_superposition(
                seq1_coords[i-1].reshape(-1, 3), # Ensure it's (N_atoms, 3)
                seq2_coords[j-1].reshape(-1, 3)  # Ensure it's (N_atoms, 3)
            )

            # Accumulate cost from previous cells
            D[i, j] = local_cost + min(D[i-1, j],    # Deletion (move right in seq1)
                                      D[i, j-1],    # Insertion (move down in seq2)
                                      D[i-1, j-1])  # Match/Substitution (diagonal)

    # Traceback to find the optimal path
    path = []
    i, j = n, m
    while i > 0 or j > 0:
        path.append((i - 1, j - 1)) # Append 0-indexed values
        if i == 0:
            j -= 1
        elif j == 0:
            i -= 1
        else:
            min_prev = min(D[i-1, j-1], D[i-1, j], D[i, j-1])
            if min_prev == D[i-1, j-1]:
                i -= 1
                j -= 1
            elif min_prev == D[i-1, j]:
                i -= 1
            else: # min_prev == D[i, j-1]:
                j -= 1
    path.reverse() # Path is built backwards, so reverse it

    total_cost = D[n, m]
    length_of_warping_path = len(path)

    if length_of_warping_path > 0:
        normalized_dtw_cost = total_cost / length_of_warping_path
    else:
        # This case should ideally not happen if n > 0 or m > 0 due to path construction,
        # but as a safeguard
        normalized_dtw_cost = np.inf if (n > 0 or m > 0) else 0.0

    return normalized_dtw_cost, path

In [None]:
# # --- Example Usage ---

# # Load PDB and extract coordinates
# seq1_coords = extract_ca_coordinates('data/1avw.pdb')
# seq2_coords = extract_ca_coordinates('data/1cfd.pdb')

# print(f"Sequence 1 has {len(seq1_coords)} residues.")
# print(f"Sequence 2 has {len(seq2_coords)} residues.")

# if seq1_coords and seq2_coords: # Proceed only if both sequences have data
#     total_cost, path = dtw_with_rmsd_cost(seq1_coords, seq2_coords)

#     print(f"\nTotal DTW Cost: {total_cost:.4f} Angstroms")
#     # --- Code to calculate Normalized DTW Distance ---
#     length_of_warping_path = len(path)
#     if length_of_warping_path > 0:
#         normalized_dtw_distance = total_cost / length_of_warping_path
#         print(f"Normalized DTW Distance (by Warping Length Path): {normalized_dtw_distance:.4f} Angstroms")
#     else:
#         print("Warning: Warping path has zero length, cannot normalize DTW distance.")

#     print("\nOptimal Warping Path:")
#     for i, (idx1, idx2) in enumerate(path):
#         print(f"  Path Step {i+1}: seq1_residue[{idx1}] <-> seq2_residue[{idx2}]")
#     if len(path) > 100:
#         print("  ...")
# else:
#     print("\nError: One or both PDB files resulted in empty C-alpha coordinate lists.")
#     print("Please check the PDB files and the `extract_ca_coordinates` function's filtering conditions.")


Sequence 1 has 394 residues.
Sequence 2 has 148 residues.

Total DTW Cost: 12004.2280 Angstroms
Normalized DTW Distance (by Warping Length Path): 30.4676 Angstroms

Optimal Warping Path:
  Path Step 1: seq1_residue[0] <-> seq2_residue[0]
  Path Step 2: seq1_residue[1] <-> seq2_residue[1]
  Path Step 3: seq1_residue[2] <-> seq2_residue[2]
  Path Step 4: seq1_residue[3] <-> seq2_residue[3]
  Path Step 5: seq1_residue[4] <-> seq2_residue[4]
  Path Step 6: seq1_residue[5] <-> seq2_residue[5]
  Path Step 7: seq1_residue[6] <-> seq2_residue[6]
  Path Step 8: seq1_residue[7] <-> seq2_residue[7]
  Path Step 9: seq1_residue[8] <-> seq2_residue[8]
  Path Step 10: seq1_residue[9] <-> seq2_residue[9]
  Path Step 11: seq1_residue[10] <-> seq2_residue[10]
  Path Step 12: seq1_residue[11] <-> seq2_residue[11]
  Path Step 13: seq1_residue[12] <-> seq2_residue[12]
  Path Step 14: seq1_residue[13] <-> seq2_residue[13]
  Path Step 15: seq1_residue[14] <-> seq2_residue[14]
  Path Step 16: seq1_residue[15]

---
---

In [None]:
# --- Worker function for parallel processing ---
# This function must be defined at the top level (not inside if __name__ == "__main__":) so it can be pickled and sent to other processes.
def _compare_pair(args):
    pdb_id1, pdb_id2, coords1, coords2 = args
    try:
        dtw_distance, _ = dtw_with_rmsd_cost(coords1, coords2)
        return {'PDB1': pdb_id1, 'PDB2': pdb_id2, 'DTW_Distance_Normalized': dtw_distance}
    except Exception as e:
        # Handle potential errors during comparison, return NaN or a high penalty
        print(f"Error comparing {pdb_id1} and {pdb_id2}: {e}")
        return {'PDB1': pdb_id1, 'PDB2': pdb_id2, 'DTW_Distance_Normalized': np.nan}

In [None]:
# --- Main Automation Block ---
if __name__ == "__main__":
    pdb_folder_path = "/Users/dvoulgari/Desktop/Structural Bioinfromatics/AISB_Final_Project/20250614_0356239"

    if not os.path.exists(pdb_folder_path):
        print(f"Error: PDB folder not found at '{pdb_folder_path}'. Please check the path.")
        exit()

    # Get list of all PDB files
    pdb_files = [f for f in os.listdir(pdb_folder_path) if f.endswith('.pdb')]
    pdb_files.sort() # Ensure consistent order

    print(f"Found {len(pdb_files)} PDB files in '{pdb_folder_path}'.")
    if len(pdb_files) < 2:
        print("Need at least two PDB files for comparison. Exiting.")
        exit()

    # Cache extracted coordinates to avoid re-parsing PDBs for each comparison
    # This step will still run sequentially first.
    parsed_pdb_coords = {}
    print("Step 1/2: Extracting C-alpha coordinates from all PDB files (sequential)...")
    for pdb_file_name in tqdm(pdb_files, desc="Extracting Coordinates"):
        pdb_full_path = os.path.join(pdb_folder_path, pdb_file_name)
        pdb_id = os.path.splitext(pdb_file_name)[0]
        coords = extract_ca_coordinates(pdb_full_path)
        if len(coords) > 0:
            parsed_pdb_coords[pdb_id] = coords
        else:
            print(f"Skipping {pdb_id} due to empty or problematic C-alpha extraction.")

    valid_pdb_ids = list(parsed_pdb_coords.keys())
    print(f"Successfully extracted C-alpha coordinates for {len(valid_pdb_ids)} valid PDBs.")

    # Prepare tasks for the multiprocessing pool
    tasks = []
    for i in range(len(valid_pdb_ids)):
        pdb_id1 = valid_pdb_ids[i]
        coords1 = parsed_pdb_coords[pdb_id1]
        for j in range(i + 1, len(valid_pdb_ids)): # Compare each unique pair
            pdb_id2 = valid_pdb_ids[j]
            coords2 = parsed_pdb_coords[pdb_id2]
            tasks.append((pdb_id1, pdb_id2, coords1, coords2)) # Store arguments as a tuple

    total_comparisons = len(tasks)
    print(f"Step 2/2: Starting pairwise DTW comparisons ({total_comparisons} in total) using multiprocessing...")

    # Determine the number of CPU cores to use
    num_processes = os.cpu_count()
    if num_processes is None: # Fallback for systems that don't report CPU count
        num_processes = 4 # Default to 4 if not detected
    print(f"Using {num_processes} CPU cores for parallel processing.")

    comparison_results = []
    # Use multiprocessing Pool to distribute tasks
    with multiprocessing.Pool(processes=num_processes) as pool:
        # imap_unordered is good for progress bars as it yields results as they complete
        for result in tqdm(pool.imap_unordered(_compare_pair, tasks), total=total_comparisons, desc="DTW Comparisons"):
            if result is not None: # Collect results, skipping any None from errors
                comparison_results.append(result)

    # Store results in a Pandas DataFrame
    dtw_df = pd.DataFrame(comparison_results)

    # Save to CSV
    output_csv_path = "pairwise_dtw_distances_parallel.csv"
    dtw_df.to_csv(output_csv_path, index=False)
    print(f"\nAll pairwise DTW distances saved to '{output_csv_path}'")

    # Display a sample of the results
    print("\nSample of Pairwise DTW Distances:")
    print(dtw_df.head())

    # Create and save the square distance matrix
    pdb_ids_list = sorted(list(parsed_pdb_coords.keys())) # Ensure consistent order
    distance_matrix = np.full((len(pdb_ids_list), len(pdb_ids_list)), np.nan) # Use nan for self-comparison/uncomputed

    # Create mapping from PDB ID to index
    pdb_id_to_idx = {pdb_id: i for i, pdb_id in enumerate(pdb_ids_list)}

    # Fill diagonal with 0.0 (distance to self)
    np.fill_diagonal(distance_matrix, 0.0)

    for _, row in dtw_df.iterrows():
        idx1 = pdb_id_to_idx[row['PDB1']]
        idx2 = pdb_id_to_idx[row['PDB2']]
        distance_matrix[idx1, idx2] = row['DTW_Distance_Normalized']
        distance_matrix[idx2, idx1] = row['DTW_Distance_Normalized'] # Symmetric matrix

    # Save the distance matrix
    np.save("dtw_distance_matrix_parallel.npy", distance_matrix)
    print(f"DTW distance matrix (NumPy array) saved to 'dtw_distance_matrix_parallel.npy'")
    
    # # Save as CSV for easier viewing (optional, can be very large)
    # distance_matrix_df = pd.DataFrame(distance_matrix, index=pdb_ids_list, columns=pdb_ids_list)
    # distance_matrix_df.to_csv("dtw_distance_matrix_parallel.csv")
    # print(f"DTW distance matrix (CSV) saved to 'dtw_distance_matrix_parallel.csv'")

    print("\nParallel processing complete!")

The time it takes will depend on a few key factors:

- Average Length of Your Antibody Fv Domains: Your Fv sequences (combined VH and VL) will likely range from 200-300 residues. The core DTW algorithm has a time complexity of roughly O(L1×L2), where L1 and L2are the lengths of the two sequences being compared.

- Efficiency of the Local Cost Calculation (calculate_rmsd_after_superposition): Your implementation of calculate_rmsd_after_superposition for single C-alpha atoms effectively boils down to a fast Euclidean distance calculation (due to the if coords1.shape[0] == 1 check), which is very efficient.

In [None]:
import numpy as np
from scipy.spatial.transform import Rotation as R

def calculate_global_rmsd(coords1, coords2):
    """
    Calculates the Global Root Mean Square Deviation (RMSD) between two sets of 3D coordinates
    after finding the optimal rigid body superposition using the Kabsch algorithm.

    This function treats the entire set of coordinates as a single rigid body and finds
    the best superposition to minimize the overall distance between corresponding points.
    It's different from local RMSD calculations which might compare smaller segments or
    allow for sequence warping.

    Args:
        coords1 (list of np.ndarray): A list of 3D coordinates for the first structure,
                                       where each element is a (1, 3) numpy array
                                       (e.g., C-alpha coordinates of residues).
        coords2 (list of np.ndarray): A list of 3D coordinates for the second structure,
                                       formatted similarly to coords1.

    Returns:
        float: The global RMSD between the two superposed coordinate sets.
               Returns 0.0 if both inputs are empty.
               Returns a very high value (e.g., 1000.0) if one input is empty,
               indicating a non-comparable state.
    """
    # Convert lists of (1, 3) arrays into single (N, 3) arrays
    # np.vstack stacks the (1, 3) arrays into an (N, 3) array
    np_coords1 = np.vstack(coords1) if coords1 else np.array([])
    np_coords2 = np.vstack(coords2) if coords2 else np.array([])

    n1 = np_coords1.shape[0]
    n2 = np_coords2.shape[0]

    # Handle empty input cases
    if n1 == 0 and n2 == 0:
        return 0.0  # Both empty, RMSD is 0
    if n1 == 0 or n2 == 0:
        return 1000.0  # One empty, cannot compare meaningfully, return a high penalty

    # Ensure the number of points is the same for global RMSD.
    # If not, it means the comparison is inherently different and might need
    # a different metric (like DTW). For a *strict* global RMSD, lengths must match.
    # In this context, we assume they should be compared globally, so if lengths differ,
    # it's a non-sensical global RMSD.
    if n1 != n2:
        print(f"Warning: Global RMSD expects same number of points. Found {n1} and {n2}.")
        # You might choose to raise an error, return NaN, or use a high penalty.
        # Returning a high penalty aligns with how calculate_rmsd_after_superposition
        # handles some edge cases, making it consistent for "non-comparable" states.
        return 1000.0


    # Perform optimal superposition using the Kabsch algorithm
    # R.align_vectors returns a Rotation object and the RMSD
    try:
        _, global_rmsd = R.align_vectors(np_coords1, np_coords2)
        return global_rmsd
    except ValueError as e:
        # Catch potential errors from R.align_vectors (e.g., singular matrix if all points are identical)
        print(f"Error during global RMSD calculation: {e}")
        # If alignment fails, it indicates a problematic comparison, return a high penalty
        return 1000.0