# TP1: Conversion of Sampling Frequency and STFT

*By Daniel Jorge Deutsch, Kevin Kuhl and Brayam Santiago Velandia (25/09/2020)*

In [None]:
import os
import struct
import sys
import time
import warnings
import wave
from copy import deepcopy
from math import ceil
import warnings

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyaudio
from scipy import signal as sig
from scipy.io import wavfile

# Ignores warnings
warnings.filterwarnings("ignore")

In [None]:
class Base:

    def __init__(self, name, data):
        self.name = name 
        self.data = np.asarray(data)


    #------------------------------#
    #--- SAMPLES ------------------#
    #------------------------------#

    def step_split_sample(self, step):
        datas = [self.data[i::step] for i in range(step)]
        max_len = max([len(data) for data in datas])
        for i, data in enumerate(datas):
            if len(data) != max_len:
                datas[i] = np.append(data, 0)
            datas[i] = self.__class__(f"{self.name}{i}", datas[i])
        return tuple(datas)       


    #------------------------------#
    #--- PLOTS --------------------#
    #------------------------------#
    
    def data_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):
        plt.figure(figsize=figsize)
        plt.plot(self.data)
        plt.xlim(0, len(self.data)-1)
        plt.ylabel("Amplitude")
        plt.xlabel("Index")
        plt.title(f"Plot of {self.name}")
        if xhighlights:
            for xhighlight in xhighlights:
                plt.axvline(x=xhighlight, color="red")
        if yhighlights:
            for yhighlight in yhighlights:
                plt.hline(y=yhighlight, color="red")
        if save:
            plt.savefig(f"./outputs/figures/{self.name}_data.png", dpi=300, bbox_inches="tight")
        plt.show()


    def spectrogram_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):
        
        # Obtain the spectrogram params
        f, t, Sxx = sig.spectrogram(self.data, self.freq)

        # Plot
        plt.figure(figsize=figsize)
        plt.pcolormesh(t, f, 20*np.log10(Sxx), shading="auto")
        plt.ylabel("Frequency [Hz]")
        plt.xlabel("Time [s]")
        plt.title(f"Spectogram of {self.name}")
        if xhighlights:
            for xhighlight in xhighlights:
                plt.axvline(x=xhighlight, color="red")
        if yhighlights:
            for yhighlight in yhighlights:
                plt.hline(y=yhighlight, color="red")
        if save:
            plt.savefig(f"./outputs/figures/{self.name}_spectrogram.png", dpi=300, bbox_inches="tight")
        plt.show()


    # Should use when you have the filter's coefficients
    def freqz_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):

        freq, mag = sig.freqz(self.data)

        # Magnitude
        mag = 20*np.log10(np.abs(mag))
        mag[np.abs(mag-np.mean(mag)) > 2*np.std(mag)] = np.nan   # Remove outliers for the plot

        # Frequency   
        freq = freq/(2*np.pi)
        
        # Plot
        plt.figure(figsize=figsize)
        plt.plot(freq, mag)
        plt.xlim(freq[0], freq[-1])
        plt.ylabel("Magnitude [dB]")
        plt.xlabel("Frequency [(rad/sample)/2π]")
        plt.title(f"Frequency response of {self.name}")
        if xhighlights:
            for xhighlight in xhighlights:
                plt.axvline(x=xhighlight, color="red")
        if yhighlights:
            for yhighlight in yhighlights:
                plt.hline(y=yhighlight, color="red")
        if save:
            plt.savefig(f"./outputs/figures/{self.name}_freqz.png", dpi=300, bbox_inches="tight")
        plt.show() 

    
    # Should use when you have filter's impulse response
    def fft_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):      
        
        # Magnitude
        len_fft = 4096                                           # Length of the transformed axis of the output
        mag = np.fft.fft(self.data, len_fft)                     # Obtains the magnitude of the signal
        mag = np.fft.fftshift(mag)                               # Shifts the zero-frequency component to the center of the spectrum
        mag = 20*np.log10(np.abs(mag))                           # Applies log to the result
        mag[np.abs(mag-np.mean(mag)) > 2*np.std(mag)] = np.nan   # Remove outliers for the plot

        # Frequency
        freq = np.linspace(-0.5, 0.5, len(mag))

        # Plot
        plt.figure(figsize=figsize)
        plt.plot(freq, mag)
        plt.xlim(freq[0], freq[-1])
        plt.ylabel("Magnitude [dB]")
        plt.xlabel("Frequency [(rad/sample)/2π]")
        plt.title(f"Frequency response of {self.name}")
        if xhighlights:
            for xhighlight in xhighlights:
                plt.axvline(x=xhighlight, color="red")
        if yhighlights:
            for yhighlight in yhighlights:
                plt.hline(y=yhighlight, color="red")
        if save:
            plt.savefig(f"./outputs/figures/{self.name}_fft.png", dpi=300, bbox_inches="tight")
        plt.show()

