Complete aggregated spectra (MS1 from all the scans)

In [None]:
from fisher_py.data.business import Scan
from fisher_py import RawFile
import re
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import pickle

def wholeCasting(folder_path, cast_path):
    os.chdir(folder_path)

    def helper_regex(text):
        match = re.search(rf"{'Full'}\s+(\w+)", text)
        if match:
            return match.group(1)
        return None
    def find_matching_keys(sequence: str, substring_dict: dict) -> list:
        return [key for key, substrings in substring_dict.items() if any(substring in sequence for substring in substrings)]


    files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    substring_dict_sample = {"TreatmentA": ["TreatmentA"], "TreatmentB": ["TreatmentB"], "TreatmentC": ["TreatmentC"],}


    file_name = []
    sample_group = []


    cast_spectra = []



    for raw_name in files:
        raw = RawFile(raw_name)
        print(raw_name)
        data_intensities = [0]*13690
        file_name = raw_name
        sample_group = find_matching_keys(raw_name, substring_dict_sample)[0]

        for i in tqdm(range(1, raw.number_of_scans), desc="Processing scans", ncols=100):
            raw_scan = Scan.from_file(raw._raw_file_access, scan_number=i)

            if str(helper_regex(raw_scan.scan_type)) == 'ms':
                
                scan_masses = raw_scan.preferred_masses
                scan_intensities = raw_scan.preferred_intensities

                for j in range(0,len(scan_masses)):
                    index = int(round(scan_masses[j], 2)*10)
                    if index > 6000 and index < 19360:
                        data_intensities[index-6000] = scan_intensities[j] + data_intensities[index-6000]

        cast_spectra.append(data_intensities)
        scan_dict = {'sample_name': file_name, 'group_name': sample_group, 'cast spectra': cast_spectra}
        df = pd.DataFrame(scan_dict)
        df.to_csv(f"{file_name}.csv")

    return()
     

Doing the actual data aggregation and saving each data in a separate CSV

In [10]:
import os
import re
import numpy as np
import pandas as pd
from tqdm import tqdm

