---
# **LAB 4 - Audio Filters**
---

In [None]:
import os

# dirs
base_dir = './' 
wav_dir = os.path.join(base_dir, 'wav/') 
mp3_dir = os.path.join(base_dir, 'mp3/') 
out_dir = os.path.join(base_dir, 'output/') 

# print base dir 
print("Current dir:", os.getcwd())

# ▶️ Classes and functions

In [None]:
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
import scipy.signal as sig

# local libraries
from src.pyaudio2 import *
from src.pyaudio3 import *
from src.utils import *

class Filter(object):
  """ Standard linear recursive filter
      Julius O. Smith III - Introduction to Digital Filters with audio applications
      https://ccrma.stanford.edu/~jos/fp/

                b0 + b1*z^(-1) + b2*z^(-2) + ... + bn*z^(-n)
        H(z) = ---------------------------------------------
                a0 + a1*z^(-1) + a2*z^(-2) + ... + an*z^(-n)
  """

  font = dict(family='Old Standard TT, serif', size=20, color='grey')

  def __init__(self, B=None, A=None, Fs=44100, name='Linear Filter'):
    """Coefficient setting
          
      B  : (array) is feed forward (numerator) zeros
      A  : (array) is feed back (denominator) poles
      Fs : sampling frequency
      name : (string) name of the filter model
    """

    self.Fs = Fs
    self.name = name
    self.set_coefficients(B=B, A=A)

  def __str__(self):
    s  = 'FILTER~INFO~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n'
    s += ' - classname        : %s\n'          % self.__class__.__name__
    s += ' - type             : %s\n'          % self.name
    s += ' - sample rate      : %.1f [Hz]\n'   % self.Fs
    if self.name == 'II order resonator':
      s += ' - gain             : %s\n'        % str(self.gain)
      s += ' - resonance freq   : %s [Hz]\n'   % str(self.resonance)
      s += ' - radius of pole   : %s \n'   % str(self.radius)
      s += ' - bandwidth        : %s [Hz]\n'   % str(np.round(self.bandwidth,5))
    s += ' - feedforward  (B) : %s\n'          % str(np.round(self.B,5))
    s += ' - feedback     (A) : %s\n'          % str(np.round(self.A,5))
    s += ' - number of zeros  : %i\n'          % (len(self.B)-1)
    s += ' - number of poles  : %i\n'          % (len(self.A)-1)
    s += ' - minimum phase?   : %s\n'          % ("Yes" if self.is_minimum_phase() else "No")
    s += '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n'
    return s

  def __repr__(self):
    return "Filter(B=%s, A=%s, fs=%s)" % (list(self.B), list(self.A), self.Fs)

  def apply(self, wave):
    """ Apply the filter

        Params: 
          wave : (Digital) a digital wave

        Return:
          (Digital) the filtered wave
    """

    assert isinstance(wave, Digital), "Only Digital instances can be used"

    # apply the filter
    x = sig.lfilter(self.B, self.A, wave.x)

    # return a new digital wave using the filtered signal
    return Digital(x, wave.Fs, wave.start)

  def FIR_lowpass(self, numtaps, cutoff, Fs):
    """FIR filter design using the window method, the filter will have linear phase.

      numtaps : (int) Length of the filter (number of coefficients, i.e. the filter order + 1). 
                numtaps must be odd if a passband includes the Nyquist frequency
      cutoff   :  Cutoff frequency of filter (expressed in the same units as Fs)   
      Fs      : (int) sample rate of the supplied signal
    """

    self.name = 'FIR (lowpass)'
    self.B = sig.firwin(numtaps, cutoff, fs=Fs)
    self.A = np.array((1,)) 
    self.Fs = Fs

  def FIR_highpass(self, numtaps, cutoff, Fs):
    """FIR filter design using the window method, the filter will have linear phase.

      numtaps : (int) Length of the filter (number of coefficients, i.e. the filter order + 1). 
                numtaps must be odd if a passband includes the Nyquist frequency
      cutoff   :  Cutoff frequency of filter (expressed in the same units as Fs)   
      Fs      : (int) sample rate of the supplied signal
    """

    self.name = 'FIR (highpass)'
    self.B = sig.firwin(numtaps, cutoff, fs=Fs, pass_zero=False)
    self.A = np.array((1,)) 
    self.Fs = Fs

  def FIR_bandpass(self, numtaps, cutoff1, cutoff2, Fs, ftype='bandpass'):
    """FIR filter design using the window method, the filter will have linear phase.

      numtaps  : (int) Length of the filter (number of coefficients, i.e. the filter order + 1). 
                numtaps must be odd if a passband includes the Nyquist frequency
      cutoff1,2 :  Cutoff frequencies of filter (expressed in the same units as Fs) representing the band edges 
      Fs       : (int) sample rate of the supplied signal
      ftype    : 'bandpass' or 'bandstop'
    """

    self.name = 'FIR (bandpass)'
    pass_zero = False if ftype == 'bandpass' else True

    self.B = sig.firwin(numtaps, [cutoff1, cutoff2], fs=Fs, pass_zero=pass_zero)
    self.A = np.array((1,)) 
    self.Fs = Fs

  def IIR_lowpass(self, cutoff, Fs, order=5):
    """Butterworth IIR lowpass filter design

      cutoff  : (int) frequency in Hz that acts as cutoff for filter.
                All frequencies above cutoff are filtered out.
      Fs     : (int) sample rate of the supplied signal
      order  : (int) filter order, defines the strength of the roll-off around the 
               cutoff frequency. Typically orders above 6 are not used frequently. 
    """

    self.name = 'Butter (lowpass)'
    nyq = 0.5 * Fs
    normal_cutoff = cutoff / nyq
    self.B, self.A = sig.butter(order, normal_cutoff, btype='low', analog=False)
    self.Fs = Fs

  def IIR_highpass(self, cutoff, Fs, order=5):
    """Butterworth IIR highpass filter design

      cutoff  : (int) frequency in Hz that acts as cutoff for filter.
                All frequencies above cutoff are filtered out.
      Fs     : (int) sample rate of the supplied signal
      order  : (int) filter order, defines the strength of the roll-off around the 
              cutoff frequency. Typically orders above 6 are not used frequently. 
    """

    self.name = 'Butter (highpass)'
    nyq = 0.5 * Fs
    normal_cutoff = cutoff / nyq
    self.B, self.A = sig.butter(order, normal_cutoff, btype='high', analog=False)
    self.Fs = Fs

  def IIR_bandpass(self, lowcut, highcut, Fs, order=5):
    """Butterworth IIR bandpass filter design

      lowcut  : (int) Lower frequency bound of the filter in Hz
      highcut : (int) Upper frequency bound of the filter in Hz
      cutoff   : (int) frequency in Hz that acts as cutoff for filter.
                All frequencies above cutoff are filtered out.
      Fs      : (int) sample rate of the supplied signal
      order   : (int) filter order, defines the strength of the roll-off around the 
                cutoff frequency. Typically orders above 6 are not used frequently. 
    """

    self.name = 'Butter (bandpass)'
    nyq = 0.5 * Fs
    low = lowcut / nyq
    high = highcut / nyq
    self.B, self.A = sig.butter(order, [low, high], btype='band')
    self.Fs = Fs

  def resonator(self, f0, R, Fs, G=None):
    """II order resonator filter design

      f0  : (float) resonance frequency of the filter in Hz
      R   : (float) pole module (< 1)
      Fs  : (int) sample rate of the supplied signal
      G   : (float) gain of the filter (if None gain is unitary)
    """

    self.name = 'II order resonator'
    𝜔0 = 2*np.pi*f0/Fs
    a1 = -2*R*np.cos(𝜔0)   # Coefficient a1
    a2 = R**2                         # Coefficient a2
    self.A = [1, a1, a2]
    if not G:
      self.B = [(1-R)*np.sqrt(1-2*R*np.cos(2*𝜔0)+R**2)]
      G = 1
    self.B = [G*(1-R)*np.sqrt(1-2*R*np.cos(2*𝜔0)+R**2)]
    self.Fs = Fs
    self.bandwidth = 2*(1-R)*Fs/(2*np.pi)
    self.gain = G
    self.resonance = f0
    self.radius = R

  def set_coefficients(self, B=None, A=None):
    """Set the filter coefficients.

      B : (array) coefficients in the numerator. 
      A : (array) coefficients in the denominator. 
    """

    # numerator (zeros, feedforward, FIR)
    self.B = np.array((1,)) if B is None else np.asanyarray(B)

    # denominator (poles, feedback, IIR)
    self.A = np.array((1,)) if A is None else np.asanyarray(A)

  def is_minimum_phase(self):
    """
      A filter is defined as _minimum phase_ if all its poles and zeros 
        are inside the unit circle, excluding the unit circle itself.
    """
    isMinPhase = True

    zeros, poles, unused_gain = sig.tf2zpk(self.B, self.A)

    for pole in poles:
      if not np.abs(pole) < 1.0:
          isMinPhase = False

    for zero in zeros:
      if not np.abs(zero) < 1.0:
          isMinPhase = False

    return isMinPhase

  def freq_response(self, nfreqs=2048):
    """Calculate the complex frequency response"""

    assert isinstance(nfreqs, int), "nfreqs must be int"

    # w is normalized to the Nyquist rate 
    w, h = sig.freqz(self.B, self.A, fs=self.Fs, worN=nfreqs)

    return w, h

  def plot_mag_phase(self, npoints=5000, title='Frequency response'):
    """Produce a plot with magnitude and phase response
    """

    w, h = self.freq_response(npoints)

    mag = np.abs(h)
    magdB = 20 * np.log10(mag)
    pha = np.angle(h)

    data= [go.Scatter(x=w, y=magdB, fillcolor='blue', name='Magnitude response (dB)'), 
           go.Scatter(x=w, y=mag, fillcolor='blue', name='Magnitude response'), 
           go.Scatter(x=w, y=pha, fillcolor='red', name='Phase response')]

    menu = [{'label': 'magnitude dB', 
             'method': 'update',
             'args': [{'visible': [True, False, False]}, 
                      {'title': '$20\log_{10}|H(f)|$'}]},
            {'label': 'magnitude', 
             'method': 'update',
             'args': [{'visible': [False, True, False]}, 
                      {'title': '$|H(f)|$'}]},
            {'label': 'phase', 
             'method': 'update',
             'args': [{'visible': [False, False, True]}, 
                      {'title': '$\angle |H(f)|$'}]},
            {'label': 'mag and phase', 
             'method': 'update',
             'args': [{'visible': [False, True, True]}, 
                      {'title': 'Frequency response$'}]}
            ]

    layout=go.Layout(title=title, 
                     xaxis=dict(title='Freqs (Hz)'),
                     updatemenus=list([dict(buttons=menu,type = 'buttons')]),barmode='overlay')
    fig = go.Figure(data,layout)
    fig.show(renderer=rendering())

  def plot_pole_zero(self, filename=None):
    """Produce a plot with the location of all poles and zeros."""

    zeros, poles, gain = sig.tf2zpk(self.B, self.A)
    nzeros = len(self.B)-1
    npoles = len(self.A)-1

    fig = go.Figure()

    fig.add_shape(type="circle", xref="x", yref="y", x0=-1, y0=-1, x1=1, y1=1,
                line=dict(color="black", width=1))

    fig.add_trace(go.Scatter(
        x=zeros.real,
        y=zeros.imag,
        name='Zeros (' + str(nzeros) + ')',
        mode='markers',
        marker_symbol='circle',
        marker_color="red", 
        marker_line_color="red",
        marker_line_width=1, 
        marker_size=10))
    
    fig.add_trace(go.Scatter(
        x=poles.real,
        y=poles.imag,
        name='Poles (' + str(npoles) + ')',
        mode='markers',
        marker_symbol='x',
        marker_color="blue",
        marker_line_color="blue", 
        marker_line_width=.5, 
        marker_size=10))
    
    fig.update_xaxes(title='Real part', range=[-1.05, 1.05])
    fig.update_yaxes(title='Imag part', range=[-1.05, 1.05])
    fig.update_layout(title='Pole-zero plot', width=600, height=600)
    fig.show(renderer=rendering())

  def plot_impulse_resp(self, npoints=5000, name='Inpulse response'):
    """Plot the impulse response"""

    impulse = np.zeros(npoints)  
    impulse[0] = 1                 # delta function
    y = sig.lfilter(self.B, self.A, impulse)
    t = np.arange(0, npoints/self.Fs, 1/self.Fs)

    data= [go.Scatter(x=t, y=y, name=name)]
    layout = go.Layout(
          title=name,
          xaxis=dict(title='$n$',
                  gridwidth=1,
                  titlefont=self.font),
          yaxis=dict(title='$h(n)$',
              showgrid=False,
              zeroline=True,
              showline=True,
              zerolinecolor='#CCC',
              zerolinewidth=2,
              linewidth=2,
              titlefont=self.font))                 
    fig = go.Figure(data,layout)
    fig.show(renderer=rendering())

