In [2]:
# %% [markdown]
# MS1 Peak Assignment via Neutral Mass → Charge-State Matching
# - Select deconvoluted neutral masses down to 1% of max
# - Generate synthetic charge states (z = 5..50)
# - Match to raw MS1 within tolerance, iteratively remove matched peaks
# - Return MS1 with assigned peaks + summary
#
# Paths below are prefilled with your uploaded files; change if needed.

# %%
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ----------------------------
# User-configurable parameters
# ----------------------------
RAW_MS1_CSV = r"F:/test/5__pos_1__neg_0_pos_runA.csv"       # raw MS1 (m/z, intensity)
DECONV_TXT  = r"F:/test/5__pos_1__neg_0_pos_runA_mass.txt"  # deconvoluted neutral masses (mass, intensity)
OUT_DIR     = r"F:/test/test"                                    # output folder

# Matching parameters
Z_MIN, Z_MAX = 5, 50                 # charge range
TOP_CUTOFF_FRACTION = 0.1            # keep deconv masses with intensity ≥ (fraction * max); set 0.01 for 1% cutoff
PPM_TOL = 1000.0                     # ppm tolerance for m/z match
ABS_DA_TOL = 1                       # absolute Da tolerance (combined with ppm)
MIN_MATCHED_CHARGE_STATES = 4        # require at least this many charge-state matches to accept a protein

PROTON_MASS = 1.007276466812         # Da
BIN = 5


# %%
def _read_raw_ms1(path: str) -> pd.DataFrame:
    """
    Robustly read raw MS1 CSV. Expected two columns (m/z, intensity), with or without headers.
    If more columns exist, attempt to pick the best 'mz' and 'intensity' columns.
    """
    # Try with header, fallback to no header
    try:
        df = pd.read_csv(path)
    except Exception:
        df = pd.read_csv(path, header=None)

    # If exactly two columns, assume mz, intensity
    if df.shape[1] == 2:
        df.columns = ["mz", "intensity"]
    else:
        # Try to identify columns by common names
        cols_lower = [str(c).lower() for c in df.columns]
        mz_candidates = []
        for i, c in enumerate(cols_lower):
            if ("mz" in c) or ("m/z" in c) or ("mass/charge" in c) or (c.strip() == "m z"):
                mz_candidates.append(i)
        int_candidates = []
        for i, c in enumerate(cols_lower):
            if ("int" in c) or ("abund" in c) or ("height" in c) or ("signal" in c):
                int_candidates.append(i)

        if not mz_candidates:
            mz_candidates = [0]
        if not int_candidates:
            int_candidates = [1 if df.shape[1] > 1 else 0]

        df = df.iloc[:, [mz_candidates[0], int_candidates[0]]].copy()
        df.columns = ["mz", "intensity"]

    # Clean
    df = df.replace([np.inf, -np.inf], np.nan).dropna()
    df = df[df["intensity"] > 0].copy()
    df["mz"] = pd.to_numeric(df["mz"], errors="coerce")
    df["intensity"] = pd.to_numeric(df["intensity"], errors="coerce")
    df = df.dropna().sort_values("mz").reset_index(drop=True)
    return df


def _read_deconv(path: str) -> pd.DataFrame:
    """
    Read deconvoluted neutral masses (txt/tsv/space-separated).
    Expected two columns: mass, intensity (relative or absolute).
    """
    # Try whitespace-delimited first
    try:
        df = pd.read_csv(path, sep=r"\s+", header=None, engine="python")
    except Exception:
        # Fallback to CSV
        try:
            df = pd.read_csv(path, header=None)
        except Exception:
            # Try with header and infer
            df = pd.read_csv(path)

    if df.shape[1] == 2:
        df.columns = ["mass", "intensity"]
    else:
        df = df.iloc[:, :2].copy()
        df.columns = ["mass", "intensity"]

    # Clean
    df = df.replace([np.inf, -np.inf], np.nan).dropna()
    df["mass"] = pd.to_numeric(df["mass"], errors="coerce")
    df["intensity"] = pd.to_numeric(df["intensity"], errors="coerce")
    df = df.dropna()
    df = df[df["mass"] > 0].copy()

    # Fraction-of-max cutoff
    if len(df) > 0:
        thr = df["intensity"].max() * TOP_CUTOFF_FRACTION
        df = df[df["intensity"] >= thr].copy()

    # Sort by descending intensity priority
    df = df.sort_values("intensity", ascending=False).reset_index(drop=True)
    return df


