In [2]:
import LDAQ #custom version of LDAQ --> should be installed properly via pip
import numpy as np
import numpy.typing as npt
import matplotlib.pyplot as plt
import scipy

import pandas as pd

from pathlib import Path

from data_converter import measurement_dict_to_sep005

In [None]:
def extract_data(loaded_data, threshold_V: float = 75e-3, width_s: float = 100e-6, plot: bool = False) -> tuple[npt.ArrayLike, npt.ArrayLike]:
    """
    Threshold the raw sensor data and extract the next `width_s` second from when the threshold is reached.
    """
    time = loaded_data["time"]
    data = loaded_data["data"]

    width = int(width_s * loaded_data["fs"])

    start = np.argmax(np.abs(data) > threshold_V, axis=0)
    end = start + width

    extracted_data = np.zeros((width, data.shape[1]))
    extracted_time = np.zeros((width, data.shape[1]))

    for i in range(data.shape[1]):
        extracted_data[:, i] = data[start[i]:end[i], i]
        extracted_time[:, i] = time[start[i]:end[i]]

    if plot:
        plt.figure()
        plt.plot(extracted_time, extracted_data)

    return extracted_data, extracted_time


def extract_peak_times(extracted_time, extracted_data, smoothing: int = 4, plot: bool = False) -> tuple[npt.ArrayLike, npt.ArrayLike]:
    """
    Extract all peaks from the extracted data. This is done by interpolating a cubic spline through the data taken at every `smoothing` (e.g. 4th) point.
    The peaks of this interpolated data and the time at which it occurs is returned
    """

    assert extracted_time.shape == extracted_data.shape, f"Shapes of time and data don't match"

    peak_times = []
    peak_vals = []

    for i in range(extracted_data.shape[1]):
        t = extracted_time[:, i]
        d = extracted_data[:, i]

        cs = scipy.interpolate.CubicSpline(t[::smoothing], d[::smoothing])
        deriv = cs.derivative()
        #deriv_vals = deriv(t)

        zero_idx = np.argmax(np.abs(cs(t)))
        zero_time = t[zero_idx]

        #zero_time = np.min(deriv.roots(extrapolate=False))
        #zero_idx = np.argmin(np.abs(t - zero_time))

        peak_times.append(zero_time)
        peak_vals.append(d[zero_idx])

        if plot:
            plt.figure()
            plt.plot(t, cs(t), t, d, "--", zero_time, d[zero_idx], "x")

    return np.asarray(peak_times), np.asarray(peak_vals)

def df_from_folder(data_folder: str, locations: dict, columns: list[str]) -> pd.DataFrame:
    """
    Iterate through folder and load all measurement files within. Extract the peak times and peak values and append them to a pandas.DataFrame
    """

    impact_type = data_folder.split("/")[-1]
    raw_df_data = []

    for file in list(Path(data_folder).glob('*.pkl')):
        loaded_data = LDAQ.utils.load_measurement(file.name, directory=data_folder)
        loaded_data = measurement_dict_to_sep005(loaded_data)

        extracted_data, extracted_time = extract_data(loaded_data)

        peak_times, peak_vals = extract_peak_times(extracted_time, extracted_data)
        time_offset = peak_times - peak_times[0]
        raw_df_data.append([*time_offset, *peak_vals, *locations[impact_type]])

    return pd.DataFrame(raw_df_data, columns=columns)

locations = {
    "back": (0, -60),
    "center": (0, 0),
    "front": (0, 60)
}

columns = [
    *[f"S{i+1}" for i in range(6)],
    *[f"A{i+1}" for i in range(6)],
    *["X", "Y"]
]

df_center = df_from_folder("data/impacts/center", locations, columns)
df_front = df_from_folder("data/impacts/front", locations, columns)

df = pd.concat([df_center, df_front], ignore_index=True)

### Split of last entries into test dataset

In [8]:
test_split = 0.25
split_indx = int(test_split * len(df_center))

train_center, test_center = df_center.iloc[0:split_indx], df_center.iloc[split_indx:]
train_front, test_front = df_front.iloc[0:split_indx], df_front.iloc[split_indx:]

df_train = pd.concat([train_center, train_front], ignore_index=True)
df_test = pd.concat([test_center, test_front], ignore_index=True)

In [None]:
df_train.to_csv("./data/dataset_train.csv", index=False)
df_test.to_csv("./data/dataset_test.csv", index=False)
df.to_csv("./data/dataset.csv", index=False)