In [1]:
import pandas as pd
import numpy as np

import patato as pat

from paiskintonetools.correction_factor import get_correction_factor_interpolator

from tqdm.auto import tqdm

In [2]:
df_scans = pd.read_parquet("scan_table.parquet")

In [3]:
cal_curve_file = "../Fluence Correction/cali_curve.csv"
correction_factor_spline = get_correction_factor_interpolator(cal_curve_file)

In [4]:
def unmix(rec):
    wl = rec.wavelengths[rec.wavelengths < 900]
    um = pat.SpectralUnmixer(chromophores=["Hb", "HbO2"], wavelengths=wl)
    u, _, _ = um.run(rec, None)

    thber = pat.THbCalculator()
    so2er = pat.SO2Calculator()

    thb, _, _ = thber.run(u, None)
    so2, _, _ = so2er.run(u, None)

    return thb, so2


def get_measurements(file, row):
    pa = pat.PAData.from_hdf5(file)
    pa.set_default_recon(("Model Based", "0"))
    results = []
    ita = row["ITA"]
    for n, roi in pa.get_rois().items():
        row_output = dict(row)
        row_output["ROI Name"] = " ".join(n[0].split("_"))
        row_output["ROI Number"] = n[1]
        for correct in [False, True]:
            # Apply correction factor or not
            rec = pa.get_scan_reconstructions()
            rec = rec.copy()
            rec.raw_data = np.copy(rec.raw_data)
            if correct:
                mvf = (19.028 - 0.3692 * ita + 0.001685 * ita**2) / 100
                rec.raw_data *= np.exp(
                    correction_factor_spline((rec.wavelengths, mvf))[
                        None, :, None, None, None
                    ]
                )

            thb, so2 = unmix(rec)
            for measurement, positive in [
                ("spectrum", False),
                ("spectrum", True),
                ("thb", False),
                ("thb", True),
                ("so2", None),
            ]:
                if measurement == "spectrum":
                    calc = rec
                elif measurement == "thb":
                    calc = thb
                elif measurement == "so2":
                    calc = so2
                if measurement == "so2":
                    r = calc.raw_data
                    r[thb.raw_data < 0] = np.nan
                    r[r > 1.5] = np.nan
                    r[r < 0] = np.nan
                elif positive:
                    r = calc.raw_data
                    r[r < 0] = np.nan
                mask, _ = roi.to_mask_slice(calc)
                s = np.squeeze(calc.raw_data.T[mask.T].T)

                for agg in [np.nanmean, np.nanmedian, np.nanstd]:
                    n = agg.__name__[3:]
                    f = (
                        ("corrected_" if correct else "")
                        + measurement
                        + ("_positive" if positive else "")
                        + "_"
                        + n
                    )
                    row_output[f] = agg(s, axis=-1)
        results.append(row_output)
    return pd.DataFrame(results)

In [5]:
import warnings

# N.b. catching warnings because sometimes no pixels contain positive values
with warnings.catch_warnings(action="ignore"):
    scans = []
    for _, row in tqdm(df_scans.iterrows(), total=df_scans.shape[0]):
        scans.append(get_measurements(row["File"], row))

  0%|          | 0/1504 [00:00<?, ?it/s]

In [6]:
df_result = pd.concat(scans)

In [7]:
df_result["ROI Name"] = df_result["ROI Name"].str.strip()

In [8]:
df_result.to_parquet("pa_values_extracted.parquet")