***************************************************

**Definitions and Imports:**

In [None]:
####################################
## DO NOT EDIT THIS CODE SECTION
import numpy as np
from matplotlib import pyplot as plt
from scipy.io.wavfile import read, write
from scipy.signal import correlate, find_peaks, lfilter, freqz, resample
from scipy.linalg import solve_toeplitz
from IPython.display import Audio
from IPython.display import display
from pydub import AudioSegment
from math import pi

##additional imports for metrics
import torch
import torchmetrics
from torchmetrics.functional.audio.pesq import perceptual_evaluation_speech_quality
from torchmetrics.functional import signal_noise_ratio
# For displaying interactive plots in the notebook
%matplotlib widget
####################################

**Useful functions:**

In [None]:
####################################
## DO NOT EDIT THIS CODE SECTION
def read_wav_file(filename: str) -> (int, np.ndarray):
    Fs, signal = read(filename)
    signal = signal.astype(np.float32)
    signal = signal/(2**15)
    return Fs, signal

def write_wav_file(filename: str, sampling_rate: int, signal: np.ndarray):
    signal = signal*(2**15)
    signal = signal.astype(np.int16)    
    write(filename, sampling_rate, signal)

def lpc(x: np.ndarray, p: int) -> np.ndarray:
    n = len(x)
    # Compute autocorrelation vector or matrix
    nextpow2 = 2**(2*n-1).bit_length()
    x_pad = np.pad(array=x, pad_width=(0, nextpow2-n))
    X = np.fft.fft(x_pad)
    R = np.fft.ifft(np.abs(X)**2)
    R = R/n # Biased autocorrelation estimate
    a = np.real(solve_toeplitz(R[0:p],-R[1:(p+1)]))
    a = np.insert(a,0,1)
    return a
    