# ✅ FIR and IIR filters

**Butterworth lowpass** **IIR** filter.

In [None]:
# Filter demo: Butterworth lowpass

# define the waveform
Fs = 500  # sampling rate (=1/Ts)
f1, f2 = 10, 50    # Hz
mix = Sin(f1, Fs) + Cos(f2, Fs, A=2) # obj of type 'Sin' with parent 'Discrete'
mix.plot()

# define and apply the filter
filter = Filter()
fcut = 30    # Hz
filter.IIR_lowpass(fcut, Fs, order=8)
sig_filt = filter.apply(mix)   # filtered signal
sig_filt.plot()

# filter analysis
print(filter)
filter.plot_mag_phase()
filter.plot_impulse_resp()
filter.plot_pole_zero()

**FIR lowpass** filter

In [None]:
# Filter demo: FIR lowpass

filter.FIR_lowpass(105, fcut, Fs)
print(filter)
sig_fil = filter.apply(mix)   # filtered signal
sig_fil.plot()

# filter analysis
filter.plot_mag_phase()
filter.plot_impulse_resp()

**FIR highpass** filter

In [None]:
# Filter demo: FIR highpass

filter.FIR_highpass(105, fcut, Fs)
print(filter)
sig_fil = filter.apply(mix)   # filtered signal
sig_fil.plot()


