# Improving measurePPI

Problem: The ml model clearly sucks (sadly this is the right phrase). Going a step back, maybe it's a good idea to improve the measurements now by including the interface definition and only calculating the properties for interface residues.

In [1]:
# created by Andreas Brilka from a code basis from IMB summmer school
# 2024-12-16

__version__ = "1.0.3"

import datetime
import math
import numpy as np
import pathlib
import pandas as pd
import sys
import time
import logging
from io import StringIO
from multiprocessing import Pool, cpu_count, get_logger
import multiprocessing_logging
multiprocessing_logging.install_mp_handler()

import biotite.structure as struc
import biotite.structure.io.pdb as bt_pdb
from Bio.PDB import PDBParser
from Bio.PDB.Structure import Structure as BioPy_PDBStructure
from Bio.PDB.Model import Model as BioPy_PDBModel
from Bio.PDB.PDBExceptions import PDBConstructionException

LOGLEVEL_ADDITIONAL_INFO = 19 # The module logs more information like for example the current processed file with this level

logger = logging.getLogger("measure_PPI")
formatter = logging.Formatter(fmt="[%(asctime)s | %(module)s | %(levelname)s] %(message)s")
streamHandler = logging.StreamHandler(sys.stdout)
streamHandler.setFormatter(formatter)
logger.setLevel(logging.INFO)
logger.addHandler(streamHandler)
logging.addLevelName(LOGLEVEL_ADDITIONAL_INFO, "INFO")

_freesasa_ready = False
try:
    import freesasa
    _freesasa_ready = True
except ModuleNotFoundError:
    logger.warning("You don't have freesasa installed. Falling back to biotite")

parser = PDBParser(QUIET=True)

# This function is included to allow worker threads to output to Jupyter Notebooks as multiprocessing.Pool does not allow to alter sys.stdout of the child processes.
# Therefore this function redirects the log output to a buffer, which is transmitted back to the main thread where it is outputed.
def _WorkerThreadIni(logLevel:int):
    """
        This function is called in the worker threads by multiprocessing.Pool
    """
    global stdout, logger
    stdout = StringIO()
    sys.stdout = stdout
    streamHandler.setStream(sys.stdout)
    logger.setLevel(logLevel)

class ProteinStructureWarning(Exception):
    def __init__(self, message):            
        super().__init__(message)

def OpenStructure(path: pathlib.Path, structure_name: str = "") -> tuple[BioPy_PDBStructure|None, struc.AtomArray|None]:
    """
        Opens the given structure and returns the Bio.PDB and biotite objects.
    """
    t0 = time.perf_counter()
    file_name = path.name
    try:
        structure_biopy = parser.get_structure("structure", file=path)
        structure_biotite = bt_pdb.get_structure(bt_pdb.PDBFile.read(path))
    except PDBConstructionException:
        logger.warning(f"Can't parse structure {structure_name} (file {file_name}) using Biopython")
        return (None, None)
    except ValueError as ex:
        logger.warning(f"Can't parse structure {structure_name} (file {file_name}) due to the following reason: {ex}")
        return (None, None)
    
    if structure_biotite.stack_depth() != 1:
        logger.warning(f"Can't parse structure {structure_name} (file {file_name}) because it contains more than one stack")
        return (None, None)
    
    atomarray_biotite: struc.AtomArray = structure_biotite[0]

    chains = [c for c in structure_biopy.get_chains()]
    if len(chains) != 2:
        logger.warning(f"Can't parse structure {structure_name} (file {file_name}) because it has not 2 chains")
        return (None, None)
    
    t1 = time.perf_counter()
    logger.debug(f"Runtime reading structure {structure_name} (file {file_name}): {round((t1-t0)*1000, 1)}ms")
    return (structure_biopy, atomarray_biotite)

