<div style="margin: 0 auto 30px; height: 60px; border: 2px solid gray; border-radius: 6px;">
  <div style="float: left;"><img src="img/epfl.png" /></div>
  <div style="float: right; margin: 20px 30px 0; font-size: 10pt; font-weight: bold;"><a href="https://moodle.epfl.ch/course/view.php?id=18253">COM202 - Signal Processing</a></div>
</div>
<div style="clear: both; font-size: 30pt; font-weight: bold; color: #483D8B;">
    Lab 7: Discrete-time Filters
</div>

In this notebook we will learn how to implement and use two discrete-time  filters called the Leaky Integrator and the Moving Average. In spite of their simplicity, these two lowpass filters are extremely useful and they often represent a good initial stand-in for more complex filters when prototyping a signal processing application.

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import IPython
import scipy.signal as sp
from scipy.io import wavfile

# interactivity library:
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

In [None]:
plt.rcParams["figure.figsize"] = (14,4)

# Implementing discrete-time filters

The input-output relationship for a realizable causal filter can always be expressed as a constant-coefficient difference equation (CCDE) of the form
\[
    y[n] = \sum_{k=0}^{M} b_k x[n-k] - \sum_{k=1}^{N} a_k y[n-k] 
\]

The CCDE is an algorithm using only scalar multiplications, additions, and memory cells to implement the required delays. Although ready-made filtering functions are available in SciPy, it's always instructive to try and code a digital filter from scratch in order to understand the finer details of a practical implementation. 

## Filters as Python functions

