<img src="../../img/python-logo-no-text.svg"
     style="display:block;margin:auto;width:10%"/>
<br>
<div style="text-align:center; font-size:200%;">
  <b>Auswertung von Messdaten</b>
</div>
<br/>
<div style="text-align:center;">Dr. Matthias Hölzl</div>
<br/>
<div style="text-align:center;">module_360_gui/topic_210_d5_device_app_poll</div>


# Auswertung von Messdaten

In [None]:
from collections import UserList
from dataclasses import dataclass
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
from scipy import signal
from time import sleep
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import tkinter as tk
import tkinter.ttk as ttk

In [None]:
def sine_wave(
    frequency: float, time: npt.ArrayLike, phase: float = 0.0, amplitude: float = 1.0
):
    return np.sin(2.0 * np.pi * time * frequency + phase) * amplitude

In [None]:
xs = np.linspace(0.0, 4.0, 401)

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(xs, sine_wave(1.0, xs), label="basic wave")
plt.plot(xs, sine_wave(1.0, xs, np.pi), label="offset by π")
plt.plot(xs, sine_wave(0.5, xs) * 0.5, label="frequency 0.5")
plt.plot(xs, sine_wave(0.5, xs, np.pi) * 0.5, label="0.5, π")
plt.axhline(y=0, color="black")
plt.legend()

In [None]:
def sawtooth_wave(
    frequency: float, time: npt.ArrayLike, phase: float = 0.0, amplitude: float = 1.0
):
    return (
        signal.sawtooth(2.0 * np.pi * time * frequency + phase + np.pi / 2, 0.5)
        * amplitude
    )

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(xs, sawtooth_wave(1.0, xs), label="basic wave")
plt.plot(xs, sawtooth_wave(1.0, xs, np.pi), label="offset by π")
plt.plot(xs, sawtooth_wave(0.5, xs, amplitude=0.5), label="frequency 0.5")
plt.plot(xs, sawtooth_wave(0.5, xs, phase=np.pi, amplitude=0.5), label="0.5, π")
plt.axhline(y=0, color="black")
plt.legend()

In [None]:
def square_wave(
    frequency: float, time: npt.ArrayLike, phase: float = 0.0, amplitude: float = 1.0
):
    return signal.square(2.0 * np.pi * time * frequency + phase, 0.5) * amplitude

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(xs, square_wave(1.0, xs), label="basic wave")
plt.plot(xs, square_wave(2.0, xs, amplitude=0.5), label="frequency 2")
plt.axhline(y=0, color="black")
plt.legend()

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(xs, sine_wave(1.0, xs), label="sin(x)")
plt.plot(xs, sawtooth_wave(1.0, xs), label="sawtooth(x)")
plt.plot(xs, square_wave(1.0, xs), label="square(x)")
plt.axhline(y=0, color="black")
plt.legend()

In [None]:
WAVE_FUNCTIONS = {
    "Sine Wave": sine_wave,
    "Sawtooth Wave": sawtooth_wave,
    "Square Wave": square_wave,
}

In [None]:
def apply_noise(fun, noise_scale=0.1, noise_loc=0.0):
    def noisy_fun(frequency, time, phase=0.0, amplitude=1.0):
        noise = np.random.normal(loc=noise_loc, scale=noise_scale, size=np.shape(time))
        return fun(frequency, time, phase=phase, amplitude=amplitude) + noise

    return noisy_fun

In [None]:
xs = np.linspace(0, 2.0, 201)

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(xs, apply_noise(sine_wave, 0.05)(1.0, xs), label="sin(x)")
plt.plot(xs, apply_noise(sawtooth_wave, 0.2)(0.5, xs), label="sawtooth(x)")
plt.plot(xs, apply_noise(square_wave, 0.2)(2, xs, amplitude=0.7), label="square(x)")
plt.axhline(y=0, color="black")
plt.legend()