def calculate_buried_area(structure_biopy:BioPy_PDBStructure):
    """
        Calculates the buried surface area using freesasa which is defined as surface area of the two chains
        subtracted from the surface area of the complex.
    """
    ti = time.perf_counter()
    chains = [c for c in structure_biopy.get_chains()]
    assert len(chains) == 2

    chain1 = structure_biopy[0][chains[0].id]
    chain2 = structure_biopy[0][chains[1].id]

    strucChain1 = BioPy_PDBStructure('structure')
    modelChain1 = BioPy_PDBModel("1")
    modelChain1.add(chain1)
    strucChain1.add(modelChain1)
    strucChain2 = BioPy_PDBStructure('structure')
    modelChain2 = BioPy_PDBModel("1")
    modelChain2.add(chain2)
    strucChain2.add(modelChain2)
    t1 = time.perf_counter()

    fs_pp = freesasa.structureFromBioPDB(structure_biopy)
    fs_chain1 = freesasa.structureFromBioPDB(strucChain1)
    fs_chain2 = freesasa.structureFromBioPDB(strucChain2)
    t2 = time.perf_counter()

    area_pp = freesasa.calc(fs_pp).totalArea()
    area_chain1 = freesasa.calc(fs_chain1).totalArea()
    area_chain2 = freesasa.calc(fs_chain2).totalArea()
    tf = time.perf_counter()

    buried_area = (area_chain1 + area_chain2 - area_pp)
    tf = time.perf_counter()
    logger.debug(f"Sasa values: Chain 1 = {round(area_chain1, 3)}, Chain 2 = {round(area_chain2, 3)}, Total = {round(area_pp, 3)}")
    logger.debug(f"Runtime calculate_buried_area: {round((tf-ti)*1000, 1)}ms ({round((t1-ti)*1000, 1)}ms model buiilding, {round((t2-t1)*1000, 1)}ms loading, {round((tf-t2)*1000, 1)}ms sasa calc)")
    return round(buried_area, 3)

def calculate_buried_area_biotite(atomarray_biotite:struc.AtomArray, chain1:struc.AtomArray, chain2:struc.AtomArray, probe_radius:float=1.4):
    """
        Calculates the buried surface area using biotite which is defined as surface area of the two chains
        subtracted from the surface area of the complex.
    """
    ti = time.perf_counter()

    sasa12 = np.sum([s for s in struc.sasa(atomarray_biotite, probe_radius=probe_radius) if math.isfinite(s)])
    sasa1 = np.sum([s for s in struc.sasa(chain1, probe_radius=probe_radius) if math.isfinite(s)])
    sasa2 = np.sum([s for s in struc.sasa(chain2, probe_radius=probe_radius) if math.isfinite(s)])

    logger.debug(f"Sasa values: Chain 1 = {round(sasa1, 3)}, Chain 2 = {round(sasa2, 3)}, Total = {round(sasa12, 3)}")
    buried_area = (sasa1 + sasa2 - sasa12)
    tf = time.perf_counter()
    logger.debug(f"Runtime calculate_buried_area: {round((tf-ti)*1000, 1)}ms ")
    return round(buried_area, 3)


def calculate_min_distance(atomarray_biotite:struc.AtomArray, cutoff:float=5.0, max_cutoff:float = 15.0):
    """
        Calculates the minimum distance [Angstrom] between the two chains of a protein complex using biotite.
        The minimum distance is defined as the distance between the backbone (CA atoms) if is subceeds
        the cutoff value. For distances above cutoff the algorithm reports NaN

        You may whish to apply the cutoff value not for the backbone only but for all atoms of the residue.
        For this, set the max_cutoff [Angstrom] to something above cutoff (for example twice) and this function
        will report distances above cutoff if a) at least one pair of atoms in the two residues has a distance
        below cutoff and b) the backbone distance is still below max_cutoff.
        This will require MUCH more computational power and should therefore only be enabled if necessary.
    """
    ti = time.perf_counter()
    chains = struc.get_chains(atomarray_biotite)
    assert len(chains) == 2

    chain1 = atomarray_biotite[atomarray_biotite.chain_id == chains[0]]
    chain2 = atomarray_biotite[atomarray_biotite.chain_id == chains[1]]

    chain1_backbone = chain1[chain1.atom_name == "CA"]
    chain2_backbone = chain2[chain2.atom_name == "CA"]
    
    min_distance = float("inf")
    t1 = time.perf_counter()

    # max_cutoff is implemented to mimic the same behaviour as the ISS code which used pymol.

    for ca1 in chain1_backbone:
        for ca2 in chain2_backbone:
            if (dist := struc.distance(ca1, ca2)) < cutoff:
                min_distance = min(min_distance, dist)
                continue
            elif dist <= max_cutoff and dist < min_distance: # If max_cutoff is set, check the individual atoms
                for a1 in chain1[chain1.res_id == ca1.res_id]:
                    for a2 in chain2[chain2.res_id == ca2.res_id]:
                        if struc.distance(a1, a2) <= cutoff:
                            break
                    else: # Runs after loop finished normally
                        continue
                    break # This only runs if there is a break in the inner loop because of previous continue statement
                else:
                    # Only calculate min_distance if there is the atom wise distance of the residues is below cutoff
                    continue
                min_distance = min(min_distance, dist)

    tf = time.perf_counter()
    logger.debug(f"Runtime calculate_min_distance: {round((tf-ti)*1000, 1)}ms ({round((t1-ti)*1000, 1)}ms generating chains, {round((tf-t1)*1000, 1)}ms calculating distance)")
    
    return round(float(min_distance), 3) if math.isfinite(min_distance) else float('NaN')