# filter analysis
filter.plot_mag_phase()
filter.plot_impulse_resp()

## Moving Average Smoothing

**Smoothing** is a technique applied to time series to remove the fine-grained variation between time steps.

**Moving averages** are a simple and common type of smoothing used in time series analysis and time series forecasting.

**Calculating** a moving average involves creating a new series where the values are comprised of the average of raw observations in the original time series.

**Window**. A moving average requires that you specify a window size called the window width. This defines the number of raw observations used to calculate the moving average value.

The **“moving”** part in the moving average refers to the fact that the window defined by the window width is slid along the time series to calculate the average values in the new series.

In [None]:
# sound and its spectrum
annoy_file = wav_dir + 'annoying.wav'

a = Audio()
a.read_wav(annoy_file)
annoy = a.get_Digital()
a.play()
a.plot()

# spectrum
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 
   

In [None]:
# filter order and coefficients
order = 20
B = np.ones(order)/order     # moving average coefficients
filter = Filter(B=B, name='Moving Average')  # moving average filter

# filter analysis
filter.plot_mag_phase()

# get sound and filter it
annoy = a.get_Digital()         # get Digital object from Audio object
annoy_fil = filter.apply(annoy)   # filtered signal: apply to annoy sound

a = Audio(annoy_fil)
a.play()
a.plot()
a.write()