In [None]:
class Signal(Base):

    def __init__(self, name, data=None, freq=None, file=None):
        if not ((data is None) ^ (file == None)):
            raise Exception("You must provide a data or a .wav file")
        if file:
            freq, data = wavfile.read(file)
        self.freq = int(freq) if freq else freq
        Base.__init__(self, name, data)
    

    #------------------------------#
    #--- SHIFTING -----------------#
    #------------------------------#

    def shift(self, name, power_of_z):
        data = self.data
        data = np.append(data[power_of_z:], power_of_z*[0]) if power_of_z > 0 else np.append(abs(power_of_z)*[0], data[:power_of_z])
        return self.__class__(name, data, self.freq)

    
    #------------------------------#
    #--- SAMPLING -----------------#
    #------------------------------#

    def under_sample(self, name, M):
        datas = self.step_split_sample(M)
        return self.__class__(name=name, data=datas[0].data, freq=self.freq/M)


    def over_sample(self, name, L):
        data = np.insert(self.data, range(1, len(self.data)+1)[::L-1], 0)
        return self.__class__(name=name, data=data, freq=self.freq*L) 
    

    #------------------------------#
    #--- PLOT ---------------------#
    #------------------------------#

    def time_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):

        # Obtains the time axis
        t = np.linspace(0, len(self.data)/self.freq, len(self.data))

        # Plot
        plt.figure(figsize=figsize)
        plt.plot(t, self.data)
        plt.xlim(t[0], t[-1])
        plt.ylabel("Amplitude")
        plt.xlabel("Time [s]")
        plt.title(f"Time plot of {self.name}")
        if xhighlights:
            for xhighlight in xhighlights:
                plt.axvline(x=xhighlight, color="red")
        if yhighlights:
            for yhighlight in yhighlights:
                plt.hline(y=yhighlight, color="red")
        if save:
            plt.savefig(f"./outputs/figures/{self.name}_time.png", dpi=300, bbox_inches="tight")
        plt.show()
            

    #------------------------------#
    #--- LISTEN -------------------#
    #------------------------------#

    def listen(self):
        
        # Saves the .wav
        if not self.freq:
            raise Exception("the sample freq (sample/sec) must be provided")
        wavfile.write(f"./outputs/sounds/{self.name}.wav", self.freq, np.asarray(self.data, dtype=np.int16))

        # Uses pyaudio to play the signal
        chunk = 1024
        pa = pyaudio.PyAudio()
        audio = wave.open(f"./outputs/sounds/{self.name}.wav", "rb")
        stream = pa.open(
            format = pa.get_format_from_width(audio.getsampwidth()),
            channels = audio.getnchannels(),
            rate = audio.getframerate(),
            output = True
        )
        data = audio.readframes(chunk)
        while data:
            stream.write(data)
            data = audio.readframes(chunk)
        stream.stop_stream()
        stream.close()
        pa.terminate()

In [None]:
class Filter(Base):

    def apply(self, name, signal):
        conv = np.convolve(self.data, signal.data)
        return Signal(name, data=conv, freq=signal.freq)

# 2.1

In order to implement the direct resampling, we should use three blocks:

- First the upsampling with L = 2
    