def calculate_hbonds(atomarray_biotite:struc.AtomArray):
    """
        Calculates the number of hbonds between two chains of a protein complex using biotites AtomArray
    """
    ti = time.perf_counter()
    chains = struc.get_chains(atomarray_biotite)
    assert len(chains) == 2

    chain1_mask = atomarray_biotite.chain_id == chains[0]
    chain2_mask = atomarray_biotite.chain_id == chains[1]
    t1 = time.perf_counter()

    bond_list = struc.bonds.connect_via_distances(atomarray_biotite)
    atomarray_biotite.bonds = bond_list

    t2 = time.perf_counter()

    triplets = struc.hbond(atomarray_biotite, selection1=chain1_mask, selection2=chain2_mask)
    tf = time.perf_counter()
    logger.debug(f"Runtime calculate_hbonds: {round((tf-ti)*1000, 1)}ms ({round((t1-ti)*1000, 1)}ms generating chains, {round((t2-t1)*1000, 1)}ms bond list, {round((tf-t2)*1000, 1)}ms hbonds)")
    
    return triplets.shape[0]


def calculate_saltbridges(structure_biopy:BioPy_PDBStructure, cutoff:float=4.0):
    """
        Calculates the number of saltbridges between the two chains of a protein complex using biopython.
        Saltbridges are defined as a interaction between an acidic residue (ASP, GLU) with a basic residue 
        (ARG, LYS) and found if the distance between the oxygen and nitrogen atoms is below cutoff [Angstrom]
    """
    ti = time.perf_counter()
    chains = [c for c in structure_biopy.get_chains()]
    assert len(chains) == 2

    chain1 = structure_biopy[0][chains[0].id]
    chain2 = structure_biopy[0][chains[1].id]

    saltBridges_ac = {"ASP":"a", "GLU":"a", "ARG":"b", "LYS":"b"} # a: Acidic, b: Basic
    saltBridges_atoms = ['OD1', 'OD2', 'OE1', 'OE2', 'NH1', 'NH2', 'NE', 'NZ'] # 0,1: ASP, 2,3: GLU, 4,5,6: ARG, 7: LYS

    salt_bridges = 0

    for res1 in chain1:
        if res1.resname not in saltBridges_ac.keys():
            continue
        for res2 in chain2:
            if res2.resname not in saltBridges_ac.keys():
                continue
            if saltBridges_ac[res1.resname] == saltBridges_ac[res2.resname]:
                continue
            for atom1 in [a for a in res1 if a.id in saltBridges_atoms]:
                for atom2 in [a for a in res2 if a.id in saltBridges_atoms]:
                    distance = atom1 - atom2
                    if distance <= cutoff:
                        salt_bridges += 1
    tf = time.perf_counter()
    logger.debug(f"Runtime calculate_saltbridges: {round((tf-ti)*1000, 1)}ms")
    
    return salt_bridges

def calculate_hydrophobic_interactions(structure_biopy:BioPy_PDBStructure, cutoff:float=5.0):
    """
        Calculates the number of hydrophobic interactions between two chains of a protein complex using biopython.
        Hydrophobic interactions are defined if the C atoms are below the cutoff value [Angstrom] of the following
        residues: ALA, VAL, LEU, ILE, MET, PHE, PRO, TRP, GLY
    """
    ti = time.perf_counter()
    chains = [c for c in structure_biopy.get_chains()]
    assert len(chains) == 2

    chain1 = structure_biopy[0][chains[0].id]
    chain2 = structure_biopy[0][chains[1].id]

    hydrophobic_interactions = 0

    hydrophobic_residues = {'ALA', 'VAL', 'LEU', 'ILE', 'MET', 'PHE', 'PRO', 'TRP', 'GLY'}

    # Compare each hydrophobic residue in chain1 with each hydrophobic residue in chain2
    for res1 in [r for r in chain1 if r.resname in hydrophobic_residues]:
        for atom1 in [a for a in res1 if a.element == 'C']:
            for res2 in [r for r in chain2 if r.resname in hydrophobic_residues]:
                for atom2 in [a for a in res2 if a.element == 'C']:
                    distance = atom1 - atom2
                    if distance <= cutoff:
                        hydrophobic_interactions += 1

    tf = time.perf_counter()
    logger.debug(f"Runtime calculate_hydrophobic_interactions: {round((tf-ti)*1000, 1)}ms")
    
    return hydrophobic_interactions




