Using Modin (parallel pandas) requires specific changes to typical pandas coding to allow it to work
- More stringent need for numpy-like objects (1D)
- Consideration of parallel gotchas (don't access objects by multiple threads e.g. use dict.copy())
- GroupBy behaves differently (don't need `reset_index(level=0, drop=True)`)
- Categorical columns cause some pain

Written to run on a jupyter server started at the root of the repository. This enables datalad compatible coding assuming paths are from the root dir.

In [1]:
%load_ext autoreload
%autoreload 2

import os, glob
from cryoant.daq.xia.listmode import load_and_process
from beest.laser import correct_substrate_heating
import matplotlib.pyplot as plt
import matplotlib as mpl
import cryoant as ct
import numpy as np
from joblib import Parallel, delayed
# import pandas as pd
import modin.pandas as pd
import modin.config as mcfg

plt.style.use(f"{list(ct.__path__)[0]}/plot.mpl")
pd.set_option("plotting.backend", "matplotlib")

DPI_VIS, DPI_SV = 200, 50
DATE = 20240811

figdir = f"out/{DATE}/"
os.makedirs(figdir, exist_ok=True)

df = pd.read_feather(f"out/trace-chewing/processed/{DATE}.feather")

2024-12-10 19:07:50,131	INFO worker.py:1786 -- Started a local Ray instance.


In [2]:
timestamps = {
    k: pd.to_datetime("_".join(k.split("_")[-3:-1]), format="%Y-%m-%d_%H.%M.%S")
    for k in df.fname.unique()
}
df["timestamp"] = (
    df.groupby("fname", observed=True)
    .fname.apply(lambda x: x.map(lambda y: timestamps.copy()[y]))
    .reset_index(level=0, drop=True)
    .astype("category")
)
df["chunk"] = df.fname.apply(
    lambda x: int(x.split("_")[-1].split(".")[0].removeprefix("chunk"))
).astype("category")
zero_timestamps = {
    k: v for k, v in timestamps.items() if k.split("_")[-1].split(".")[0] == "chunk0"
}
zero_timestamps = dict(sorted(zero_timestamps.items(), key=lambda x: x[1]))
#: Create a mapping of fname to run_id. Timestamps between two zero_timestamps belong to the former run_id
#: zero_timestamps is sorted so first, the zero files can be assigned to their run ids
run_ids = {k: v for v, k in enumerate(zero_timestamps.keys())}
#: Now the remainder of files need to be assinged a run id based on the latest zero file they are greater than
for fname, timestamp in timestamps.items():
    if fname in zero_timestamps:
        continue
    run_ids[fname] = [
        run_ids[k] for k in zero_timestamps.keys() if timestamp > zero_timestamps[k]
    ][-1]

df["run_id"] = df.fname.map(run_ids).astype("category")

#: APPLY does not work with modin (produces DataFrame). Luckily, the minimum timestamp for each run is the zero chunk timestamp.
df["run_start"] = (
    df.groupby("run_id")
    .timestamp.transform(lambda x: pd.to_datetime(x).min())
    .astype("category")
)
df["realtime"] = pd.to_timedelta(df.time, unit="s").add(pd.to_datetime(df.run_start))

In [3]:
"""Do laser tagging"""

df["ig_laser"] = ~df.ib_head & ~df.ib_clipped & ~df.ib_flat & ~df.ib_error
df["ig_event"] = ~df.ig_laser & (
    ~df.ib_head & ~df.ib_clipped & ~df.ib_flat & ~df.ib_error
)

In [79]:
%%capture
%%
"""Preview laser data.
"""

plt.close("all")
channels = df.channel.unique()
for run, rungroup in df.groupby("run_id"):
    fig, axes = fig, ax = plt.subplots(
        len(channels),
        1,
        figsize=(5, 3 * len(channels)),
        dpi=DPI_VIS,
        constrained_layout=True,
    )
    fig.suptitle(f"Run {run}")

    for (ch, chgroup), ax in zip(rungroup.groupby("channel"), axes):
        data = chgroup[
            (chgroup.height_mV.between(*chgroup.height_mV.quantile([0, 0.99])))
            & (chgroup.otherV.between(*chgroup.otherV.quantile([0, 0.99])))
            & chgroup.ig_laser
        ]
        hb = ax.hexbin(
            data.height_mV,
            data.otherV,
            gridsize=500,
            lw=0,
            cmap="inferno",
            norm=mpl.colors.LogNorm(),
        )
        cb = fig.colorbar(hb, ax=ax)
        cb.set_label("log10(N)")
        ax.set(
            xlabel="Height (mV)",
            ylabel="Other (V)",
            title=f"Channel {ch}",
        )
    display(fig)
    # fig.savefig(f"{figdir}/run{run}-laser-corrected.png")
    plt.close(fig)

In [80]:
%%capture
%%
df[["correct", "gradient"]] = (
    df.groupby("channel")
    .apply(lambda x: pd.DataFrame(correct_substrate_heating(x.height_mV, x.sumV)).T)
    .reset_index(level=0, drop=True)
)
df["gradient"] = df["gradient"].astype("category")

In [None]:
"""Do substrate correction.

Takes a while. May need parallelization.
Should I use groupby or simply filter by channel/run?
"""

# TRY: Parallelize per channel and per run to correct individually
#: Probably won't work because you can't parallel modin which already parallelizes
#: This spun up multiple ray instances and crashed
# with Parallel(n_jobs=-1) as parallel:
#     results = parallel(
#         delayed(
#             lambda run_id, channel, group: (
#                 run_id,
#                 channel,
#                 *correct_substrate_heating(group.height_mV, group.sumV),
#             )
#         )(run_id, channel, group)
#         for (run_id, channel), group in df.groupby(["run_id", "channel"])
#     )
#     run_ids, channels, corrects, gradients = zip(*results)

#     corrects_df = pd.concat(
#         [
#             pd.Series(
#                 correct, index=df[(df.run_id == run_id) & (df.channel == channel)].index
#             )
#             for run_id, channel, correct in zip(run_ids, channels, corrects)
#         ]
#     )
#     gradients_df = pd.concat(
#         [
#             pd.Series(
#                 gradient,
#                 index=df[(df.run_id == run_id) & (df.channel == channel)].index,
#             )
#             for run_id, channel, gradient in zip(run_ids, channels, gradients)
#         ]
#     )

#     df["correct"] = corrects_df
#     df["gradient"] = gradients_df.astype("category")

# del run_ids, channels, corrects, gradients, corrects_df, gradients_df

# TRY: No parallel
#: AFAICT, this is working but slow
# corrects = []
# gradients = []

# for run, rungroup in df.groupby("run_id"):
#     for channel, group in rungroup.groupby("channel"):
#         correct, gradient = correct_substrate_heating(
#             group.height_mV,
#             group.sumV,
#             dev_plot=True,
#             dev_dir=figdir,
#             dev_name=f"substrateCorrection-run{run}-ch{channel}",
#         )
#         print(gradient)
#         corrects.append(pd.Series(correct, index=group.index))
#         gradients.append(pd.Series(gradient, index=group.index).astype("category"))

# df["correct"] = pd.concat(corrects)
# df["gradient"] = pd.concat(gradients)

# del corrects, gradients


# TRY: Parallelize AFTER groupby
#: If pandas operations in parallel makes modin confused, then
#: do parallel numpy operations. Makes mapping back to DataFrame more difficult
# NOTE: correct_substrate_heating is currently written to work with pandas objects to conserve indices
# NOTE: Disabling modin may be as easy as df._to_pandas() which creates a non-parallelized pandas copy.
def process_group(run_id, channel, *args, **kwargs):
    """Pass grouby indicators to results for concatenation."""
    correct, gradient = correct_substrate_heating(*args, **kwargs)
    return (run_id, channel, correct, gradient)


# def construct_delayed(run, ch, *modin, **kwargs):
#     """Change the function signature to allow for delayed."""
#     with mcfg.context(Engine="python", BenchmarkMode=True):
#         return delayed(process_group)(
#             run, ch, *[m._to_pandas() for m in modin], **kwargs
#         )

grouped = df[df.ig_laser].groupby(["run_id", "channel"], observed=True)

mVmins = {rid: {ch: 2.6 for ch in df.channel.unique()} for rid in df.run_id.unique()}
#: Per run, channel mVmin assignment
mVmins[0][14] = 3
mVmins[0][18] = 3

results = Parallel(n_jobs=-1)(
    #: This for loop is sequential, thus what gets parallelized is numpy only
    # construct_delayed(
    delayed(process_group)(
        run,
        ch,
        group.height_mV._to_pandas(),
        group.sumV._to_pandas(),
        dev_plot=True,
        dev_dir=figdir,
        dev_name=f"substrateCorrection-run{run}-ch{ch}",
        kwdict={
            "mVmin": mVmins[run][ch],
            "dev_plot": True,
            "dev_dir": figdir,
            "dev_name": f"substrateCorrection-run{run}-ch{ch}",
        },
    )
    for (run, ch), group in grouped
)

runs, channels, corrects, gradients = zip(*results)

# #: I don't trust that filtration or even re-grouping will maintain the index
# #: So, use the same grouped and map the results
# correct_map = {(run, ch): correct for run, ch, correct in zip(runs, channels, corrects)}
# gradient_map = {
#     (run, ch): gradient for run, ch, gradient in zip(runs, channels, gradients)
# }
# df["correct"] = pd.concat(
#     [
#         pd.Series(
#             correct_map[g[0]],
#             index=g[1].index,
#         )
#         for g in grouped
#     ]
# )
# df["gradient"] = pd.concat(
#     [
#         pd.Series(
#             gradient_map[g[0]],
#             index=g[1].index,
#         )
#         for g in grouped
#     ]
# ).astype("category")
df.loc["correct"] = pd.concat(corrects)
df.loc["gradient"] = pd.concat(gradients).astype("category")

In [None]:
"""Validate the correction.

Using filters takes 8m 30s
Using groups takes 3m 13s!
"""

plt.close("all")
channels = df.channel.unique()
for run, rungroup in df.groupby("run_id"):
    fig, axes = fig, ax = plt.subplots(
        len(channels),
        1,
        figsize=(5, 3 * len(channels)),
        dpi=DPI_VIS,
        constrained_layout=True,
    )
    fig.suptitle(f"Run {run}")

    for (ch, chgroup), ax in zip(rungroup.groupby("channel"), axes):
        data = chgroup[
            (chgroup.correct.between(*chgroup.correct.quantile([0, 0.99])))
            & (chgroup.otherV.between(*chgroup.otherV.quantile([0, 0.99])))
            & (
                ~chgroup.ib_head
                & ~chgroup.ib_clipped
                & ~chgroup.ib_flat
                & ~chgroup.ib_error
            )
        ]
        hb = ax.hexbin(
            data.correct,
            data.otherV,
            gridsize=500,
            lw=0,
            cmap="inferno",
            norm=mpl.colors.LogNorm(),
        )
        cb = fig.colorbar(hb, ax=ax)
        cb.set_label("log10(N)")
        ax.set(
            xlabel="Height (mV)",
            ylabel="Other (V)",
            title=f"Channel {ch}",
        )
    display(fig)
    # fig.savefig(f"{figdir}/run{run}-laser-corrected.png")
    plt.close(fig)