def _ppm_window(target_mz: float, ppm: float, abs_da: float) -> tuple[float, float]:
    da = target_mz * ppm * 1e-6
    tol = max(da, abs_da)
    return target_mz - tol, target_mz + tol


def _match_targets(sorted_mz: np.ndarray, targets: np.ndarray, ppm: float, abs_da: float, available_mask: np.ndarray) -> dict:
    """
    Given sorted m/z array and a set of target m/z values, find nearest matches
    within combined tolerance and that are currently 'available' (not yet removed).
    Returns dict: target_index -> matched_raw_index (or None)
    """
    from bisect import bisect_left

    results = {}
    for ti, t in enumerate(targets):
        lo, hi = _ppm_window(t, ppm, abs_da)
        j = bisect_left(sorted_mz, t)
        best_idx = None
        best_delta = float("inf")

        # Explore a small neighborhood
        for k in (j, j-1, j+1, j-2, j+2, j-3, j+3):
            if 0 <= k < len(sorted_mz):
                mz_k = sorted_mz[k]
                if available_mask[k] and (lo <= mz_k <= hi):
                    delta = abs(mz_k - t)
                    if delta < best_delta:
                        best_delta = delta
                        best_idx = k
        results[ti] = best_idx
    return results


def _generate_charge_series(neutral_mass: float, z_min: int, z_max: int) -> pd.DataFrame:
    """
    Compute expected m/z for z = z_min..z_max using proton adduction.
    """
    z = np.arange(z_min, z_max + 1, dtype=int)
    mz = (neutral_mass + z * PROTON_MASS) / z
    return pd.DataFrame({"z": z, "target_mz": mz})


