# Signal Analyzer

This notebooks loads single signals and offers a simple GUI to plot them including some further analysis.

The prerequisite is to have slki as well as its dependencies installed.

In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [3]:
from typing import Any

from _defaults import (
    datasets_labels,
    init_notebook,
    load_datasets_chunk_lengths,
    load_signal,
)
import numpy as np
from tabulate import tabulate

In [4]:
init_notebook()

## Load preprecessed datasets

In [5]:
import _defaults

In [6]:
# do enforce resampling, there is an option for that inside the GUI later
_defaults.resample_size = None

Analyse the data. To be presice: Analyse how many signals are avaliable without loading the signals to save memory.

In [7]:
dataset_lengths = load_datasets_chunk_lengths()

[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:537][0m  7 meta data files found.
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:594][0m  Loading meta data from '/home/lab/slki/Dataset/preprecessed/v4/fernverkehr-points-kionix-sh-z-dt_1-meta.pkl'...
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:594][0m  Loading meta data from '/home/lab/slki/Dataset/preprecessed/v4/fernverkehr-points-kionix-sh-z-dt_2-meta.pkl'...
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:594][0m  Loading meta data from '/home/lab/slki/Dataset/preprecessed/v4/fernverkehr-points-kionix-sh-z-dt_3-meta.pkl'...
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:594][0m  Loading meta data from '/home/lab/slki/Dataset/preprecessed/v4/fernverkehr-points-kionix-sh-z-dt_4-meta.pkl'...
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:594][0m  Loading meta data from '/home/lab/slki/Dataset/preprecessed/v4/fernverkehr-points-kionix-sh-z-dt_5-meta.pkl'...
[[1;32mINFO

In [8]:
print(
    tabulate(
        zip(datasets_labels, (sum(data_lengths) for data_lengths in dataset_lengths), strict=False),
        headers=["Dataset", "Length"],
    )
)

Dataset         Length
------------  --------
Fernverkehr       5572
Güterzüge         1587
Regioverkehr      4248


## Plot some samples

In [9]:
from IPython.display import display
import ipywidgets as widgets
import matplotlib.pyplot as plt
from tslearn.preprocessing import TimeSeriesResampler

In [10]:
sample, meta = load_signal(0, 0, dataset_lengths)

[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:532][0m  1 data files and 1 meta data files found.
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:576][0m  Loading data from '/home/lab/slki/Dataset/preprecessed/v4/fernverkehr-points-kionix-sh-z-dt_1.pkl'...
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:594][0m  Loading meta data from '/home/lab/slki/Dataset/preprecessed/v4/fernverkehr-points-kionix-sh-z-dt_1-meta.pkl'...
[[1;32mINFO[0m] [37m[slki.data.import][data_import.py:334][0m  Normalizing data with 'mone_one_zero_fix' normalization.


Normalizing:   0%|                                      | 0/843 [00:00<?, ?it/s]

In [None]:
dataset_widget = widgets.ToggleButtons(options=list(zip(datasets_labels, range(len(datasets_labels)), strict=False)))
sample_idx_widget = widgets.BoundedIntText(value=0, min=0, max=sum(dataset_lengths[0]) - 1, step=1)
sample_idx_max_widget = widgets.Label(value=f"max: {sum(dataset_lengths[0]) - 1}")
resample_check_widget = widgets.Checkbox(value=False, indent=False)
resample_widget = widgets.IntSlider(
    value=len(sample), min=100, max=len(sample), step=100, readout=True, readout_format="d", disabled=True
)
checkbox_kwargs = {"value": False, "indent": False, "layout": widgets.Layout(width="200px")}
outlier_reduction_widget = widgets.Checkbox(description="Outlier reduction", **checkbox_kwargs)
outlier_detection_widget = widgets.Checkbox(description="Outlier detection", **checkbox_kwargs)
peak_detection_widget = widgets.Checkbox(description="Peaks detection", **checkbox_kwargs)
eps_widget = widgets.IntSlider(value=50)
grid_widget = widgets.GridBox(
    [
        widgets.Label(value="Dataset:"),
        dataset_widget,
        widgets.Label(value="Sample:"),
        widgets.HBox([sample_idx_widget, sample_idx_max_widget]),
        widgets.Label(value="Resample:"),
        widgets.HBox([resample_widget, resample_check_widget]),
        widgets.Label(value="Flags:"),
        widgets.HBox(
            [
                widgets.VBox(
                    [
                        outlier_reduction_widget,
                        outlier_detection_widget,
                    ]
                ),
                peak_detection_widget,
            ]
        ),
    ],
    layout=widgets.Layout(grid_template_columns="100px auto"),
)

In [12]:
def plot(sample: np.ndarray):
    fig, ax = plt.subplots(figsize=(20, 5))
    ax.plot(sample)
    plt.ylim(-1.1, 1.1)
    fig.tight_layout()
    return fig, ax

In [13]:
current_dataset_idx = 0


def update_sample_max_value(dataset_idx: int):
    global current_dataset_idx
    if current_dataset_idx == dataset_idx:
        return None

    current_dataset_idx = dataset_idx
    max_idx = sum(dataset_lengths[dataset_idx]) - 1
    sample_idx_max_widget.value = f"max: {max_idx}"
    sample_idx_widget.max = max_idx
    sample_idx_widget.value = min(sample_idx_widget.value, max_idx)

In [14]:
def update_resample_widgets(sample_size: int, resample_size: int, resample: bool) -> int:
    old_max = resample_widget.max
    resample_widget.disabled = not resample

    resample_widget.max = sample_size
    if old_max == resample_size or resample_size >= sample_size:
        resample_widget.value = sample_size
    return resample_widget.value

In [15]:
from matplotlib.patches import Rectangle

from slki.utils.peak import detect_upper_peaks

In [16]:
def detect_outlier(sample: np.ndarray, ax: plt.Axes, ax2: plt.Axes, sample_length: int):
    # detect and plot peaks
    upper_peaks = detect_upper_peaks(sample, sample_length)
    if upper_peaks.any():
        ax.scatter(upper_peaks, sample[upper_peaks], marker="o", color="tab:purple", s=40, facecolors="none")
    lower_peaks = detect_upper_peaks(-sample, sample_length)
    if lower_peaks.any():
        ax.scatter(lower_peaks, sample[lower_peaks], marker="o", color="tab:cyan", s=40, facecolors="none")
    peaks = np.unique(np.sort(np.hstack((upper_peaks, lower_peaks))))
    # box plot
    sample_peaks = sample[peaks]
    bxpstats = plt.cbook.boxplot_stats(sample_peaks)
    ax2.bxp(
        bxpstats,
        widths=0.8,
        vert=True,
        flierprops={"markeredgecolor": "tab:red", "linestyle": "none", "markerfacecolor": "tab:red"},
    )
    ax2.set_xticklabels(["peaks"])
    # draw boundary lines
    lower_bound = bxpstats[0]["whislo"]
    upper_bound = bxpstats[0]["whishi"]
    ax.axhline(y=lower_bound, color="r", linestyle="-")
    ax.axhline(y=upper_bound, color="r", linestyle="-")
    # add colors to the different areas
    xlim = ax.get_xlim()
    ylim = ax.get_ylim()
    patch_kwgars = {"linewidth": 1, "edgecolor": "none", "facecolor": "r", "alpha": 0.1}
    ax.add_patch(Rectangle((xlim[0], ylim[0]), xlim[1], -ylim[0] + lower_bound, **patch_kwgars))
    ax.add_patch(Rectangle((xlim[0], upper_bound), xlim[1], ylim[1] - upper_bound, **patch_kwgars))
    # log details
    fliers = bxpstats[0]["fliers"]
    print(f"Number of outliers: {len(fliers)}")
    print(f"lower bound: {lower_bound},  upper bound: {upper_bound}")
    print(f"Outlier values: {fliers}")

In [17]:
from datetime import datetime

from slki.utils.peak import PeaksClusteringResult, cluster_peaks, detect_peaks

In [None]:
def calc_peak_mean_distance(
    signal: np.ndarray,
    meta: dict[str, Any],
    peaks: np.ndarray | None = None,
    clustering_results: PeaksClusteringResult | None = None,
):
    sample_length: int = meta["sample_length"]
    peaks = peaks if peaks is not None else detect_peaks(signal, sample_length)
    if clustering_results is None:
        clustering_results = cluster_peaks(peaks, len(signal), sample_length)

    unique_labels = set(clustering_results.labels)
    cluster_centers = []
    for label in unique_labels:
        if label == -1:
            continue  # skip noise points
        label_mask = clustering_results.labels == label
        x = peaks[label_mask]
        pos = x[np.argmax(signal[x])]
        cluster_centers.append(pos.tolist())
    mean_dist = sum(b - a for a, b in zip(cluster_centers, cluster_centers[1:], strict=False)) / len(cluster_centers)
    # print([b - a for a, b in zip(cluster_centers, cluster_centers[1:], strict=False)])

    # mean_dist = sum(b - a for a, b in zip(peaks, peaks[1:])) / len(peaks)
    print("Mean distance between peaks in sample rate:", mean_dist)

    # stage "detect" trims signal and sample_length but lets start and end time untouched
    start_time: datetime = meta["start_time"]
    end_time: datetime = meta["end_time"]
    sample_rate_in_hz: float = meta["sample_rate_in_hz"]  # sample rate correct?
    resample_factor = sample_length / len(signal)

    print("signal duration (based on sample rate):", sample_length * resample_factor / sample_rate_in_hz, "s")
    print("recorded signal duration (based on start end time)", (end_time - start_time).total_seconds(), "s")

    print("sample_rate_in_hz:", sample_rate_in_hz)
    print("sample_length:", sample_length)

    mean_dist_in_seconds = mean_dist * resample_factor / (sample_rate_in_hz)
    print("Mean distance between peaks in seconds (based on sample rate):", mean_dist_in_seconds)
    print(f"  =>  {15.0 / mean_dist_in_seconds} m/s  <=>  {15.0 * 3.6 / mean_dist_in_seconds} km/h")

    seconds = (end_time - start_time).total_seconds()
    mean_dist_in_seconds = mean_dist * resample_factor * (seconds / sample_length)
    print("Mean distance between peaks in seconds (based on start end time):", mean_dist_in_seconds)
    print(f"  =>  {15.0 / mean_dist_in_seconds} m/s  <=>  {15.0 * 3.6 / mean_dist_in_seconds} km/h")

In [19]:
from slki.preprocessing.outlier import Outlier as OutlierReduction
from slki.utils.peak import detect_and_cluster_and_plot_peaks

In [20]:
def handle_interaction(
    dataset_idx: int,
    sample_idx: int,
    resample_size: int,
    resample: bool,
    peak_detection: bool,
    outlier_detection: bool,
    outlier_reduction: bool,
):
    # update widgets
    update_sample_max_value(dataset_idx)
    resample_size = update_resample_widgets(
        len(load_signal(dataset_idx, sample_idx, dataset_lengths)[0]),
        resample_size,
        resample,
    )
    # get (resampled) signal
    sample, meta = load_signal(dataset_idx, sample_idx, dataset_lengths)
    sample = sample.copy()
    if resample and len(sample) != resample_size:
        sample = TimeSeriesResampler(resample_size).fit_transform(sample).flatten()
        sample = sample / np.max(np.abs(sample))  # norm between [-1, 1]
    if outlier_reduction:
        OutlierReduction.from_data_and_meta(sample, meta).run_default()
        sample = sample / np.max(np.abs(sample))  # norm between [-1, 1]

    # plot sample
    fig, ax = plot(sample)
    # detect peaks + plotting
    if peak_detection:
        clustering_results, *_, peaks = detect_and_cluster_and_plot_peaks(
            sample, meta["sample_length"], ax, print_console=True
        )
        calc_peak_mean_distance(sample, meta, peaks, clustering_results)
    # detect outlier
    if outlier_detection:
        ax2 = fig.add_axes([1.025, 0.08, 0.05, 0.89])
        plt.ylim(-1.1, 1.1)
        detect_outlier(sample, ax, ax2, meta["sample_length"])
    plt.show()
    # outlier_str = "on" if outlier_reduction else "off"
    # fig.savefig(f"../results/plots/signal-{sample_idx:0>2}-outlier-{outlier_str}.png")
    # print(meta["origin"])

In [21]:
plot_widget = widgets.interactive_output(
    handle_interaction,
    dict(
        dataset_idx=dataset_widget,
        sample_idx=sample_idx_widget,
        resample_size=resample_widget,
        resample=resample_check_widget,
        peak_detection=peak_detection_widget,
        outlier_detection=outlier_detection_widget,
        outlier_reduction=outlier_reduction_widget,
    ),
)

Display GUI.

In [22]:
display(grid_widget, plot_widget)

GridBox(children=(Label(value='Dataset:'), ToggleButtons(options=(('Fernverkehr', 0), ('Güterzüge', 1), ('Regi…

Output()