In [2]:
import numpy as np
import pandas as pd
from bisect import bisect_left, bisect_right
from pyteomics import mass
import matplotlib.pyplot as plt

# -------------------------
# Нейтральные потери
# -------------------------
NEUTRAL_LOSSES = {
    'H2O': mass.calculate_mass(formula='H2O'),  # -18.01056 Da
    'NH3': mass.calculate_mass(formula='NH3')   # -17.02655 Da
}
# при желании сюда можно добавить специфическую потерю модификации, например:
# 'oxMet': 15.9949, и т.д.

def calc_fragment_masses(sequence, modifications=None, ion_types=('b','y'), max_charge=2,
                         include_losses=True, custom_losses=None):
    """
    Построение b/y фрагментов с учётом модификаций и нейтральных потерь.
    """
    if modifications is None:
        modifications = {}
    if custom_losses is None:
        custom_losses = {}

    n = len(sequence)
    res_masses = [mass.std_aa_mass[aa] for aa in sequence]

    prefix = [0.0] * (n + 1)
    for i in range(1, n+1):
        prefix[i] = prefix[i-1] + res_masses[i-1] + modifications.get(i, 0.0)

    suffix = [0.0] * (n + 1)
    for i in range(n-1, -1, -1):
        suffix[i] = suffix[i+1] + res_masses[i] + modifications.get(i+1, 0.0)

    fragments = []
    # b-ions
    for k in range(1, n):
        neutral_mass = prefix[k] + mass.calculate_mass(formula='H')
        fragments.append(_build_fragment("b", k, neutral_mass, max_charge,
                                         include_losses, custom_losses))
    # y-ions
    for k in range(1, n):
        neutral_mass = suffix[n-k] + mass.calculate_mass(formula='H2O') + mass.calculate_mass(formula='H')
        fragments.append(_build_fragment("y", k, neutral_mass, max_charge,
                                         include_losses, custom_losses))

    return [f for frag_group in fragments for f in frag_group]

def _build_fragment(ion_type, length, neutral_mass, max_charge, include_losses, custom_losses):
    """
    Возвращает список фрагментов (базовый + с нейтральными потерями).
    """
    frags = []
    loss_dict = {}
    if include_losses:
        loss_dict.update(NEUTRAL_LOSSES)
        loss_dict.update(custom_losses)

    variants = {'': 0.0}  # без потери
    for lname, lmass in loss_dict.items():
        variants[lname] = lmass

    for lname, lmass in variants.items():
        nm = neutral_mass - lmass
        for z in range(1, max_charge+1):
            mz = (nm + z * mass.calculate_mass(formula='H')) / z
            ion_name = f"{ion_type}{length}"
            if lname:
                ion_name += f"-{lname}"
            frags.append({
                'ion': ion_name,
                'type': ion_type,
                'length': length,
                'charge': z,
                'mass': nm,
                'mz': mz,
                'loss': lname if lname else None
            })
    return frags


# -------------------------
# Визуализация спектра
# -------------------------
def plot_spectrum_with_annotations(mz, intensity, annotated_df, title=None, tol=20, ppm=True):
    """
    Рисует спектр с аннотированными ионами.
    """
    plt.figure(figsize=(12, 6))
    plt.bar(mz, intensity, width=0.02, color='grey', alpha=0.7, label="Experimental peaks")

    colors = {'b': 'red', 'y': 'blue'}
    for _, row in annotated_df.iterrows():
        color = colors.get(row['type'], 'green')
        plt.vlines(row['obs_mz'], 0, row['obs_intensity'], color=color, linewidth=1.5)
        plt.text(row['obs_mz'], row['obs_intensity']*1.05,
                 f"{row['ion']}^{row['charge']}",
                 rotation=90, color=color, fontsize=8, ha='center', va='bottom')

    plt.xlabel("m/z")
    plt.ylabel("Intensity")
    if title:
        plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [4]:
from pyteomics import mass, mzml
import numpy as np
import pandas as pd
from bisect import bisect_left, bisect_right

# --- Утилиты для поиска пика по m/z с учетом tolerance (ppm или Da) ---
def mz_tol_window(mz, tol, ppm=True):
    if ppm:
        delta = mz * tol / 1e6
    else:
        delta = tol
    return mz - delta, mz + delta

def find_peaks_in_window(sorted_mz, mz_lo, mz_hi):
    """Возвращает индексы пиков (в отсортированном списке mz), попадающих в [mz_lo, mz_hi]."""
    left = bisect_left(sorted_mz, mz_lo)
    right = bisect_right(sorted_mz, mz_hi)
    return range(left, right)

# --- Построение теоретических фрагментов (b и y) с учетом модификаций) ---
AA_MASSES = mass.std_aa_mass  # pyteomics словарь стандартных масс аминокислот