# spectrum of filtered
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 
   

## 🔴 TODO

**Ex 1**

How to remove (by filtering) the annoying sound from the piece `annoying.wav`?

In [None]:
#### TODO

# sound and its spectrum
annoy_file = wav_dir + 'annoying.wav'

# play and plot audio orig

# spectrum of orig

# define and apply a lowpass filter

# play and plot audio filtered

# spectrum of filtered


In [None]:
#### SOLUTION

# sound and its spectrum
annoy_file = wav_dir + 'annoying.wav'

# play and plot audio orig
a = Audio()
a.read_wav(annoy_file)
a.play()
a.plot()

# spectrum of orig
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase)       

# define and apply a lowpass filter
filter = Filter()
Fs = a.samplerate
Fc = 1000        # cutoff frequency
filter.IIR_lowpass(Fc, Fs, order=8)
annoy = a.get_Digital()          # get Digital object from Audio object
filt = filter.apply(annoy)       # filtered signal: apply to annoy sound

# play and plot audio filtered
a = Audio(filt)
a.play()
a.plot()

# spectrum of filtered
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 
        


**Ex 2**

Compare both a **IIR bandpass** filter and a **FIR bandpass** filter to extract the second and third harmonics of the signal `Flute_C4.wav`

In [None]:
#### TODO

