<style>       
    hr{
        height: 4px;
        background-color: rgb(247,148,9);
        border: none;
    }
</style>
<div style="color=white;
           display:fill;
           border-radius:5px;
           background-color:rgb(34,41,49)">
<hr>
<div align="right"><i>BTE5034 - Digital Signal Processing &nbsp;</i></div>
<div align="right">EIT - BFH &nbsp;</div>

<div style="clear: both; font-size: 30pt; font-weight: bold;">
    Practical introduction to discrete-time Filters
</div>
<hr>
</div>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import IPython

import ipywidgets as wg

In [None]:
plt.rcParams["figure.figsize"] = (14,4)

# Denoising

In a denoising scenario we have a "clean" signal $x[n]$ that has been corrupted by an additive noise signal $\eta[n]$; we only have access to $\hat{x}[n] = x[n] + \eta[n]$ and we would like to recover $x[n]$.

In general, without further assumptions, this is not a solvable problem. However, it is generally the case that the signal and the noise have very different characteristics and, in this case, we can try to reduce the amount of noise via filtering. Typically, if we look in the time domain:
 * the clean signal is varying slowly and smoothly
 * the noise is low-amplitude with respect to the signal and it varies very fast from one sample to the next.

## A generator for smooth noisy signals

The following function can be used to generate an $N$-point smooth signal together with a noise-corrupted version at the specified signal to noise ratio; the spectrum of the smooth signal will contain most of its energy in the $[-B\pi, B\pi]$ range. You don't need to worry about how the function works, simply use it as a black box.

In [None]:
# Only for the very curious students!!

def sig_gen(N: int, SNR: float, B=0.02, x=None) -> [np.ndarray, np.ndarray]:
    # if you pass a clean signal x, the function will just add noise
    if x is None:
        # build a smooth signal by creating a DFT vector X of length 2 * int(N * B) + 2 and
        #  taking a 2N-point inverse DFT. The resulting signal will be bandlimited to 2pi * B.
        # X[0] = 0 to ensure zero mean; other values randomly distributed over [-1, 1]
        X = np.r_[0, np.random.uniform(-1, 1, 2 * int(N * B) + 1)]
        # at this point the signal's energy is the number of nonzero DFT coefficients times
        #  their variance, divided by the number of time samples: (2NB)(1/3)/(2N) = B / 3
        # We take the real part only, so energy is B / 6
        # To get a signal with approx unit peak, normalize by sqrt of twice the power
        #  (pretending the signal is sinusoidal so that peak = RMS * sqrt(2))
        x = np.real(np.fft.ifft(X, 2*N))[:N] / np.sqrt(2 * B / 3 / N)
    # amplitude of the noise from the desired SNR, knowing that now signal energy is 0.125
    a = np.sqrt((3.0 / 8.0) / np.power(10, SNR / 10)) 
    return x, x + np.random.uniform(-a, a, len(x))

Use the following interactive widget to play with the SNR and the B parameters and try to get a feel for their effect on the  signal generated by the function:

In [None]:
def display(SNR=10, Bw=20, dfilt=None, param=None):
    LEN, SHOW = 1000, slice(200, 800)
    if display.data['x'] is None or display.data['Bw'] != Bw:
        x, x_hat = sig_gen(LEN, SNR, Bw / 1000)
        display.data.update({'SNR': SNR, 'Bw': Bw, 'x': x, 'x_hat': x_hat})
    elif display.data['SNR'] != SNR:
        x, x_hat = sig_gen(LEN, SNR, Bw / 1000, display.data['x'])
        display.data.update({'SNR': SNR, 'x_hat': x_hat})
    else:
        x, x_hat = display.data['x'], display.data['x_hat']
    plt.plot(x_hat[SHOW], color=('red', 0.5), lw=1, label='noisy');
    plt.plot(x[SHOW], 'C3', lw=2, label='clean');
    if dfilt is not None:
        plt.plot(dfilt(x_hat, param)[SHOW], 'C0', lw=3, label='denoised');
    plt.ylim(-1.2,1.2);
    plt.legend(loc="upper right");
    