def EvaluateStructure(path: pathlib.Path, structure_name: str = "") -> dict|None:
    """
        Measures the pdb file given by path
    """
    ti = time.perf_counter()
    file_name = path.name
    structure_biopy, atomarray_biotite = OpenStructure(path, structure_name)
    if structure_biopy is None or atomarray_biotite is None: raise ProteinStructureWarning(f"The strucuture {structure_name} (file {path.name}) can't be opened")

    buried_area = calculate_buried_area(structure_biopy) if _freesasa_ready else calculate_buried_area_biotite(atomarray_biotite)
    hbonds = calculate_hbonds(atomarray_biotite)
    min_distance = calculate_min_distance(atomarray_biotite)
    salt_bridges = calculate_saltbridges(structure_biopy)
    hydrophobic_interactions = calculate_hydrophobic_interactions(structure_biopy)

    tf = time.perf_counter()
    logger.log(level=LOGLEVEL_ADDITIONAL_INFO, msg=f"parsed {structure_name} (file {file_name}) in {round((tf-ti), 3)}s")
    return {
        'structure_name': structure_name,
        'file': file_name,
        'hbonds': hbonds,
        'salt_bridges': salt_bridges,
        'buried_area': buried_area,
        'min_distance': min_distance,
        'hydrophobic_interactions': hydrophobic_interactions
    }

def WalkFolder(basePath: str, 
               pathObj:dict[str, dict[str, pathlib.Path]]={},
               structures: None|str|list[str] = None,
               files: None|bool|str|list[str] = None
               ) -> dict[str, dict[str, pathlib.Path]]:
    """
        Add the path basePath/structure/file.pdb to the pathObj provided (or create a new one if omitted).
        If files and/or structures are None, search inside the directory for all pdb files.
        Returns:
            pathObj: dict[name:str, tuple[path: pathlib.Path, structure_name: str]]
    """
    structures_count = 0
    basePath = pathlib.Path(basePath).absolute()
    if not basePath.is_dir():
        raise ValueError("The given basePath is not a valid directory")
    
    if structures is None:
        structures: list[pathlib.Path] = [p for p in basePath.iterdir()]
    elif isinstance(structures, str):
        structures: list[pathlib.Path] = [basePath / structures]
    elif isinstance(structures, list):
        structures: list[pathlib.Path] = [basePath / p for p in structures]
    else:
        raise ValueError("Invalid argument for structures")

    for structure in structures:
        if not structure.exists():
            raise ValueError(f"The structure {structure} does not point to a valid path")
        structure_name = str(structure.name)
        if structure.is_file():
            if structure.suffix.lower() == ".pdb":
                structure_name = str(structure.stem)
                if structure_name in pathObj.keys():
                    raise ValueError(f"Duplicate structure and file {structure}")
                pathObj[structure_name] = (structure.absolute(), structure_name)
                structures_count += 1
            continue

        if files is None:
            filesF: list[pathlib.Path] = [f for f in structure.iterdir() if f.is_file()]
        elif isinstance(files, str):
            filesF: list[pathlib.Path] = [structure / f"{files}.pdb"]
        elif isinstance(files, list):
            filesF: list[pathlib.Path] = [structure / f"{f}.pdb" for f in files]
        else:
            raise ValueError("Invalid argument for files")
        
        for file in filesF:
            if not file.exists() or not file.is_file():
                raise ValueError(f"{structure}/{file} does not point to a valid file")
            if not file.suffix.lower() == ".pdb":
                continue
            file_name = file.stem
            name = f"{structure_name}-{file_name}"
            if name in pathObj.keys():
                raise ValueError(f"Duplicate structure and file {structure}/{file_name}.pdb")
            pathObj[name] = (file.absolute(), structure_name)
            structures_count += 1
    logger.info(f"Found {structures_count} structures")
    return pathObj


# This function is called from the main processes with a pathObj
def _run_task(args) -> tuple[dict|None, str]:
    """
        Helper function called from the main process using multiprocessing.Pool and pool.imap_unordered.
        Returns a tuple. The first value is either or a dict containing the measurement parameters or None on a error, while the second value
        is the output of the logging function.
    """
    path, structure_name = args
    try:
        r = EvaluateStructure(path, structure_name)
    except ProteinStructureWarning as ex:
        logger.warning(str(ex))
        r = None
    output = stdout.getvalue().strip("\n")
    return (r, output)

