# Signal functions in depth

* Geometry example, raw acc/gyro data from Wave and compute pose, 3D cube? Gravity? Linear acceleration?
* Filtering / spectrogram example with mic data / wave forms
* ML inference example
* Windowed signal functions: delay? FFT?
* Creating custom signal functions, serialization quirks

Genki Signals offers a wide range of `SignalFunction`s and this notebook is mostly meant to demonstrate some of them.

In [None]:
import os

# NOTE: this isn't required when the library has been installed from PyPI
os.chdir('../')

`SignalFunction`s are functions on signal and can be classified into two classes:

    1. one-to-one
    2. many-to-one (Windowed)

one-to-one `SignalFunction`s take in one sample and return one sample. They can still depend on former samples e.g. `Differentiate` but the number of samples out should be the same as number of samples in.

many-to-one/Windowed `SignalFunction`s take in multiple samples and return one sample. They can still receive a single sample at a time but will only return a value when the window has been filled. As well as defining what to do with the window they need to define the window overlap. 

In [None]:
from genki_signals.system import System
from genki_signals.sources import *
from genki_signals.functions import *
from genki_signals.frontends import *

A good example of a windowed signal is short-time Fourier transform (stft)

In [None]:
mic = MicSource()
stft = FourierTransform("audio", "fourier", window_size=1024, window_overlap=512)
fourier_system = System(mic, [stft])

fourier_system.start()

One way to view data from a `System` is to connect it to a buffer with `System.register_data_feed`. This can be useful to see if a `SignalFunction` is behaving properly

In [None]:
from genki_signals.buffers import DataBuffer

buffer = DataBuffer()

fourier_system.register_data_feed(id(buffer), lambda d: buffer.extend(d))

Now all data that goes through our `System` will be sent to the buffer. Below we can view our buffer.

In [None]:
buffer

In [None]:
fourier_system.stop()

Another example of a `SignalFunction` are the `Inference` / `WindowedInference` functions which take in a onnx file and calculate inference on it.

Let's load a model which predicts the relative depth of an image.

Note: You can also create your own `Inference` function but more on custom `SignalFunctions` later.

In [None]:
import torch

#load the model from torch.hub
midas = torch.hub.load("intel-isl/MiDaS", "MiDaS_small")

input_resolution = (256, 256)
dummy_input = torch.randn((1, 3, *input_resolution))
model_path = "./examples/midas.onnx"

# export the model to onnx
torch.onnx.export(
    midas,
    dummy_input,
    model_path,
    input_names=["input"],
    output_names=["output"],
    # dynamic_axes=({"input": [0]})
)

To connect the model to our camera we need to create a `CameraSource`, a `Sampler` to sample it, and connect it to a `System` which also computes the inference

In [None]:
camera = CameraSource(resolution=input_resolution)
camera_sampler = Sampler({"model_input": camera}, 10)
model_inference = Inference("model_input", "model_output", model_path, stateful=False)
inference_system = System(camera_sampler, [model_inference], update_rate=50)
inference_system.start()

Now the depth image is under the key "model_output" and we can visualize it with our `WidgetFrontend`

In [None]:
from genki_signals.frontends import Video, WidgetFrontend

video = Video("model_input")
depth = Video("model_output")

frontend = WidgetFrontend(inference_system, [video, depth])

frontend

In [None]:
inference_system.stop()

Notes on using multiple signal functions:

If one `SignalFunction` depends on the output of another `SignalFunction` then it needs to come after the other in the initialization of the `System`

An example of a sequence of `SignalFunction`s:

In [None]:
pos_to_vel = Differentiate("mouse_pos", "timestamp", "mouse_vel")
vel_to_acc = Differentiate("mouse_vel", "timestamp", "mouse_acc")
acc_to_vel = Integrate("mouse_acc", "timestamp", "mouse_vel_2")
vel_to_pos = Integrate("mouse_vel_2", "timestamp", "mouse_pos_2", use_trapz=False)

mouse_source = MouseSource()
sampler = Sampler({"mouse_pos": mouse_source}, 100)
system = System(sampler, [pos_to_vel, vel_to_acc, acc_to_vel, vel_to_pos])

system.start()

In [None]:
from genki_signals.frontends import Line, WidgetFrontend

pos =  Line("timestamp", "mouse_pos")
pos2 = Line("timestamp", "mouse_pos_2")
vel = Line("timestamp", "mouse_vel")
vel2 = Line("timestamp", "mouse_vel_2")

frontend = WidgetFrontend(system, [pos, pos2, vel, vel2])
frontend

In [None]:
system.stop()

Note that this method will store all intermediate results.

To prevent that we can wrap these functions in the `SignalFunction`: `Combine`

In [None]:
pos_to_acc_to_pos = Combine([pos_to_vel, vel_to_acc, acc_to_vel, vel_to_pos], name="mouse_pos_2")

system = System(sampler, [pos_to_acc_to_pos])
system.start()

In [None]:
from genki_signals.frontends import Line, WidgetFrontend

pos =  Line("timestamp", "mouse_pos")
pos2 = Line("timestamp", "mouse_pos_2")

frontend = WidgetFrontend(system, [pos, pos2])
frontend

In [None]:
system.stop()

Lastly let's create a custom `SignalFunction`. 

To do that we need to extend the `SignalFunction` base class which takes three arguments input_signals, name and params. This is done for serialization purposes which we will cover further in the next example notebook

Then we only need to implement a `__call__` method for our class.

This method takes in a batch of samples of our input_signals and returns a batch of outputs. 

The signals passed into the function are defined in the `__init__` method (order matters).



In [None]:
import numpy as np
from genki_signals.functions import SignalFunction
from scipy import ndimage, signal

class ConvGrayscaleImage(SignalFunction):
    def __init__(self, input_signal, name, kernel, inverse=False):
        super().__init__(input_signal, name=name, params={"kernel": kernel, "inverse": inverse})
        self.kernel = kernel / np.linalg.norm(kernel)
        self.inverse = inverse

    def __call__(self, signal):
        grayscale = np.average(signal, axis=0, weights=[0.2989, 0.5870, 0.1140])
        l = [ndimage.convolve(grayscale[..., i], self.kernel) for i in range(grayscale.shape[-1])]
        if self.inverse:
            return 255 - np.stack(l, axis=-1)
        return np.stack(l, axis=-1)


def gaussian_kernel(n, std):
    '''
    Generates a n x n matrix with a centered gaussian 
    of standard deviation std centered on it. If normalised,
    its volume equals 1.'''
    gaussian1D = signal.gaussian(n, std)
    gaussian2D = np.outer(gaussian1D, gaussian1D)
    return gaussian2D

kernel = np.array([[-1, -1, -1, -1, -1],
                   [-1,  1,  2,  1, -1],
                   [-1,  2,  4,  2, -1],
                   [-1,  1,  2,  1, -1],
                   [-1, -1, -1, -1, -1]])

gaussian = gaussian_kernel(5, 1)

gaussian_edges = ndimage.convolve(gaussian, kernel)

conv = ConvGrayscaleImage("video", "video_edges", gaussian_edges, True)

In [None]:
video = CameraSource(0)
sampler = Sampler({"video": video}, sample_rate=60)
system = System(sampler, [conv])

system.start()

In [None]:
video_normal = Video("video")
video_edges = Video("video_edges")

frontend = WidgetFrontend(system, [video_normal, video_edges])

frontend

In [None]:
system.stop()