In [0]:
import os
import numpy as np
from scipy.io import loadmat, wavfile

# Get current notebook directory
notebook_dir = os.getcwd()
file_path = os.path.join(notebook_dir, "test.mat")
speech_file_path = os.path.join(notebook_dir, "speech.wav")

input = loadmat(file_path)
a = input["a"].flatten()
x = input["x"].flatten()

speech_input = wavfile.read(speech_file_path)


def to_float(x):
    if x.dtype == np.int16:
        return x.astype(np.float32) / 32768.0
    if x.dtype == np.int32:
        return x.astype(np.float32) / 2147483648.0
    return x.astype(np.float32)

speech_fs = speech_input[0]
speech_data = speech_input[1]
speech_data = to_float(speech_data)

speech_data -= np.mean(speech_data)

### 1.1

In [0]:
# 1)

import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import acf
from scipy.signal import lfilter

def lpc(x, k):

    acf_values = acf(x, nlags=k+1)
    
    cov = np.zeros((k,k))
    cross = np.zeros(k)
    for i in range(k):
        # We negate the rhs
        cross[i] = -acf_values[i+1]
        for j in range(k):
            cov[i, j] = acf_values[np.abs(i-j)]

    h = np.linalg.inv(cov) @ cross

    # Here the analysis filter is used, which is the inverse of the synthesis filter.
    # In the synthesis filter we add 1 in front of the filter 
    # We add [0] in front of the coeffecients, since we are using earlier samples to predict the current ([0]) sample
    b = np.concatenate([[0], -h])  # numerator
    a = [1]                        # denominator

    x_hat = lfilter(b, a, x)

    e = x - x_hat

    mse = np.mean(e**2)
    rmse = mse / np.var(x)

    return e, h, x_hat, rmse

e, h, x_hat, rmse  = lpc(x, k=15)

plot_num = 1000

plt.figure()
plt.plot(x[:plot_num], label="x")
plt.plot(x_hat[:plot_num], label="x estimated")
plt.plot(e[:plot_num], label="error")
plt.legend()

plt.figure()
plt.plot(a, label="a")
plt.plot(h, label="a estimated")
plt.legend()
plt.title("Filter coeffecients")

    

### 1.2

In [0]:
from scipy.signal.windows import hann

# a)

# Here we split the speech into chunks and place each chunk as a ROW in the matrix
# This is much easier to work with in Python

chunk_len = 240

def chunks(x, N):
    L = len(x)
    M = L // N          # number of full chunks
    return x[:M*N].reshape(M, N)

speech_matrix = chunks(speech_data, chunk_len)

# b)

speech_energy = np.sum(speech_matrix**2, axis=1)

speech_energy_argmax = np.argmax(speech_energy)

plt.plot()
plt.plot(np.arange(chunk_len)*(1/speech_fs), speech_matrix[speech_energy_argmax,:])
plt.title(f"Max energy chunk idx = {speech_energy_argmax}")

plt.figure()
plt.plot(np.sqrt(speech_energy))
plt.title("Sqrt Speech Energy")

plt.figure()
plt.plot(speech_matrix.ravel())

In [0]:
# Class to mimic MatLab dsp.ZeroCrossingDetector()
class ZeroCrossingDetector:
    """
    Python equivalent of MATLAB dsp.ZeroCrossingDetector.
    - Stateful: retains previous sign across frames
    - Zero-handling matches MATLAB: zeros inherit previous nonzero sign
    """
    def __init__(self):
        self.prev_sign = 0  # same as MATLAB initial state

    def process(self, x):
        x = np.asarray(x)

        # compute sign
        s = np.sign(x).astype(int)

        # propagate sign through zeros, matching MATLAB behavior
        for i in range(len(s)):
            if s[i] == 0:
                s[i] = self.prev_sign
            else:
                break  # once a nonzero sample is found, no need for more

        # count zero-crossings including transition from previous frame
        full = np.concatenate([[self.prev_sign], s])
        zc = np.count_nonzero(np.diff(full))

        # update internal state: last nonzero sign if possible
        if np.any(s != 0):
            self.prev_sign = s[s != 0][-1]

        return zc

In [0]:
# c)

# I could not get the zero crossing method to work reliable so I used energy for voiced detection.

zc_threshold = chunk_len // 2
energy_threshold = 0.025

zcd = ZeroCrossingDetector()

speech_zc = np.array([zcd.process(speech_matrix[i, :]) for i in range(speech_matrix.shape[0])])
#speech_voiced = [1 if speech_zc[i] < zc_threshold else 0 for i in range(len(speech_zc))]
speech_voiced_ = np.array([np.ones(chunk_len) if speech_energy[i] > energy_threshold**2 else np.zeros(chunk_len) for i in range(len(speech_energy))])
speech_voiced = np.array([1 if speech_energy[i] > energy_threshold**2 else 0 for i in range(len(speech_energy))])

plt.figure()
plt.plot(speech_matrix.ravel(), label="Speech")
plt.plot(speech_voiced_.ravel()/10, label="Speech Voiced")
plt.legend()

In [0]:
# d)

excitation = []
coefs = []
predicted = []

for i in range(speech_matrix.shape[0]):

    x_in = speech_matrix[i, :]
    e, h, x_hat, rmse_  = lpc(x_in, k=15)
    excitation += [e]
    coefs += [h]
    predicted += [x_hat]

