# Introduction to signal processing (Master Erasmus Mundus CYBER)


https://github.com/deepcharles/master-erasmus-mundus-cyber

This session is related to all pre-processing steps that can be used to transform a raw signal into a signal of interest for machine learning algorithms : denoising, detrending, removal of impulsive noise and interpolation.

<div class="alert alert-block alert-info">

1. **Library and data loading**
2. **Exploratory study**
3. **Denoising**
4. **Detrending**
5. **Detection and removal of impulsive noise**
</div>

## 1. Library and data loading

In [None]:
from dataclasses import dataclass

import numpy as np
import scipy
import statsmodels.api as sm
from matplotlib import pyplot as plt
from numpy.typing import NDArray
from statsmodels.regression.linear_model import yule_walker
from statsmodels.tsa.tsatools import detrend

SAMPLING_FREQUENCY = 100  # Hz

In [None]:
def fig_ax(figsize=(20, 5)):
    fig, ax = plt.subplots(figsize=figsize)
    ax.set_xmargin(0)
    return fig, ax

In [None]:
def plot_spectrogram(signal: NDArray, nperseg: int, noverlap: int):
    """Compute and display the spectrogram.

    nperseg is the window length and noverlap is the overlap length.
    """
    # Compute spectrogram
    freqs, time_array, Sxx = scipy.signal.stft(
        x=signal, fs=SAMPLING_FREQUENCY, nperseg=nperseg, noverlap=noverlap
    )
    # Display spectrogram
    fig, ax = fig_ax(figsize=(10, 5))
    ax.pcolormesh(time_array, freqs, abs(Sxx))
    ax.set_xlabel("Time (second)")
    ax.set_ylabel("Frequency (Hz)")

In [None]:
def plot_periodogram(signal: NDArray, nperseg: int, ax=None):
    # Compute spectrogram
    freqs, Pxx_den = scipy.signal.welch(signal, fs=SAMPLING_FREQUENCY, nperseg=nperseg)
    # Display spectrogram
    if ax is None:
        fig, ax = fig_ax(figsize=(10, 5))
    ax.semilogy(freqs, Pxx_den)
    ax.set_xlabel("Frequency (Hz)")
    ax.set_ylabel("PSD [V**2/Hz]")

In [None]:
@dataclass
class Data:
    x: NDArray
    t: NDArray
    y1: NDArray
    y2: NDArray
    y3: NDArray
    y4: NDArray
    n: int
    n2: int
    Fs: int

    @property
    def corrupted_index(self):
        return (np.sort(np.round(self.n)) - 1).astype(int)

    @property
    def signal(self):
        return self.x

    @property
    def signal_with_noise(self):
        return self.y1

    @property
    def signal_with_trend(self):
        return self.y2

    @property
    def signal_with_impulsive_noise(self):
        return self.y3

    @property
    def time_array(self):
        return self.t

    @classmethod
    def load(cls, fname: str) -> "Data":
        data = np.load(fname)
        return cls(**dict(list(data.items())))

In [None]:
# Data loading
data = Data.load("Session 2.npz")

## 2. Exploratory study

In [None]:
print(f"Sampling Frequency : {SAMPLING_FREQUENCY} Hz")
print(f"Number of samples : {data.signal.shape[0]}")


fig, ax = fig_ax()
ax.plot(data.time_array, data.signal)
_ = ax.set_xlabel("Time (seconds)")

<div class="alert alert-success" role="alert">
    <p><b>Question</b></p>
    <p>What is the main frequency of the signal? (Use code from the previous session.)</p>
</div>

## 3. Denoising

A popular solution for denoising a signal consists in filtering the signal with appropriate cut frequencies. One practical procedure is to study the PSD of the noisy signal in order to estimate the frequency bands that are unlikely to belong to the signal.

In [None]:
# Plot of the original and noisy signals
fig, ax = fig_ax()
ax.plot(data.time_array, data.signal, label="Original signal")
ax.plot(data.time_array, data.signal_with_noise, label="Noisy signal")
_ = ax.set_xlabel("Time (seconds)")
_ = plt.legend()

In [None]:
fig, ax = fig_ax(figsize=(10, 5))
plot_periodogram(signal=data.signal, nperseg=256, ax=ax)
plot_periodogram(signal=data.signal_with_noise, nperseg=256, ax=ax)
_ = ax.set_xlim(0, 20)

According to the PSD, it appears that most of the power of the original signal lies in the frequency band before 10 Hz: the remaining content is likely to be only composed of noise. Therefore, we can perform denoising by applying a low-pass filter

<div class="alert alert-success" role="alert">
    <p><b>Question</b></p>
    <p>Apply a low-pass filter? (Use the code from the previous session.)</p>
</div>

In [None]:
# filter signal
cutoff_freq = ...  # your code
filtered = ...  # your code

fig, ax = fig_ax()

# Plot angular velocities
ax.plot(data.time_array, data.signal_with_noise, label="Noisy signal")
ax.plot(data.time_array, filtered, label="Filtered")

# add labels to axis and title
ax.set_xlabel("Time (s)")
_ = plt.legend()

## 4. Detrending

Trends can be removed either by using low-pass filtering or by using a regression on a set of smooth functions such as polynoms (see Session 3) so as to extract the trend component