# filter comprison

# sound and its spectrum
annoy_file = wav_dir + 'Flute_C4.wav'

# play and plot audio orig

# spectrum of orig

# define and apply a lowpass filter

# play and plot audio filtered

# spectrum of filtered

In [1]:
#### SOLUTION

# filter comprison

# sound and its spectrum
annoy_file = wav_dir + 'Flute_C4.wav'

# play and plot audio orig
a = Audio()
a.read_wav(annoy_file)
a.play()
a.plot()

# spectrum of orig
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase)   

# define and apply a lowpass filter
filter = Filter()
Fs = a.samplerate
F1 = 400        # left cutoff frequency 
F2 = 900        # right cutoff frequency 
filter.IIR_bandpass(F1, F2, Fs, order=8)
flaute = a.get_Digital()        # get Digital object from Audio object
filt = filter.apply(flaute)       # filtered signal: apply to annoy sound

# play and plot audio filtered
a = Audio(filt)
a.play()
a.plot()

# spectrum of filtered
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 
       

NameError: name 'wav_dir' is not defined

# ✅ Acoustic impulse response

To characterize the acoustic response of a room or open space, a simple way
to generate an impulse is to pop a balloon or fire a gun. A gunshot puts an
impulse into the system; the sound you hear is the impulse response.

As an example, let's use a recording of a gunshot to characterize the room where
the gun was fired, then use the impulse response to simulate the effect of that
room on a violin recording.

In [None]:
# waveform and spectrum of a gunshot
gun_file = wav_dir + 'gunshot.wav'

gun_audio = Audio()
gun_audio.read_wav(gun_file)
gun_audio.play()
gun_audio.plot()