# hold the current data in a dict as a function attribute (hacky, I know...)
display.data = {'SNR': None, 'Bw': None, 'x': None, 'x_hat': None}    

In [None]:
wg.interact(display, SNR=(0.0, 50.0), Bw=(5, 40, 1), dfilt=wg.fixed(None), param=wg.fixed(None));

### Exercise: check the SNR

Given a noise-corrupted signal $\hat{x}[n] = x[n] + \eta[n]$, the signal-to-noise ratio is expressed in dB and is computed as 

$$
    \text{SNR}_{\hat{x}} = 10 \log_{10}\left(\frac{E_x}{E_\eta}\right)
$$

where $E_x$ is the energy of the clean signal and $E_\eta$ is the energy of the noise. 

Generate a noisy signal and verify numerically that the SNR of the sequence returned by `sig_gen()` is indeed close to the SNR passed as an argument to the function.

In [None]:
N, SNR = 1000, 5
x, x_hat = sig_gen(N, SNR)

In [None]:
plt.plot(x);
plt.plot(x_hat);

In [None]:
E_x = np.sum(x ** 2)
E_eta = np.sum((x - x_hat) ** 2)
SNR_exp = 10 * np.log10(E_x/E_eta)
print(SNR_exp)

## Denoising via averaging

As a first idea, try to remove the noise by replacing each sample with a local average of the past $M$ input points; the resulting signal is
$$
  y[n] = (x[n] + x[n-1] + \ldots + x[n-M+1])/M
$$ 

Let's implement this in the simplest possible way:

In [None]:
# vector to store the output
y = np.zeros(len(x))

# number of points to use in the average
M = 20

# let's go
for n in range(len(x)):
    for m in range (M):
        y[n] += x_hat[n-m] 
    y[n] /= M  # average of x[n], x[n-1], x[n-2]  for M points

plt.plot(x_hat, color=('red', .5), lw=1, label='noisy');
plt.plot(x, 'C3', lw=2, label='clean');
plt.plot(y, 'C0', lw=3, label='denoised');
plt.legend(loc="upper right");

### A better implementation

Usually we want to implement our processing algorithms as functions; also, we want to use NumPy's built-ins whenever we can:

In [None]:
def mavg(x: np.ndarray, M: int) -> np.ndarray:
    N = len(x)
    y = np.zeros(N)
    # prepend M-1 zeros to x so that we can always "go back" M samples to compute the average
    x = np.r_[np.zeros(M-1), x]
    for n in range(0, N):
        # we can use the built-in averaging function in numpy over the previous M samples
        y[n] = np.mean(x[n:n+M])        
    return y

With this, we can play around interactively and see how the number of points used in the average affects the result:

In [None]:
wg.interact(display, SNR=(0.0, 50.0), Bw=(5, 40, 1), dfilt=wg.fixed(mavg), param=wg.IntSlider(min=2, max=100, value=3, description='M'));

## Denoising with a recursive filter

The idea here is to _update_ the previous estimate of the average as we go along; we do this by keeping a large proportion of the previous estimate and adding a small amount of the current input sample
$$
    y[n] = \lambda y[n-1] + (1-\lambda)x[n]
$$
where $\lambda$ is a parameter between zero and one. The system that implements this recursive update is called a _leaky integrator_.

As we have seen in class, this recursive relation also describes a discretized RC lowpass circuit; it's not a coincidence that this works for denoising since here we want to remove the fast variations due to the noise while preserving the smooth, slowly-varying signal envelope and this is what a lowpass filter does.

In [None]:
def leaky(x: np.ndarray, lam: float) -> np.ndarray:
    y = np.zeros(len(x))
    for n in range(0, len(x)):
        y[n] = lam * y[n-1] + (1 - lam) * x[n]        
    return y

In [None]:
interact(display, SNR=(0.0, 50.0), Bw=(5, 40, 1), dfilt=wg.fixed(leaky), param=wg.FloatSlider(min=0, max=0.999, value=0.5, description='$\\lambda$'));