# Create arrays
excitation = np.array(excitation)
coefs = np.array(coefs)
predicted = np.array(predicted)
rmse = np.array(rmse)

start = 4400
stop = 5000

idx = speech_energy_argmax

plt.figure()
plt.plot(speech_matrix.ravel()[start:stop], label="Signal")
plt.plot(predicted.ravel()[start:stop], label="Predicted")
plt.plot(excitation.ravel()[start:stop], label="Excitation")
plt.legend()

In [0]:
# e)

speech_matrix = speech_matrix.astype(np.float64)
coefs = coefs.astype(np.float64)

org_bytes = speech_matrix.nbytes
comp_bytes = coefs.nbytes

print(f"Original signal bytes: {org_bytes}, compressed bytes: {comp_bytes}, compression ration {org_bytes/comp_bytes:.2f}")


### 1.3


In [0]:
# a, b and c

def UV_exicte(N):
    return np.random.normal(0, 1, N)

def V_exicte(N, pitch_hz, fs):
    
    # Calculate sample spacing between pitches
    excitation_delta_time = 1/pitch_hz
    sample_delta_time = 1/fs

    # We calculate the delta samples needed for the pitch
    Np = int(np.round(excitation_delta_time / sample_delta_time))

    exc = np.zeros(N, dtype=float)

    # Find impulse indices
    impulse_indices = np.arange(0, N, Np)

    # Set impulse indices to 1
    exc[impulse_indices] = 1

    return exc

def synth(exc, h):
    a_full = np.concatenate([[1], h])
    b = [1]  # all-pole filter
    x_synth = lfilter(b, a_full, exc)
    return x_synth

# Determines which exication to use
def get_excite(idx, speech_voiced):
    if speech_voiced[idx] == 1:
        return V_exicte(chunk_len, pitch_hz=133, fs=speech_fs)
    else:
        return UV_exicte(chunk_len)
    
# Create excitation matrix
speech_exication = np.array([get_excite(idx=i, speech_voiced=speech_voiced) for i in range(speech_voiced.shape[0])])
print(speech_exication.shape)

# Create speech synth matrix
speech_synth = np.array([synth(exc=speech_exication[i], h=coefs[i, :]) for i in range(speech_exication.shape[0])])

#Rescale energy
speech_synth = (speech_synth.T * np.sqrt(speech_energy/np.sum(speech_synth**2, axis=1))).T

#excitation_ = get_excite(idx=speech_energy_argmax, speech_voiced=speech_voiced)  
#print(excitation_.shape)
#x_synth = synth(exc=excitation_, h=coefs[speech_energy_argmax, ])


plt.figure()
plt.plot(speech_synth[speech_energy_argmax, :], label="Speech synth")
plt.plot(speech_matrix[speech_energy_argmax, :], label="Speech Original")
plt.legend()

plt.figure()
plt.plot(speech_synth.ravel(), label="Speech synth")
plt.plot(speech_matrix.ravel(), label="Speech Original")
plt.legend()

int16_audio = np.int16(speech_synth.ravel() * 32767)
wavfile.write("speech_synth.wav", speech_fs, int16_audio)


There is a clear difference between the original and synthesized signal, but the harmonics seems to match up. The quality of the signal is not the best, it sounds like a robot speaking but the speech can be understood.

To make the speech easier to understand, a better voiced detector and some overlaps in the speech frames could be used.

### 2.1


The LS estimator in closed form is

$$\hat{\boldsymbol{\theta}} = (\mathbf{H}^T \mathbf{H})^{-1} \mathbf{H}^T \mathbf{y}$$

Where \\(\mathbf{H}\\) is the design matrix with rows \\(\mathbf{h_1}=[1, x_1, x_1^2]\\) to \\(\mathbf{h_N}=[1, x_N, x_N^2]\\) for \\(M=3\\)

In [0]:
import numpy as np
import matplotlib.pyplot as plt

N = 100

i = np.arange(1,N+1)

x = np.sin(2*np.pi*i/N)

w = np.random.normal(0, 0.2, N)

y = x + w

train_samples = 10
M = 3

train_idx = np.random.randint(low=0, high=N, size=train_samples)

y_train = y[train_idx]
x_train = x[train_idx]

### 2.2

In [0]:
H = np.zeros((N, M+1))

for t in range(N):
    for m in range(M+1):
        H[t, m] = i[t]**m

H_train = H[train_idx, :]

theta_hat = np.linalg.pinv(H_train) @ x_train

x_hat = H @ theta_hat



### 2.3

In [0]:
plt.figure()
plt.plot(x, label="x")
plt.plot(y, label="y")
plt.plot(x_hat, label="x_hat")
plt.plot(x-x_hat, label="Error")
plt.legend()
plt.title(f"M={M}")

The estimated model is very close to the ground truth

### 2.4

Here 10 samples is used for training on a 10 order polynomial. It is seen that we're overfitting.

In [0]:
plt.figure()
plt.plot(x, label="x")
plt.plot(y, label="y")
plt.plot(x_hat, label="x_hat")
plt.plot(x-x_hat, label="Error")
plt.legend()
plt.title(f"M={M}")

### 2.5

If we increase the number of training samples to 50, we're decreasing overfit, but the model is still not as good as when M=3. 

In [0]:
plt.figure()
plt.plot(x, label="x")
plt.plot(y, label="y")
plt.plot(x_hat, label="x_hat")
plt.plot(x-x_hat, label="Error")
plt.legend()
plt.title(f"M={M}")