# Codes for extracting peak areas from .D files from HPLC calibration runs

## Import Required Packages

In [1]:
import os
import numpy as np
import pandas as pd
import copy

## Functions for Extracting HPLC Data

In [2]:
def load_csv_data_wl(directory, wl):
    cols  = ["No.", "rt", "type", "peak", "area", "height", "%area"]
    cols2 = ["File", "RT IS", "RT analyte", f"IS int {wl}", f"A int {wl}"]

    data_ = []

    # --- PASS 1: reference folder containing '50' ---
    report_no = None
    first = None

    # Only consider actual folders
    folders = [f for f in os.listdir(directory) if os.path.isdir(os.path.join(directory, f))]

    for folder in folders:
        if "50" not in folder:
            continue

        folder_path = os.path.join(directory, folder)
        report00 = os.path.join(folder_path, "REPORT00.csv")
        if not os.path.exists(report00):
            continue

        # Match your working function's encoding
        dat = pd.read_table(report00, encoding="utf-16", header=None)
        q = dat.columns[0]

        # Find "Number of Signals"
        index0 = None
        for i in range(len(dat[q])):
            line = dat[q].iloc[i]
            if pd.isna(line):
                continue
            s = str(line)
            if s.startswith("Number of Signals"):
                index0 = i + 1
                break

        if index0 is None:
            raise ValueError(f"Could not find 'Number of Signals' in {report00}")

        # Advance until wl appears
        while index0 < len(dat[q]) and str(wl) not in str(dat[q].iloc[index0]):
            index0 += 1

        if index0 >= len(dat[q]):
            raise ValueError(f"No data for wavelength {wl} in {report00}")

        line = str(dat[q].iloc[index0])

        # Vendor-format specific: still using fixed position, but validate
        report_no = line[7].strip()
        file_name = os.path.join(folder_path, f"REPORT0{report_no}.CSV")
        if not os.path.exists(file_name):
            raise FileNotFoundError(
                f"Computed report file does not exist: {file_name}\n"
                f"(Parsed report_no='{report_no}' from line: {line!r})"
            )

        data = pd.read_csv(file_name, names=cols, encoding="utf-16")

        # Force numeric
        data["area"] = pd.to_numeric(data["area"], errors="coerce")
        data["rt"]   = pd.to_numeric(data["rt"],   errors="coerce")
        data = data.dropna(subset=["area", "rt"])

        if len(data) < 2:
            raise ValueError(f"Reference file {file_name} has fewer than 2 valid peaks.")

        # Top 2 by area
        top2 = data.nlargest(2, "area").reset_index(drop=True)

        rt_is = float(top2.loc[0, "rt"])
        rt_a  = float(top2.loc[1, "rt"])
        a_is  = float(top2.loc[0, "area"])
        a_a   = float(top2.loc[1, "area"])

        first = [folder, rt_is, rt_a, a_is, a_a]
        data_.append(first)
        break

    if report_no is None or first is None:
        raise ValueError("No reference folder containing '50' was found (or reference parsing failed).")

    # --- PASS 2: process other folders ---
    for folder in folders:
        if "50" in folder:
            continue
        if len(folder) < 5:
            continue

        folder_path = os.path.join(directory, folder)
        file_name = os.path.join(folder_path, f"REPORT0{report_no}.CSV")
        if not os.path.exists(file_name):
            continue

        data = pd.read_csv(file_name, names=cols, encoding="utf-16")
        data["area"] = pd.to_numeric(data["area"], errors="coerce")
        data["rt"]   = pd.to_numeric(data["rt"],   errors="coerce")
        data = data.dropna(subset=["area", "rt"])

        if len(data) < 2:
            continue

        # Get top peaks by area (descending)
        data_sorted = data.sort_values("area", ascending=False).reset_index(drop=True)

        # Candidate IS/analyte from top 2
        rt_  = float(data_sorted.loc[0, "rt"])
        rt_2 = float(data_sorted.loc[1, "rt"])
        max_  = float(data_sorted.loc[0, "area"])
        max_2 = float(data_sorted.loc[1, "area"])

        # If >2 peaks, try to find analyte RT within Â±10% of reference analyte RT
        if len(data_sorted) > 2 and not pd.isna(first[2]):
            ref_rt = float(first[2])
            if ref_rt != 0:
                found = False
                for k in range(1, len(data_sorted)):  # scan candidates (by area rank)
                    cand_rt = float(data_sorted.loc[k, "rt"])
                    ratio = cand_rt / ref_rt
                    if 0.9 <= ratio <= 1.1:
                        rt_2 = cand_rt
                        max_2 = float(data_sorted.loc[k, "area"])
                        found = True
                        break
                if not found:
                    rt_2 = np.nan
                    max_2 = np.nan

        # If identical RTs, pick next candidate for IS peak (if possible)
        if rt_ == rt_2 and len(data_sorted) >= 3:
            rt_  = float(data_sorted.loc[1, "rt"])
            max_ = float(data_sorted.loc[1, "area"])

        data_.append([folder, rt_, rt_2, max_, max_2])

    # Build output table safely
    table = pd.DataFrame(data_, columns=cols2)

    # Robust numeric conversion for sorting/writing
    table[f"A int {wl}"] = pd.to_numeric(table[f"A int {wl}"], errors="coerce")

    table2 = table.sort_values(by=f"A int {wl}", ascending=False)
    table2.to_csv(f"{directory}_largest_areas_{wl}.csv", index=False)

    return data_, table2