Using Python and NumPy, filters can be implemented as standalone [pure functions](https://en.wikipedia.org/wiki/Pure_function); the arguments are going to be

 * an array containing the entire input signal
 * the CCDE coefficients $a_k$ and $b_k$

and the return value is going to be an array, with the same size as the input, containing the entire output signal, as shown in this template:

In [None]:
def dt_filter(x: np.ndarray, filter_parameters) -> np.ndarray:
    y = np.zeros(len(x))
    for n in range(0, len(x)):
        y[n] = ...  # compute each output sample        
    return y

### Causality and zero initial conditions

Filters are considered causal by default, and therefore the computation of each output sample $y[n]$ involves only _past_ input and output samples.

The input passed as an argument is interpreted as an array containing the values $x[0], x[1], \ldots, x[N-1]$. We assume _zero initial conditions_, which means:

 * the input is a causal sequence, i.e., $x[n] = 0$ for $n <  0$; if the algorithm requires past values of $x[n]$ for negative values of the index $x$, these values are assumed to be zero
 * for recursive filters whose CCDE uses past _output_ values, we set $y[n] = 0$ for $n < 0$; this is equivalent to initializing all delay elements at zero

### Termination

The input to the filtering function is an array of length $N$ and, by convention, the return value is also an array of length $N$. Note however that this does not mean that $y[n] = 0$ for $n \ge N$, it simply means that no more output samples can be computed unless more input samples are provided.

In fact, if for instance we were to assume that $x[n]=0$ for $n \ge N$, an IIR filter would produce an infinite-length output sequence $y[n]$. Sometimes, it is useful to append a suitable amount of zeros to the input array so that the output can naturally decay to a small amplitude once the actual data in the array has been processed.

## The Leaky Integrator

The following function implements a Leaky Integrator described by the constant-coefficient difference equation (CCDE) 

$$
    y[n] = \lambda y[n-1] + (1-\lambda)x[n], \qquad 0 < \lambda < 1.
$$

In [None]:
def leaky(x: np.ndarray, lam: float) -> np.ndarray:
    y = np.zeros(len(x))
    for n in range(0, len(x)):
        y[n] = lam * y[n-1] + (1 - lam) * x[n]        
    return y

### Ensuring zero initial conditions 

The Leaky Integrator is a recursive (IIR) filter and its output at time $n$ depends on the output at time $n-1$; assuming zero initial conditions, when $n=0$ the required "previous output value" is $y[-1] = 0$. 

In the implementation above, when `n > 0` the expression `y[n-1]` is indeed pointing to the previously-computed output value. In the first iteration, however, `n` is equal to zero and so the expression `y[n-1]` is equivalent to `y[-1]`. Contrary to many other programming languages Python allows negative indexing so that  `y[-1]` actually points to the _last_ element in the array `y`. Since the output array `y` is pre-allocated and filled with zeros, `y[-1]` is indeed equal to zero as required.

### Testing the code

The impulse response of the Leaky Integrator is $h[n] = (1-\lambda)\lambda^n u[n]$ and we can verify that the above implementation is correct by comparing the theoretical values to the output of the function when the input is a (truncated) delta sequence:

In [None]:
N = 100
lam = 0.95
plt.stem(leaky(np.r_[1, np.zeros(N-1)], lam), 'C0', basefmt='C0-', label=r"$h[n]$ (using the filter)");
plt.plot((1 - lam) * lam ** np.arange(0, N), 'C1', linewidth=2, label=r"$(1-\lambda)\lambda^n$ (theoretical impulse response)");
plt.legend();

## The Moving Average

The causal Moving Average filter of length $M$ is described by the CCDE

$$
    y[n] = \frac{1}{M}\sum_{k=0}^{M-1}x[n-k].
$$

The Moving Average is an FIR filter, that is, each output sample is computed using only past input samples; in a practical implementation, values for $x[n]$ when $n<0$ are equal to zero because of the causality assumption for $x[n]$.

### Exercise: implement the Moving Average filter

Complete the code below

In [None]:
def mavg(x: np.ndarray, M: int) -> np.ndarray:
    ...        
    return y

In [None]:
# SOLUTION

def mavg(x: np.ndarray, M: int) -> np.ndarray:
    N = len(x)
    y = np.zeros(N)
    # prepend M-1 zeros to x so that we can always "go back" M samples to compute the average
    x = np.r_[np.zeros(M-1), x]
    for n in range(0, N):
        # we can use the built-in averaging function in numpy over the previous M samples
        y[n] = np.mean(x[n:n+M])        
    return y

Let's test your implementation:

In [None]:
y = mavg((-1) ** np.arange(0, 40), 20)  # test signal, filtered
print('good job!' if np.sum(y[1::2]) == 0 and np.sum(y[:20:2]) == 0.5 else 'Sorry, try again!')

# Applications: Denoising

In a denoising scenario we have a "clean" signal $x[n]$ that has been corrupted by an additive noise signal $\eta[n]$; we only have access to $\hat{x}[n] = x[n] + \eta[n]$ and we would like to recover $x[n]$.

In general, without further assumptions, this is not a solvable problem. However, it is generally the case that the signal and the noise have very different characteristics and, in this case, we can try to reduce the amount of noise via filtering. Typically, if we look in the time domain:
 * the clean signal is varying slowly and smoothly
 * the noise is low-amplitude with respect to the signal and it varies very fast from one sample to the next.
 
These two characteristics translate to the following properties in the frequency domain:
 * the clean signal contains most of its energy in the low frequencies around zero
 * the noise has a full-band spectrum, with almost equal energy at all frequencies.

## A signal generator

The following function can be used to generate an $N$-point smooth signal together with a noise-corrupted version at the specified signal to noise ratio; the spectrum of the smooth signal will contain most of its energy in the $[-B\pi, B\pi]$ range. You don't need to worry about how the function works, simply use it as a black box with the syntax

`x = sig_gen(N, SRN, B)`

where
 * `N` is the number of samples
 * `SNR` is the signal to noise ratio in dB
 * `B` is the positive bandwidth of the clean signal expressed as a percentage of the maximum bandwidth; small values (e.g. 2%) will yield slow-varying signals, and vice-versa

In [None]:
# Only for the very curious students!!

def sig_gen(N: int, SNR: float, B=4, x=None) -> [np.ndarray, np.ndarray]:
    B /= 100.0
    if x is None:
        # build a smooth signal by creating a DFT vector X of length 2 * int(N * B) + 2 and
        #  taking a 2N-point inverse DFT. The resulting signal will be bandlimited to 2pi * B.
        # X[0] = 0 to ensure zero mean; other values randomly distributed over [-1, 1]
        X = np.r_[0, np.random.uniform(-1, 1, 2 * int(N * B) + 1)]
        # at this point the signal's energy is the number of nonzero DFT coefficients times
        #  their variance, divided by the number of time samples: (2NB)(1/3)/(2N) = B / 3
        # We take the real part only, so energy is B / 6
        # To get a signal with approx unit peak, normalize by sqrt of twice the power
        #  (pretending the signal is sinusoidal so that peak = RMS * sqrt(2))
        x = np.real(np.fft.ifft(X, 2*N))[:N] / np.sqrt(2 * B / 3 / N)
    # amplitude of the noise from the desired SNR, knowing that now signal energy is 0.125
    a = np.sqrt((3.0 / 8.0) / np.power(10, SNR / 10)) 
    return x, x + np.random.uniform(-a, a, len(x))

Use the following interactive widget to play with the SNR and the B parameters and try to get a feel for their effect on the  signal generated by the function:

In [None]:
class DisplaySignal:
    def __init__(self):
        self.N = 500
        self.SNR, self.B = 0, 2
        self.x, self.x_hat = None, None

    def display(self, SNR=12, B=2, param=None):
        if SNR != self.SNR or B != self.B:
            self.x, self.x_hat = sig_gen(self.N, SNR, B, x=(self.x if B == self.B else None)) 
        self.SNR, self.B = SNR, B
        plt.plot(self.x_hat, 'C3', lw=1, label='noisy');
        plt.plot(self.x, 'C0', lw=2, label='clean');
        y = self.filter(param)
        if y is not None:
            plt.plot(y, 'C2', lw=4, label='denoised');
        plt.ylim(-1.2,1.2)
        plt.legend(loc="upper right");

    def filter(self, param):
        return None

In [None]:
interact(DisplaySignal().display, SNR=(0.0, 50.0), B=(1, 10, 1), param=fixed(None));

### Exercise: checking the SNR

Given a noise-corrupted signal $\hat{x}[n] = x[n] + \eta[n]$, the signal-to-noise ratio is expressed in dB and is computed as 

$$
    \text{SNR}_{\hat{x}} = 10 \log_{10}\left(\frac{E_x}{E_\eta}\right)
$$

where $E_x$ is the energy of the clean signal and $E_\eta$ is the energy of the noise. 

Generate a noisy signal and verify numerically that the SNR of the sequence returned by `sig_gen()` is indeed close to the SNR passed as an argument to the function.

In [None]:
N, SNR = 1000, 30
x, x_hat = sig_gen(N, SNR)

E_x = ...
E_eta = ...
SNR_exp = ...
print(SNR_exp)

In [None]:
# SOLUTION

x, x_hat = sig_gen(N, SNR)
E_x = np.sum(x ** 2)
E_eta = np.sum((x_hat - x) ** 2)
SNR_exp = 10 * np.log10(E_x / E_eta)
print(SNR_exp)

## Denoising with a leaky integrator

The following interactive widget allows you to play with the SNR of the noisy signal and with the parameter $\lambda$ of a leaky integrator to see the denoising performance of the filter in the time domain. Try to find a value for $\lambda$ that provides a good compromise between removal of the noise and preservation of the original clean signal.

In [None]:
class DenoiseLeaky(DisplaySignal):
    def filter(self, param):
        return leaky(self.x_hat, param)

In [None]:
interact(DenoiseLeaky().display, SNR=(0.0, 50.0), B=(1, 10, 1), 
         param=widgets.FloatSlider(min=0.48, max=0.99, step=0.02, value=0.6, description='lambda'));

### Exercise: denoising in frequency

<div style="float: right; margin: 0 10px 0 30px;"><img src="img/plot.png" width="250"></div>

Plot the magnitude spectra of the clean, noisy, and denoised signals, together with the magnitude response of the leaky integrator, using the values for SNR, $B$, and $\lambda$ that you found satisfactory by using the widget. 

Remember that the magnitude response of the Leaky Integrator is 

$$
    |H(\omega)| = \frac{(1-\lambda)}{\sqrt{1 - 2\lambda \cos\omega + \lambda^2}}.
$$

To obtain the plot:
 * plot the filter's magnitude response over the $[-\pi, \pi]$ interval using the above formula for $|H(\omega)|$
 * compute $E_x$, the energy of the clean signal, then normalize the clean, noisy and denoised signals by $2\sqrt{E_x}$ before computing their DFTs
 * plot the magnitude of the DFTs so that they are aligned with the frequency response of the filter (using the techniques we developed in Lab 2.b for DFT plots)

In the end you should obtain three plots similar to those shown here on the right.

In [None]:
PTS, SNR, B, lam = 500, 12, 2, 0.8
x, x_hat = sig_gen(PTS, SNR, B)

# your code here
...

In [None]:
# SOLUTION
PTS, SNR, B, lam = 500, 12, 2, 0.8
x, x_hat = sig_gen(PTS, SNR, B)

d = leaky(x_hat, lam)
norm = 2 * np.sqrt(np.sum(x ** 2))

w = np.linspace(-np.pi, np.pi, 2000)
H = (1 - lam) / np.sqrt(1 - 2 * lam * np.cos(w) + lam**2)

for sig, label, color in [(x, 'clean', 'C0'), (x_hat, 'noisy', 'C3'), (d, 'denoised', 'C2')]:
    plt.figure()
    S = np.abs(np.fft.fftshift(np.fft.fft(sig / norm)))
    plt.plot(np.linspace(-np.pi, np.pi, len(S)), S, c=color, label=label);  # linspace(-pi, pi) is correct if PTS is even.
                                                                            # For PTS odd the bounds would have to be shifted slightly.
    plt.plot(w, H, 'C1', lw=3, label='mag resp')
    plt.xlabel(f"$\\omega$")
    plt.legend();

## Denoising with a Moving Average filter

In this section we will explore the denoising properties of a moving average filter by adapting the code of the previous section.

### Exercise: MA denoising in time

Modify the code for the [leaky integrator denoising widget](#Denoising-with-a-leaky-integrator) so that it uses a moving average filter instead; in the new function, the interactive widget will need to pass the desired length for the filter. 

In [None]:
class DenoiseMA(DisplaySignal):
    ... # your code here

In [None]:
# SOLUTION

class DenoiseMA(DisplaySignal):
    def filter(self, param):
        return mavg(self.x_hat, param)

In [None]:
interact(DenoiseMA().display, SNR=(0.0, 50.0), B=(1, 10, 1), 
         param=widgets.IntSlider(min=2, max=100, step=1, value=5, description='M'));

### Exercise: MA denoising in frequency

Modify the code in you wrote for the exercise ["denoising in frequency"](#Exercise:-denoising-in-frequency) so that the plots show the magnitude response and the filtering results of a moving average filter.

The magnitude response in this case is going to be

$$
    |H(\omega)| = \frac{1}{M}\left|\frac{\sin(M \omega/2)}{\sin(\omega / 2)}\right|.
$$

In [None]:
# SOLUTION
PTS, SNR, B, M = 500, 12, 2, 20
x, x_hat = sig_gen(PTS, SNR, B)

d = mavg(x_hat, M)
norm = 2 * np.sqrt(np.sum(x ** 2))

w = np.linspace(-np.pi, np.pi, 2000)
H = np.abs(np.sin(M * w / 2) / np.sin(w / 2) / M)

for sig, label, color in [(x, 'clean', 'C0'), (x_hat, 'noisy', 'C3'), (d, 'denoised', 'C2')]:
    plt.figure()
    S = np.abs(np.fft.fftshift(np.fft.fft(sig / norm)))
    plt.plot(np.linspace(-np.pi, np.pi, len(S)), S, c=color, label=label);  
    plt.plot(w, H, 'C1', lw=3, label='mag resp')
    plt.legend();

# Applications: Detrending (aka DC removal)

In many (if not all) signal processing applications, we prefer signals to be _balanced,_ that is, we want the average value of the signal to be zero. Indeed, all physical processing devices (and digital devices in particular) can only deal with a finite range of possible input values before things like distortion or breakdown start to happen, and this nominal input range is usually centered around zero. If a signal is not balanced, it will not be able to fully use the available input range of a processing device.

As an example, assume that a processing device can only accept input values in the interval $[-1, 1]$; a signal $x[n]$ such that $\max_n\{x[n]\} = 0.8$ and $\min_n\{x[n]\} = -0.8$ will be processed without problems; but the unbalanced signal $y[n] = x[n] + 0.5$ will exceed the device's input limits even though the signal's range, $\max_n\{y[n]\} - \min_n\{y[n]\}$, is the same as for $x[n]$.

## DC removal

The mean of a signal is also known as its DC (direct current) component, since a physical signal with a constant voltage offset will cause a direct (i.e. non-alternating) current to flow into a resistive load. This is obviously a waste of power, hence the need to remove such an offset.

As you may have noticed while playing with the interactive widgets before, if you push the value of $\lambda$ very close to one in a Leaky integrator or if you use a large value of $M$ in the moving average, the output of the filter tends to converge to the mean of the signal.

The idea, therefore, is to use a Leaky Integrator or a Moving Average filter to locally _detrend_ a signal, that is, remove its estimated local mean value to balanced the signal.

### Exercise: average estimation with Leaky Integrator and Moving Average

The following cell creates a signal and offsets it by a constant amount. Use a Leaky Integrator and a Moving Average filter to estimate the value of the offset in order to balance back the signal. Compute the empirical mean of the entire signal and find the values for $\lambda$ and $M$ that provide a good estimate over time

In [None]:
def sig_gen_DC(dc_offset, N=2000, SNR=100, B=4):
    x, _ = sig_gen(N, SNR, B)
    return x + dc_offset

In [None]:
# change these two values until you get a reasonably good estimate of the offset
lam = 0.5
M = 2

In [None]:
offset = 0.4
x = sig_gen_DC(offset)
m = np.mean(x)

avg_li = leaky(x, lam)
avg_ma = mavg(x, M)

plt.axhline(offset, label='true offset')
plt.axhline(m, label='empirical average')
plt.plot(avg_li, 'C2', label='LI estimate' )
plt.plot(avg_ma, 'C3', label='MA estimate')
plt.legend();

In [None]:
# this shows the deviation from the actual offset (percentage)

plt.plot(100 * (offset - avg_li) / offset, 'C2', label='leaky' )
plt.plot(100 * (offset - avg_ma) / offset, 'C3', label='m-avg')
plt.legend();

In [None]:
# SOLUTION

# these are reasonable values
lam = 0.995
M = 300

# Filtering Music

The lowpass filters provided by the leaky integrator and the moving average are very simple, too simple in fact to be used in applications such as audio processing. In this section we will use more advanced filters to extract specific audio frequencies from a music file.

Let's load a short audio clip and play it

In [None]:
music_sf, music = wavfile.read('data/sm.wav')
IPython.display.Audio(music, rate=music_sf)

## Extracting the treble

Suppose we are interested in the high-frequency content of the music signal, which usually contain very interesting information about percussive instruments such as the drums. Usually, we can get a good feel for the parts played by the hi-hat and cymbals if we discard all frequencies below 7KHz. 

Let's use an FIR highpass; note that, to design a highpass, we choose an odd number of taps in order to obtain a type I filter (symmetric with integer delay):

In [None]:
# Transition band and high cutoff frequency
tb = 100  # Transition band (Hz)
fh = 4000  # High cutoff frequency for the high-pass filter (Hz)

# Length of the filter
M = 1601  # Number of taps in the filter (higher number means more precise filtering)

# Design the filter using the Remez (Parks-McClellan) algorithm
treble_filter = sp.remez(M, [0, fh - tb, fh, music_sf/2], [0, 1], fs=music_sf, maxiter=50)

- **Transition Band (tb**): Frequency range over which the filter transitions from stop-band to pass-band.

- **High Cutoff Frequency (fh)**: Frequencies below this will be attenuated.

- **Length of Filter (M)**: Number of taps in the filter, affecting precision.

Let's plot the filter's magnitude response on a dB scale:

In [None]:
w, HH = sp.freqz(treble_filter, 1, 1024)
plt.semilogy(w/np.pi * (music_sf/2), np.abs(HH));

Now we can apply the filter and listen to the results:

In [None]:
treble = sp.lfilter(treble_filter, [1], music)
IPython.display.Audio(data = treble, rate = music_sf)

## Extracting and enhacing the bass

Imagine you are a recording engineer and you want to enhance the low end of the music I am producing. My idea is to extract the low frequencies, amplify them, and then add them back to the signal.

Let's start by desining a low pass filter.

In [None]:
music_sf

### Exercise: low-frequency extraction

Design a Type I FIR lowpass filter of length 3501 taps, with cutoff frequency 200 Hz and a 50-Hz-wide transition band. 

In [None]:
tb = 50  # Transition band (Hz)
fc = 200  # Cutoff frequency for the low-pass filter (Hz)
M = 3501  # Length of the filter (number of taps)

# your code here :
bass_filter = ...


In [None]:
# SOLUTION 

bass_filter = sp.remez(M, [0, fc, fc+tb, music_sf/2], [1, 0], fs=music_sf, maxiter=50)

Let's plot the magnitude response :

In [None]:
w, H = sp.freqz(bass_filter, 1, 1024)
plt.semilogy(w[0:200]/np.pi * (music_sf/2), np.abs(H[0:200]));

Now apply the filter and extract the bass :

In [None]:
#your code here :
bass = ...

In [None]:
# SOLUTION

bass = sp.lfilter(bass_filter, 1.0, music)

In [None]:
IPython.display.Audio(data = bass, rate = music_sf)

### Enhancing the bass

Cool, we have the bass part! In order to enhance its presence in the final mix, the idea is to add back to the audio file a scaled-up version of it. There is one important detail, however: the filter introduces a processing delay so the audio and the bass are now out of sync!

We can see this if we plot the very first note of the audio excerpt:

In [None]:
s = slice(0, 120000)
plt.plot(music[s]);
plt.plot(bass[s]);

If we zoom in and start plotting from the beginning of the first note, we can see that there is a delay of about half the FIR's length:

In [None]:
s = slice(16600, 50000)
plt.plot(music[s]);
plt.plot(bass[s]);
plt.axvline(0, color='C3')
plt.axvline((M-1)/2, color='C3')

Indeed, a Type I FIR with $M$ taps ($M$ odd) introduces a delay of $D = (M-1)/2$ samples. We can compensate for this in two ways:
 * here, since we're working "offline", we can just discard the first $D$ samples from the bass signal
 * in a real-time application, we would need to delay the original signal by $D$ samples to re-align it with the lowpass output

In [None]:
shifted_bass = bass[(M-1)//2:]

plt.plot(music[s]);
plt.plot(shifted_bass[s]);

Nice. We can now amplify the bass if we want, add it back to our original signal and we are done ! Feel free to play around with the values to see how it changes !

In [None]:
amplification_factor = 2

valid_range = slice(0, len(shifted_bass))
enhanced_bass = amplification_factor * shifted_bass
enhanced_audio = music[valid_range] + enhanced_bass

IPython.display.Audio(enhanced_audio, rate = music_sf)

In [None]:
# Time vector for plotting
t = np.arange(len(shifted_bass)) / music_sf

plt.figure(figsize=(12, 6))

#Plot the bass vs bass enhanced signal
plt.subplot(2, 1, 1)
plt.plot(t, bass[valid_range], label='Bass Signal', color='red')
plt.plot(t, enhanced_bass[valid_range], label='Bass Enhanced Signal', alpha=0.7)
plt.title('Bass vs Bass Enhanced Signal')
plt.xlabel('Time [s]')
plt.grid(True)
plt.legend()


# Plot the results
plt.subplot(2, 1, 2)
plt.plot(t, music[valid_range], label='Original Signal', color='blue')
plt.plot(t, enhanced_audio[valid_range], label='Bass Enhanced Audio', alpha=0.7)
plt.title('Original vs Bass Enhanced Audio')
plt.xlabel('Time [s]')
plt.grid(True)
plt.legend()

plt.tight_layout()

# A last trick for the road

Let's finish with a fun and surprising trick. Let's load an audio file and play it; you shouldn't hear anything:

In [None]:
fs, s = wavfile.read('data/testing.wav')
IPython.display.Audio(s, rate=fs)

What if we filter the signal before playing? 

In [None]:
M = 36
IPython.display.Audio(mavg(s, M), rate=fs)

Still nothing. But check this out:

In [None]:
IPython.display.Audio(mavg(s ** 2, M), rate=fs)

Cool, isn't it? Can you figure out what happened and why squaring the signal made the audio magically appear? 

If you feel like investigating, start by looking at plots of the signal both in the time and in the frequency domain, and then try to understand how the original signal was generated. If after a while you're still clueless, you may want to use this [hint](https://en.wikipedia.org/wiki/Crystal_detector). 

## Solution

OK, this is going to be long so take it as a little "detective story" in Signal Processing and proceed only if you are interested!

### An initial time-domain exploration

As a first step in our investigation, let's look at the mystery signal in the time domain:

In [None]:
plt.plot(s);

This plot is not helping much even though, if we zoom in to look at the details we can see that it is oscillating very fast:

In [None]:
chunk = s[50000:50480] 
plt.plot(chunk);

Note that the sampling frequency of the signal is 96 kHz, which is very high, and so the 480 samples shown in the previous plot correspond to only 5 ms of audio data. Yet, over this short time span the signal appears to oscillate more than 200 times, which corresponds to a frequency of at least 40 kHz, totally outside of the human hearing range.

By the way, you don't need to count the oscillations by hand; a good estimate is provided by half the number of zero crossings of the signal, and to find the number of zero crossings we need to simply count the number of times two successive samples have opposite sign:

In [None]:
np.sum(chunk[:-1] * chunk[1:] < 0)

### Moving to the frequency domain

If we move to the frequency domain, we can confirm that the mystery signal is indeed bandpass; we can use the plotting tricks developed in lab 4 to label the frequency axis to see that the spectral content occupies a small region centered around 44 kHz and extending by approximately 4000 Hz to the left and to the right of the center frequency.

In [None]:
L = int(len(s)/2)
S = np.abs(np.fft.fft(s))[:L]
plt.semilogy(np.linspace(0, fs/2, L), S);
print('peak at ', np.argmax(S) / L / 2 * fs)

A plausible hypothesis at this point is that the signal is a modulated audio signal and we can check this quickly by demodulating and playing the result:

In [None]:
demod = s * np.cos(2 * np.pi * (44000.0 / 96000.0) * np.arange(0, len(s)))
IPython.display.Audio(demod, rate=fs)

Bingo! But there are two more issues: how does the squaring operation manage to replace demodulation and why do we hear an annoying whistle?

### Demodulation via squaring

The first part is relatively easy: if we assume that the original signal is of the form $s[n] = x[n]\cos(\omega_c n)$, where $\omega_c = 2\pi (44000 / 96000) = (11/12)\pi$ is the carrier frequency, then 

$$
   s^2[n] = x^2[n]\cos^2(\omega_c n) = \frac{1}{2}x^2[n] + \frac{1}{2}x^2[n]\cos(2\omega_c n)
$$

The squaring operation, therefore, performs a demodulation of sorts and returns a baseband signal ($x^2[n]$) plus a modulated copy at twice the original carrier frequency. There are some details to keep in mind though: in general, the square of a _balanced_ audio signal will be a severely distorted version of the original, since the negative portions of the waveform become positive. You can check this using the original signal used to create this example:

In [None]:
fs_o, x_orig = wavfile.read('data/speech.wav')
x_orig = x_orig / 32768.0  # rescale the range over [-1, 1]
IPython.display.Audio(x_orig, rate=fs_o)

In [None]:
IPython.display.Audio(x_orig * x_orig, rate=fs_o)

However, we can avoid this problem if we ensure that the signal is always positive:

In [None]:
offset = 1
x = x_orig + offset
IPython.display.Audio(x * x, rate=fs_o)

Now, when this positive signal is modulated, we will obtain

$$
    s[n] = x[n]\cos(\omega_c n) = \cos(\omega_c n) + x_{\text{orig}}[n]\cos(\omega_c n)
$$

and the first term in the expression for $s[n]$ explains the prominent spectral lines at $\pm\omega_c$ that were apparent in the spectrum of the modulated signal. But, with this trick, when we "demodulate" with a squaring operation, the baseband copy $x^2[n]$ won't sound distorted. 

### The irritating "whistle"

We still have to understand (and fix) the fact that we hear an audible whistle if we try to play $s^2[n]$ as-is, without filtering. It turns out that this problem is due the combined effects of the signal's amplitude offset and of aliasing. 

Remember that the carrier frequency here is $\omega_c = (11/12)\pi$ so that $2\omega_c > \pi$; because of the $2\pi$ periodicity of discrete-time sinusoids, $\cos(2\omega_c n) = \cos(2(11/12)\pi n) = \cos((\pi/6) n)$ and so the spectrum of the demodulated signal will contain a strong spectral line at $\omega_w = \pi/6$.

At our sampling rate of 96 kHz, this corresponds to a frequency $f_w = 8$ kHz, which is perfectly (and disturbingly) audible. 

In [None]:
S = np.abs(np.fft.fft(s * s))[:L]
plt.plot(np.linspace(0, fs/2, L), S);

To remove this component we need to filter out the cross-modulation terms around $\omega_w$; unfortunately, since the original signal extends up to 4 kHz, a simple Leaky Integrator would not work because its transition band is too wide. You can try it here and see that no value of $\lambda$ will be able to remove enough of the whistle while preserving the speech signal.

In [None]:
lam = 0.98
IPython.display.Audio(leaky(s * s, lam), rate=fs)

In [None]:
plt.plot(np.linspace(0, fs/2, L), S / np.max(S), label="signal spectrum");

w = np.linspace(0, np.pi, L)[1:]
plt.plot(
    fs * w / np.pi / 2, 
    (1 - lam) / np.sqrt(1 - 2 * lam * np.cos(w) + lam**2),
    label="leaky integrator response",
);
plt.legend();

Normally in this situation we should use a more advanced filter with a steep transition band, such as an elliptic filter with cutoff frequency above 4 kHz:

In [None]:
b, a = sp.ellip(6, .1, 60, 5000 / fs)
IPython.display.Audio(sp.lfilter(b, a, s * s), rate=fs)

In [None]:
plt.plot(np.linspace(0, fs/2, L), S / np.max(S), label="signal spectrum");

wb, Hb = sp.freqz(b, a, L);
plt.plot(fs * wb / np.pi / 2, np.abs(Hb), label="filter spectrum");
plt.legend();

In this case, however, we can be clever and use a simple Moving Average because:
 * the frequency of the whistle is a rational multiple of $2\pi$.
 * we know that the frequency response of a Moving Average of length $M$ will be exactly zero at all multiples of $2\pi/M$. 
 
The spectral line causing the whistle sound is at $\omega_w = \pi/6$, so any MA filter where $M$ is a multiple of 12 will kill it. We want to choose $M$ as large as possible but sufficiently small to preserve the speech signal; $M=36$ seems to be a good compromise:

In [None]:
plt.plot(np.linspace(0, fs/2, L), S / np.max(S), label="signal spectrum");

M = 36
w = np.linspace(0, np.pi, L)[1:]
plt.plot(
    fs * w / np.pi / 2, 
    np.abs(np.sin(M * w / 2) / np.sin(w / 2) / M),
    label="MA spectrum",
);
plt.legend();