In [None]:
@dataclass
class Device:
    name: str
    amplitude: float = 1.0
    frequency: float = 1.0
    phase: float = 0.0
    noise_scale: float = 0.1
    noise_loc: float = 0.0
    value_function: callable = sine_wave
    n_samples_per_cycle: float = 20
    cycle_time: float = 1.0
    _current_time = 0.0
    _delay_per_call = 0.0

    def get_measurements(self):
        sleep(self._delay_per_call)
        t = self._current_time
        self._current_time += self.cycle_time
        n = self.n_samples_per_cycle
        ts = np.linspace(t, t + self.cycle_time, n, endpoint=False)
        fun = apply_noise(
            self.value_function, noise_loc=self.noise_loc, noise_scale=self.noise_scale
        )
        return list(fun(self.frequency, ts, phase=self.phase, amplitude=self.amplitude))

    def reset(self):
        self._current_time = 0.0

In [None]:
device_1 = Device("device 1", cycle_time=1.0)
device_2 = Device(
    "device 2",
    amplitude=0.8,
    frequency=0.5,
    noise_scale=0.2,
    value_function=square_wave,
    cycle_time=1.0,
)

In [None]:
ys_1, ys_2 = [], []
n_measurements = 4
for t in range(n_measurements):
    ys_1.extend(device_1.get_measurements())
    ys_2.extend(device_2.get_measurements())
xs = np.linspace(0.0, n_measurements, len(ys_1), endpoint=False)

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(xs, ys_1, label="device 1")
plt.plot(xs, ys_2, label="device 2")
plt.axhline(y=0, color="black")
plt.legend()

In [None]:
class DeviceList(UserList):
    def __init__(
        self, devices: Optional[list[Device]] = None, n_samples_per_second: int = 20
    ):
        self.n_samples_per_second = n_samples_per_second
        if devices is None:
            self.data = []
        else:
            self.data = devices
            for device in devices:
                device.n_samples_per_cycle = n_samples_per_second

    def get_measurements(self):
        return [device.get_measurements() for device in self]

    def reset(self):
        for d in self:
            d.reset()

    @property
    def names(self):
        return [device.name for device in self]

In [None]:
ds = DeviceList([device_1, device_2])
ds

In [None]:
np.shape(ds.get_measurements())

In [None]:
n_measurements = 4
measurements = []
ds.reset()
for t in range(n_measurements):
    measurements.append(np.array(ds.get_measurements()))
np.shape(measurements)

In [None]:
ys = np.concatenate(measurements, axis=1)
xs = np.linspace(0.0, n_measurements, np.shape(ys)[1], endpoint=False)

np.shape(xs), np.shape(ys)

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(xs, ys.T, label=ds.names)
plt.axhline(y=0, color="black")
plt.legend()