In [None]:
# compute trend signal
detrended = detrend(data.signal_with_trend, order=1)

fig, ax = fig_ax()
# Plot angular velocities
ax.plot(data.time_array, data.signal, label="Original signal")
ax.plot(data.time_array, detrended, label="Detrended signal")

# add labels to axis and title
ax.set_xlabel("Time (s)")
_ = plt.legend()

In [None]:
fig, ax = fig_ax()
# Plot angular velocities
ax.plot(data.time_array, data.signal, label="Original signal")
ax.plot(data.time_array, data.signal_with_trend - detrended, label="Trend")

# add labels to axis and title
ax.set_xlabel("Time (s)")
_ = plt.legend()

<div class="alert alert-success" role="alert">
    <p><b>Question</b></p>
    <p>What is the best polynomial degree for the trend?</p>
</div>

## 5. Detection and removal of impulsive noise

Impulsive noise of small lengths (several samples) can be suppressed by applying non linear filters such as median filters. In case of large bursts (more than 10 samples), impulsive noise has to be handled in two phases : detection (finding the locations of the corrupted samples) and interpolation (replacing corrupted samples with more appropriate values). 

* **Detection phase :** given estimates of the AR parameters, computation of the quantity $$d[n]=x[n]+\sum_{i=1}^p \hat{a}_i x[n-i]$$ The set of corrupted samples can determined as $$\mathcal{T} = \left\lbrace n \mbox{ s.t. } |d[n]|>\lambda \right\rbrace$$
* **Interpolation phase :** minimization of the quantity $$\sum_{n=p+1}^{N-1}\left|x[n] + \sum_{i=1}^p \hat{a}_i x[n-i]\right|^2$$

In [None]:
# Plot of the original and noisy signals

fig, ax = fig_ax()
ax.plot(data.time_array, data.signal, label="Signal")
ax.plot(
    data.time_array,
    data.signal_with_impulsive_noise,
    label="Signal with impulsive noise",
)

plt.xlabel("Time (seconds)")
_ = plt.legend()

In [None]:
# Naive approach : Median filtering
filtered = scipy.signal.medfilt(data.signal_with_impulsive_noise, kernel_size=3)

fig, ax = fig_ax()
ax.plot(
    data.time_array,
    data.signal_with_impulsive_noise,
    label="Signal with impulsive noise",
)
ax.plot(data.time_array, filtered, label="Median filter")
ax.set_xlabel("Time (seconds)")
_ = plt.legend()

<div class="alert alert-success" role="alert">
    <p><b>Question</b></p>
    <p>What happens when the kernel size increases?</p>
</div>

In [None]:
# Detection of corrupted samples based on AR model
def detection_impulsive(x, p, K):
    # Estimation of the AR parameters
    a, sigma_e = yule_walker(x, p, method="mle", demean=False)
    Nw = np.size(x)
    # Computation of the prediction error
    d = np.zeros((Nw,))
    for j in range(p, Nw):
        d[j] = x[j] - np.sum(np.dot(x[j - p : j], np.flip(a)))
    d = np.abs(d)
    # Thresholding the prediction error to find the locations of the corrupted samples
    lambda_K = K * sigma_e
    T = np.where(d > lambda_K)
    T = T[0]
    T = T[(T > p - 1) & (T < Nw - p)]
    return T

In [None]:
p = 10  # Order of the model
K = 1  # Detection threshold
T = detection_impulsive(data.signal_with_impulsive_noise, p, K)

fig, ax = fig_ax()
ax.plot(data.time_array, data.signal_with_impulsive_noise)
ax.plot(data.time_array[T], data.signal_with_impulsive_noise[T], "o")
_ = ax.set_xlabel("Time (seconds)")

In [None]:
# Interpolation based on AR model
def interpolation_impulsive(x, p, T):
    a, sigma_e = yule_walker(x, p, method="mle", demean=False)
    Nw = np.size(x)
    a = np.concatenate(([1], -a))
    m = np.size(T)
    b = np.zeros((p + 1,))
    for i in range(p + 1):
        b[i] = np.sum(np.dot(a[0 : p - i + 1], a[i : p + 1]))

    B = np.zeros((m, m))
    for i in range(m):
        for j in range(m):
            if np.abs(T[i] - T[j]) < p + 1:
                B[i, j] = b[np.abs(T[i] - T[j])]

    z = np.zeros((m,))
    for i in range(m):
        for k in range(-p, p + 1):
            if np.all(T != T[i] - k):
                z[i] = z[i] + b[np.abs(k)] * x[T[i] - k]

    y = -np.dot(np.linalg.inv(B), z)
    return y

In [None]:
# Interpolation when the locations of corrupted samples are known

p = 1  # Order of the model
filtered_points = interpolation_impulsive(
    data.signal_with_impulsive_noise, p, data.corrupted_index
)

filtered = np.copy(data.signal_with_impulsive_noise)
filtered[data.corrupted_index] = filtered_points

fig, ax = fig_ax()
ax.plot(data.time_array, data.signal, label="Original")
ax.plot(data.time_array, filtered, label="Denoized")
plt.xlabel("Time (seconds)")
_ = plt.legend()

<div class="alert alert-success" role="alert">
    <p><b>Question</b></p>
    <p>What is the best filter order?</p>
</div>