- Then the low pass filter with coefficients given by Remez method
    
- Finally the downsampling with M = 3
This will be implemented on part 2.2

![title](img/03.png)

# 2.2

In [None]:
# Obtains the input signal x from its file
x = Signal(name="x", file="./inputs/caravan_48khz.wav")

# Defines the over and under sampling constants
L = 2
M = 3

# Defines the Remez filter params
numtaps = 100                       # An even number (can't be too big)
trans_width = 1/13                  # ?
cutoff = min( 1/(2*L), 1/(2*M) )    # Cutoff frequency (should be 1/(2*M) = 1/6)

# Obtains the Remez filter
h = Filter("h", sig.remez(numtaps, [0, cutoff, cutoff+trans_width, 1/2], [L, 0]))

# Runs the system
start_dir = time.perf_counter()      # Start counting (for exercise 2.5)
w = x.over_sample("w", L)            # Obtains the over sampled signal w using L=2 (Fw=48*2=96kHz)
v = h.apply("v", w)                  # Calculates the convolution v=w*h
y = v.under_sample("y", M)           # Undersample v to obtain the output of the system using M=3 (Fy=96/3=32kHz)
end_dir = time.perf_counter()        # Ends counting (for exercise 2.5)

# 2.3

![title](img/Proof.png)

<br>

# 2.4

<img src="img/01.png" width="552" height="892" />
<img src="img/02.png" width="552" height="892" />
<br>

# 2.5

In [None]:
# Obtains the filters
H0, H1 = h.step_split_sample(L)
H00, H01, H02 = H0.step_split_sample(M)
H10, H11, H12 = H1.step_split_sample(M)

# Starts counting (for exercise 2.5)
start_opt = time.perf_counter()

# Obtains all the w's
w0 = x.shift("w0", 1)
w1 = w0.shift("w1", -1)
w2 = w1.shift("w2", -1)
w3 = x
w4 = w3.shift("w4", -1)
w5 = w4.shift("w5", -1)

# Obtains all the v's
v0 = w0.under_sample("v0", M)
v1 = w1.under_sample("v1", M)
v2 = w2.under_sample("v2", M)
v3 = w3.under_sample("v3", M)
v4 = w4.under_sample("v4", M)
v5 = w5.under_sample("v5", M)

# Obtains all the u's
u0 = H00.apply("u0", v0)
u1 = H01.apply("u0", v1)
u2 = H02.apply("u0", v2)
u3 = H10.apply("u0", v3)
u4 = H11.apply("u0", v4)
u5 = H12.apply("u0", v5)

# Obtains all the k's
k0 = Signal("k0", u0.data+u1.data+u2.data, u0.freq)
k1 = Signal("k1", u3.data+u4.data+u5.data, u3.freq)

# Obtains all the p's
p0 = k0.over_sample("p0", L)
p1 = k1.over_sample("p1", L)

# Obtains all the q's
q0 = p0.shift("q0", -1)

# Obtains y
y = Signal("y", q0.data+p1.data, q0.freq)

# Ends counting (for exercise 2.5)
end_opt = time.perf_counter()

# Listen to the output
y.listen()

# Compare the execution time
print(f"Direct implementation execution time: {end_dir-start_dir}")
print(f"Optimal implementation execution time: {end_opt-start_opt}")

# 3.1.a

In [None]:
# Defines a Hanning window of length 50
Nw = 50                                 
w = Signal("w", np.hanning(Nw))

# Lower and upper limits of the main lobe (so width = 4/Nw)
xhighlights = [-2/Nw, 2/Nw]

# Plots the DTF of w
w.fft_plot(xhighlights=xhighlights)

# 3.1.b

The STFT referred to as low-pass convention is defined in discrete time as:

$$W_x(\lambda,b)=\sum_{n\in \mathbb Z} x(n)w(n-b)e^{-j2\pi\lambda n}$$

In this equation we consider the input signal $x(n)$ and the analysis window in discrete time $w(n-b)$ (where $b$ is a discret and quantized variable) to obtain the filter-bank interpretation. The spectrum of the signal $x$  is first rotated along unit circle in the $z$ plane in that way we shift $\lambda$ frecuency down to 0. Then, we can see the input signal as:

$$x(n)\overset{\Delta}{=} x(n)e^{-j2\pi\lambda n}$$

Additionally, the signal is convolved with the window to get $W_x(b)$:

$$W_x(b)=\sum_{n\in \mathbb Z} x(n)w(n-b)= (x \ast w)(b)$$

At this point, we consider the window $w$ as a low pass filter impulse response. The output $W_x$ is a time-domain signal with a centered componend $e^{-j2\pi\lambda n}$ removed by frecuency shifting. Since the analysis of the STFT is symetric and the lenght of the impulse response is odd, then we conlude that is a Type I filter, in the case of an even impulse response we define a Type II filter.

<br>

# 3.1.c

Starting by the given expression:

$$\tilde{X}(\lambda,b)=\sum_{n\in \mathbb Z} x(n+b)w(n-b)e^{-j2\pi\lambda n}$$

Doing $m=n+b$:

$$\tilde{X}(\lambda,b)=\sum_{m\in \mathbb Z} x(m)w(m-b)e^{-j2\pi\lambda (m-b)}$$
$$\tilde{X}(\lambda,b)=\sum_{m\in \mathbb Z} x(m)w(m-b)e^{-j2\pi\lambda m}e^{j2\pi\lambda b}$$
$$\tilde{X}(\lambda,b)=e^{j2\pi\lambda b} \sum_{m\in \mathbb Z} x(m)w(m-b)e^{-j2\pi\lambda m}$$
$$\tilde{X}(\lambda,b)=e^{j2\pi\lambda b} W_x(\lambda,b)$$

<br>

# 3.1.d

In [None]:
# Defines the variables k, R and M as requested
k = 3     # Index of the frequency channel
R = 1     # Hop size (increment on analysis times)
M = 32    # TDF order

# Defines a Hanning window of length 31 (< M)
Nw = 31
w = Signal("w", data=np.hanning(Nw))

# Number of tdf's to calculate
Nt = int((len(x.data)-Nw)/R)

# Calculates Xtil
Xtil = np.zeros((M, Nt), dtype=complex)
for u in np.arange(0, Nt).reshape(-1):
    term = np.multiply(x.data[np.arange(int(u*R+1), int(u*R+1+Nw))], w.data)
    Xtil[:, u] = np.fft.fft(term, M)

# Listen to the real part of xk (with k=3)
rexk = np.real(Xtil[k, :])
rexk = Signal("rexk", data=rexk, freq=x.freq)
rexk.listen()

# Plot rexk's spectrogram
rexk.spectrogram_plot()

In [None]:
# Now, we can check the filter equivalence. As we fixed k=3 on last part, this is equal to getting a band-pass filter centered on 4500. We used a butterworth filter to generate the filter and then we used filtfilt as it was suggested as a better approach on scipy's website.
f_desired = 4500
f_total = 48000

# Obtain filter's coefficients
numerator, denumerator = sig.butter(9, [2*f_desired/f_total-2.5/(Nw), 2*f_desired/f_total+2.5/(Nw)], btype="band")
y = sig.filtfilt(numerator, denumerator, x.data, axis=0)
y = Signal("y", y, f_total)

y.spectrogram_plot()

# 3.2.e