# spectrum
#gunshot, Fs = a.audio, a.samplerate  # get the wave signal and sampling freq
freq, amp, phase = spectrum(gun_audio.audio, gun_audio.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 

We can simulate what a recording would sound like if it were played in the same room and recorded in the same way. Here's the violin recording 

In [None]:
# waveform and spectrum of a violin
vio_file = wav_dir + 'violin.wav'
vio_audio = Audio()
vio_audio.read_wav(vio_file)
vio_audio.play()
vio_audio.plot()

# spectrum
freq, amp, phase = spectrum(vio_audio.audio, vio_audio.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase)     

Let's compute the convolution between the impulse response (gunshot) and the violin recording to hear the effect

In [None]:
# convolve violin sound with room impulse response 
violin = vio_audio.get_Digital()
gunshot = gun_audio.get_Digital()
violin_room = violin.convolve(gunshot)
a = Audio(violin_room)
a.normalize()
a.play()
a.plot()

# spectrum
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 

# ✅ Resonator



**II order Resonator**: Design a 2-pole resonator filter with 
* peak at $f_0 = 500 Hz$
* gain = 2
* operating at the sampling rate of $F_s = 10 kHz$

In [None]:
# define and apply the resonator filter
filter = Filter()
f0 = 1500
R = 0.98
G = 2
Fs = 10000
filter.resonator(f0=f0, R=R, Fs=Fs, G=G)

# filter analysis
print(filter)
filter.plot_mag_phase()
filter.plot_impulse_resp()
filter.plot_pole_zero()

### Popcorn effect

<img src="https://github.com/giulianogrossi/imgs/blob/main/misc/pops.png?raw=true" align="center" width=300px >

* Popcorn sound simulator playing pops with randomized properties 
* An exponantially decaying white noise is filtered with a second-order resonator with center frequency about 400 Hz with some random variation and bandwidth between 50 and 150 Hz



In [None]:
Fs = 10000    # Sampling rate (Hz)

# -- Design excitation signal
envelop = Exp(Fs=Fs, a=5)
envelop.plot(title='Envelope decay')

# White noise sequence
noise = Noise(Fs=Fs)           
noise.plot(title='White noise')

# excitation
excit = envelop.multilpy(noise)              # Digital object
excit.plot(title='Excitation signal')

# spectrum
freq, amp, phase = spectrum(excit.x, Fs)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 

second-order resonator filter specs...

In [None]:
# define and apply the resonator filter
f0 = 100 + 300 * np.random.rand()  # frequency of resonance (randomized in 100, ..., 400 Hz)
R = 0.99                           # Radius of pole
G = 10
filter = Filter()
filter.resonator(f0=f0, R=R, Fs=Fs, G=G)

# -- Generate pop by filtering the noise sequence with the resonator
pop = filter.apply(excit)
a = Audio(pop)
a.play()

# spectrum
freq, amp, phase = spectrum(a.audio, a.samplerate)  # computes the spectrum (magnitude and phase)
plot_spectrum(freq, amp, phase) 

## 🔴 TODO

**Repeated pops:** Design a resonator filter (2nd order) and apply it repeatedly to obtain a list of pops. Turn the list into an array by stacking all the pops together into a single signal.   

**Hints**
Use the following code to create the final signal that collects all the pops and to play it back
```
pops_list = [] # list of pops
...
pops_array = np.array(pops_list).reshape(-1)
POPS = Digital(pops_array, Fs)
a = Audio(POPS)
```

In [None]:
#### TODO

# Sampling rate (Hz)

# design excitation signal

# White noise sequence     

# make excitation signal

# generate pop list

# play all pops together

In [2]:
#### SOLUTION

Fs = 10000                      # Sampling rate (Hz)
end = 0.5
envelop = Exp(Fs=Fs, a=5, end=end)       # design excitation signal
noise = Noise(Fs=Fs, end=end)            # White noise sequence     
excit = envelop.multilpy(noise)          # excitation

# generate pop list
pops_list = []  # a list of pops
M = 20     # number of pops
for i in range(M):
  # define and apply the resonator filter
  f0 = 100 + 300 * np.random.rand()  # frequency of resonance (randomized in 100, ..., 400 Hz)
  R = 0.99                           # Radius of pole
  G = 10
  filter = Filter()
  filter.resonator(f0=f0, R=R, Fs=Fs, G=G)

  # generate pop by filtering the noise sequence with the resonator
  pop = filter.apply(excit)
  pops_list.append(pop.x)

# play all pops together
pops_array = np.array(pops_list).reshape(-1)
POPS = Digital(pops_array, Fs)
a = Audio(POPS)
a.play()

NameError: name 'Exp' is not defined