This tutorial breaks down the processing pipeline from raw data up to hits identification, in the `qualiphide` context and algorithms (plugins) of `straxion v0.0.0` (which is mostly a translation and refactoring from [the `Matlab` processor](https://github.com/FaroutYLq/axioph/tree/main/QUALIPHIDE/old_matlab)).

Lanqing, August 05, 2025

# Pre-knowledge

Please make sure you have already followed the instructions in the top-level `README.md` to install `straxion`.

In [None]:
import straxion
import matplotlib.pyplot as plt
import numpy as np

# Apply the plotting style. You can also comment it out to use the default style.
plt.style.use("../.customized_mplstyle")

In this tutorial, we assume that you already have access to the resonator fine scan as well as time stream data prepared in a format as follow. The scope of these data might only make sesne to QUALIPHIDE collaboration for now, and they are not made public.

In [None]:
!tree ../.example_data/

To load data, we will need to define a `strax.Context` object, in which all the detector-specific processing-related information is defined.

In [None]:
# Define the context.
st = straxion.qualiphide()

# Minimally configure the context.
# Other unmentioned options are set to default values inherited from the Matlab codes.
st.set_config(
    dict(
        daq_input_dir="../.example_data/timeS429",  # Where to find raw data.
        iq_finescan_dir="../.example_data/finescan_iq428",  # Where to find IQ fine-scan data.
    )
)

# Load the raw records.
raw_records = st.get_array(run_id="timeS429", targets="raw_records")

For example, all the technical configuration regarding computation can be found already stored in the _context_. It is beyond regular users to understand what they are exactly though, for which avid users want to consult the [`strax` documentation](https://strax.readthedocs.io/en/latest/):

In [None]:
st.show_config()

The processing algorithms are broken down into significant steps called _plugins_, inserted into the _context_. Below are the implemented _plugins_ so far in `straxion v0.0.0`.

In [None]:
st._plugin_class_registry

In the following of this tutorial, we will dive into these plugins in order of dependency. 

# Plugins

## `raw_records`

This is the most fundamental step in processing: Read the raw data into time stream of in-phase and quadrature.

In [None]:
?straxion.DAQReader

The following command loads the `raw_records` data into memory:

In [None]:
raw_records = st.get_array(run_id="timeS429", targets="raw_records")

All the data we computed are saved in the output folder defined by the context:

In [None]:
st.storage

In [None]:
!tree ./strax_data

You can see that the data are named by `<run_id>-<data_type>-<lineage_hash>`, where the hash is uniquely determined by the tracked lineage of configuration. The motivation for this design is that we will not run into problems loading data computed by different configuration (For example, you might have changed a threshold value somewhere and forgot. This will not hurt you in `straxion`).

As an example, here are the tracked lineage for `raw_records`. Higher level plugins might have much more complicated lineage.

In [None]:
st.lineage("timeS429", "raw_records")

It gives the hash:

In [None]:
st.key_for("timeS429", "raw_records")

Now let's see what do we have in `raw_records`:

In [None]:
st.data_info("raw_records")

Each element of the loaded array correspond to one channel. We will just inspect one for illustration.

In [None]:
np.shape(raw_records)

In [None]:
raw_records = raw_records[raw_records["channel"] == 0]

Let's watch the I and Q waveforms before any processing.

In [None]:
plt.figure(figsize=(10, 5))

plt.plot(
    raw_records[0]["time"] + np.arange(raw_records[0]["length"]) * raw_records[0]["dt"],
    raw_records[0]["data_i"],
    label="I",
)
plt.plot(
    raw_records[0]["time"] + np.arange(raw_records[0]["length"]) * raw_records[0]["dt"],
    raw_records[0]["data_q"],
    label="Q",
)
plt.xlabel("Time Since Unix Epoch [ns]")
plt.ylabel("IQ [A.U.]")
plt.legend(ncol=2, loc="upper right")

## `records`

Next, `records` processes these raw IQ data to extract phase information. You can find the important assumptions as well as the processing workflow below.

In [None]:
?straxion.PulseProcessing

In [None]:
records = st.get_array(
    run_id="timeS429",
    targets="records",
)[
    0
]  # Only read the first record, as the rest are the same.

### Understand your configurations

More configurations are involved now, and these are mostly filtering-related parameters.


In [None]:
st.lineage("timeS429", "records")

To understand what these configuration options mean, you can always consult the helpers:

In [None]:
st.show_config("records")

Note that `records` is a pretty heavy datat type! You can check the size in memory by:

In [None]:
st.size_mb(run_id="timeS429", target="records")

### Algorithms

In [None]:
st.data_info("records")

First we load from the fine scan, and perform a geometric circular fit to determine a "baseline" of phases.

In [None]:
from straxion.plugins.records import PulseProcessing

finescan = PulseProcessing.load_finescan_files("../.example_data/finescan_iq428")
finescan_i = finescan[0][:, 1]
finescan_q = finescan[0][:, 2]
x_center, y_center, radius, residuals = PulseProcessing.circfit(finescan_i, finescan_q)

In [None]:
plt.scatter(finescan_i, finescan_q, label="finescan_iq428")
# Create points for circle
theta = np.linspace(0, 2 * np.pi, 100)
circle_x = x_center + radius * np.cos(theta)
circle_y = y_center + radius * np.sin(theta)
plt.plot(circle_x, circle_y, alpha=0.5, label="Fitted circle")
plt.xlabel("I")
plt.ylabel("Q")
plt.scatter(
    [np.mean(raw_records[0]["data_i"])], [np.mean(raw_records[0]["data_q"])], label="Mean", s=100
)

plt.legend(loc="upper left", bbox_to_anchor=(-0.1, 1.1), ncol=3)
# plt.title("Check of the circle fit by iq428.txt")
plt.show()

The next step is to convolve the pulses with a square wave (default width 5 samples) and a truncated EMG pulse kernel. Note that the convolved waveform will be slightly shifted in time.

In [None]:
emg_strax = PulseProcessing.pulse_kernel(
    ns=512, fs=50_000, t0=200_000, tau=600_000, sigma=28_000, truncation_factor=10
)

plt.plot(emg_strax, label="Straxion default parameters")
plt.legend(loc="best")
plt.title("Truncated EMG Pulse Kernel")
plt.xlabel("Samples [20 us]")

Let's take a look at the pulses.

In [None]:
slice_sample_length = 20_000
total_sample_length = records["length"]

for i in range(1):
    sample_range = (i * slice_sample_length, (i + 1) * slice_sample_length)

    times_start = records["time"]
    times_us = (records["time"] + np.arange(records["length"]) * records["dt"] - times_start) / 1e3
    times_us = times_us[sample_range[0] : sample_range[1]]

    plt.figure(figsize=(10, 5))
    plt.plot(
        times_us,
        records["data_theta"][sample_range[0] : sample_range[1]],
        label="Flipped and Shifted",
        # color="black",
        alpha=0.5,
        lw=1,
    )
    plt.plot(
        times_us,
        records["data_theta_moving_average"][sample_range[0] : sample_range[1]],
        label="Moving Averaged",
        # color="tab:blue",
        lw=2,
    )
    plt.plot(
        times_us,
        records["data_theta_convolved"][sample_range[0] : sample_range[1]],
        label="Convolved with Pulse Kernel",
        # color="tab:orange",
        lw=2,
    )

    plt.legend(loc="best", ncol=3)
    plt.xlabel("Time Since Run Start [us]")
    plt.ylabel("Theta [rad]")
    # plt.title(f"Slice {i+1} of {int(total_sample_length / slice_sample_length)}")
    plt.title("Example Records of 0.4 seconds")
    plt.show()

## `hits` and `hit_classification`

In [None]:
?straxion.Hits

In [None]:
hits = st.get_array("timeS429", ["hits", "hit_classification"])
hits = hits[hits["channel"] == 0]

In [None]:
st.lineage("timeS429", "hit_classification")

In [None]:
st.data_info("hits")

In [None]:
st.data_info("hit_classification")

Below we show one example how hits are found.

In [None]:
slice_sample_length = 20_000
total_sample_length = records["length"]

convolved_std = np.std(records["data_theta_convolved"])
hit_id = 0

# for i in range(5):
for i in range(1):
    sample_range = (i * slice_sample_length, (i + 1) * slice_sample_length)

    times_start = records["time"]
    times_us = (records["time"] + np.arange(records["length"]) * records["dt"] - times_start) / 1e3
    times_us = times_us[sample_range[0] : sample_range[1]]

    plt.figure(figsize=(10, 5))
    plt.plot(
        times_us,
        records["data_theta"][sample_range[0] : sample_range[1]],
        label="Flipped and Shifted",
        # color="black",
        alpha=0.5,
        lw=1,
    )
    plt.plot(
        times_us,
        records["data_theta_moving_average"][sample_range[0] : sample_range[1]],
        label="Moving Averaged",
        # color="tab:blue",
        lw=2,
    )
    plt.plot(
        times_us,
        records["data_theta_convolved"][sample_range[0] : sample_range[1]],
        label="Convolved with Pulse Kernel",
        # color="tab:orange",
        lw=2,
    )

    # Add hit time ranges
    slice_hits = hits[
        (hits["time"] >= times_start + sample_range[0] * records["dt"])
        & (hits["time"] < times_start + sample_range[1] * records["dt"])
    ]
    for hit in slice_hits:
        hit_start_us = (hit["time"] - times_start) / 1e3
        hit_end_us = (hit["endtime"] - times_start) / 1e3
        plt.axvspan(hit_start_us, hit_end_us, alpha=0.2, color="red")
        plt.axvline(hit["aligned_at_records_i"] * records["dt"] / 1e3, color="tab:red")
        plt.text(hit_start_us, hit["amplitude_ma_max"], "#" + str(hit_id), color="tab:red")
        hit_id += 1

    # Show threshold and baseline

    plt.axhline(hits[i]["hit_threshold"], color="tab:red", ls=":")
    plt.axhspan(-convolved_std, convolved_std, color="tab:orange", alpha=0.2, zorder=0)

    plt.xlim(times_us[0], times_us[-1])

    plt.legend()
    plt.xlabel("Time Since Run Start [us]")
    plt.ylabel("Theta [rad]")
    plt.title(f"Slice {i+1} of {int(total_sample_length / slice_sample_length)}")
    plt.show()

In [None]:
for i in range(2):
    plt.plot(hits[i]["data_theta"], label="Flipped and Shifted", alpha=0.5, lw=1)
    plt.plot(hits[i]["data_theta_moving_average"], label="Moving Averaged", lw=2)
    plt.plot(hits[i]["data_theta_convolved"], label="Convolved with Pulse Kernel", lw=2)
    plt.axhline(hits[i]["hit_threshold"], color="tab:red", ls=":")
    plt.axhline(0, color="black", ls=":")
    plt.axhline(hits[i]["amplitude_ma_max"], color="#4067b1", ls="--")
    plt.axhline(hits[i]["amplitude_convolved_max"], color="#6ccef5", ls="--")
    plt.axhline(hits[i]["amplitude_convolved_max_ext"], color="#6ccef5", ls=":")
    plt.axhline(hits[i]["amplitude_ma_max_ext"], color="#4067b1", ls=":")

    if hits[i]["is_cr"]:
        type_str = "Cosmic Ray"
    elif hits[i]["is_symmetric_spike"]:
        type_str = "Symmetric Spike"
    else:
        type_str = "Photon Candidate"

    plt.title("Hit #" + str(i) + " (" + type_str + ")")
    plt.xlabel("Samples [20us]")
    plt.ylabel("Phase [rad]")
    plt.legend()
    plt.show()

Let's read some waveforms for the hits we cut out.

In [None]:
for i in np.where(hits["is_cr"])[0][:2]:
    plt.plot(hits[i]["data_theta"], label="Flipped and Shifted", alpha=0.5, lw=1)
    plt.plot(hits[i]["data_theta_moving_average"], label="Moving Averaged", lw=2)
    plt.plot(hits[i]["data_theta_convolved"], label="Convolved with Pulse Kernel", lw=2)
    plt.axhline(hits[i]["hit_threshold"], color="tab:red", ls=":")
    plt.axhline(0, color="black", ls=":")
    plt.axhline(hits[i]["amplitude_ma_max"], color="#4067b1", ls="--")
    plt.axhline(hits[i]["amplitude_convolved_max"], color="#6ccef5", ls="--")
    plt.axhline(hits[i]["amplitude_convolved_max_ext"], color="#6ccef5", ls=":")
    plt.axhline(hits[i]["amplitude_ma_max_ext"], color="#4067b1", ls=":")

    if hits[i]["is_cr"]:
        type_str = "Cosmic Ray"
    elif hits[i]["is_symmetric_spike"]:
        type_str = "Symmetric Spike"
    else:
        type_str = "Photon Candidate"

    plt.title("Example" + " " + type_str)
    plt.xlabel("Samples [20us]")
    plt.ylabel("Phase [rad]")
    plt.legend(loc="best")
    plt.show()

In [None]:
for i in np.where(hits["is_symmetric_spike"])[0][:2]:
    plt.plot(hits[i]["data_theta"], label="Flipped and Shifted", alpha=0.5, lw=1)
    plt.plot(hits[i]["data_theta_moving_average"], label="Moving Averaged", lw=2)
    plt.plot(hits[i]["data_theta_convolved"], label="Convolved with Pulse Kernel", lw=2)
    plt.axhline(hits[i]["hit_threshold"], color="tab:red", ls=":")
    plt.axhline(0, color="black", ls=":")
    plt.axhline(hits[i]["amplitude_ma_max"], color="#4067b1", ls="--")
    plt.axhline(hits[i]["amplitude_convolved_max"], color="#6ccef5", ls="--")
    plt.axhline(hits[i]["amplitude_convolved_max_ext"], color="#6ccef5", ls=":")
    plt.axhline(hits[i]["amplitude_ma_max_ext"], color="#4067b1", ls=":")

    if hits[i]["is_cr"]:
        type_str = "Cosmic Ray"
    elif hits[i]["is_symmetric_spike"]:
        type_str = "Symmetric Spike"
    else:
        type_str = "Photon Candidate"

    plt.title("Example" + " " + type_str)
    plt.xlabel("Samples [20us]")
    plt.ylabel("Phase [rad]")
    plt.legend(loc="best")
    plt.show()

Below we show some basic statistics.

In [None]:
plt.hist(hits["width"], bins=np.linspace(20, 300, 20), label="All Hits")
plt.hist(hits[hits["is_cr"]]["width"], bins=np.linspace(20, 300, 20), label="CR Hits")
plt.hist(
    hits[hits["is_symmetric_spike"]]["width"],
    bins=np.linspace(20, 300, 20),
    label="Symmetric Spikes",
)
plt.xlabel("Width [20us]")
plt.ylabel("Counts")
plt.legend(loc="best")
plt.yscale("log")
plt.show()

In [None]:
plt.hist(hits["amplitude_convolved_max_ext"], bins=np.linspace(0, 1.2, 40), label="All Hits")
plt.hist(
    hits[hits["is_cr"]]["amplitude_convolved_max_ext"],
    bins=np.linspace(0, 1.2, 40),
    label="CR Hits",
)
plt.hist(
    hits[hits["is_symmetric_spike"]]["amplitude_convolved_max_ext"],
    bins=np.linspace(0, 1.2, 40),
    label="Symmetric Spikes",
)
plt.xlabel("Convolved Amplitude (Extended) [20us]")
plt.ylabel("Counts")
plt.legend(loc="best")
plt.yscale("log")
plt.show()

In [None]:
plt.scatter(
    hits["amplitude_convolved_max"],
    hits["amplitude_convolved_max_ext"],
    alpha=0.5,
    label="All Hits",
)
plt.scatter(
    hits["amplitude_convolved_max"][hits["is_cr"]],
    hits["amplitude_convolved_max_ext"][hits["is_cr"]],
    label="CR Hits",
    s=10,
)
plt.scatter(
    hits["amplitude_convolved_max"][hits["is_symmetric_spike"]],
    hits["amplitude_convolved_max_ext"][hits["is_symmetric_spike"]],
    label="Symmetric Spikes",
    s=10,
)
plt.xlabel("Convolved Amplitude [rad]")
plt.ylabel("Convolved Amplitude (Extended) [rad]")
plt.legend(loc="best")
plt.show()

In [None]:
plt.scatter(hits["amplitude_ma_max"], hits["amplitude_ma_max_ext"], alpha=0.5, label="All Hits")
plt.scatter(
    hits["amplitude_ma_max"][hits["is_cr"]],
    hits["amplitude_ma_max_ext"][hits["is_cr"]],
    label="CR Hits",
    s=10,
)
plt.scatter(
    hits["amplitude_ma_max"][hits["is_symmetric_spike"]],
    hits["amplitude_ma_max_ext"][hits["is_symmetric_spike"]],
    label="Symmetric Spikes",
    s=10,
)
plt.xlabel("M.A. Amplitude [rad]")
plt.ylabel("M.A. Amplitude (Extended) [rad]")
plt.legend(loc="best")
plt.show()