In [3]:
import os
import numpy as np
import h5py
import matplotlib.pyplot as plt

from scipy.signal import butter, filtfilt
from scipy import sparse
from scipy.sparse.linalg import spsolve

try:
    from sklearn.linear_model import Lasso
except Exception:
    Lasso = None
from pybaselines import Baseline
plt.rcParams["figure.figsize"] = (14, 4)
plt.rcParams["axes.grid"] = True


In [4]:
def list_doric_channels(path):
    with h5py.File(path, "r") as f:
        base = f["DataAcquisition"]["FPConsole"]["Signals"]["Series0001"]
        chans = []
        if "LockInAOUT02" in base:
            for k in base["LockInAOUT02"].keys():
                if k.startswith("AIN"):
                    chans.append(k)
        chans = sorted(chans)

        digital = []
        if "DigitalIO" in base:
            for k in base["DigitalIO"].keys():
                if k.startswith("DIO"):
                    digital.append(k)
        return chans, digital

def load_doric(path, channel="AIN01", signal_folder="LockInAOUT02", ref_folder="LockInAOUT01",
              trigger_name=None):
    """
    Returns dict with:
      time, sig465, ref405, fs, (optional) trig_time, trig
    """
    with h5py.File(path, "r") as f:
        base = f["DataAcquisition"]["FPConsole"]["Signals"]["Series0001"]

        sig = np.asarray(base[signal_folder][channel][()], float)
        ref = np.asarray(base[ref_folder][channel][()], float)

        # time: prefer the matching folder time if size matches
        t_sig = np.asarray(base[signal_folder]["Time"][()], float) if "Time" in base[signal_folder] else np.array([])
        t_ref = np.asarray(base[ref_folder]["Time"][()], float) if "Time" in base[ref_folder] else np.array([])

        if t_sig.size == sig.size:
            t = t_sig
        elif t_ref.size == sig.size:
            t = t_ref
        else:
            # fallback
            dt = np.nanmedian(np.diff(t_sig)) if t_sig.size > 2 else 1/1000
            t = np.arange(sig.size) * dt

        # if ref length differs, interpolate onto t if possible
        if ref.size != sig.size:
            if t_ref.size == ref.size:
                ref = np.interp(t, t_ref, ref)
            else:
                ref = np.resize(ref, sig.size)

        # sampling rate
        fs = 1.0 / float(np.nanmedian(np.diff(t))) if t.size > 2 else np.nan

        # optional digital trigger overlay
        trig_time = None
        trig = None
        if trigger_name:
            if "DigitalIO" in base and trigger_name in base["DigitalIO"]:
                dio = base["DigitalIO"]
                trig = np.asarray(dio[trigger_name][()], float)
                trig_time = np.asarray(dio["Time"][()], float) if "Time" in dio else None

                # if lengths mismatch, interpolate signals to trigger time (like your Doric logic)
                if trig_time is not None and trig_time.size and trig_time.size != t.size:
                    sig = np.interp(trig_time, t, sig)
                    ref = np.interp(trig_time, t, ref)
                    t = trig_time
                    fs = 1.0 / float(np.nanmedian(np.diff(t))) if t.size > 2 else fs

    out = {"time": t, "sig465": sig, "ref405": ref, "fs": fs}
    if trig is not None and trig_time is not None:
        out["trig_time"] = trig_time
        out["trig"] = trig
    return out
import numpy as np

def ols_fit(x, y):
    x = np.asarray(x, float)
    y = np.asarray(y, float)
    m = np.isfinite(x) & np.isfinite(y)
    if np.sum(m) < 10:
        return 1.0, 0.0
    X = np.vstack([x[m], np.ones(np.sum(m))]).T
    coef, *_ = np.linalg.lstsq(X, y[m], rcond=None)
    return float(coef[0]), float(coef[1])



def compute_arpls_baselines(
    baseline_fitter,
    sig_f,
    ref_f,
    lam=1e9,
    diff_order=2,
    max_iter=50,
    tol=1e-3,
):
    """
    Compute arPLS baselines for signal and reference.

    Parameters:
        baseline_fitter: pybaselines.Baseline instance, already set up with x_data.
        sig_f: 1D array-like, preprocessed signal channel.
        ref_f: 1D array-like, preprocessed reference channel.
        lam: smoothing parameter.
        diff_order: difference order for the penalty.
        max_iter: maximum iterations for arPLS.
        tol: convergence tolerance.

    Returns:
        b_sig_arpls, b_ref_arpls
    """
    b_sig_arpls, _ = baseline_fitter.arpls(
        sig_f, lam=lam, diff_order=diff_order,
        max_iter=max_iter, tol=tol
    )
    b_ref_arpls, _ = baseline_fitter.arpls(
        ref_f, lam=lam, diff_order=diff_order,
        max_iter=max_iter, tol=tol
    )
    return b_sig_arpls, b_ref_arpls




def compute_motion_corrected_dff(sig_f, ref_f, b_sig, b_ref):
    """
    Computes motion-corrected dF/F using the 'Standardized' method:
    1) Calculate dF/F for both signal and reference channels separately.
       dff_sig_raw = (sig - b_sig) / b_sig
       dff_ref_raw = (ref - b_ref) / b_ref
    2) Fit dff_ref_raw to dff_sig_raw using OLS (y = ax + b).
    3) Subtract the fitted reference from the signal dF/F.
       dff_mc = dff_sig_raw - (a * dff_ref_raw + b)
    """

    # --- 1. Calculate raw dF/F for each channel ---
    # Prepare denominators (avoid division by zero)
    den_sig = np.asarray(b_sig, float).copy()
    den_sig[np.abs(den_sig) < 1e-12] = np.nan

    den_ref = np.asarray(b_ref, float).copy()
    den_ref[np.abs(den_ref) < 1e-12] = np.nan

    # Calculate standard dF/F (percent change) for each
    dff_sig_raw = (sig_f - b_sig) / den_sig
    dff_ref_raw = (ref_f - b_ref) / den_ref

    # --- 2. Fit Reference dF/F to Signal dF/F ---
    # We fit: dff_sig_raw ~ a * dff_ref_raw + b
    a, b = ols_fit(dff_ref_raw, dff_sig_raw)

    # --- 3. Subtract to get Motion-Corrected dF/F ---
    # The fitted reference represents the motion/artifact component in dF/F space
    fitted_ref = (a * dff_ref_raw + b)
    dff_mc = dff_sig_raw - fitted_ref

    # Return dictionary with keys compatible with your analysis pipeline
    # Note: 'sig_det' and 'ref_det' now refer to the raw dF/F traces
    return {
        "sig_det": dff_sig_raw,   # Now holds raw dF/F of signal
        "ref_det": dff_ref_raw,   # Now holds raw dF/F of reference
        "a": a,                   # Slope of the regression in dF/F space
        "b": b,                   # Intercept of the regression
        "delta_mc": dff_mc,       # The final motion-corrected dF/F
        "dff": dff_mc,            # Same as delta_mc (kept for compatibility)
    }



In [5]:
path = r"C:\Analysis\fiber_photometry_app\test_data\30545-lick_0002.doric"

channels, dio = list_doric_channels(path)
channels, dio

(['AIN01'], ['DIO01', 'DIO02'])