# TP1: Conversion of Sampling Frequency and STFT

*By Daniel Jorge Deutsch, Kevin Kuhl and Brayam Santiago Velandia (25/09/2020)*

In [None]:
import os
import struct
import sys
import wave
import time
from copy import deepcopy
from math import ceil

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyaudio
from scipy import signal as sig
from scipy.io import wavfile

In [None]:
class Base:

    def __init__(self, name, data):
        self.name = name 
        self.data = np.asarray(data)


    #------------------------------#
    #--- SAMPLES ------------------#
    #------------------------------#

    def step_split_sample(self, step):
        datas = [self.data[i::step] for i in range(step)]
        max_len = max([len(data) for data in datas])
        for i, data in enumerate(datas):
            if len(data) != max_len:
                datas[i] = np.append(data, 0)
            datas[i] = self.__class__(f"{self.name}{i}", datas[i])
        return tuple(datas)       


    #------------------------------#
    #--- PLOTS --------------------#
    #------------------------------#
    
    def time_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):
        pass


    def spectrogram_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):
        pass


    # Should use when you have the filter's coefficients
    def freqz_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):

        freq, mag = sig.freqz(self.data)

        # Magnitude
        mag = 20*np.log10(np.abs(mag))
        mag[np.abs(mag-np.mean(mag)) > 2*np.std(mag)] = np.nan   # Remove outliers for the plot

        # Frequency   
        freq = freq/(2*np.pi)
        
        # Plot
        plt.figure(figsize=figsize)
        plt.plot(freq, mag)
        plt.xlim(freq[0], freq[-1])
        plt.ylabel("Magnitude [(]dB]")
        plt.xlabel("Frequency [(rad/sample)/2π]")
        plt.title(f"Frequency response of {self.name}")
        if xhighlights:
            for xhighlight in xhighlights:
                plt.axvline(x=xhighlight, color="red")
        if yhighlights:
            for yhighlight in yhighlights:
                plt.hline(y=yhighlight, color="red")
        if save:
            plt.savefig(f"./outputs/figures/{self.name}_freqz.png", dpi=300, bbox_inches="tight")
        plt.show() 

    
    # Should use when you have filter's impulse response
    def fft_plot(self, xhighlights=None, yhighlights=None, figsize=(15, 4), save=False):      
        
        # Magnitude
        len_fft = 4096                                           # Length of the transformed axis of the output
        mag = np.fft.fft(self.data, len_fft)                     # Obtains the magnitude of the signal
        mag = np.fft.fftshift(mag)                               # Shifts the zero-frequency component to the center of the spectrum
        mag = 20*np.log10(np.abs(mag))                           # Applies log to the result
        mag[np.abs(mag-np.mean(mag)) > 2*np.std(mag)] = np.nan   # Remove outliers for the plot

        # Frequency
        freq = np.linspace(-0.5, 0.5, len(mag))

        # Plot
        plt.figure(figsize=figsize)
        plt.plot(freq, mag)
        plt.xlim(freq[0], freq[-1])
        plt.ylabel("Magnitude [dB]")
        plt.xlabel("Frequency [(rad/sample)/2π]")
        plt.title(f"Frequency response of {self.name}")
        if xhighlights:
            for xhighlight in xhighlights:
                plt.axvline(x=xhighlight, color="red")
        if yhighlights:
            for yhighlight in yhighlights:
                plt.hline(y=yhighlight, color="red")
        if save:
            plt.savefig(f"./outputs/figures/{self.name}_fft.png", dpi=300, bbox_inches="tight")
        plt.show()

In [71]:
class Signal(Base):

    def __init__(self, name, data=None, freq=None, file=None):
        if not ((data is None) ^ (file == None)):
            raise Exception("You must provide a data or a .wav file")

        if freq:
            self.freq = int(freq)

        if file:
            freq, data = wavfile.read(file)
            self.freq = int(freq)
        Base.__init__(self, name, data)
    

    #------------------------------#
    #--- SHIFTING -----------------#
    #------------------------------#

    def shift(self, name, power_of_z):
        data = self.data
        data = np.append(data[power_of_z:], power_of_z*[0]) if power_of_z > 0 else np.append(abs(power_of_z)*[0], data[:power_of_z])
        return self.__class__(name, data, self.freq)

    
    #------------------------------#
    #--- SAMPLING -----------------#
    #------------------------------#

    def under_sample(self, name, M):
        datas = self.step_split_sample(M)
        return self.__class__(name=name, data=datas[0].data, freq=self.freq/M)


    def over_sample(self, name, L):
        data = np.insert(self.data, range(1, len(self.data)+1)[::L-1], 0)
        return self.__class__(name=name, data=data, freq=self.freq*L) 
            

    #------------------------------#
    #--- LISTEN -------------------#
    #------------------------------#
    def listen(self):
        
        # Saves the .wav
        if not self.freq:
            raise Exception("the sample freq (sample/sec) must be provided")
        wavfile.write(f"./outputs/sounds/{self.name}.wav", self.freq, np.asarray(self.data, dtype=np.int16))

        # Uses pyaudio to play the signal
        chunk = 1024
        pa = pyaudio.PyAudio()
        audio = wave.open(f"./outputs/sounds/{self.name}.wav", "rb")
        stream = pa.open(
            format = pa.get_format_from_width(audio.getsampwidth()),
            channels = audio.getnchannels(),
            rate = audio.getframerate(),
            output = True
        )
        data = audio.readframes(chunk)
        while data:
            stream.write(data)
            data = audio.readframes(chunk)
        stream.stop_stream()
        stream.close()
        pa.terminate()

