# Step Zero:

## Import Libraries:

In [None]:
!pip install librosa numpy soundfile matplotlib noisereduce scipy
!pip install hmmlearn colorama collections scipy

In [None]:
import librosa
import numpy as np
import os
import soundfile as sf
import matplotlib.pyplot as plt
import noisereduce as nr
from scipy.spatial.distance import euclidean
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from hmmlearn import hmm
from sklearn.model_selection import train_test_split
from colorama import Fore, Style
from collections import defaultdict
from sklearn.metrics import accuracy_score
import scipy.stats

# Step One:

## Set Parameters 

In [None]:
initial_recordings = 'recordings'
preprocessed_recordings = 'preprocessed_recordings'
mfcc_features = 'mfcc_features'

TARGET_SAMPLING_RATE = 16000

N_MFCC = 13
N_TIME_MFCC = 30



## Preprocessing:

In [None]:
def preprocess_audio(input_path, output_path, target_sampling_rate = TARGET_SAMPLING_RATE):
    # Load the audio file
    audio, sampling_rate = librosa.load(input_path, sr = target_sampling_rate)

    # Noise reduction
    reduced_noise_audio = nr.reduce_noise(y=audio, sr = sampling_rate)

    # Silence removal
    non_silent_audio, _ = librosa.effects.trim(audio)

    # Normalize the audio to a standard volume level
    normalized_audio = librosa.util.normalize(non_silent_audio)

    # Save the processed audio file
    sf.write(output_path, normalized_audio, target_sampling_rate)