def assign_ms1_peaks(raw_df: pd.DataFrame, deconv_df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Iteratively assign raw MS1 peaks to deconvoluted neutral proteins by matching charge-series m/z.
    Returns:
      - assigned_raw: original raw with annotation columns filled where matched
      - assignments_summary: one row per accepted neutral mass with match details
    """
    raw_df = raw_df.sort_values("mz").reset_index(drop=True)
    mz_arr = raw_df["mz"].to_numpy()
    inten_arr = raw_df["intensity"].to_numpy()

    # Track availability of peaks (True = not yet removed)
    available = np.ones(len(raw_df), dtype=bool)

    # Prepare assignment containers
    assigned_mass = np.full(len(raw_df), np.nan)
    assigned_z    = np.full(len(raw_df), np.nan)

    # For per-protein summary
    summary_rows = []

    for r in deconv_df.itertuples(index=False):
        mass = float(r.mass)
        mass_intensity = float(r.intensity)

        # Build expected charge series
        series = _generate_charge_series(mass, Z_MIN, Z_MAX)
        targets = series["target_mz"].to_numpy()

        # Find matches among currently available peaks
        matches = _match_targets(mz_arr, targets, PPM_TOL, ABS_DA_TOL, available_mask=available)

        matched_indices = []
        matched_z_list  = []
        matched_mz_list = []

        for ti, k in matches.items():
            if k is not None:
                matched_indices.append(k)
                matched_z_list.append(int(series.iloc[ti]["z"]))
                matched_mz_list.append(mz_arr[k])

        if len(matched_indices) >= MIN_MATCHED_CHARGE_STATES:
            # Accept this protein assignment
            for idx, z_val in zip(matched_indices, matched_z_list):
                if available[idx]:
                    available[idx] = False
                    assigned_mass[idx] = mass
                    assigned_z[idx] = z_val

            # Collect summary
            frac_intensity_removed = float(np.sum(inten_arr[matched_indices])) / float(np.sum(inten_arr)) if inten_arr.sum() > 0 else 0.0
            summary_rows.append({
                "neutral_mass": mass,
                "deconv_intensity": mass_intensity,
                "n_matches": len(matched_indices),
                "matched_z_list": json.dumps(matched_z_list),
                "matched_mz_list": json.dumps([round(float(x), 1) for x in matched_mz_list]),  # one decimal
                "ppm_tol": PPM_TOL,
                "abs_da_tol": ABS_DA_TOL,
                "fraction_total_intensity_captured": frac_intensity_removed
            })

    assigned_raw = raw_df.copy()
    assigned_raw["assigned_mass"] = assigned_mass
    assigned_raw["assigned_charge"] = assigned_z
    assigned_raw["is_assigned"] = ~np.isnan(assigned_mass)

    assignments_summary = pd.DataFrame(summary_rows).sort_values("deconv_intensity", ascending=False).reset_index(drop=True)
    assignments_summary["bin"] = BIN
    return assigned_raw, assignments_summary


# ----------------------------
# Plotting helpers
# ----------------------------
def plot_neutral_mass_spectrum(deconv_df: pd.DataFrame, out_dir: str, filename: str = "neutral_mass_spectrum.png"):
    """Stick plot of neutral mass vs intensity from the filtered deconvolution table."""
    if deconv_df.empty:
        print("No neutral masses to plot.")
        return
    masses = deconv_df["mass"].to_numpy()
    intens = deconv_df["intensity"].to_numpy()

    plt.figure(figsize=(9, 4.5))
    plt.vlines(masses, 0, intens, linewidth=1)
    plt.xlabel("Neutral mass (Da)")
    plt.ylabel("Intensity (arb.)")
    plt.title("Neutral Mass Spectrum (filtered)")
    plt.tight_layout()
    out_path = os.path.join(out_dir, filename)
    plt.savefig(out_path, dpi=200)
    plt.close()
    print(f"Saved plot: {out_path}")


def plot_mirror_assigned_vs_total(assigned_raw: pd.DataFrame, out_dir: str, filename: str = "mirror_assigned_vs_total.png"):
    """
    Mirror plot:
      - Top: TOTAL raw MS1 intensities (all ions)
      - Bottom (negative): ASSIGNED ions (subset of raw identified by the algorithm)
    """
    if assigned_raw.empty:
        print("No assigned/raw data to plot.")
        return

    mz = assigned_raw["mz"].to_numpy()
    total_int = assigned_raw["intensity"].to_numpy()
    assigned_mask = assigned_raw["is_assigned"].to_numpy(dtype=bool)
    assigned_int = np.where(assigned_mask, total_int, 0.0)

    plt.figure(figsize=(10, 5.2))
    plt.vlines(mz, 0, total_int, linewidth=0.6)  # Top: total
    plt.vlines(mz[assigned_mask], 0, -assigned_int[assigned_mask], linewidth=0.8)  # Bottom: assigned (negative)

    ymax = total_int.max() if len(total_int) else 1.0
    ymin = -assigned_int.max() if assigned_int.any() else -0.1 * ymax
    plt.ylim(ymin * 1.05, ymax * 1.05)

    plt.xlabel("m/z")
    plt.ylabel("Intensity (arb.)")
    plt.title("Mirror Plot: Total (top) vs Assigned (bottom)")
    plt.tight_layout()
    out_path = os.path.join(out_dir, filename)
    plt.savefig(out_path, dpi=200)
    plt.close()
    print(f"Saved plot: {out_path}")


def plot_mirror_unassigned_vs_total(assigned_raw: pd.DataFrame, out_dir: str, filename: str = "mirror_unassigned_vs_total.png"):
    """
    Mirror plot for NON-ASSIGNED peaks:
      - Top: TOTAL raw MS1 intensities (all ions)
      - Bottom (negative): NON-ASSIGNED ions (is_assigned == False)
    """
    if assigned_raw.empty:
        print("No assigned/raw data to plot.")
        return

    mz = assigned_raw["mz"].to_numpy()
    total_int = assigned_raw["intensity"].to_numpy()
    unassigned_mask = ~assigned_raw["is_assigned"].to_numpy(dtype=bool)
    unassigned_int = np.where(unassigned_mask, total_int, 0.0)

    plt.figure(figsize=(10, 5.2))
    plt.vlines(mz, 0, total_int, linewidth=0.6)  # Top: total
    plt.vlines(mz[unassigned_mask], 0, -unassigned_int[unassigned_mask], linewidth=0.8)  # Bottom: unassigned

    ymax = total_int.max() if len(total_int) else 1.0
    ymin = -unassigned_int.max() if unassigned_int.any() else -0.1 * ymax
    plt.ylim(ymin * 1.05, ymax * 1.05)

    plt.xlabel("m/z")
    plt.ylabel("Intensity (arb.)")
    plt.title("Mirror Plot: Total (top) vs Non-assigned (bottom)")
    plt.tight_layout()
    out_path = os.path.join(out_dir, filename)
    plt.savefig(out_path, dpi=200)
    plt.close()
    print(f"Saved plot: {out_path}")


def plot_mirror_assigned_by_protein_vs_total(
    assigned_raw: pd.DataFrame,
    out_dir: str,
    filename: str = "mirror_assigned_by_protein_vs_total.png",
    max_legend_items: int = 20
):
    """
    Mirror plot with TOTAL raw MS1 on top and ASSIGNED peaks on bottom,
    where assigned peaks are colored by their assigned protein (neutral_mass).
    """
    if assigned_raw.empty:
        print("No assigned/raw data to plot.")
        return

    mz = assigned_raw["mz"].to_numpy()
    total_int = assigned_raw["intensity"].to_numpy()

    plt.figure(figsize=(11, 5.6))
    # Top: TOTAL (all ions)
    plt.vlines(mz, 0, total_int, linewidth=0.5)

    # Assigned-only subset
    df_assigned_only = assigned_raw[assigned_raw["is_assigned"]].copy()
    if df_assigned_only.empty:
        plt.xlabel("m/z")
        plt.ylabel("Intensity (arb.)")
        plt.title("Mirror Plot: Total (top) vs Assigned by Protein (bottom)")
        plt.tight_layout()
        out_path = os.path.join(out_dir, filename)
        plt.savefig(out_path, dpi=200)
        plt.close()
        print(f"Saved plot: {out_path}")
        return

    # Order proteins by number of assigned peaks for a more meaningful legend
    counts = (
        df_assigned_only.groupby("assigned_mass", dropna=True)["is_assigned"]
        .count()
        .sort_values(ascending=False)
    )
    proteins_in_order = counts.index.tolist()

    # Color cycle
    color_cycle = plt.rcParams['axes.prop_cycle'].by_key().get(
        'color', ['C0','C1','C2','C3','C4','C5','C6','C7','C8','C9']
    )

    # Plot each protein's assigned peaks at the bottom (negative)
    legend_entries = 0
    for i, mass in enumerate(proteins_in_order):
        mask = (assigned_raw["assigned_mass"] == mass)
        mz_i = assigned_raw.loc[mask, "mz"].to_numpy()
        inten_i = assigned_raw.loc[mask, "intensity"].to_numpy()

        label = None
        if legend_entries < max_legend_items:
            label = f"{mass/1000:.2f} kDa (n={len(mz_i)})"
            legend_entries += 1

        plt.vlines(
            mz_i, 0, -inten_i,
            linewidth=0.8,
            color=color_cycle[i % len(color_cycle)],
            label=label
        )

    ymax = total_int.max() if len(total_int) else 1.0
    ymin = -df_assigned_only["intensity"].max() if len(df_assigned_only) else -0.1 * ymax
    plt.ylim(ymin * 1.05, ymax * 1.05)

    if legend_entries:
        plt.legend(title="Assigned proteins", loc="upper right", fontsize=8, ncol=1)

    plt.xlabel("m/z")
    plt.ylabel("Intensity (arb.)")
    plt.title("Mirror Plot: Total (top) vs Assigned by Protein (bottom)")
    plt.tight_layout()
    out_path = os.path.join(out_dir, filename)
    plt.savefig(out_path, dpi=200)
    plt.close()
    print(f"Saved plot: {out_path}")


# %%
def main():
    os.makedirs(OUT_DIR, exist_ok=True)

    raw_df = _read_raw_ms1(RAW_MS1_CSV)
    deconv_df = _read_deconv(DECONV_TXT)

    assigned_raw, summary = assign_ms1_peaks(raw_df, deconv_df)

    out_assigned = os.path.join(OUT_DIR, "assigned_ms1_with_peaks.csv")
    out_summary  = os.path.join(OUT_DIR, "assignments_summary.csv")
    assigned_raw.to_csv(out_assigned, index=False)
    summary.to_csv(out_summary, index=False)

    # --- Plots ---
    plot_neutral_mass_spectrum(deconv_df, OUT_DIR, filename="neutral_mass_spectrum.png")
    plot_mirror_assigned_vs_total(assigned_raw, OUT_DIR, filename="mirror_assigned_vs_total.png")
    plot_mirror_unassigned_vs_total(assigned_raw, OUT_DIR, filename="mirror_unassigned_vs_total.png")
    plot_mirror_assigned_by_protein_vs_total(
        assigned_raw, OUT_DIR, filename="mirror_assigned_by_protein_vs_total.png", max_legend_items=20
    )

    # Minimal console report
    print(f"Raw MS1 peaks: {len(raw_df):,}")
    print(f"Deconvoluted proteins (≥{int(TOP_CUTOFF_FRACTION*100)}% of max): {len(deconv_df):,}")
    print(f"Assigned peaks: {int(assigned_raw['is_assigned'].sum()):,}")
    print(f"Non-assigned peaks: {int((~assigned_raw['is_assigned']).sum()):,}")
    print(f"Saved: {out_assigned}")
    print(f"Saved: {out_summary}")


# %%
if __name__ == "__main__":
    main()


Saved plot: F:/test/test\neutral_mass_spectrum.png
Saved plot: F:/test/test\mirror_assigned_vs_total.png
Saved plot: F:/test/test\mirror_unassigned_vs_total.png
Saved plot: F:/test/test\mirror_assigned_by_protein_vs_total.png
Raw MS1 peaks: 7,346
Deconvoluted proteins (≥10% of max): 205
Assigned peaks: 4,136
Non-assigned peaks: 3,210
Saved: F:/test/test\assigned_ms1_with_peaks.csv
Saved: F:/test/test\assignments_summary.csv


Identification module

In [None]:
import pandas as pd
import numpy as np
from typing import Optional, Dict
import ast

def _num(s: pd.Series) -> pd.Series:
    return pd.to_numeric(s, errors="coerce")

def search_best(
    df: pd.DataFrame,
    rt_query: float,
    mz_query: float,
    mass_query: float,
    rt_window: float = 10.0,
    mz_tol: float = 2.0,
    mass_tol: float = 20.0
) -> Optional[Dict]:
    """
    Return the single best match (row as dict) if ALL three criteria match:
      |rt - rt_query| <= rt_window
      |mz - mz_query| <= mz_tol
      |mass - mass_query| <= mass_tol
    Otherwise returns None.
    """
    work = df.copy()
    for col in ("rt_aligned", "precursor_mz", "MASS"):
        if col not in work.columns:
            work[col] = np.nan
    work["rt_aligned"]   = _num(work["rt_aligned"])
    work["precursor_mz"] = _num(work["precursor_mz"])
    work["MASS"]         = _num(work["MASS"])

    d_rt   = (work["rt_aligned"] - rt_query).abs()
    d_mz   = (work["precursor_mz"] - mz_query).abs()
    d_mass = (work["MASS"] - mass_query).abs()

    mask = (d_rt <= rt_window) & (d_mz <= mz_tol) & (d_mass <= mass_tol)
    cand = work.loc[mask].copy()
    if cand.empty:
        return None

    cand["score"] = (
        d_rt.loc[cand.index] / rt_window +
        d_mz.loc[cand.index] / mz_tol +
        d_mass.loc[cand.index] / mass_tol
    )
    best_row = cand.sort_values("score").iloc[0]
    return best_row.to_dict()

def _safe_parse_list(val):
    """Convert string repr of list into Python list safely."""
    if isinstance(val, str):
        try:
            return ast.literal_eval(val)
        except Exception:
            return []
    if isinstance(val, (list, tuple, np.ndarray)):
        return list(val)
    return []


def best_match_formatter(row, df2):
    neutral_mass = row["neutral_mass"]
    retention_time = row["bin"]
    mz_list = _safe_parse_list(row["matched_mz_list"])
    
    formatted = []
    for mz_value in mz_list:
        res = search_best(
            df2,
            rt_query=retention_time,
            mz_query=float(mz_value),
            mass_query=float(neutral_mass)
        )
        if res is not None:
            uniprot_id = res.get("Accession", "NA")
            formatted.append(f"{mz_value}: {uniprot_id}, {neutral_mass}")
    return "[" + ", ".join(formatted) + "]" if formatted else None



In [None]:

# Load your CSVs
charge_file_path_1 = r"F:/test/assignments_summary_1.csv"
charge_file_path_2 = r"F:/test/databank_with_ids.csv"

df1 = pd.read_csv(charge_file_path_1)
df2 = pd.read_csv(charge_file_path_2)

# Create the new column

df1["best_match"] = df1.apply(lambda row: best_match_formatter(row, df2), axis=1)


# Save to CSV
output_path = r"F:/test/assignments_with_best_matches.csv"
df1.to_csv(output_path, index=False)

print(f"Saved with formatted best_match column → {output_path}")



Saved with formatted best_match column → F:/test/assignments_with_best_matches.csv