def Run(pathObj: list[tuple[pathlib.Path, str]], num_threads=None) -> pd.DataFrame|None:
    """
        Measures the given paths and returns the result as pandas Dataframe.
        pathObj is a list of tuples of (path_to_pdf: pathlib.Path, structure_name: str)
        [The structurename is used for the output as filenames often are not unique]
    """

    if len(pathObj) == 0:
        logger.warning(f"You provided an empty pathObj")
        return
    logger.info(f"Started Taskpool of {num_threads} processes for {len(pathObj)} files")
    t0 = time.perf_counter()
    with Pool(initializer=_WorkerThreadIni, initargs=[logger.level], processes=(num_threads if num_threads is not None else cpu_count())) as p:
        results = []
        _ti = t0
        _ti_n = 0
        for i, rmsg in enumerate(p.imap_unordered(_run_task, pathObj)):
            r, output = rmsg
            if len(output) > 0:
                print(output)
            _ti_n += 1
            if time.perf_counter() - _ti > 5:
                _speed = ((time.perf_counter() - _ti)/_ti_n)**-1 if _ti_n > 0 else 0
                _speed_avg = ((time.perf_counter() - t0)/i)**-1 if i > 0 else 0
                _eta = (len(pathObj) - i)/_speed_avg if _speed_avg != 0 else -1
                _ti = time.perf_counter()
                _ti_n = 0
                logger.info(f"{int(100*i/len(pathObj))}% - ETA {str(datetime.timedelta(seconds=int(_eta))) if _eta >= 0 else '?'} | current speed {round(_speed, 3)} s⁻¹ | average speed {round(_speed_avg, 3)} s⁻¹")
            if r is not None:
                results.append(r)
    _speed_avg = ((time.perf_counter() - t0)/len(pathObj))**-1
    num_errors = len(pathObj) - len(results)
    logger.info(f"Finished processing {len(pathObj)} objects{f' ({num_errors} objects produced an error)' if num_errors > 0 else ''} in {str(datetime.timedelta(seconds=int(time.perf_counter() - t0)))} | average speed {round(_speed_avg, 3)} s⁻¹")
    if len(results) == 0:
        return None
    return pd.DataFrame(results).sort_values(["structure_name", "file"])

In [4]:
from pathlib import Path

In [5]:
AF_prediction_results = Path("../ressources/AF_predictions/AF_prediction_randomized_DMI_results.xlsx").resolve()
AF_prediction_metrics = Path("../ressources/AF_predictions/AF_metrics_all_structures.tsv").resolve()
AF_DMI_structures_folders = [Path("../ressources/AF_DMI_structures").resolve() / p for p in ['AF_DMI_structures1', 'AF_DMI_structures2', 'AF_DMI_structures3', "AF_DMI_mutated_structures"]]
AF_DDI_structures_path = Path("../ressources/AF_DDI_structures").resolve()
solved_DMI_structures_path = Path("../ressources/DMI_solved_structures_hydrogens").resolve()
solved_DDI_structures_path = Path("../ressources/DDI_solved_structures_hydrogens").resolve()

for p in [AF_prediction_results, AF_prediction_metrics, AF_DDI_structures_path, solved_DMI_structures_path, solved_DDI_structures_path] + AF_DMI_structures_folders:
    if not p.exists():
        print(f"{p} does not point to a valid path")

In [58]:
logger.setLevel(logging.INFO)

In [55]:
#Loading sample structure
sampleStructure_name = "LIG_NRP_CendR_1_2ORZ"
sampleStructure_path = AF_DMI_structures_folders[1] / sampleStructure_name / "ranked_0.pdb"
structure_biopy, atomarray_biotite = OpenStructure(sampleStructure_path, sampleStructure_name)

[2025-01-27 15:12:25,438 | 3073387823 | DEBUG] Runtime reading structure LIG_NRP_CendR_1_2ORZ (file ranked_0.pdb): 28.3ms


In [49]:
# Stopwatch
from typing import Self
import numpy as np

