In [448]:
import dataclasses
from collections import Counter
from typing import Tuple, List
from enum import Enum, auto
import time

from protein import trypsin
from measurement import read_mgf, PeptideMeasurement
from pyteomics import mass


@dataclasses.dataclass
class Modification:
    name: str
    mass: float
    count: int


@dataclasses.dataclass
class Peptide:
    beginning: int
    end: int
    seq: str
    modifications: List[Modification]

    _aas = None
    _mass = None
    _minmass = None
    _maxmass = None

    def count(self, amino_acid):
        if self._aas is None:
            self._aas = Counter(self.seq)
        return self._aas[amino_acid]

    @property
    def zwitterion_mass(self):
        if self._mass is None:
            self._mass = mass.calculate_mass(
                sequence=self.seq, ion_type="M", charge=0
            ) - mass.calculate_mass(formula="H2O")
        return self._mass

    @property
    def min_mass(self):
        if self._minmass is None:
            neg = sum(m.mass * m.count for m in self.modifications if m.mass < 0)
            self._minmass = self.zwitterion_mass + neg
        return self._minmass

    @property
    def max_mass(self):
        if self._maxmass is None:
            pos = sum(m.mass * m.count for m in self.modifications if m.mass > 0)
            self._maxmass = self.zwitterion_mass + pos
        return self._maxmass


In [390]:
LYS = "KVFGRCELAAAMKRHGLDNYRGYSLGNWVCAAKFESNFNTQATNRNTDGSTDYGILQINSRWWCNDGRTPGSRNLCNIPCSALLSSDITASVNCAKKIVSDGNGMNAWVAWRNRCKGTDVQAWIRGCRL"

BSA = "DTHKSEIAHRFKDLGEEHFKGLVLIAFSQYLQQCPFDEHVKLVNELTEFAKTCVADESHAGCEKSLHTLFGDELCKVASLRETYGDMADCCEKQEPERNECFLSHKDDSPDLPKLKPDPNTLCDEFKADEKKFWGKYLYEIARRHPYFYAPELLYYANKYNGVFQECCQAEDKGACLLPKIETMREKVLTSSARQRLRCASIQKFGERALKAWSVARLSQKFPKAEFVEVTKLVTDLTKVHKECCHGDLLECADDRADLAKYICDNQDTISSKLKECCDKPLLEKSHCIAEVEKDAIPENLPPLTADFAEDKDVCKNYQEAKDAFLGSFLYEYSRRHPEYAVSVLLRLAKEYEATLEECCAKDDPHACYSTVFDKLKHLVDEPQNLIKQNCDQFEKLGEYGFQNALIVRYTRKVPQVSTPTLVEVSRSLGKVGTRCCTKPESERMPCTEDYLSLILNRLCVLHEKTPVSEKVTKCCTESLVNRRPCFSALTPDETYVPKAFDEKLFTFHADICTLPDTEKQIKKQTALVELLKHKPKATEEQLKTVMENFVAFVDKCCAADDKEACFAVEGPKLVVSTQTALA"

measurements = {m.scan: m for m in read_mgf("../data/mgf/190318_LYS_AT_50x_05.mgf")}

In [392]:
peptides = []
for b, e in trypsin(LYS):
    seq = LYS[b:e]
    met_ox = Modification("met_ox", 15.9949, sum(aa == "M" for aa in seq))
    peptides.append(Peptide(b, e, seq, [met_ox]))

peptides

