In [None]:
import os
import tqdm
import numpy as np
import matplotlib as mpl
from matplotlib import pyplot as plt
from tools.validator import Validator
from tools.fitting import TriggerFitter, SignalFitter
from utils.file_management import file_reader
from scipy.signal import savgol_filter

In [None]:
folder_path = "data/path/to/folder"

In [None]:
def plot_spectra(
    show=True,
    left_lim=0,
    right_lim=100,
    time_disc_left=0,
    time_disc_right=2,
    peak_left=0,
    peak_right=100,
    bins=60,
):
    with open(os.path.join(folder_path, "tmp/signal.txt"), "r") as file:
        signal = file.readlines()
    with open(os.path.join(folder_path, "tmp/rise_time.txt"), "r") as file:
        rise_time = file.readlines()

    fast_amplitudes = [
        float(signal[i].split(",")[0])
        for i in Validator(folder_path).validated_indices()
    ]
    slow_amplitudes = [
        float(signal[i].split(",")[4])
        for i in Validator(folder_path).validated_indices()
    ]
    rise_times = [
        float(rise_time[i].split(",")[0])
        for i in Validator(folder_path).validated_indices()
    ]

    amplitudes = [
        fast_amplitude + slow_amplitude
        for fast_amplitude, slow_amplitude in zip(fast_amplitudes, slow_amplitudes)
    ]
    amplitudes = [
        amplitude
        for amplitude, rise_time in zip(amplitudes, rise_times)
        if rise_time > time_disc_left and rise_time < time_disc_right
    ]
    plt.hist(amplitudes, bins=bins, range=(left_lim, right_lim))

    y, bin_edges = np.histogram(
        amplitudes,
        bins=int(bins / (right_lim - left_lim) * (peak_right - peak_left)),
        range=(peak_left, peak_right),
    )
    x = (bin_edges[:-1] + bin_edges[1:]) / 2
    tr_fitter = TriggerFitter(x, y)
    popt = tr_fitter.fit(
        tr_fitter.gauss,
        p0=[100, 0.5 * (peak_right - peak_left), 3],
        bounds=([0, 0, 0], [np.inf, np.inf, np.inf]),
    )
    _, mu, sigma = popt
    plt.plot(x, tr_fitter.gauss(x, *popt), "k-", linewidth=2)
    plt.xlabel("Signal amplitude, [mV]")
    plt.ylabel("Counts")
    plt.legend([f"Resolution: {round(100*2.355*abs(sigma/mu),2)}%"])
    if show:
        plt.show()
    else:
        plt.savefig(f"{folder_path}/tmp/amplitude_spectrum.png")
        plt.close()