def plot_pitch(signal: np.ndarray, pitch_vec: np.ndarray, sampling_frequency: int, frame_length: int):
    stvec = create_time_vector(signal.size, sampling_frequency)
    ptvec = np.arange(frame_length//2,signal.size,frame_length)/sampling_frequency
    pitch_vec_hz = np.zeros(pitch_vec.size)
    pitch_vec_hz[pitch_vec != 0] = sampling_frequency/pitch_vec[pitch_vec != 0]
    # plot the results
    fig = plt.figure()

    plt.subplot(211)
    plt.plot(stvec,signal)
    plt.xlabel('Time [Sec]')
    plt.ylabel('Amplitude [Volt]')
    plt.title('Speech Signal')

    plt.subplot(212)
    plt.stem(ptvec,pitch_vec_hz)
    plt.xlabel('Time [Sec]')
    plt.ylabel('Frequency [Hz]')
    plt.title('Pitch Frequency values')

    plt.tight_layout()

def plot_frame_and_filter_spectrum(fvec: np.ndarray, frameSpec_dB: np.ndarray, filterSpec_dB: np.ndarray, filterOrder: int):
    basic_plot(fvec, frameSpec_dB, 'Frequency [Hz]', 'Amplitude [dB]', f'Spectrum of voiced frame and spectrum of predictor filter of order {filterOrder}')
    plt.plot(fvec, filterSpec_dB)
    y_max = np.max([np.max(frameSpec_dB), np.max(filterSpec_dB)])
    y_min = np.min([np.min(frameSpec_dB), np.min(filterSpec_dB)])
    y_range = y_max-y_min
    _,_,_,_ = plt.axis((fvec[0], fvec[fvec.size-1], y_min-0.1*y_range, y_max+0.1*y_range))         
    

def create_signals_for_check(signal: np.ndarray) -> (np.ndarray, np.ndarray, np.ndarray):
    frame_len = 256
    reminder_samp = signal.size%frame_len
    if(reminder_samp > 0):
        pad_num = (frame_len-reminder_samp) # number of zeros for padding
        clean = np.pad(array=signal, pad_width=(0, pad_num))
    else:
        clean = signal      

    # (1) ennode and decode the signal
    N,p,lp,e = encoder(signal)
    reconstructed = decoder(N, p, lp, e)

    # (2) create a broken signal
    noisy = clean + np.sqrt(4*10e-6)*np.random.randn(clean.size)
    frames_num = noisy.size//frame_len
    noisy_frames = np.reshape(a=noisy, newshape=(frames_num, frame_len)) #Each 256 slice is a row   
    noisy_frames[np.arange(4,frames_num,8),:] = 0
    noisy_frames[np.arange(5,frames_num,8),:] = 0
    noisy_frames[np.arange(6,frames_num,8),:] = 0
    noisy_frames[np.arange(7,frames_num,8),:] = 0

    broken = noisy_frames.flatten()
    
    return clean, reconstructed, broken  

def PESQ_quality_check(original_signal:np.ndarray,distorted_signal:np.ndarray,fs:int,frame_len:int,) -> float:
    ### both the original and the reconstructed must be of the size n*frame_len where n is a natural number
    original_signal_interpolated = resample(original_signal, int(original_signal.size*2))
    distorted_signal_interpolated = resample(distorted_signal, int(distorted_signal.size*2))
    fs = fs*2
    N = int(original_signal_interpolated.size//frame_len)
    original_tensor = torch.from_numpy(original_signal_interpolated)
    distorted_tensor = torch.from_numpy(distorted_signal_interpolated)
    pesq_loss = perceptual_evaluation_speech_quality(original_tensor,distorted_tensor,fs,"wb")

    return pesq_loss.item()

def SegSNR_quality_check(original_signal:np.ndarray,distorted_signal:np.ndarray,fs:int,frame_len:int,) -> float:
    ### both the original and the reconstructed must be of the size n*frame_len where n is a natural number
    N = int(original_signal.size//frame_len)
    noise_signal = distorted_signal-original_signal
    original_tensor = torch.from_numpy(original_signal)
    distorted_tensor = torch.from_numpy(distorted_signal)
  
    pesq_loss = perceptual_evaluation_speech_quality(original_tensor,distorted_tensor,fs,"nb")
    
    SNRseg = np.zeros(original_signal.size)
    original_signal_reshaped = original_signal.reshape((N,frame_len))
    noise_signal_reshaped = noise_signal.reshape((N,frame_len))
    for i in range(N):
        P_original = sum(original_signal_reshaped[i,:] ** 2)
        P_noise = sum(noise_signal_reshaped[i,:] ** 2)
        SNRseg[i] = 10*np.log10(P_original/P_noise)
    segmental_snr_value = sum(SNRseg)/N

    return segmental_snr_value        
####################################

************************************************************

## Question 14

According to your ID numbers, load the chosen speech file.  
Pad the signal with zeros so it will contain an integer number of frames.  
Re-define the 'voiced' and 'unvoiced' frames, the same way you did in the last HW.

In [None]:
id_digit = 1 # insert your ID digit

# load speech signal
speechfilename = 'speech'+str(id_digit)+'.wav'
fs, speech_signal = ...

# Pad the signal with zeros
frameLen = 256 
speech_signal_padded = ...

# re-define the voiced and unvoiced exaple frames frm last meeting
voiced_frame_index = int(...)
voiced_example_frame = speech_signal_padded[(voiced_frame_index*frameLen):((voiced_frame_index+1)*frameLen)]

unvoiced_frame_index = int(...)
unvoiced_example_frame = speech_signal_padded[(unvoiced_frame_index*frameLen):((unvoiced_frame_index+1)*frameLen)]

Write a loop that will apply 'vu_classify' function, frame-by-frame, on the whole signal.  
plot the signal and the pitch values using the 'plot_pitch' function.

************************************************************

## Question 15

Copy your implementation of the function 'residual_energy' (from the previous parts).

In [None]:
def residual_energy(signal: np.ndarray, FIR_coeffs: np.ndarray) -> int:
    # insert code here

Calculate the lpc coefficients using 'lpc' function.  
Calculate the residual error using the function above.

In [None]:
a = lpc(...)
e = residual_energy(...)
print(f'The prediction filter coefficients are:')
print(a)
print(f'The prediction error is: {e}')

Plot the spectrum of the frame, along with the spectrum of the lpc filter using the function 'plot_frame_and_filter_spectrum'.

In [None]:
fvec = ...

_,H = freqz([1], a, frameLen//2)
H_dB = ...

voiced_example_frame_F_dB = ...

plot_frame_and_filter_spectrum(...)

*************************************************************

## Section 17.2

Repeat last section with other 'p' values, according to the booklet.

************************************************************

## Question 18

Complete the code for the encoder function:

In [None]:
def encoder(signal: np.ndarray) -> (int,np.ndarray,np.ndarray,np.ndarray):
    frame_len = 256
    sampling_freq = 8000    
    LPCorder = ...
    
    # Pad signal with zeros to contain an integer number of frames
    reminder_samp = signal.size%frame_len
    if(reminder_samp > 0):
        pad_num = (frame_len-reminder_samp) # number of zeros for padding
        signal_padded = np.pad(array=signal, pad_width=(0, pad_num))
    else:
        signal_padded = signal           
    
    # Divide the signals to frames
    speech_framed = np.reshape(a=signal_padded, newshape=(signal_padded.size//frame_len, frame_len)) #Each 256 slice is a row    
    
    # Encoder's variables init:
    ##### complete the dimensions ###
    N = ...                   # N is frames_num
    vu_flag = np.zeros(...)   # representing all frames 
    p = np.zeros(...)         # pitch duration for all frames. unvoiced frames are set to 0 pitch values. voiced frames are represented by the *index value* of the pitch (not in Hz)
    lp = np.zeros((..., ...)) # estimated LPC filter's coeffs for all frames
    e = np.zeros(...)         # the square root of the residual energies for all frames
    
    # Encoding process
    for i in range(N): # for each frame
        vu_flag[i] = ...
        if( ... ):
            p[i] = ...
        
        lp[:,i] = lpc(...,...) # estimate the transfer function of the mouth
        e[i] = np.sqrt(...) # average energy using residual_energy function

    return (N,p,lp,e)

Run the encoder function on the original signal (not the padded version) to see that it is working:

In [None]:
N,p,lp,e = encoder(...)

*************************************************************

## Section 18.2

Copy your implementation of the function 'generate_impulse_train_frame' (from the perliminary report).

In [None]:
def generate_impulse_train_frame(frame_len: int, pitch_period: int, reminder_from_last_frame: int) -> (np.ndarray, int):
    # insert code here
    return frame, reminder

Complete the code for the decoder function:

In [None]:
def decoder(N: int, p: np.ndarray ,lp: np.ndarray, e: np.ndarray) -> (np.ndarray):
    frame_len = 256
    estimated_frames = np.zeros((..., frame_len))  # representation of the synthesized frames of the decoded signal as a matrix (according to the number of frames)
    rem_last_frame = 0 # init of the the reminder-index (the gap from the end of the last frame to the location of the last impulse in the last frame)
    final_cond = np.zeros(...) # according to question 4 in the preliminary questions section in the booklet

    # Decoding process
    for i in range(N):   # for each frame
        # create exitation signal according to u/v decision
        if( ... ): # unvoiced    
            excitation_frame = np.random.randn(frame_len) # white noise
            rem_last_frame = ...
        else: # voiced
            excitation_frame, rem_next_frame = generate_impulse_train_frame(...) # impulse train
            rem_last_frame = ...

        # normalize the excitation signal and amplify it with the gain value. 
        # *DO NOT* forget to use square root of 'normalization_energy' in order to normalize the excitation signal
        normalization_energy = np.sum(excitation_frame**2)/frame_len
        excitation_frame_normalized = ... 
        excitation_frame_amplified = excitation_frame_normalized * ...
        
        # filter the exitation signal with the LPC coefficients, this time use it as an 'all-pole' filter
        estimated_frames[i,:], final_cond = lfilter(..., ..., ..., zi=final_cond)

    # concatenate all frames to a 1-dimentional signal
    return estimated_frames.flatten()

Run the decoder to get a synthesized version of the signal:

In [None]:
estimated_signal = decoder(...)

Plot the original and the synthesized signal (you can remove the padded section from 'estimated_signal' so both vectors will have the same size):

Play the two audio files and compare how they sound:

Save sythesized signal to file.

In [None]:
destinationfilename = 'speech'+str(id_digit)+'_synthesized.wav'
write_wav_file(destinationfilename, fs, estimated_signal)