class Stopwatch:
    """
        A class to measure performance of many steps with as less code as possible. After creating a Stopwatch, simply call start() and stop() and get the runtime
        via the runtime property. Set laps by calling lap(name) and provide a subwatch by calling lap(name, stopwatch) to further detail the runtime in this lap.
        To display the times, call to_string() or have a look at MultiStopwatch, which can average over a list of stopwatches
    """

    def __init__(self):
        self._times: dict[str, float] = {} # Timestamps of starting and stoping the loop as well as of the laps.
        self._tstopwatches: dict[str, Self] = {} # Stopwatches, that further detail a lap

    @property
    def runtime(self) -> float:
        """ The total runtime (in seconds) of this stopwatch or np.inf if not started or stopped yet """
        if "_initial" not in self._times.keys() or "_final" not in self._times.keys():
            return np.inf
        return self._times["_final"] - self._times["_initial"]
        
    @property
    def runtime_laps(self) -> dict[str, float]:
        """ The runtime of the laps as a dictionary """
        ks, vs = list(self._times.keys()), list(self._times.values())
        return {k: (t1-t0) for (k, t0, t1) in zip(ks[1:-1], vs[0:-2], vs[1:-1])}
    
    @property
    def times_stopwatches(self) -> dict[str|Self]:
        """ Dictionary of stopwatches assoiated to a lap """
        return self._tstopwatches
        
    def lap(self, name:str, stopwatch:Self|None=None) -> Self:
        """
            Create a lap in the current stopwatch of the given name. Supports associating the lap with a substopwatch which further details the runtime of this step
        """
        if "_initial" not in self._times.keys(): raise RuntimeError("The stopwatch must first be started")
        if name == "_final" or name=="_initial": raise KeyError(f"'{name}' is an invalid name")
        if name in self._times.keys(): raise KeyError(f"Duplicate key {name}")
        self._times[name] = time.perf_counter()
        if stopwatch is not None:
            self._tstopwatches[name] = stopwatch
        return self

    def start(self) -> Self:
        """ Start the stopwatch. Note that a stopwatch can only be started once """
        if "_initial" in self._times.keys(): return self
        self._times["_initial"] = time.perf_counter()
        return self
    
    def stop(self) -> Self:
        """ Stop the stopwatch (and all it's lap stopwatches). Note that a stopwatch can only be stoped once once it has been started """
        if "_initial" not in self._times.keys(): return self
        for c in self._tstopwatches.values():
            c.stop()
        if "_final" not in self._times.keys():
            self._times["_final"] = time.perf_counter()
        return self
    
    def _format_tdelta_float(tdelta_float: float, unit=True) -> str:
        """ Helper function to format a time delta to a string """
        if tdelta_float == np.inf:
            return f"running"
        if tdelta_float < 0:
            raise ValueError(f"A timedelta must not be negative")
        elif np.abs(tdelta_float) <= 1e-3:
            return f"{round(1e6*tdelta_float, 1)}{' µs' if unit else ''}" 
        elif np.abs(tdelta_float) <= 1.0:
            return f"{round(1e3*tdelta_float, 1)}{' ms' if unit else ''}" 
        elif np.abs(tdelta_float) <= 10.0:
            return f"{round(tdelta_float, 2)}{' s' if unit else ''}"  
        return str(datetime.timedelta(seconds=tdelta_float))
    
    def to_string(self, print_=True, max_level:int=3, level:int=0) -> str|None:
        """
            Display the stopwatch as a string. Example output:

            Total Runtime: 0:00:14
                Step 1: 34,5 ms
                Step 2: 0:00:12
                    Substep 1: 292.3 µs
                    Substep 2: 0:00:11
                Step 3: 2.45 s

            IMPORTANT: Do not change the level parameter as it is for internal use only
        """
        returnStr = []
        if level == 0:
            returnStr.append(f"Total Runtime: {Stopwatch._format_tdelta_float(self.runtime)}")
        if level < max_level:
            for name, rtime in self.runtime_laps.items():
                returnStr.append(f"{'\t'*(level+1)}{name}: {Stopwatch._format_tdelta_float(rtime)}")
                if name in self._tstopwatches.keys():
                    returnStr.extend(self._tstopwatches[name].to_string(max_level=max_level, level=(level+1)))
        if level == 0:
            if print_: 
                print('\n'.join(returnStr))
                return 
            return '\n'.join(returnStr)
        return returnStr