In [None]:
def plot_rise_time(show=True, left_lim=0, right_lim=2, bins=100):
    with open(os.path.join(folder_path, "tmp/rise_time.txt"), "r") as file:
        rise_time = file.readlines()

    rise_time = [
        float(rise_time[i].split(",")[0])
        for i in Validator(folder_path).validated_indices()
    ]
    plt.hist(rise_time, bins=bins, range=(left_lim, right_lim), alpha=0.75)
    max_bin_index = np.argmax(
        np.histogram(rise_time, bins=bins, range=(left_lim, right_lim))[0]
    )
    bin_edges = np.histogram_bin_edges(
        rise_time, bins=bins, range=(left_lim, right_lim)
    )
    max_bin_center = (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2
    plt.axvline(max_bin_center, color="r", linestyle="dashed", linewidth=1)
    plt.xlabel(r"Rise Time, [$\mu$s]")
    plt.legend([f"Max: {max_bin_center}"])
    if show:
        plt.show()
    else:
        plt.savefig(f"{folder_path}/tmp/rise_time.png")
        plt.close()

In [None]:
def plot_rise_time_signal(
    show=True,
    left_lim_time=0,
    right_lim_time=2,
    left_lim_signal=0,
    right_lim_signal=100,
    bins=(50, 50),
    is_log=False,
):
    with open(os.path.join(folder_path, "tmp/signal.txt"), "r") as file:
        signal = file.readlines()

    with open(os.path.join(folder_path, "tmp/rise_time.txt"), "r") as file:
        rise_time = file.readlines()

    signal_ampl = [
        float(signal[i].split(",")[0])
        for i in Validator(folder_path).validated_indices()
    ]
    slow_ampl = [
        float(signal[i].split(",")[4])
        for i in Validator(folder_path).validated_indices()
    ]
    rise_time = [
        float(rise_time[i].split(",")[0])
        for i in Validator(folder_path).validated_indices()
    ]
    total_signal = [signal_ampl[i] + slow_ampl[i] for i in range(len(signal_ampl))]

    if is_log:
        plt.hist2d(
            rise_time,
            total_signal,
            bins=bins,
            range=[
                [left_lim_time, right_lim_time],
                [left_lim_signal, right_lim_signal],
            ],
            cmap=plt.cm.jet,
            norm=mpl.colors.LogNorm(),
        )
    else:
        plt.hist2d(
            rise_time,
            total_signal,
            bins=(50, 50),
            range=[
                [left_lim_time, right_lim_time],
                [left_lim_signal, right_lim_signal],
            ],
            cmap=plt.cm.jet,
        )

    plt.colorbar()
    plt.xlabel(r"Rise Time, [$\mu$s]")
    plt.ylabel("Signal Amplitude, [mV]")
    if show:
        plt.show()
    else:
        plt.savefig(f"{folder_path}/tmp/rise_time_signal.png")
        plt.close()

In [None]:
def plot_sample(sample, show=True, left_lim=0, right_lim=-1):
    csv_file = [f for f in os.listdir(folder_path) if f.endswith(f"{sample+1}.csv")][0]

    time, signals, _ = file_reader(os.path.join(folder_path, csv_file))

    with open(os.path.join(folder_path, "tmp/signal.txt"), "r") as file:
        signal = file.readlines()

    try:
        popt = [float(par) for par in signal[sample].split(",")]
    except ValueError:
        return
    if sample in list(Validator(folder_path).validated_indices()):
        plt.plot(time[left_lim:right_lim], signals[left_lim:right_lim])
        signal_fitter = SignalFitter(
            time, signals, Validator(folder_path).get_main_sign()
        )
        plt.plot(
            time[left_lim:right_lim],
            signal_fitter.sigmoid(time, *popt[:4])[left_lim:right_lim],
        )

        signals = savgol_filter(signals, 40, 2)

        left, right = signal_fitter.auto_borders(time, signals)

        if popt[-1] == 1:
            offset = popt[3] + popt[0]
        else:
            offset = popt[3]

        new_time = [t for t in time if t > right]

        plt.plot(
            new_time,
            signal_fitter.one_exponent(new_time, *popt[4:6], right, offset),
        )
        plt.axvline(right, color="r", linestyle="dashed", linewidth=2)
        plt.axvline(left, color="r", linestyle="dashed", linewidth=2)
        plt.xlabel(r"Time, [$\mu$s]")
        plt.ylabel("Signal Amplitude, [mV]")

        samples_folder = os.path.join(folder_path, "tmp/samples")
        os.makedirs(samples_folder, exist_ok=True)

        if show:
            plt.show()
        else:
            plt.savefig(os.path.join(samples_folder, f"{sample}.png"))
            plt.close()


def plot_all_peaks():
    csv_files = [f for f in os.listdir(folder_path) if f.endswith(".csv")]

    for file in tqdm.tqdm(csv_files):
        sample = int(file.split("_")[-1].replace(".csv", ""))
        plot_sample(sample, show=False)

In [None]:
print(list(Validator(folder_path).validated_indices()))
plot_sample(21, show=True)

In [None]:
plot_spectra(
    left_lim=0,
    right_lim=60,
    peak_left=0,
    peak_right=60,
    time_disc_left=0,
    time_disc_right=2,
    bins=60,
)

In [None]:
plot_rise_time(left_lim=0, right_lim=0.2, bins=30)

In [None]:
plot_rise_time_signal(
    left_lim_time=0,
    right_lim_time=0.4,
    left_lim_signal=0,
    right_lim_signal=60,
    bins=(20, 60),
    is_log=False,
)

In [None]:
plot_all_peaks()