In [117]:
import dataclasses
from collections import Counter
from typing import Tuple, List, Dict, Union, Set, Iterator, Optional
from enum import Enum, auto
import time

from protein import trypsin
from measurement import read_mgf, PeptideMeasurement
from pyteomics import mass
from common import LYS, BSA


@dataclasses.dataclass
class Mod:
    description: str
    mass: float

    def __hash__(self):
        return (self.description, self.mass).__hash__()


@dataclasses.dataclass
class Residue:
    name: str
    mass: float

    def __init__(self, name: str, modifications: List[Mod] = []):
        self.name = name
        self.mass = (
            mass.calculate_mass(sequence=name)
            - mass.calculate_mass(formula="H2O")
            + sum(m.mass for m in modifications)
        )


class Peptide:
    beginning: int
    end: int
    seq: str

    _modifications: Dict[str, Tuple[Mod, int]]
    _residues: List[Residue]
    _residue_counts: Dict[str, int] = None
    _mass = None
    _minmass = None
    _maxmass = None

    def __init__(
        self,
        beginning: int,
        end: int,
        seq: str,
        modifications: Dict[str, Tuple[Mod, int]],
    ):
        self.beginning = beginning
        self.end = end
        self.seq = seq
        self._residues = [Residue(resname) for resname in seq]
        self._modifications = modifications

    def __getitem__(self, index: int):
        if self.beginning <= index < self.end:
            return self._residues[index - self.beginning]
        return None

    def __iter__(self):
        return range(self.beginning, self.end).__iter__()

    def __add__(self, other):
        if other.beginning != self.end:
            raise ValueError(
                f"Peptides can only be added when they are contiguous. Got {(self.beginning, self.end)} + {other.beginning, other.end} instead."
            )

        merged_mods = self._modifications
        for target, (mod, c2) in other._modifications.items():
            mod2, c1 = merged_mods.setdefault(target, (mod, 0))
            if mod != mod2:
                raise ValueError(
                    f"Peptides can only be added when they have compatible modifications. These two differ at {target}"
                )
            merged_mods[target] = mod, c1 + c2

        return Peptide(self.beginning, other.end, self.seq + other.seq, merged_mods)

    def count(self, amino_acid):
        if self._residue_counts is None:
            self._residue_counts = Counter(self.seq)
        return self._residue_counts[amino_acid]

    @property
    def zwitterion_mass(self):
        if self._mass is None:
            self._mass = mass.calculate_mass(
                sequence=self.seq, ion_type="M", charge=0
            ) - mass.calculate_mass(formula="H2O")
        return self._mass

    @property
    def min_mass(self):
        if self._minmass is None:
            neg = sum(
                m.mass * count for m, count in self.modifications_anywhere if m.mass < 0
            )
            self._minmass = self.zwitterion_mass + neg
        return self._minmass

    @property
    def max_mass(self):
        if self._maxmass is None:
            pos = sum(
                m.mass * count for m, count in self.modifications_anywhere if m.mass > 0
            )
            self._maxmass = self.zwitterion_mass + pos
        return self._maxmass

    @property
    def modifications_anywhere(self) -> Iterator[Tuple[Mod, int]]:
        return (x for x in self._modifications.values())

    @property
    def cysteines(self) -> Iterator[int]:
        return (i for i, res in enumerate(self._residues) if res.name == "C")

    def __repr__(self):
        return f"Peptide(beginning={self.beginning}, end={self.end}, seq={self.seq}, modifications={self._modifications})"


def within_bounds(reference_mass, measured_mass, ppm_error: float = 10):
    return abs(reference_mass - measured_mass) <= err_margin(reference_mass, ppm_error)


def err_margin(reference_mass, ppm_error: float = 10):
    return (ppm_error / 1e6) * reference_mass


def compute_error(reference_mass, measured_mass):
    return 1e6 * abs(measured_mass - reference_mass) / reference_mass


class State(Enum):
    BEFORE = auto()
    DURING = auto()


def set_tuple(t, i, x):
    return t[:i] + (x,) + t[i + 1 :]


# Pass None when you want to allow to skip a mod
def combine_modifications_2(
    modifications: List[List[Union[Mod, None]]],
    starting_mass: float,
    target_mass: float,
    ppm_error: float = 10,
) -> List[List[Mod]]:
    result = []

    def go(i, current, selection):
        if i == len(modifications):
            if within_bounds(current, target_mass, ppm_error):
                result.append(selection)
        else:
            for m in modifications[i]:
                if m is None:
                    go(i + 1, current, selection)
                else:
                    go(i + 1, current + m.mass, selection + (m,))

    go(0, current=starting_mass, selection=())
    return list(set(result))


In [118]:
measurements = {m.scan: m for m in read_mgf("../data/mgf/190318_LYS_AT_50x_05.mgf")}
list(measurements.items())[:10]