class MultiStopwatch:
    """
        A class to hold a list of stopwatches and create average runtime statistics from them. Supports nested laps as provided by the Stopwatch class

        The MultiStopwatch can measure the total runtime by calling start() and stop(), but it isn't needed for a valid output. To add a child, call add_child(stopwatch)
        or view them by accessing the childs property. Call to_string(max_level) to display an average output of all it's childs.
    """

    def __init__(self):
        self._starttime: float|None = None
        self._stoptime: float|None = None
        self._childs: list[Stopwatch] = []

    def start(self) -> Self:
        """ Start the Stopwatch. Note that a stopwatch can only be started once """
        if self._starttime is not None: return self
        self._starttime = time.perf_counter()
        return self
    
    def stop(self) -> Self:
        """ Stop the stopwatch and all it's childrens. Note that a stopwatch can only be stoped once"""
        if self._starttime is None: return self
        for c in self._childs:
            c.stop()
        if self._stoptime is None: 
            self._stoptime = time.perf_counter()
        return self
    
    @property
    def runtime(self) -> float:
        """ The total runtime of this stopwatch or np.inf if not started or stopped yet """
        if self._starttime is None or self._stoptime is None:
            return np.inf
        return self._stoptime - self._starttime
    
    def add_child(self, stopwatch:Stopwatch) -> Self:
        """ Add a stopwatch """
        if not isinstance(stopwatch, Stopwatch): raise TypeError(f"add_childs needs a Stopwatch, but you provided {type(stopwatch)}")
        self._childs.append(stopwatch)
        return self

    @property
    def childs(self) -> list[Stopwatch]:
        return self._childs
    
    def to_string(self, print_=True, max_level:int=3) -> str|None:
        """ Display averaged runtime statistics for all it's child stopwatches. Example output: 
        
            Total Runtime: 0:00:14 ± 2.39 s (n=13)
                Step 1: 34,5 ms ± 6.5 ms (n=13)
                Step 2: 0:00:12 ± 1.35 s (n=13)
                    Substep 1: 292.3 µs ± 1.1 ms (n=9)
                    Substep 2: 0:00:11 ± 982.3 ms (n=7)
                Step 3: 2.45 s ± 162 ms (n=2)
        """

        def _deref(stopwatches:list[Stopwatch], max_level=max_level, level=0):
            returnStr = []

            runtimesDict = {} # Dictionary of all runtimes per lap name in the list of stopwatches
            lapStopwatchesDict = {} # Dictionary of all lap stopwatches per lap name in the list of stopwatches
            for s in stopwatches:
                for k,v in s.runtime_laps.items():
                    runtimesDict.setdefault(k, []).append(v)
                for k,v in s.times_stopwatches.items():
                    lapStopwatchesDict.setdefault(k, []).append(v)
            for k, runtimes in runtimesDict.items():
                if len(runtimes) == 0:
                    continue
                mean = np.mean(runtimes)
                std = np.std(runtimes)
                returnStr.append(f"{'\t'*level}{k}: {Stopwatch._format_tdelta_float(mean)} ± {Stopwatch._format_tdelta_float(std)} (n={len(runtimes)})")
                if k in lapStopwatchesDict.keys() and level < max_level:
                    returnStr.extend(_deref(lapStopwatchesDict[k], max_level=max_level, level=(level+1)))
            return returnStr
        
        returnStr = [f"Total Runtime: {Stopwatch._format_tdelta_float(self.runtime)} (n={len(self._childs)})"]
        returnStr.extend(_deref(self._childs, max_level=max_level, level=1))
        if print_: 
            print('\n'.join(returnStr))
            return 
        return '\n'.join(returnStr)

In [56]:
from sklearn.metrics import pairwise_distances
from Bio.PDB.Atom import Atom as BioPy_Atom

class ProteinStructureWarning(Exception):
    def __init__(self, message):            
        super().__init__(message)

def get_distance_matrix(structure_biopy:BioPy_PDBStructure) -> tuple[np.ndarray, list[BioPy_Atom], list[BioPy_Atom]]:
    stopwatch = Stopwatch().start()
    chains = [c for c in structure_biopy.get_chains()]
    if not len(chains) == 2: raise ProteinStructureWarning(f"The protein needs to have 2 chains but it has {len(chains)}")
    chain1 = structure_biopy[0][chains[0].id]
    chain2 = structure_biopy[0][chains[1].id]
    

    chain1_atoms = np.array([a for a in chain1.get_atoms()])
    chain2_atoms = np.array([a for a in chain2.get_atoms()])
    chain1_coords = [a.coord for a in chain1_atoms]
    chain2_coords = [a.coord for a in chain2_atoms]
    stopwatch.lap("Parsing the chains and atom arrays")

    distance_matrix = pairwise_distances(chain1_coords,chain2_coords)
    stopwatch.lap("Distance Matrix")

    return (distance_matrix, chain1_atoms, chain2_atoms, stopwatch.stop())