From the definition of an overlap-add operation:
\begin{equation}
\begin{split}
y(n)&= \sum_{u \in \mathbb Z} y_s(u,n-uR)\\
&= \sum_{u \in \mathbb Z} \left( \frac{1}{M} \sum_{k=0}^{M-1} \tilde{X}_{k,u}e^{j2\pi\frac{k(n-uR)}{m}}w_{s}(n-uR)\right)\\
&= \sum_{u \in \mathbb Z} \left[ \frac{1}{M} \sum_{k=0}^{M-1} \left(\sum_{m\in \mathbb Z}x_{(m)}w_{(m-uR)}e^{-j2\pi\frac{k(m-uR)}{M}}\right)e^{j2\pi\frac{k(n-uR)}{M}}w_{s_{(n-uR)}}\right]\\
&= \sum_{u \in \mathbb Z} \left[ \frac{1}{M} \sum_{k=0}^{M-1} \sum_{m\in \mathbb Z}x_{(m)}w_{(m-uR)}e^{-j2\pi\frac{k(m-uR)}{M}}e^{j2\pi\frac{k(n-uR)}{M}}w_{s_{(n-uR)}}\right]\\
&= \sum_{u \in \mathbb Z} \left[ \frac{1}{M} \sum_{k=0}^{M-1} \sum_{m\in \mathbb Z}x_{(m)}w_{(m-uR)}w_{s_{(n-uR)}}e^{j2\pi\frac{k(n-m)}{M}}\right]\\
&= \sum_{u \in \mathbb Z} \left[ \frac{1}{M} \sum_{m\in \mathbb Z} \sum_{k=0}^{M-1} x_{(m)}w_{(m-uR)}w_{s_{(n-uR)}}e^{j2\pi\frac{k(n-m)}{M}}\right]\\
&= \sum_{u \in \mathbb Z} \left[ \frac{1}{M} \sum_{m\in \mathbb Z} x_{(m)}w_{(m-uR)}w_{s_{(n-uR)}}\sum_{k=0}^{M-1}e^{j2\pi\frac{k(n-m)}{M}}\right]\\
\end{split}
\end{equation}

If $m \ne n$, we can write

\begin{equation}
\begin{split}
&= \sum_{u \in \mathbb Z} \left[ \frac{1}{M} \sum_{m\in \mathbb Z}  x_{(m)}w_{(m-uR)}w_{s_{(n-uR)}}\frac{1-e^{j2\pi k(n-m)}}{1-e^{j2\pi\frac{k(n-m)}{M}}}\right]\\
\end{split}
\end{equation}

Noting that $e^{j2\pi k(n-m)}$ is a M-unitary root, then, if $m\ne n$, $e^{j2\pi k(n-m)}=1$. Which give us:

$$\frac{1-e^{j2\pi k(n-m)}}{1-e^{j2\pi\frac{k(n-m)}{M}}}=0 \ \ \ \ \ for \ \ n\ne m$$

Then, if $m=n$ 

$$\sum_{k=0}^{M-1} e^{j2\pi\frac{k(n-m)}{M}}= \sum_{k=0}^{M-1}1 = M$$

Therefore, we can write:

$$y(m)=\sum_{u \in \mathbb Z} \left( \frac{1}{M} \sum_{m \in \mathbb Z} X_{(m)}w(m-uR)w_{s}{(n-uR)}M \delta_{m,n}\right)$$

So, we can consider only $m=n$ $(\delta_{m,n}=1)$

\begin{equation}
\begin{split}
y(n)&= \sum_{u \in \mathbb Z} \left[\frac{M}{M} x(n) w(n-uR) w_s(n-uR)\right]\\
&=\sum_{u \in \mathbb Z} x(n)w(n-uR)w_s(n-uR) \\
&= x(n) \sum_{u \in \mathbb Z}w(n-uR)w_s(m-uR)
\end{split}
\end{equation}

For perfect reconstruction $x(n)=y(n)$

$$1=\sum_{u \in \mathbb Z}w(n-uR)w_s(m-uR)\overset{\Delta}{=} f(n)$$

therefore $f(n)=1$  $\forall$ $n$.

In [None]:
# Sets the desired overlap
overlap = 0.75

# Defines a Hanning window of length 512
Nw = 512
w = Signal("w", np.hanning(Nw))


# Defines a function that performs the overlap
def ola(w=None, hop=None, Nb=10):  
    w = w[:, np.newaxis]
    N = len(w)
    output = np.zeros(((Nb - 1) * hop + N, 1))
    for k in np.arange(0, Nb).reshape(-1):
        start = k*hop
        end = start + N
        output[np.arange(start, end)] = output[np.arange(start, end)] + w 
    return output


####################################
# Obtains h2 = w**4/1.02**2
h2 = np.multiply(w.data/1.02, w.data/1.02)
h2 = np.multiply(h2, h2)