[(3, <measurement.PeptideMeasurement at 0x11801d790>),
 (7, <measurement.PeptideMeasurement at 0x11801d250>),
 (9, <measurement.PeptideMeasurement at 0x11801d160>),
 (12, <measurement.PeptideMeasurement at 0x11801d880>),
 (13, <measurement.PeptideMeasurement at 0x11801d9d0>),
 (29, <measurement.PeptideMeasurement at 0x11801df10>),
 (37, <measurement.PeptideMeasurement at 0x11837c7c0>),
 (42, <measurement.PeptideMeasurement at 0x11837c760>),
 (54, <measurement.PeptideMeasurement at 0x11837c610>),
 (57, <measurement.PeptideMeasurement at 0x11837cd90>)]

In [119]:
peptides = []
for b, e in trypsin(LYS):
    seq = LYS[b:e]
    met_ox = (Mod("met_ox", 15.9949), sum(aa == "M" for aa in seq))
    mods = {"M": met_ox} if "M" in seq else {}
    peptides.append(Peptide(b, e, seq, modifications=mods))

peptides

[Peptide(beginning=0, end=1, seq=K, modifications={}),
 Peptide(beginning=1, end=5, seq=VFGR, modifications={}),
 Peptide(beginning=5, end=13, seq=CELAAAMK, modifications={'M': (Mod(description='met_ox', mass=15.9949), 1)}),
 Peptide(beginning=13, end=14, seq=R, modifications={}),
 Peptide(beginning=14, end=21, seq=HGLDNYR, modifications={}),
 Peptide(beginning=21, end=33, seq=GYSLGNWVCAAK, modifications={}),
 Peptide(beginning=33, end=45, seq=FESNFNTQATNR, modifications={}),
 Peptide(beginning=45, end=61, seq=NTDGSTDYGILQINSR, modifications={}),
 Peptide(beginning=61, end=68, seq=WWCNDGR, modifications={}),
 Peptide(beginning=68, end=73, seq=TPGSR, modifications={}),
 Peptide(beginning=73, end=96, seq=NLCNIPCSALLSSDITASVNCAK, modifications={}),
 Peptide(beginning=96, end=97, seq=K, modifications={}),
 Peptide(beginning=97, end=112, seq=IVSDGNGMNAWVAWR, modifications={'M': (Mod(description='met_ox', mass=15.9949), 1)}),
 Peptide(beginning=112, end=114, seq=NR, modifications={}),
 Pepti

In [149]:
# peptide_masses should be digestion peptides with H2O loss
# TODO: Přepsat na čitelnější verzi
def precursor_mass_matches(
    peptides: List[Peptide],
    measurement: PeptideMeasurement,
    alkylation_mass: float,
    max_inter_bonds: int,
    ppm_error: int = 10,
) -> List[str]:
    target = measurement.peptide_mass_estimate
    h2o = mass.calculate_mass(formula="H2O")
    h2 = mass.calculate_mass(formula="H2")

    result = []

    def go(
        i: int,
        current: float,
        min_raw_mass: float,
        max_raw_mass: float,
        selection: Tuple[int, ...],
        inter_bonds_left: int,
        free_cys: int,
        waiting_for_cys: bool,
    ) -> None:
        if not waiting_for_cys:
            max_posibble_mass = max_raw_mass + alkylation_mass * free_cys
            upper_bound = max_posibble_mass + err_margin(max_posibble_mass, ppm_error)

            if target <= upper_bound:
                has_alkylated_cys = free_cys % 2 == 1
                min_possible_mass = min_raw_mass + alkylation_mass * has_alkylated_cys
                lower_bound = min_possible_mass - err_margin(
                    min_possible_mass, ppm_error
                )

                if lower_bound <= target:
                    ranges = list(zip(selection[::2], (selection + (i,))[1::2]))
                    possible_mods: List[List[Mod]] = []

                    for b, e in ranges:
                        for p in peptides[b:e]:
                            for m, count in p.modifications_anywhere:
                                possible_mods += [
                                    [Mod(m.description, m.mass), None]
                                ] * count

                    max_intra_bonds = free_cys // 2
                    for _ in range(max_intra_bonds):
                        possible_mods.append(
                            [Mod("cys_pair_alk", alkylation_mass * 2), None]
                        )

                    seq = "+".join(
                        "".join(p.seq for p in peptides[b:e]) for b, e in ranges
                    )

                    if has_alkylated_cys:
                        # One Cys has to be alkylated, as it can't be in a bond
                        possible_mods.append([Mod("cys_alk", alkylation_mass)])

                    combinations = combine_modifications_2(
                        possible_mods,
                        starting_mass=current,
                        target_mass=target,
                        ppm_error=ppm_error,
                    )

                    for modifications in combinations:
                        total_mass = current + sum(m.mass for m in modifications)

                        alkylated_pairs = sum(
                            m.description == "cys_pair_alk" for m in modifications
                        )
                        intra_bonds = max_intra_bonds - alkylated_pairs
                        inter_bonds = max_inter_bonds - inter_bonds_left

                        result.append(
                            {
                                "sequence": seq,
                                "ranges": ranges,
                                "cysteine_bonds": intra_bonds + inter_bonds,
                                # "mass": total_mass,
                                "error": compute_error(total_mass, target),
                                "mods": modifications,
                            }
                        )
                    # return TODO: Proč tady nemůže být tvle

        if (
            i == len(peptides)
            or min_raw_mass - err_margin(min_raw_mass, ppm_error) > target
        ):
            # Either we're out of peptides to add
            # Or our mass is too high, beyond repair
            return
        else:
            if (
                not waiting_for_cys
                and min(inter_bonds_left, free_cys) > 0
                and selection[-1] != i
            ):
                # End this run, begin next one
                find_run_start(
                    i,
                    current + h2o - h2,
                    min_raw_mass + h2o - h2,
                    max_raw_mass + h2o - h2,
                    selection + (i,),
                    inter_bonds_left - 1,
                    free_cys - 1,
                    True,
                )

            # Take this one
            new_free_cys = peptides[i].count("C") - waiting_for_cys
            go(
                i + 1,
                current + peptides[i].zwitterion_mass,
                min_raw_mass + peptides[i].min_mass,
                max_raw_mass + peptides[i].max_mass,
                selection,
                inter_bonds_left,
                free_cys=free_cys + max(new_free_cys, 0),
                waiting_for_cys=new_free_cys < 0,
            )

    def find_run_start(
        i: int,
        current: float,
        min_raw_mass: float,
        max_raw_mass: float,
        selection: Tuple[int, ...],
        inter_bonds_left: int,
        free_cys: int,
        waiting_for_cys: bool = False,
    ):

        for beginning in range(i, len(peptides)):
            go(
                beginning,
                current,
                min_raw_mass,
                max_raw_mass,
                selection + (beginning,),
                inter_bonds_left,
                free_cys,
                waiting_for_cys,
            )

    find_run_start(
        0,
        current=h2o,
        min_raw_mass=h2o,
        max_raw_mass=h2o,
        selection=(),
        inter_bonds_left=max_inter_bonds,
        free_cys=0,
        waiting_for_cys=False,
    )

    return result


In [150]:

start_time = time.time()

for scan, measurement in list(measurements.items()):
    for match in sorted(
        precursor_mass_matches(
            peptides,
            measurement,
            alkylation_mass=57.0214,
            max_inter_bonds=2,
            ppm_error=15,
        ),
        key=lambda m: m["sequence"],
    ):
        print(f"{scan}: {match}")

end_time = time.time()
print(f"This takes {end_time - start_time} seconds")

845: {'sequence': 'CELAAAMK+GCR', 'ranges': [(2, 3), (16, 17)], 'cysteine_bonds': 1, 'error': 0.17497809442811452, 'mods': (Mod(description='met_ox', mass=15.9949),)}
846: {'sequence': 'RHGLDNYR', 'ranges': [(3, 5)], 'cysteine_bonds': 0, 'error': 0.046330920919352085, 'mods': ()}
848: {'sequence': 'CELAAAMK+GCR', 'ranges': [(2, 3), (16, 17)], 'cysteine_bonds': 1, 'error': 0.7012779318072832, 'mods': (Mod(description='met_ox', mass=15.9949),)}
849: {'sequence': 'CELAAAMK+CK', 'ranges': [(2, 3), (14, 15)], 'cysteine_bonds': 1, 'error': 0.45653075413820915, 'mods': (Mod(description='met_ox', mass=15.9949),)}
852: {'sequence': 'CELAAAMKR+CK', 'ranges': [(2, 4), (14, 15)], 'cysteine_bonds': 1, 'error': 0.363565438838716, 'mods': ()}
854: {'sequence': 'CELAAAMKR+GCR', 'ranges': [(2, 4), (16, 17)], 'cysteine_bonds': 1, 'error': 0.07791330145198574, 'mods': (Mod(description='met_ox', mass=15.9949),)}
858: {'sequence': 'NRCK+GCR', 'ranges': [(13, 15), (16, 17)], 'cysteine_bonds': 1, 'error': 3.

In [147]:
precursor_mass_matches(
    peptides,
    measurements[5332],
    alkylation_mass=57.0214,
    max_inter_bonds=2,
    ppm_error=15,
)


15 1 False -0.04435176620972925


[{'sequence': 'VFGRCELAAAMKRHGLDNYR+NLCNIPCSALLSSDITASVNCAK',
  'ranges': [(1, 5), (10, 11)],
  'cysteine_bonds': 1,
  'error': 0.6603902488132445,
  'mods': (Mod(description='cys_pair_alk', mass=114.0428),)}]

In [None]:
import pickle
from tqdm import tqdm

FILE_PATH = "../out/precursor_matches_lys_at_2_inter_bonds.pickle"

start_time = time.time()

with open(FILE_PATH, "wb") as f:
    for scan, measurement in measurements.items():
        for match in precursor_mass_matches(
            peptides,
            measurement,
            alkylation_mass=57.0214,
            max_inter_bonds=1,
            ppm_error=10,
        ):
            print(f"{scan}: {match}")
        # pickle.dump({"scan": scan, "matches": matches}, f)

end_time = time.time()

print(f"This takes {end_time - start_time} seconds")