def get_interface(distance_matrix: np.ndarray, chain1_atoms: list[BioPy_Atom], chain2_atoms: list[BioPy_Atom], cutoff=5.0):
    stopwatch = Stopwatch().start()
    pair_dist = np.argwhere(distance_matrix <= cutoff) # List of matrix indices where distance is below cutoff
    intf1_atoms = set(chain1_atoms[pair_dist[:, 0]]) # Set of the chain1 atoms with an atom from chain2 closer than cutoff distance
    intf2_atoms = set(chain2_atoms[pair_dist[:, 1]])
    stopwatch.lap("Interface atoms")

    # List of backbone atoms in the interface
    intf1_backbone = [a for a in intf1_atoms if a.name == "CA" and a in intf1_atoms]
    intf2_backbone = [a for a in intf2_atoms if a.name == "CA" and a in intf2_atoms]
    stopwatch.lap("Interface backbone")

    intf1_residues = [a.parent for a in intf1_backbone]
    intf2_residues = [a.parent for a in intf2_backbone]

    # A list for each chain combined to a tuple with the indices of the interface residues. Example : ([23, 445, 470], [26]) 
    backbone_indices_tuple = ([i for i,a in enumerate(chain1_atoms) if a in intf1_backbone], [i for i,a in enumerate(chain2_atoms) if a in intf2_backbone]) 

    # The distance matrix sliced to only the interface backbone atoms. So shape is (len(intf1_backbone), len(intf2_backbone))
    local_dist_matrix = distance_matrix[backbone_indices_tuple[0], :][:, backbone_indices_tuple[1]]

    stopwatch.lap("local distance matrix")

    min_distance = round(np.min(local_dist_matrix), 3)

    return (intf1_residues, intf2_residues, min_distance, stopwatch.stop())


In [60]:
def EvaluateStructure(path: pathlib.Path, structure_name: str = "") -> tuple[dict|None, Stopwatch] :
    """
        Measures the pdb file given by path
    """
    stopwatch = Stopwatch().start()
    file_name = path.name
    structure_biopy, atomarray_biotite = OpenStructure(path, structure_name)
    stopwatch.lap("Loading structure")
    if structure_biopy is None or atomarray_biotite is None: return (None, stopwatch.stop())

    distance_matrix, chain1_atoms, chain2_atoms, s_dist_matrix = get_distance_matrix(structure_biopy)
    stopwatch.lap("Distance Matrix", s_dist_matrix)
    intf1_residues, intf2_residues, min_distance, s_interface = get_interface(distance_matrix, chain1_atoms, chain2_atoms)

    stopwatch.lap("Defining Interface", s_interface)

    buried_area = calculate_buried_area(structure_biopy) if _freesasa_ready else calculate_buried_area_biotite(atomarray_biotite)
    stopwatch.lap("Surface Area")
    hbonds = calculate_hbonds(atomarray_biotite)
    stopwatch.lap("H-Bonds")
    min_distance = calculate_min_distance(atomarray_biotite)
    stopwatch.lap("Interface Distance")
    salt_bridges = calculate_saltbridges(structure_biopy)
    stopwatch.lap("Salt bridges")
    hydrophobic_interactions = calculate_hydrophobic_interactions(structure_biopy)
    stopwatch.lap("Hydropphobic interactions")

    return ({
        'structure_name': structure_name,
        'file': file_name,
        'chainA_intf_res': len(intf1_residues),
        'chainB_intf_res': len(intf2_residues),
        'interface_distance': min_distance,
        'buried_area': buried_area,

        'hbonds': hbonds,
        'salt_bridges': salt_bridges,
        'hydrophobic_interactions': hydrophobic_interactions
    }, stopwatch.stop())

multiStopwatch = MultiStopwatch().start()
result, stopwatch = EvaluateStructure(sampleStructure_path, sampleStructure_name)
multiStopwatch.add_child(stopwatch)
multiStopwatch.stop()
stopwatch.to_string()

Total Runtime: 162.9 ms
	Loading structure: 22.1 ms
	Distance Matrix: 3.5 ms
		Parsing the chains and atom arrays: 2.1 ms
		Distance Matrix: 1.3 ms
	Defining Interface: 5.2 ms
		Interface atoms: 771.8 µs
		Interface backbone: 14.6 µs
		local distance matrix: 4.4 ms
	Surface Area: 78.7 ms
	H-Bonds: 14.5 ms
	Interface Distance: 33.6 ms
	Salt bridges: 626.0 µs
	Hydropphobic interactions: 4.7 ms


In [None]:
bondList = struc.BondList()