def calc_fragment_masses(sequence, modifications=None, ion_types=('b','y'), max_charge=2):
    """
    sequence: "PEPTIDE"
    modifications: dict {position (1-based): delta_mass}, например {5: 15.9949}
    returns list of dicts: [{'ion': 'b5', 'type':'b', 'length':5, 'mass':m_mono, 'mz_list':[mz_for_charge1, ...]}, ...]
    """
    if modifications is None:
        modifications = {}
    seq = sequence
    n = len(seq)

    # precompute residue masses
    res_masses = [mass.calculate_mass(sequence=aa) for aa in seq]  # single AA masses
    # but better: use mass.std_aa_mass lookup for speed:
    res_masses = [AA_MASSES[aa] for aa in seq]

    # accumulate prefix/suffix masses (monoisotopic neutral masses)
    prefix = [0.0] * (n + 1)   # prefix[i] mass of first i residues
    for i in range(1, n+1):
        prefix[i] = prefix[i-1] + res_masses[i-1] + (modifications.get(i, 0.0) if i in modifications else 0.0)

    suffix = [0.0] * (n + 1)   # suffix[i] mass of residues i..n
    for i in range(n-1, -1, -1):
        suffix[i] = suffix[i+1] + res_masses[i] + (modifications.get(i+1, 0.0) if (i+1) in modifications else 0.0)

    fragments = []
    # b-ions: masses of first k residues + mass of proton (mass of b-ion neutral composition = prefix[k] + mass of H)
    for k in range(1, n):
        neutral_mass = prefix[k] + mass.calculate_mass(formula='H')  # b ion neutral
        ion_name = f"b{k}"
        entry = {'ion': ion_name, 'type':'b', 'length':k, 'mass': neutral_mass}
        entry['mz_list'] = [(neutral_mass + z*mass.calculate_mass(formula='H'))/z for z in range(1, max_charge+1)]
        fragments.append(entry)

    # y-ions: last k residues + mass of water + H (common conv: y = suffix[pos] + mass(H2O) + H ?)
    # Using standard: y_k neutral mass = suffix[n-k] + mass(H2O) + mass(H)
    for k in range(1, n):
        # y_k corresponds to suffix starting at position n-k -> suffix_index = n-k
        neutral_mass = suffix[n-k] + mass.calculate_mass(formula='H2O') + mass.calculate_mass(formula='H')
        ion_name = f"y{k}"
        entry = {'ion': ion_name, 'type':'y', 'length':k, 'mass': neutral_mass}
        entry['mz_list'] = [(neutral_mass + z*mass.calculate_mass(formula='H'))/z for z in range(1, max_charge+1)]
        fragments.append(entry)

    return fragments

# --- Аннотирование одного спектра ---
def annotate_spectrum(spectrum_mz, spectrum_intensity, sequence, modifications=None,
                      precursor_charge=2, max_frag_charge=2,
                      tol=20, ppm=True, min_rel_intensity=0.01):
    """
    spectrum_mz, spectrum_intensity: numpy arrays or lists (not necessarily sorted)
    modifications: dict {1-based position: delta_mass}
    tol: tolerance in ppm (if ppm=True) or Da (ppm=False)
    min_rel_intensity: минимальная относительная интенсивность пика для учёта (фракция от base peak)
    Returns: DataFrame с аннотацией и summary dict
    """
    # sort spectrum by mz to allow efficient window search
    idx = np.argsort(spectrum_mz)
    mz_sorted = np.array(spectrum_mz)[idx]
    inten_sorted = np.array(spectrum_intensity)[idx]
    base_int = inten_sorted.max() if len(inten_sorted)>0 else 1.0
    # optional: filter out very small peaks
    valid_mask = inten_sorted >= (min_rel_intensity * base_int)
    if not valid_mask.any():
        # nothing above threshold; but continue with full array to not fail
        pass

    fragments = calc_fragment_masses(sequence, modifications=modifications, ion_types=('b','y'), max_charge=max_frag_charge)
    matches = []
    for frag in fragments:
        for charge_idx, mz_expected in enumerate(frag['mz_list'], start=1):
            mz_lo, mz_hi = mz_tol_window(mz_expected, tol, ppm=ppm)
            for j in find_peaks_in_window(mz_sorted, mz_lo, mz_hi):
                matched_mz = float(mz_sorted[j])
                matched_int = float(inten_sorted[j])
                ppm_err = (matched_mz - mz_expected) / mz_expected * 1e6
                da_err = matched_mz - mz_expected
                matches.append({
                    'ion': frag['ion'],
                    'type': frag['type'],
                    'length': frag['length'],
                    'charge': charge_idx,
                    'theor_mz': mz_expected,
                    'obs_mz': matched_mz,
                    'obs_intensity': matched_int,
                    'ppm_err': ppm_err,
                    'da_err': da_err
                })

    annotated_df = pd.DataFrame(matches).sort_values(by=['type','length','charge'])
    # Summary metrics: coverage, site-determining ions etc.
    n_theor = len(fragments) * max_frag_charge
    n_matched = annotated_df.shape[0]
    coverage = n_matched / n_theor if n_theor>0 else 0.0

    # Site-determining ions: ions that include the modified residue (if we know modified position)
    site_score = None
    if modifications:
        # collect modified positions
        mod_positions = set(modifications.keys())
        # For each ion, decide whether it includes any modified position.
        def ion_includes_mod(ion_name, length, ion_type):
            # b_k includes residues 1..k
            # y_k includes residues n-k+1 .. n
            n = len(sequence)
            if ion_type == 'b':
                included = set(range(1, length+1))
            else:  # y
                included = set(range(n-length+1, n+1))
            return bool(included & mod_positions)

        # mark which theoretical ions are site-determining
        site_determining_theor = []
        for frag in fragments:
            sd = ion_includes_mod(frag['ion'], frag['length'], frag['type'])
            site_determining_theor.append((frag['ion'], sd))

        sd_theor_set = set(ion for ion, sd in site_determining_theor if sd)
        sd_matched = annotated_df[annotated_df['ion'].isin(sd_theor_set)]
        n_sd_theor = len([1 for ion, sd in site_determining_theor if sd])
        n_sd_matched = sd_matched['ion'].nunique()
        site_score = {
            'n_sd_theor': n_sd_theor,
            'n_sd_matched_unique': int(n_sd_matched),
            'n_sd_matched_total_peaks': int(sd_matched.shape[0])
        }
    summary = {
        'n_theoretical_fragments_total': n_theor,
        'n_matched_peaks': int(n_matched),
        'coverage_fraction': coverage,
        'site_score': site_score
    }

    return annotated_df, summary


In [None]:
annotate_spectrum()