In [None]:
def preprocess_all_audio(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.wav'):
            file_path = os.path.join(input_folder, file_name)
            output_path = os.path.join(output_folder, file_name)
            preprocess_audio(file_path, output_path)
            #print(f"Processed {file_name}")

In [None]:
preprocess_all_audio(initial_recordings, preprocessed_recordings)

## Feature Extraction

### Extract MFCC

In [None]:
def pad_mfcc(mfcc_feature, n_time_mfcc = N_TIME_MFCC):
    temp = np.tile(mfcc_feature, (1 ,int(np.ceil(n_time_mfcc / mfcc_feature.shape[1])) ))
    padded_mfcc_features= temp[:,:n_time_mfcc]
    return padded_mfcc_features

In [None]:
def extract_mfcc(file_path, target_sampling_rate = TARGET_SAMPLING_RATE, n_mfcc = N_MFCC, n_time_mfcc = N_TIME_MFCC):
    audio, _ = librosa.load(file_path)
    mfcc_feature = librosa.feature.mfcc(y = audio, sr = target_sampling_rate, n_mfcc = n_mfcc)
    mfcc_feature = pad_mfcc(mfcc_feature, n_time_mfcc)
    return mfcc_feature

In [None]:
def extract_features_for_all_files(recordings_folder, mfcc_folder):
    if not os.path.exists(mfcc_folder):
        os.makedirs(mfcc_folder)

    for file_name in os.listdir(recordings_folder):
        if file_name.endswith('.wav'):
            file_path = os.path.join(recordings_folder, file_name)
            mfcc_feature = extract_mfcc(file_path)
            # Save mfcc_feature as file
            output_file_path = os.path.join(mfcc_folder, file_name.replace('.wav', '.npy'))
            np.save(output_file_path, mfcc_feature)
            print(f"MFCC features extracted and saved for {file_name} , shape = {mfcc_feature.shape}")

In [None]:
input_folder = 'preprocessed_recordings'
output_folder = 'mfcc_features'
extract_features_for_all_files(preprocess_all_audio, output_folder)

### Heat Map:

In [None]:
def plot_mfcc_heatmaps(mfcc_dict):
    for file_name, mfcc in mfcc_dict.items():
        plt.figure(figsize=(15, 4))
        plt.title(f'MFCC Heatmap for {file_name}')
        librosa.display.specshow(mfcc, x_axis='time')
        plt.yticks(range(0, 13))
        plt.ylabel('MFCC')
        plt.colorbar(format='%+2.0f dB')
        plt.tight_layout()
        plt.show()

In [None]:
mfcc_folder = 'mfcc_features'
index_of_interest = 0

files_with_index_0 = [f for f in os.listdir(mfcc_folder) if f.endswith('.npy') and f.split('_')[-1].replace('.npy', '') == str(index_of_interest)]

mfcc_features0 = {file: np.load(os.path.join(mfcc_folder, file)) for file in files_with_index_0}
plot_mfcc_heatmaps(mfcc_features0)

# Step Two:

## Prepare Data

In [None]:
def prepare_data(mfcc_folder, m):
    train_features, test_features = [], []
    train_labels, test_labels = [], []

    split_index = m // 5
    for file_name in os.listdir(mfcc_folder):
        digit_label, speaker_name, index_str =  file_name[:-4].split('_')
        index = int(index_str)
        file_path = os.path.join(mfcc_folder, file_name)
        file_path = os.path.join(mfcc_folder, file_name)
        mfcc = np.load(file_path)

        if index < split_index:
            train_features.append(mfcc.T)
            train_labels.append(file_name)
        else:
            test_features.append(mfcc.T)
            test_labels.append(file_name)

    return train_features, train_labels, test_features, test_labels

In [None]:
m = 50
mfcc_folder = 'mfcc_features'
train_features, train_labels, test_features, test_labels = prepare_data(mfcc_folder, m)

In [None]:
train_labels

In [None]:
test_labels

## HMM:

# Step Three: Training and Testing

In [None]:
# Function to extract just the digit from the filename
def extract_digit(label):
    digit_label = label.split('_')[0]  # Extracting digit from the filename
    return digit_label

In [None]:
# Organize training data by digit
training_data_by_digit = defaultdict(list)
for mfcc, label in zip(train_features, train_labels):
    digit = extract_digit(label)
    training_data_by_digit[digit].append(mfcc)

# Similar for testing data
# testing_data_by_digit = defaultdict(list)
# for mfcc, label in zip(test_features, test_labels):
#     digit = extract_digit(label)
#     testing_data_by_digit[digit].append(mfcc)

## Part 1:

#### Digit:

In [None]:
# Initialize HMM parameters
n_components = 30
hmm_models_digit = {}

for digit, data in training_data_by_digit.items():
    # Combine the sequences for this digit and find lengths for fitting
    lengths = [len(sequence) for sequence in data]
    X = np.concatenate(data)

    # Define and train the HMM
    model = hmm.GaussianHMM(n_components=n_components)
    model.fit(X, lengths)
    hmm_models_digit[digit] = model


In [None]:
# Prepare true labels for accuracy calculation
test_true_digits = [extract_digit(label) for label in test_labels]

# Predictions
test_predictions_digits = []

for mfcc in test_features:
    best_score, best_digit = float("-inf"), None
    for digit, model in hmm_models_digit.items():
        score = model.score(mfcc)
        if score > best_score:
            best_score, best_digit = score, digit
    test_predictions_digits.append(best_digit)


In [None]:

for i in range(len(test_predictions_digits)):
    if test_true_digits[i] != test_predictions_digits[i]:
        print(Fore.RED + f"\033[1m{test_true_digits[i], test_predictions_digits[i]}\033[0m"+ Style.RESET_ALL)
    else:
        print(Fore.BLUE  + f"{test_true_digits[i], test_predictions_digits[i]}"+ Style.RESET_ALL)

In [None]:
# Calculate accuracy
accuracy = accuracy_score(test_true_digits, test_predictions_digits)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

#### Speaker:

In [None]:
# Function to extract just the digit from the filename
def extract_speaker(label):
    speaker_label = label.split('_')[1]
    return speaker_label

# Organize training data by digit
training_data_by_speaker = defaultdict(list)
for mfcc, label in zip(train_features, train_labels):
    speaker = extract_speaker(label)
    training_data_by_speaker[speaker].append(mfcc)

# Similar for testing data (might be used later)
# testing_data_by_speaker = defaultdict(list)
# for mfcc, label in zip(test_features, test_labels):
#     speaker = extract_speaker(label)
#     testing_data_by_speaker[speaker].append(mfcc)


In [None]:
# Initialize HMM parameters
n_components = 30
hmm_models_speaker = {}

for speaker, data in training_data_by_speaker.items():
    # Combine the sequences for this digit and find lengths for fitting
    lengths = [len(sequence) for sequence in data]
    X = np.concatenate(data)

    # Define and train the HMM
    model = hmm.GaussianHMM(n_components=n_components)
    model.fit(X, lengths)
    hmm_models_speaker[speaker] = model


In [None]:
# Prepare true labels for accuracy calculation
test_true_speakers = [extract_speaker(label) for label in test_labels]

# Predictions
test_predictions_speakers = []

for mfcc in test_features:
    best_score, best_speaker = float("-inf"), None
    for speaker, model in hmm_models_speaker.items():
        score = model.score(mfcc)
        if score > best_score:
            best_score, best_speaker = score, speaker
    test_predictions_speakers.append(best_speaker)


In [None]:
   for i in range(len(test_predictions_speakers)):
    if test_true_speakers[i] != test_predictions_speakers[i]:
        print(Fore.RED + f"\033[1m{test_true_speakers[i], test_predictions_speakers[i]}\033[0m"+ Style.RESET_ALL)
    else:
        print(Fore.BLUE  + f"{test_true_speakers[i], test_predictions_speakers[i]}"+ Style.RESET_ALL)

In [None]:
# Calculate accuracy
accuracy = accuracy_score(test_true_speakers, test_predictions_speakers)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

## Part 2:

### Define HMM class

In [None]:
class HMM:
    def __init__(self, num_hidden_states):
        self.num_hidden_states = num_hidden_states
        self.rand_state = np.random.RandomState(1)

        self.initial_prob = self._normalize(self.rand_state.rand(self.num_hidden_states, 1))
        self.transition_matrix = self._stochasticize(self.rand_state.rand(self.num_hidden_states, self.num_hidden_states))

        self.mean = None
        self.covariances = None
        self.num_dimensions = None

    def _forward(self, observation_matrix):
        log_likelihood = 0.
        T = observation_matrix.shape[1]
        alpha = np.zeros(observation_matrix.shape)

        for t in range(T):
            if t == 0:
                alpha[:, t] = observation_matrix[:, t] * self.initial_prob.flatten() * observation_matrix[:, t]
                ## TODO: Forward algorithm for the first time step
            else:
                alpha[:, t] = observation_matrix[:, t] * np.dot(alpha[:, t-1], self.transition_matrix)
                ## TODO: Forward algorithm for the next time steps

            alpha_sum = np.sum(alpha[:, t])
            alpha[:, t] /= alpha_sum
            log_likelihood += np.log(alpha_sum)

        return log_likelihood, alpha

    def _backward(self, observation_matrix):
        T = observation_matrix.shape[1]
        beta = np.zeros(observation_matrix.shape)

        beta[:, -1] = np.ones(observation_matrix.shape[0])

        for t in range(T - 1)[::-1]:
            beta[:, t] = np.dot(self.transition_matrix.T,(observation_matrix[:, t+1] * beta[:, t+1]))
            ## TODO: Backward algorithm for the time steps of the HMM
            beta[:, t] /= np.sum(beta[:, t])

        return beta

    def _state_likelihood(self, obs):
        obs = np.atleast_2d(obs)
        B = np.zeros((self.num_hidden_states, obs.shape[1]))

        for s in range(self.num_hidden_states):
            mean_T =self.mean[:, s].T
            covariance_T = self.covariances[:, :, s].T
            obs_T = obs.T
            B[s, :] = multivariate_normal.pdf(x = obs_T, mean=mean_T, cov=covariance_T)
            ## TODO: Compute the likelihood of observations with multivariate normal pdf
        return B

    def _normalize(self, x):
        return (x + (x == 0)) / np.sum(x)

    def _stochasticize(self, x):
        return (x + (x == 0)) / np.sum(x, axis=0)

    def _em_init(self, obs):
        if self.num_dimensions is None:
            self.num_dimensions = obs.shape[0]
        if self.mean is None:
            subset = self.rand_state.choice(
                np.arange(self.num_dimensions), size=self.num_hidden_states, replace=False)
            self.mean = obs[:, subset]
        if self.covariances is None:
            self.covariances = np.zeros(
                (self.num_dimensions, self.num_dimensions, self.num_hidden_states))
            self.covariances += np.diag(np.diag(np.cov(obs)))[:, :, None]

        return self

    def _em_step(self, obs):
        obs = np.atleast_2d(obs)
        T = obs.shape[1]

        B = self._state_likelihood(obs) ## TODO

        log_likelihood, alpha = self._forward(B) ## TODO
        beta = self._backward(B) ## TODO

        xi_sum = np.zeros((self.num_hidden_states, self.num_hidden_states))
        gamma = np.zeros((self.num_hidden_states, T))

        for t in range(T - 1):
            partial_sum = self.transition_matrix.T * np.inner(np.outer(B[:, t + 1],beta[:, t + 1]), alpha[:, t]) ## TODO
            xi_sum += self._normalize(partial_sum)
            partial_g = alpha[:, t] * beta[:, t] ## TODO
            gamma[:, t] = self._normalize(partial_g)
        partial_g = alpha[:, -1] * beta[:, -1] ## TODO
        gamma[:, -1] = self._normalize(partial_g)

        expected_prior = gamma[:, 0] ## TODO
        expected_transition = self._stochasticize(xi_sum)

        expected_covariances = np.zeros(
            (self.num_dimensions, self.num_dimensions, self.num_hidden_states))
        expected_covariances += .01 * np.eye(self.num_dimensions)[:, :, None]

        gamma_state_sum = np.sum(gamma, axis=1)
        gamma_state_sum = gamma_state_sum + (gamma_state_sum == 0)

        expected_mean = np.zeros((self.num_dimensions, self.num_hidden_states))
        for s in range(self.num_hidden_states):
            gamma_obs = obs * gamma[s, :]
            expected_mean[:, s] = np.sum(
                gamma_obs, axis=1) / gamma_state_sum[s]

        self.initial_prob = expected_prior
        self.mean = expected_mean
        self.transition_matrix = expected_transition

        return log_likelihood

    def train(self, obs, num_iterations=1):
        for i in range(num_iterations):


            self._em_init(obs)
            self._em_step(obs)
        return self

    def score(self, obs):
        B = self._state_likelihood(obs)
        log_likelihood, _ = self._forward(B)
        return log_likelihood

### Digit:

In [None]:
# Initialize HMM parameters
num_hidden_states = 13
hmm_models_digit_2 = {}
i = 0
for digit, data in training_data_by_digit.items():
    model = HMM(num_hidden_states)
    X = np.concatenate(data)
    model.train(X.T, 1)
    hmm_models_digit_2[digit] = model

In [None]:
# Prepare true labels for accuracy calculation
test_true_digits = [extract_digit(label) for label in test_labels]

# Predictions
test_predictions_digits = []

for mfcc in test_features:
    best_score, best_digit = float("-inf"), None
    for digit, model in hmm_models_digit_2.items():
        score = model.score(mfcc.T)
        if score > best_score:
            best_score, best_digit = score, digit
    test_predictions_digits.append(best_digit)


In [None]:

for i in range(len(test_predictions_digits)):
    if test_true_digits[i] != test_predictions_digits[i]:
        print(Fore.RED + f"\033[1m{test_true_digits[i], test_predictions_digits[i]}\033[0m"+ Style.RESET_ALL)
    else:
        print(Fore.BLUE  + f"{test_true_digits[i], test_predictions_digits[i]}"+ Style.RESET_ALL)

In [None]:
# Calculate accuracy
accuracy = accuracy_score(test_true_digits, test_predictions_digits)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

### Speaker:

In [None]:
 hmm_models = []

## TODO
# Iterate through each unique digti in the training data
# extract features from the audio files, and train a HMM for
# each genre, storing the trained models in the hmm_models list.


In [None]:
## TODO
# In this section, iterate through the audio files, extract
# features, and use the trained HMMs to predict the genre
# for each audio file

# Step Four: Results