In [None]:
class MeasurementApp:
    def __init__(
        self, n_devices: int = 3, n_samples_per_cycle=4, n_displayed_cycles=100
    ):
        devices = DeviceList(
            [
                Device(
                    f"Device {n + 1}",
                    n_samples_per_cycle=n_samples_per_cycle,
                    cycle_time=0.1,
                )
                for n in range(n_devices)
            ]
        )
        self.n_samples_per_cycle = n_samples_per_cycle
        self.displayed_cycles = n_displayed_cycles
        self.n_displayed_samples = n_samples_per_cycle * n_displayed_cycles
        self.xs = np.linspace(
            0.0, n_displayed_cycles, self.n_displayed_samples, endpoint=False
        )
        self.ys = np.zeros((n_devices, self.n_displayed_samples))
        self.should_devices_run = False

        self.devices = devices
        # An array of Variable instances so that they don't get garbage collected.
        self._variables = []

        root = tk.Tk()
        root.title("Perform Measurements")
        root.columnconfigure(1, weight=1, minsize=200)
        root.rowconfigure(1, weight=1)
        self.root = root

        main_frame = ttk.Frame(root, padding="3 3 12 12")
        main_frame.grid(column=1, row=1, sticky="NSEW")
        main_frame.columnconfigure(1, weight=1, minsize=200)
        main_frame.columnconfigure(2, weight=2, minsize=400)
        main_frame.rowconfigure(999, weight=1)

        for row, device in enumerate(devices, 1):
            self.add_device_panel(main_frame, device, row)

        self.add_plot_frame(main_frame)

        self.run_button = ttk.Button(main_frame, command=self.run_devices, text="Run")
        self.run_button.grid(column=2, row=10, padx=12, sticky="E")

    def add_device_panel(self, parent, device, row):
        def update_device_name(*args):
            device.name = name_var.get()

        def update_device_function(*args):
            device.value_function = WAVE_FUNCTIONS[function_var.get()]

        def update_device_frequency(*args):
            device.frequency = frequency_var.get()

        def update_device_amplitude(*args):
            device.amplitude = amplitude_var.get()

        def update_device_phase(*args):
            device.phase = phase_var.get()

        frame = ttk.Frame(parent)
        frame["borderwidth"] = 2
        frame["relief"] = "raised"
        frame.grid(column=1, row=row, sticky="NSEW")
        frame.rowconfigure(999, weight=1)
        frame.columnconfigure(2, weight=1)

        ttk.Label(frame, text="Name:").grid(column=1, row=1, sticky="E")
        name_var = tk.StringVar(value=device.name)
        name_var.trace_add("write", update_device_name)
        self._variables.append(name_var)
        name_entry = ttk.Entry(frame, width=6, textvariable=name_var)
        name_entry.grid(column=2, row=1, sticky="EW")

        ttk.Label(frame, text="Wave Function:")
        function_var = tk.StringVar()
        function_var.trace_add("write", update_device_function)
        self._variables.append(function_var)
        function_menu = ttk.OptionMenu(
            frame, function_var, "Sine Wave", *list(WAVE_FUNCTIONS.keys())
        )
        function_menu.grid(column=2, row=2, sticky="EW")

        ttk.Label(frame, text="Frequency:").grid(column=1, row=3, sticky="E", padx=12)
        initial_frequency = device.frequency
        frequency_var = tk.DoubleVar()
        frequency_var.trace_add("write", update_device_frequency)
        self._variables.append(frequency_var)
        frequency_scale = ttk.LabeledScale(
            frame, variable=frequency_var, from_=0.1, to=4.0
        )
        frequency_scale.grid(column=2, row=3, sticky="EW", padx=12)
        frequency_var.set(initial_frequency)

        ttk.Label(frame, text="Amplitude:").grid(column=1, row=4, sticky="E", padx=12)
        initial_amplitude = device.amplitude
        amplitude_var = tk.DoubleVar()
        amplitude_var.trace_add("write", update_device_amplitude)
        self._variables.append(amplitude_var)
        amplitude_scale = ttk.LabeledScale(
            frame, variable=amplitude_var, from_=0.5, to=4.0
        )
        amplitude_scale.grid(column=2, row=4, sticky="EW", padx=12)
        amplitude_var.set(initial_amplitude)

        ttk.Label(frame, text="Phase:").grid(column=1, row=5, sticky="E", padx=12)
        initial_phase = device.phase
        phase_var = tk.DoubleVar()
        phase_var.trace_add("write", update_device_phase)
        self._variables.append(phase_var)
        phase_scale = ttk.LabeledScale(
            frame, variable=phase_var, from_=0.0, to=2 * np.pi
        )
        phase_scale.grid(column=2, row=5, sticky="EW", padx=12)
        phase_var.set(initial_phase)

    def add_plot_frame(self, parent):
        plot_frame = ttk.Frame(parent, padding="3 3 12 12")
        plot_frame.grid(column=2, row=1, sticky="NSEW", rowspan=len(self.devices))

        # TODO: Should this be set to device DPI?
        self.figure = Figure(figsize=(8, 4), dpi=120)
        self.figure_canvas = FigureCanvasTkAgg(figure=self.figure, master=plot_frame)
        NavigationToolbar2Tk(self.figure_canvas, plot_frame)
        self.axes = self.figure.add_subplot()
        self.axes.set_title("Device Measurements")
        self.figure_canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=1)

    def run_devices(self, *args):
        self.should_devices_run = True
        self.run_button["command"] = self.stop_devices
        self.run_button["text"] = "Stop"
        self.update_devices()

    def stop_devices(self, *args):
        self.should_devices_run = False
        self.run_button["command"] = self.run_devices
        self.run_button["text"] = "Run"

    def update_devices(self):
        new_measurements = self.devices.get_measurements()
        self.ys = np.concatenate([self.ys, new_measurements], axis=1)[
            :, -self.n_displayed_samples :
        ]
        self.axes.cla()
        self.axes.plot(self.xs, self.ys.T, label=self.devices.names)
        self.axes.legend(loc="upper right")
        self.figure_canvas.draw()
        if self.should_devices_run:
            self.root.after(100, self.update_devices)

    def run(self):
        self.root.mainloop()

In [None]:
app = MeasurementApp()
app.run()