# ------------------------------------------------------------
# wholeCasting: per RAW file -> single-row CSV with cast spectrum expanded
# ------------------------------------------------------------
def wholeCasting(folder_path, cast_path):
    """
    For each .raw file in folder_path:
      - Sum casted MS1 intensities across all MS1 scans into a fixed grid
      - Expand the summed vector into columns
      - Save ONE CSV per file (named after the RAW filename) into cast_path

    Output columns:
      ['sample_name', 'group_name', 'cast_00000', 'cast_00001', ..., 'cast_13689']
    """
    # ---- Helpers ----
    def helper_regex(text):
        # Extract token after "Full", e.g. "Full ms"
        m = re.search(r"Full\s+(\w+)", str(text))
        return m.group(1).lower() if m else None

    def find_matching_keys(sequence: str, substring_dict: dict) -> str:
        for key, subs in substring_dict.items():
            if any(s in sequence for s in subs):
                return key
        return "Unknown"

    def safe_stem(name: str) -> str:
        # Drop extension and replace Windows-illegal filename chars
        stem = os.path.splitext(str(name))[0]
        return re.sub(r'[<>:"/\\|?*]+', '_', stem)

    # ---- Config: match your original grid logic ----
    CAST_MIN = 6000
    CAST_MAX = 19360
    N_BINS   = 13690  # kept from your code
    CAST_COLS = [f"cast_{i:05d}" for i in range(N_BINS)]

    # ---- I/O setup ----
    folder_path = os.path.abspath(folder_path)
    out_dir = os.path.abspath(cast_path)  # treat cast_path as output directory
    os.makedirs(out_dir, exist_ok=True)

    # Only .raw files in folder
    files = [f for f in os.listdir(folder_path)
             if os.path.isfile(os.path.join(folder_path, f)) and f.lower().endswith(".raw")]

    # Group inference from filename
    substring_dict_sample = {
        "TreatmentA": ["TreatmentA"],
        "TreatmentB": ["TreatmentB"],
        "TreatmentC": ["TreatmentC"],
    }

    # ---- Process each RAW file ----
    for raw_name in files:
        raw_path = os.path.join(folder_path, raw_name)
        print(f"Processing: {raw_name}")

        # Open RAW (uses your environment's RawFile/Scan)
        try:
            raw = RawFile(raw_path)
        except Exception as e:
            print(f"[WARN] Skipping {raw_name}: cannot open RAW ({e})")
            continue

        # One accumulator per file (single-row output)
        summed = np.zeros(N_BINS, dtype=np.float32)

        # Infer group from filename
        group_label = find_matching_keys(raw_name, substring_dict_sample)

        # Iterate scans and accumulate MS1
        n_scans = getattr(raw, "number_of_scans", 0)
        for i in tqdm(range(1, n_scans), desc=f"Summing MS1 {raw_name}", ncols=100):
            try:
                raw_scan = Scan.from_file(raw._raw_file_access, scan_number=i)
            except Exception:
                continue

            if helper_regex(raw_scan.scan_type) != "ms":
                continue

            masses = raw_scan.preferred_masses
            intens = raw_scan.preferred_intensities
            if not masses or not intens:
                continue

            # ---- Vectorized cast & add ----
            m = np.asarray(masses, dtype=np.float64)
            y = np.asarray(intens, dtype=np.float32)
            # idx = int(round(m,2) * 10)  (fast equivalent)
            idx = (np.rint(m * 100.0).astype(np.int64) // 10)

            mask = (idx > CAST_MIN) & (idx < CAST_MAX)
            if not np.any(mask):
                continue

            bin_idx = idx[mask] - CAST_MIN
            # guard against any stray values vs N_BINS
            good = (bin_idx >= 0) & (bin_idx < N_BINS)
            if not np.any(good):
                continue

            np.add.at(summed, bin_idx[good], y[mask][good])

        # ---- Build single-row DataFrame and save ----
        row_df = pd.DataFrame(
            [[raw_name, group_label] + summed.tolist()],
            columns=["sample_name", "group_name"] + CAST_COLS
        )

        out_path = os.path.join(out_dir, f"{safe_stem(raw_name)}.csv")
        row_df.to_csv(out_path, index=False)
        print("Saved:", out_path)


In [None]:
wholeCasting("D:/TreatmentA/",'D:/databank/databank')

Doing the casting with 10 minute time intervals

In [27]:
import os
import re
import math
import numpy as np
import pandas as pd
from tqdm import tqdm
from collections import defaultdict

# ------------------------------------------------------------
# wholeCasting: per RAW file -> multi-row CSV (10-min RT bins)
# ------------------------------------------------------------
def wholeCasting(folder_path, cast_path, rt_bin_minutes: float = 10.0):
    """
    For each .raw file in folder_path:
      - Group MS1 scans by retention time (RT) bins of `rt_bin_minutes` (default 10.0)
      - Sum casted MS1 intensities across scans within each RT bin into a fixed grid
      - Expand each summed vector into columns
      - Save ONE CSV per file into cast_path, with one row per RT bin

    Output columns:
      ['sample_name', 'group_name', 'rt_start_min', 'rt_end_min', 'rt_center_min', 'n_scans',
       'cast_00000', 'cast_00001', ..., 'cast_13689']
    """
    # ---- Helpers ----
    def helper_regex(text):
        # Extract token after "Full", e.g. "Full ms"
        m = re.search(r"Full\s+(\w+)", str(text))
        return m.group(1).lower() if m else None

    def find_matching_keys(sequence: str, substring_dict: dict) -> str:
        for key, subs in substring_dict.items():
            if any(s in sequence for s in subs):
                return key
        return "Unknown"

    def safe_stem(name: str) -> str:
        # Drop extension and replace Windows-illegal filename chars
        stem = os.path.splitext(str(name))[0]
        return re.sub(r'[<>:"/\\|?*]+', '_', stem)

    # ---- Cast grid config (matches previous logic) ----
    # Mass/m/z domain ~ 600.0 .. 1969.0 in 0.1 steps  -> 13,690 bins
    CAST_MIN = 6000   # corresponds to 600.0 when /10
    CAST_MAX = 19690  # corresponds to 1969.0 when /10
    N_BINS   = 13690
    CAST_COLS = [f"cast_{i:05d}" for i in range(N_BINS)]

    # ---- I/O setup ----
    folder_path = os.path.abspath(folder_path)
    out_dir = os.path.abspath(cast_path)  # treat cast_path as output directory
    os.makedirs(out_dir, exist_ok=True)

    # Only .raw files in folder
    files = [f for f in os.listdir(folder_path)
             if os.path.isfile(os.path.join(folder_path, f)) and f.lower().endswith(".raw")]

    # Group inference from filename
    substring_dict_sample = {
        "TreatmentA": ["TreatmentA"],
        "TreatmentB": ["TreatmentB"],
        "TreatmentC": ["TreatmentC"],
    }

    # ---- Process each RAW file ----
    for raw_name in files:
        raw_path = os.path.join(folder_path, raw_name)
        print(f"\nProcessing: {raw_name}")

        # Open RAW (uses your environment's RawFile/Scan)
        try:
            raw = RawFile(raw_path)
        except Exception as e:
            print(f"[WARN] Skipping {raw_name}: cannot open RAW ({e})")
            continue

        group_label = find_matching_keys(raw_name, substring_dict_sample)

        # Per-RT-bin accumulators: dict[bin_index] -> (sum_vector, n_scans, rt_min_seen, rt_max_seen)
        bin_sums = {}
        bin_counts = defaultdict(int)
        bin_rt_min = {}
        bin_rt_max = {}

        n_scans = getattr(raw, "number_of_scans", 0)
        for i in tqdm(range(1, n_scans), desc=f"Binning MS1 by RT ({rt_bin_minutes} min) - {raw_name}", ncols=100):
            try:
                raw_scan = Scan.from_file(raw._raw_file_access, scan_number=i)
            except Exception:
                continue

            if helper_regex(raw_scan.scan_type) != "ms":
                continue

            # Retention time (assumed minutes)
            rt = raw_scan.scan_statistics.start_time
            if rt is None:
                continue
            # Determine RT bin index
            bin_idx = int(math.floor(rt / rt_bin_minutes))

            masses = raw_scan.preferred_masses
            intens = raw_scan.preferred_intensities
            if not masses or not intens:
                # still track counts/rt bounds for completeness
                bin_counts[bin_idx] += 1
                if bin_idx not in bin_rt_min or rt < bin_rt_min[bin_idx]:
                    bin_rt_min[bin_idx] = rt
                if bin_idx not in bin_rt_max or rt > bin_rt_max[bin_idx]:
                    bin_rt_max[bin_idx] = rt
                continue

            # ---- Vectorized cast for this scan ----
            m = np.asarray(masses, dtype=np.float64)
            y = np.asarray(intens, dtype=np.float32)
            # Fast ~ int(round(m,2)*10)
            idx = (np.rint(m * 100.0).astype(np.int64) // 10)

            mask = (idx > CAST_MIN) & (idx < CAST_MAX)
            if not np.any(mask):
                # still track counts/rt bounds
                bin_counts[bin_idx] += 1
                if bin_idx not in bin_rt_min or rt < bin_rt_min[bin_idx]:
                    bin_rt_min[bin_idx] = rt
                if bin_idx not in bin_rt_max or rt > bin_rt_max[bin_idx]:
                    bin_rt_max[bin_idx] = rt
                continue

            bin_idx_vec = idx[mask] - CAST_MIN
            good = (bin_idx_vec >= 0) & (bin_idx_vec < N_BINS)
            if not np.any(good):
                bin_counts[bin_idx] += 1
                if bin_idx not in bin_rt_min or rt < bin_rt_min[bin_idx]:
                    bin_rt_min[bin_idx] = rt
                if bin_idx not in bin_rt_max or rt > bin_rt_max[bin_idx]:
                    bin_rt_max[bin_idx] = rt
                continue

            # Lazily allocate the sum vector for this RT bin
            if bin_idx not in bin_sums:
                bin_sums[bin_idx] = np.zeros(N_BINS, dtype=np.float32)

            np.add.at(bin_sums[bin_idx], bin_idx_vec[good], y[mask][good])

            # Bookkeeping
            bin_counts[bin_idx] += 1
            if bin_idx not in bin_rt_min or rt < bin_rt_min[bin_idx]:
                bin_rt_min[bin_idx] = rt
            if bin_idx not in bin_rt_max or rt > bin_rt_max[bin_idx]:
                bin_rt_max[bin_idx] = rt

        # ---- Build DataFrame with one row per RT bin (only bins with scans) ----
        rows = []
        # Use union of bins encountered (from counts), not only those with signal
        all_bins = sorted(set(list(bin_counts.keys()) + list(bin_sums.keys())))
        for b in all_bins:
            # Skip bins with zero scans just in case
            if bin_counts[b] <= 0:
                continue
            rt_start = b * rt_bin_minutes
            rt_end   = (b + 1) * rt_bin_minutes
            # Prefer observed min/max within the bin if available
            rt_obs_min = bin_rt_min.get(b, rt_start)
            rt_obs_max = bin_rt_max.get(b, rt_end)
            rt_center  = int(0.5 * (rt_obs_min + rt_obs_max))

            summed_vec = bin_sums.get(b, np.zeros(N_BINS, dtype=np.float32))

            meta = [raw_name, group_label, rt_obs_min, rt_obs_max, rt_center, int(bin_counts[b])]
            rows.append(meta + summed_vec.tolist())

        if not rows:
            print(f"[WARN] No MS1 bins produced for {raw_name}. Skipping CSV.")
            continue

        col_meta = ["sample_name", "group_name", "rt_start_min", "rt_end_min", "rt_center_min", "n_scans"]
        df = pd.DataFrame(rows, columns=col_meta + CAST_COLS)

        out_path = os.path.join(out_dir, f"{safe_stem(raw_name)}.csv")
        df.to_csv(out_path, index=False)
        print("Saved:", out_path)


In [None]:
wholeCasting("D:/TreatmentA/",'D:/databank/databank')

Combine all the csv files into a single file

In [None]:
import os
import glob
import shutil
import tempfile
import numpy as np
import pandas as pd

def _infer_dtype_map(ref_cols):
    dtype_map = {}
    cast_cols = [c for c in ref_cols if str(c).startswith("cast_")]
    for c in cast_cols:
        dtype_map[c] = np.float32
    for c in ("rt_start_min", "rt_end_min", "rt_center_min"):
        if c in ref_cols:
            dtype_map[c] = np.float32
    if "n_scans" in ref_cols:
        dtype_map["n_scans"] = "Int32"  # pandas nullable int
    for c in ("sample_name", "group_name"):
        if c in ref_cols:
            dtype_map[c] = "string"
    return dtype_map

def _load_and_align(path: str, ref_cols, dtype_map):
    try:
        df = pd.read_csv(path, dtype=dtype_map)
    except Exception:
        df = pd.read_csv(path)

    # Add missing columns with sensible defaults
    missing = [c for c in ref_cols if c not in df.columns]
    for c in missing:
        if str(c).startswith("cast_"):
            df[c] = np.float32(0.0)
        elif c in ("rt_start_min", "rt_end_min", "rt_center_min"):
            df[c] = np.float32(np.nan)
        elif c == "n_scans":
            df[c] = pd.Series([pd.NA] * len(df), dtype="Int32")
        elif c == "sample_name":
            df[c] = os.path.basename(path)
        elif c == "group_name":
            df[c] = "Unknown"
        else:
            df[c] = pd.NA

    # Drop extras not in the reference and reorder
    extras = [c for c in df.columns if c not in ref_cols]
    if extras:
        df = df.drop(columns=extras)
    df = df[ref_cols]

    # Ensure cast_* are float32
    for c in ref_cols:
        if str(c).startswith("cast_"):
            df[c] = pd.to_numeric(df[c], errors="coerce").astype("float32")
    return df

def combine_cast_outputs(
    input_dir: str,
    output_csv: str,
    recursive: bool = False,
    streaming: bool = True,
    write_parquet: bool = False,
):
    """
    Combine all per-RAW CSVs (from wholeCasting) into one CSV.
    - Aligns schemas to the first file.
    - Streaming append by default (low memory).
    """
    pattern = "**/*.csv" if recursive else "*.csv"
    files = sorted(glob.glob(os.path.join(input_dir, pattern), recursive=recursive))
    if not files:
        print(f"[WARN] No CSV files found in: {input_dir}")
        return

    # Reference schema
    ref_header = pd.read_csv(files[0], nrows=0)
    ref_cols = list(ref_header.columns)
    dtype_map = _infer_dtype_map(ref_cols)

    out_dir = os.path.dirname(os.path.abspath(output_csv)) or "."
    os.makedirs(out_dir, exist_ok=True)

    # temp file for safer write
    tmp_fd, tmp_path = tempfile.mkstemp(dir=out_dir, prefix=".combine_tmp_", suffix=".csv")
    os.close(tmp_fd)

    try:
        if streaming:
            # write header once
            pd.DataFrame(columns=ref_cols).to_csv(tmp_path, index=False)
            total_rows = 0
            for i, f in enumerate(files, 1):
                df = _load_and_align(f, ref_cols, dtype_map)
                df.to_csv(tmp_path, mode="a", index=False, header=False)
                total_rows += len(df)
                if i % 10 == 0 or i == len(files):
                    print(f"[INFO] Appended {len(df):6d} rows from {os.path.basename(f)} "
                          f"({i}/{len(files)}) → total {total_rows}")
            print(f"[OK] Combined CSV rows: {total_rows}")
        else:
            # in-memory (lets you sort)
            parts = [_load_and_align(f, ref_cols, dtype_map) for f in files]
            big = pd.concat(parts, ignore_index=True)
            sort_cols = [c for c in ("sample_name", "rt_start_min", "rt_center_min") if c in big.columns]
            if sort_cols:
                big = big.sort_values(sort_cols, kind="mergesort", ignore_index=True)
            big.to_csv(tmp_path, index=False)
            print(f"[OK] Combined CSV rows: {len(big)}")

        # move temp to final
        if os.path.exists(output_csv):
            os.remove(output_csv)
        shutil.move(tmp_path, output_csv)

        if write_parquet:
            big2 = pd.read_csv(output_csv, dtype=dtype_map)
            pq_path = os.path.splitext(output_csv)[0] + ".parquet"
            big2.to_parquet(pq_path, index=False)
            print(f"[OK] Parquet written: {pq_path}")

        print(f"[SAVED] {output_csv}")

    finally:
        if os.path.exists(tmp_path):
            try:
                os.remove(tmp_path)
            except Exception:
                pass

# ====== EDIT THESE PATHS AND RUN ======
combine_cast_outputs(
    input_dir=r"D:\databank\databank",                        # folder with your per-RAW CSVs
    output_csv=r"D:\ALL_casts_combined.csv",  # combined output path
    recursive=False,       # True if CSVs are in subfolders
    streaming=True,        # memory-friendly
    write_parquet=False    # set True to also write Parquet
)