# Obtains and plots the result of the ola function
overlap_add = ola(h2, hop=int((1-overlap)*len(w.data)), Nb=500)
overlap_add = Signal("overlap_add", overlap_add, x.freq)
overlap_add.data_plot()

# 3.2.f

In [None]:
# Defines the variables k, R, M and N (notice that R = 7 instead of 1 in this case)
M = 32               # TDF order
k = 3                # Index of the frequency channel
R = ceil( M/(2*k) )  # Hop size (increment on analysis times)
N = len(x.data)      # Length of the original signal 

# Defines a Hanning window of length 31 (< M)
Nw = 31
w = Signal("w", data=np.hanning(Nw))

# Number of tdf's to calculate
Nt = int((len(x.data)-Nw)/R)

# Calculates Xtil
Xtil = np.zeros((M, Nt), dtype=complex)
for u in np.arange(0, Nt).reshape(-1):
    start = int(u*R + 1)      # Start of the interval
    end = int(u*R + 1 + Nw)   # End of the interval
    term = np.multiply(x.data[np.arange(start, end)], w.data)
    Xtil[:, u] = np.fft.fft(term, M)


# Defines a function that reconstructs the signal
def reconst(Xtil, w, R, M, Nw, N):
    xrecons = np.zeros(N)
    for b in np.arange(0, Xtil.shape[1]):
        start = int(R*b + 1)                  # Start of the interval 
        end = int(R*b + 1 + Nw)               # End of the interval
        inv = np.fft.ifft(Xtil[:, b])         # Calculates the inverse FFT
        term = np.multiply(inv[:Nw], w.data)  # Multiply the inverse with the window
        xrecons[np.arange(start, end)] = xrecons[np.arange(start, end)] + term
    return xrecons 

# Obtains the reconstructed signal
xrecons = reconst(Xtil, w, R, M, Nw, N)
xrecons = Signal("xrecons", xrecons, x.freq)

# Plots the reconstructed signal over the original one
t = np.linspace(0, len(x.data)/x.freq, len(x.data))
plt.figure(figsize=(15, 4))
plt.plot(t, xrecons.data)
plt.plot(t, x.data)
plt.xlim(t[0], t[-1])
plt.xlabel("Time [s]")
plt.legend("Reconstructed signal", "Original Signal")
plt.title("Comparison between original and reconstructed signal")
plt.show()

# Listen to the reconstructed signal
xrecons.listen()

# 3.3.g

In [None]:
# Defining the equalizer function
def equalizer(xtilde, weights):
    # Copying input
    a = xtilde.copy()
    # Iterating through half the elements in input (as it is symmetric)
    for k in np.arange(0, int(xtilde.shape[0]/2)+1):
        # Updating value on filtered output
        a[k, :] = xtilde[k, :]*weights[k]
        a[-k, :] = xtilde[k,:]*weights[k]
        # Returning filtered output
    return a

# Defining weights to be used in sound equalization
weights = np.ones(int(M/2)+1)

# Displaying frequency channels
print("Frequency channels:")
for i in range(int(M/2)):
    print("{} - {}Hz until {}Hz".format(i, x.freq*i/int(M/2), x.freq*(i+1)/int(M/2)))

# Selecting which channels to filter
start_filtering = 0
end_filtering = 7
weights[start_filtering:end_filtering+1]=0

# Copying Xtil
b = np.real(Xtil).copy()

# Equalizing based on the weights defined above
Y = equalizer(b,weights)

# Reconstruction of the sound wave from a filtered signal in frequency domain
Y2 = reconst(Y, w, R=R, M=M, Nw=Nw,N=N)

# Normalizing output sound to avoid amplitude peaks
Y2 = Y2 * max(x.data)/max(Y2)

# Presenting which channels are being filtered
print("Listening song filtering from channel {} to channel {}".format(start_filtering, end_filtering))

# Converting to an object of Class Signal
Y2 = Signal("Y2", Y2, freq=48000)

# Listening to the equalized sound
Y2.listen()