In [72]:
class Filter(Base):

    def apply(self, name, signal):
        conv = np.convolve(self.data, signal.data)
        return Signal(name, data=conv, freq=signal.freq)

# 2.1

In order to implement the direct resampling, we should use three blocks like shown in the following image:

![title](assets/imgs/img01.png)

<br>

# 2.2

In [75]:
# Obtains the input signal x from its file
x = Signal(name="x", file="./inputs/caravan_48khz.wav")

# Defines the over and under sampling constants
L = 2
M = 3

# Defines the Remez filter params
numtaps = 100                       # An even number (can't be too big)
trans_width = 1/13                  # ?
cutoff = min( 1/(2*L), 1/(2*M) )    # Cutoff frequency (should be 1/(2*M) = 1/6)

# Obtains the Remez filter
h = Filter("h", sig.remez(numtaps, [0, cutoff, cutoff+trans_width, 1/2], [L, 0]))

# Runs the system
start_dir = time.perf_counter()      # Start counting (for exercise 2.5)
w = x.over_sample("w", L)            # Obtains the over sampled signal w using L=2 (Fw=48*2=96kHz)
v = h.apply("v", w)                  # Calculates the convolution v=w*h
y = v.under_sample("y", M)           # Undersample v to obtain the output of the system using M=3 (Fy=96/3=32kHz)
end_dir = time.perf_counter()        # Ends counting (for exercise 2.5)

# 2.3

We have initially:

![title](assets/imgs/img02.png)

Which can be written as:

![title](assets/imgs/img03.png)

Which can also be written as:

![title](assets/imgs/img04.png)

<br>

# 2.4

![title](assets/imgs/img05.png)
![title](assets/imgs/img06.png)
![title](assets/imgs/img07.png)
![title](assets/imgs/img08.png)
![title](assets/imgs/img09.png)
![title](assets/imgs/img10.png)

<br>

# 2.5

In [76]:
# Obtains the filters
H0, H1 = h.step_split_sample(L)
H00, H01, H02 = H0.step_split_sample(M)
H10, H11, H12 = H1.step_split_sample(M)

# Starts counting (for exercise 2.5)
start_opt = time.perf_counter()

# Obtains all the k's
w0 = x.shift("w0", 1)
w1 = w0.shift("w1", -1)
w2 = w1.shift("w2", -1)
w3 = x
w4 = w3.shift("w4", -1)
w5 = w4.shift("w5", -1)

# Obtains all the v's
v0 = w0.under_sample("v0", M)
v1 = w1.under_sample("v1", M)
v2 = w2.under_sample("v2", M)
v3 = w3.under_sample("v3", M)
v4 = w4.under_sample("v4", M)
v5 = w5.under_sample("v5", M)

# Obtains all the u's
u0 = H00.apply("u0", v0)
u1 = H01.apply("u0", v1)
u2 = H02.apply("u0", v2)
u3 = H10.apply("u0", v3)
u4 = H11.apply("u0", v4)
u5 = H12.apply("u0", v5)

# Obtains all the k's
k0 = Signal("k0", u0.data+u1.data+u2.data, u0.freq)
k1 = Signal("k1", u3.data+u4.data+u5.data, u3.freq)

# Obtains all the p's
p0 = k0.over_sample("p0", L)
p1 = k1.over_sample("p1", L)

# Obtains all the q's
q0 = p0.shift("q0", -1)

# Obtains y
y = Signal("y", q0.data+p1.data, q0.freq)

# Ends counting (for exercise 2.5)
end_opt = time.perf_counter()

# Listen to the output
y.listen()

In [77]:
print(f"Direct implementation execution time: {end_dir-start_dir}")
print(f"Optimal implementation execution time: {end_opt-start_opt}")

Direct implementation execution time: 0.1871553999990283
Optimal implementation execution time: 0.13018709999960265