In [3]:
def load_csv_data_several(directory, no, sens, filename=None):
    if filename is None:
        filename = f"{directory}_calibration_data.xlsx"

    writer = pd.ExcelWriter(filename, engine="xlsxwriter")
    datas = []

    # --- pick a folder that actually is a directory ---
    entries = os.listdir(directory)
    folders = [f for f in entries if os.path.isdir(os.path.join(directory, f))]
    if not folders:
        writer.close()
        raise ValueError(f"No subfolders found in directory: {directory}")

    folder1 = folders[0]
    report00 = os.path.join(directory, folder1, "REPORT00.csv")
    if not os.path.exists(report00):
        writer.close()
        raise FileNotFoundError(f"Missing REPORT00.csv at: {report00}")

    dat = pd.read_table(report00, encoding="utf-16", header=None)
    q = dat.columns[0]

    # --- find "Number of Signals" safely ---
    index0 = None
    for i in range(len(dat[q])):
        line = dat[q].iloc[i]
        if pd.isna(line):
            continue
        s = str(line)
        if s.startswith("Number of Signals"):
            index0 = i + 1
            break

    if index0 is None:
        writer.close()
        raise ValueError(f'"Number of Signals" not found in {report00}')

    # --- parse signal lines with bounds check ---
    wls, nos = [], []
    while index0 < len(dat[q]):
        line = dat[q].iloc[index0]
        if pd.isna(line):
            index0 += 1
            continue
        s = str(line)
        if not s or s[0] != "S":
            break

        wl = s[22:25]
        if wl not in wls:
            wls.append(wl)
            # original logic: report number is one char at pos 7
            nos.append(s[7])
        index0 += 1

    for i in range(len(wls)):
        cols = ["No.", "rt", "type", "peak", "area", "height", "%area"]

        cols2 = ["File", "RT IS"]
        for k in range(no):
            cols2.append(f"RT analyte {k+1}")
        cols2.append(f"IS int at {wls[i]}")
        for l in range(no):
            cols2.append(f"A int analyte {l+1} at {wls[i]}")

        data_ = []

        # --- PASS 1: reference (folders containing '60') ---
        rts1 = None
        for folder in os.listdir(directory):
            if "60" not in folder:
                continue
            if not os.path.isdir(os.path.join(directory, folder)):
                continue

            file_name = os.path.join(directory, folder, f"REPORT0{nos[i]}.CSV")
            if not os.path.exists(file_name):
                continue

            data = pd.read_csv(file_name, names=cols, encoding="utf-16")
            list_data = list(data["area"])
            list_data_2 = list(data["area"])
            list_data.sort(reverse=True)

            maxs1 = list_data[: (no + 1)]
            rts1 = []
            for a in maxs1:
                ind = list_data_2.index(a)
                rts1.append(data["rt"].iloc[ind])

            values = [folder]
            while len(rts1) < no + 1:
                rts1.append(np.nan)
                maxs1.append(np.nan)

            values.extend(rts1)
            values.extend(maxs1)
            data_.append(values)

        # --- PASS 2: other folders ---
        for folder in os.listdir(directory):
            if not os.path.isdir(os.path.join(directory, folder)):
                continue
            if "60" in folder:
                continue
            if len(folder) < 5 and "B" in folder:
                continue
            if "blank" in folder:
                continue

            file_name = os.path.join(directory, folder, f"REPORT0{nos[i]}.CSV")
            if not os.path.exists(file_name):
                continue

            data = pd.read_csv(file_name, names=cols, encoding="utf-16")
            if "area" not in data.columns or len(data) == 0:
                continue

            list_data = list(data["area"])
            list_data_2 = list(data["area"])
            list_data.sort(reverse=True)

            if len(list_data) <= 1:
                continue

            # RTs corresponding to sorted areas
            rt_full = []
            for a in list_data:
                ind = list_data_2.index(a)
                rt_full.append(data["rt"].iloc[ind])

            new_rts, new_maxs = [], []
            for h in range(no + 1):
                if rts1 is None or h >= len(rts1) or h >= len(rt_full):
                    new_rts.append(np.nan)
                    new_maxs.append(np.nan)
                    continue

                # candidate at rank h
                cand_rt = rt_full[h]
                cand_area = list_data[h]

                cnt = 0
                while True:
                    # if reference RT is nan, stop trying to match
                    if pd.isna(rts1[h]) or pd.isna(cand_rt):
                        break

                    ratio = cand_rt / rts1[h]
                    if (1 - sens) <= ratio <= (1 + sens):
                        break

                    cnt += 1
                    if cnt >= len(rt_full):
                        cand_rt = np.nan
                        cand_area = np.nan
                        break

                    cand_rt = rt_full[cnt]
                    cand_area = list_data[cnt]

                new_rts.append(cand_rt)
                new_maxs.append(cand_area)

            row = [folder] + new_rts + new_maxs
            data_.append(row)

        table = pd.DataFrame(np.array(data_, dtype=object), columns=cols2)

        # robust numeric conversion
        for w in range(no):
            col = f"A int analyte {w+1} at {wls[i]}"
            table[col] = pd.to_numeric(table[col], errors="coerce")

        table2 = table.sort_values(by=f"A int analyte 1 at {wls[i]}", ascending=False)
        table2.to_excel(writer, sheet_name=f"Cal data {wls[i]}")
        datas.append(table2)

    writer.close()
    return datas


## Excecution for example data (1a and 3aa)

In [4]:
data_4SO2Me_1a_3aa=load_csv_data_several('4SO2MeArOPh 1a and 3aa calibration',7,0.15)

In [5]:
data_4SO2Me_1a_3aa_254=load_csv_data_wl('4SO2MeArOPh 1a and 3aa calibration',254)