[Peptide(beginning=0, end=1, seq='K', modifications=[Modification(name='met_ox', mass=15.9949, count=0)]),
 Peptide(beginning=1, end=5, seq='VFGR', modifications=[Modification(name='met_ox', mass=15.9949, count=0)]),
 Peptide(beginning=5, end=13, seq='CELAAAMK', modifications=[Modification(name='met_ox', mass=15.9949, count=1)]),
 Peptide(beginning=13, end=14, seq='R', modifications=[Modification(name='met_ox', mass=15.9949, count=0)]),
 Peptide(beginning=14, end=21, seq='HGLDNYR', modifications=[Modification(name='met_ox', mass=15.9949, count=0)]),
 Peptide(beginning=21, end=33, seq='GYSLGNWVCAAK', modifications=[Modification(name='met_ox', mass=15.9949, count=0)]),
 Peptide(beginning=33, end=45, seq='FESNFNTQATNR', modifications=[Modification(name='met_ox', mass=15.9949, count=0)]),
 Peptide(beginning=45, end=61, seq='NTDGSTDYGILQINSR', modifications=[Modification(name='met_ox', mass=15.9949, count=0)]),
 Peptide(beginning=61, end=68, seq='WWCNDGR', modifications=[Modification(name='

In [561]:

def within_bounds(reference_mass, measured_mass, ppm_error=10):
    return abs(reference_mass - measured_mass) <= err_margin(reference_mass, ppm_error)


def err_margin(reference_mass, ppm_error=10):
    return (ppm_error / 1e6) * reference_mass


def compute_error(reference_mass, measured_mass):
    return 1e6 * abs(measured_mass - reference_mass) / reference_mass


class State(Enum):
    BEFORE = auto()
    DURING = auto()


def set_tuple(t, i, x):
    return t[:i] + (x,) + t[i + 1 :]


def combine_modifications(
    modifications: List[Modification],
    starting_mass: float,
    target_mass: float,
    ppm_error: float = 10,
) -> List[int]:
    result = []

    def go(i, current, selection):
        if within_bounds(current, target_mass, ppm_error):
            result.append(selection)

        if i == len(modifications):
            return
        else:
            m = modifications[i]
            go(i + 1, current + m.mass, selection + ((m.name, m.mass),))
            go(i + 1, current, selection)

    go(0, current=starting_mass, selection=())
    return list(set(result))


# peptide_masses should be digestion peptides with H2O loss
def precursor_mass_matches(
    peptides: List[Peptide],
    measurement: PeptideMeasurement,
    cysteine_mod: float,
    jumps: int,
    ppm_error: int = 10,
) -> List[str]:
    target = measurement.peptide_mass_estimate
    h2o = mass.calculate_mass(formula="H2O")
    h2 = mass.calculate_mass(formula="H2")

    result = []

    # TODO: Optimize with dynamic programming if possible
    def go(
        i: int,
        current: float,
        selection: Tuple[int, ...],
        state: State,
        jumps: int,
        cysteines_before: int,
        cysteines_now: int,
        min_mod_bound: float,
        max_mod_bound: float,
    ) -> None:
        # Non-bonded cysteines are alkylated, or modified in another way
        cysteines = cysteines_before + cysteines_now
        c_mod_add = cysteine_mod * cysteines

        final_mass = current + c_mod_add

        min_with_c = min_mod_bound + c_mod_add
        lowest = min_with_c - err_margin(min_with_c, ppm_error)

        max_with_c = max_mod_bound + c_mod_add
        highest = max_with_c + err_margin(max_with_c, ppm_error)

        if cysteines_now >= 0 and lowest <= target <= highest:
            ranges = list(zip(selection[::2], (selection + (i,))[1::2]))

            mod_array = []

            for b, e in ranges:
                for p in peptides[b:e]:
                    for m in p.modifications:
                        mod_array += [m] * m.count

            combinations = combine_modifications(
                mod_array,
                starting_mass=final_mass,
                target_mass=target,
                ppm_error=ppm_error,
            )

            seq = "+".join("".join(p.seq for p in peptides[b:e]) for b, e in ranges)

            for c in combinations:
                superfinal_mass = final_mass + sum(m for _, m in c)
                result.append(
                    {
                        "sequence": seq,
                        "cysteine_mods": cysteines,
                        "mass": superfinal_mass,
                        "error": compute_error(superfinal_mass, target),
                        "mods": c,
                    }
                )

        if (
            i == len(peptides)
            or min_mod_bound - err_margin(min_mod_bound, ppm_error) > target
        ):
            # Either we're out of peptides to add
            # Or we're too high and we'll never correct it
            return
        else:
            if state == State.BEFORE:
                # Don't start yet
                go(
                    i + 1,
                    current,
                    selection,
                    state.BEFORE,
                    jumps,
                    cysteines_before,
                    cysteines_now,
                    min_mod_bound,
                    max_mod_bound,
                )
            elif state == State.DURING and min(jumps, cysteines) > 0:
                # End this run, begin next one
                go(
                    i,
                    current + h2o - h2,  # +H2O for peptide ends, –H2 for the S-S bond
                    selection + (i,),
                    state.BEFORE,
                    jumps - 1,
                    cysteines - 1,
                    -1,
                    min_mod_bound + h2o - h2,
                    max_mod_bound + h2o - h2,
                )

            # Take this one, and either begin or continue this run
            go(
                i + 1,
                current + peptides[i].zwitterion_mass,
                selection + (i,) if state == State.BEFORE else selection,
                State.DURING,
                jumps,
                cysteines_before,
                cysteines_now + peptides[i].count("C"),
                min_mod_bound + peptides[i].min_mass,
                max_mod_bound + peptides[i].max_mass,
            )

    go(
        0,
        current=h2o,
        selection=(),
        state=State.BEFORE,
        jumps=jumps,
        cysteines_before=0,
        cysteines_now=0,
        min_mod_bound=h2o,
        max_mod_bound=h2o,
    )

    return result


In [566]:
import csv

FILE_PATH = "../out/precursor_matches_lys_at.csv"

start_time = time.time()

with open(FILE_PATH, "w") as f:
    field_names = ["sequence", "mass", "error", "cysteine_mods", "mods"]
    writer = csv.DictWriter(f, fieldnames=field_names)
    writer.writeheader()

    for scan, measurement in measurements.items():
        for match in precursor_mass_matches(
            peptides, measurement, cysteine_mod=57.0214, jumps=1, ppm_error=15
        ):
            writer.writerow(match)
end_time = time.time()

print(f"This takes {end_time - start_time} seconds")

This takes 26.122879028320312 seconds
