In [1]:
# Import libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import os
import pickle
import math

from classification.datasets import Dataset_augmented
from classification.utils.audio_student import AudioUtil, Feature_vector_DS
from classification.utils.plots import (
    plot_decision_boundaries,
    plot_specgram,
    show_confusion_matrix,
)
from classification.utils.utils import accuracy

In [2]:
# Hyperparameters other than those of the model
Nft = 512 # number of samples in the Fourier transform
nmel = 20 # number of mel bands
pca_components = int(math.ceil(11025/Nft) * nmel / 2) # number of components in PCA
# threshold = 0.05 # threshold under which we discard the input

In [3]:
# Loading the dataset and splitting it into training and test sets

# Load the dataset
dataset = Dataset_augmented()
classnames = dataset.list_classes() #['chainsaw', 'fire', 'fireworks', 'gun']
np.random.seed(42)  # For reproducibility

for i in range(4):
    random_indexes = [np.random.choice(280, 196, replace=False) for _ in range(4)]
    test_indexes = [np.setdiff1d(np.arange(280), random_indexes[i]) for i in range(4)]

In [4]:
# This whole cell is to compute the PCA on the training set and create the feature vectors, not optimized at all but it works

# Computing the mel spectrogram of each audio file and saving in a folder
folder_path = "src/classification/datasets/melspectrograms/"
number_audio_files = 280
n_win_files = np.zeros(4*number_audio_files)

for class_index in range (len(classnames)):
    for audio_index in range(number_audio_files):
        current_sound = dataset[classnames[class_index], audio_index]
        current_audio = AudioUtil.open(current_sound)
        current_audio = AudioUtil.resample(current_audio, 11025)
        
        # we will split the audio into 1 second window, and compute the mel spectrogram of each clip
        n_win = (len(current_audio[0]) // 11025) + 1
        n_win_files[class_index * 280 + audio_index] = int(n_win)
        for window in range(n_win):
            sub_aud = (current_audio[0][window * 11025 :], current_audio[1])
            sub_aud = AudioUtil.pad_trunc(sub_aud, 950)
            sgram = AudioUtil.melspectrogram(sub_aud, Nmel=nmel, Nft=Nft)
            ncol = int(11025 / Nft)
            sgram = sgram[:, :ncol]
            fv = sgram.reshape(-1)
            # saving the mel spectrogram in .npy format
            np.save(folder_path + classnames[class_index] + str(audio_index) + "_" + str(window) + ".npy", fv)

fv_len = len(fv)


# Creating all the feature vectors to compute the PCA
total_number_window_training = np.sum(n_win_files[random_indexes[0]]) + np.sum(n_win_files[random_indexes[1]]) + np.sum(n_win_files[random_indexes[2]]) + np.sum(n_win_files[random_indexes[3]])
X_train = np.zeros((int(total_number_window_training), int(fv_len)))
y_train = np.zeros(int(total_number_window_training))

# we will use the indexes to load the mel spectrograms and compute the PCA
index = 0
for class_index in range (len(classnames)):
    for audio_index in random_indexes[class_index]:
        for window in range(int(n_win_files[class_index * 280 + audio_index])):
            X_train[index, :] = np.load(folder_path + classnames[class_index] + str(audio_index) + "_" + str(window) + ".npy")
            y_train[index] = class_index
            index += 1

# normalisation before PCA since the amplitude is not a valuable information
X_train_pca = X_train / np.linalg.norm(X_train, axis=0)
     
# PCA
from sklearn.decomposition import PCA
pca = PCA(n_components=pca_components)
pca.fit(X_train_pca)


In [5]:
def audio_to_fv(class_index, audio_indexes):
    """
    This function takes a class index and a list of audio indexes and returns a matrix of size (len(audio_indexes), n_win_files, fv_len)
    containing the mel spectrogram of each audio file. It also returns a list of labels (which is the same for all the windows of the same audio file)
    """
    X = np.array([np.zeros((int(n_win_files[idx + class_index * number_audio_files]), fv_len)) for idx in audio_indexes], dtype=object)
    label = np.array([[classnames[class_index]] * int(n_win_files[idx + class_index * number_audio_files]) for idx in audio_indexes],dtype=object)
    for i, audio_index in enumerate(audio_indexes):
        for window in range(int(n_win_files[audio_index + class_index * number_audio_files])):
            X[i][window, :] = np.load(folder_path + classnames[class_index] + str(audio_index) + "_" + str(window) + ".npy")
            label[i][window] = classnames[class_index]
    return X, label

In [6]:
import random
import numpy as np

def manual_kfold_random(index_list, n_splits, seed=None):
    """
    Génère des indices pour un k-fold manuel avec répartition aléatoire.

    :param index_list: Liste ou array NumPy des indices totaux
    :param n_splits: Nombre de partitions (doit être inférieur à la taille de la liste)
    :param seed: Optionnel, permet de fixer la graine pour la reproductibilité
    :return: Liste de tuples (learning_set, validation_set)
    """
    assert n_splits < len(index_list), "n_splits doit être inférieur au nombre total d'indices."

    if seed is not None:
        random.seed(seed)  # Fixe la graine pour des résultats reproductibles

    index_list = list(index_list)  # Conversion en liste au cas où c'est un array NumPy
    random.shuffle(index_list)  # Mélange les index aléatoirement

    fold_size = len(index_list) // n_splits
    folds = [index_list[i * fold_size:(i + 1) * fold_size] for i in range(n_splits)]
    
    if len(index_list) % n_splits != 0:  # Gestion des restes
        folds[-1] = list(folds[-1]) + index_list[n_splits * fold_size:]  # Conversion et ajout correct

    split_indices = []
    
    for i in range(n_splits):
        validation_set = folds[i]
        learning_set = [idx for j, fold in enumerate(folds) if j != i for idx in fold]
        split_indices.append((learning_set, validation_set))
    
    return split_indices


def predict_all_probabilities(clf, X):
    """
    Prédit les probabilités pour tous les feature vectors dans X.

    :param clf: Classifieur entraîné avec une méthode `.predict_proba()`
    :param X: Array de shape (len(classnames), variable, n_win, fv_len)
    :return: Liste des probabilités prédites, réorganisées par classes, échantillons et fenêtres
    """
    len_classes = X.shape[0]  # Nombre de classes
    all_features = []
    indices = []  # Pour garder la trace des indices originaux

    # 1. Extraction des features sous forme de liste
    for class_idx in range(len_classes):
        for sample_idx, feature_matrix in enumerate(X[class_idx]):  # X[class_idx] est de shape (variable, n_win, fv_len)
            num_windows = feature_matrix.shape[0]  # Nombre de fenêtres pour cet échantillon
            all_features.append(feature_matrix)  # Stocke les features
            indices.extend([(class_idx, sample_idx)] * num_windows)  # Associe chaque fenêtre à son (class, sample)

    # 2. Conversion en un array unique pour accélérer la prédiction
    X_flattened = np.vstack(all_features)  # Shape: (total_windows, fv_len)
    X_flattened = X_flattened / np.linalg.norm(X_flattened, axis=0)  # Normal
    X_flattened = pca.transform(X_flattened)  # Réduit les dimensions

    # 3. Prédiction des probabilités en batch
    probas_flattened = clf.predict_proba(X_flattened)  # Sortie: (total_windows, n_classes)

    # 4. Réorganisation des probabilités
    probabilities = [[] for _ in range(len_classes)]
    for (class_idx, sample_idx), proba in zip(indices, probas_flattened):
        if len(probabilities[class_idx]) <= sample_idx:
            probabilities[class_idx].append([])  # Assure que la liste existe
        probabilities[class_idx][sample_idx].append(proba)  # Ajoute la probabilité pour chaque fenêtre

    return probabilities  # Liste [classes][samples][windows][probs]

In [None]:
def decision_naive(probs):
    """Règle naïve : Choisir la classe avec la probabilité la plus élevée."""
    return classnames[np.argmax(probs) % len(classnames)]

def decision_majority(probs):
    """Règle du vote majoritaire : Choisir la classe avec le plus grand nombre de votes."""
    votes = np.argmax(probs, axis=1)
    count = np.bincount(votes)
    max_indices = np.where(count == count.max())[0]
    return classnames[max_indices[0]]  # En cas d'égalité, on prend la première classe

def decision_weighted(probs):
    """Règle du vote pondéré : Choisir la classe avec la somme maximale des probabilités."""
    sum_probs = np.sum(probs, axis=0)
    return classnames[np.argmax(sum_probs)]

def decision_maxlikelihood(probs):
    """Règle du maximum de vraisemblance : Choisir la classe avec le produit maximal des probabilités."""
    prod_probs = np.prod(probs, axis=0)
    return classnames[np.argmax(prod_probs)]

# Application de la règle de décision à chaque échantillon d'un audio
def apply_decision_rules(probabilities, decision_method):
    """
    Applique une règle de décision sur toutes les fenêtres d'un échantillon.

    :param probabilities: Probabilités des fenêtres pour un échantillon (n_win, n_classes)
    :param decision_method: Méthode de décision à utiliser parmi 'naive', 'majority', 'weighted', 'maxlikelihood' [0, 1, 2, 3]
    :return: Classe prédite pour l'échantillon
    """
    if decision_method == 0:
        return decision_naive(probabilities)
    elif decision_method == 1:
        return decision_majority(probabilities)
    elif decision_method == 2:
        return decision_weighted(probabilities)
    elif decision_method == 3:
        return decision_maxlikelihood(probabilities)
    else:
        raise ValueError("Unknown decision method: " + decision_method)
    
dict_decision_methods = {
    0: "Naive",
    1: "Majority",
    2: "Weighted",
    3: "MaxLikelihood"
}


def predict_for_all_samples(probabilities, decision_method):
    """
    Applique une règle de décision sur tous les échantillons (classes, samples) du dataset.

    :param probabilities: Liste des probabilités pour chaque fenêtre de chaque échantillon.
    :param decision_method: Méthode de décision ('naive', 'majority', 'weighted', 'maxlikelihood')
    :return: Liste des classes prédites pour chaque échantillon
    """
    n_classes = len(probabilities)
    n_samples = len(probabilities[0])
    predictions = []

    for class_idx in range(n_classes):
        class_predictions = []
        for sample_idx in range(n_samples):
            # Probabilités des fenêtres pour un échantillon donné
            probs = probabilities[class_idx][sample_idx]
            # Applique la règle de décision sur l'échantillon
            predicted_class = apply_decision_rules(probs, decision_method)
            class_predictions.append(predicted_class)
        predictions.append(class_predictions)

    return predictions

import numpy as np

def compute_accuracy(predictions, y_val):
    """
    Calcule l'accuracy en comparant les prédictions avec les labels réels.

    :param predictions: Liste des prédictions pour chaque échantillon et chaque classe
                        (n_classes, n_samples) - chaque élément est la classe prédite pour un échantillon.
    :param y_val: Array des labels réels pour chaque échantillon (n_classes, n_samples)
                  Chaque élément est la classe réelle de cet échantillon.
    :return: L'accuracy du modèle en pourcentage
    """
    correct_predictions = 0
    total_predictions = 0

    n_classes = len(predictions)

    for class_idx in range(n_classes):
        for sample_idx in range(len(predictions[class_idx])):
            # Comparer la prédiction avec le label réel
            predicted_class = predictions[class_idx][sample_idx]
            true_class = y_val[class_idx][sample_idx][0]
            
            # Si la prédiction est correcte, on incrémente le compteur
            if predicted_class == true_class:
                correct_predictions += 1
            total_predictions += 1

    # Calculer l'accuracy
    accuracy = correct_predictions / total_predictions * 100
    return accuracy



In [None]:
# we will try training in a different way now, basically we want that the models fits to melspectrogram of the audio files of a subset of the training set
# but for the validation, we want to evaluate it on a sequence of melspectrogram from the complementary subset of the training set

# Model training
# we will test three models, CNN, SVM and Random Forest
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
# TO DO : implement the CNN model

# 5-fold cross validation
from sklearn.model_selection import KFold
n_splits = 5
seed = 42

kfold_indexes = [[], [], [], []]
for class_index in range (len(classnames)):
    kfold_indexes[class_index] = manual_kfold_random(random_indexes[class_index], n_splits, seed=seed)


# Random Forest
n_estimators = [10, 50, 100, 200]
max_depth = [5, 10, 20, 50, 100]
min_samples_split = [2, 5, 10, 15]

results_RF = pd.DataFrame(columns=["model", "n_estimators", "max_depth", "min_samples_split", "accuracy", "decision_method"])

for n in n_estimators:
    for d in max_depth:
        for s in min_samples_split:
            clf = RandomForestClassifier(n_estimators=n, max_depth=d, min_samples_split=s)
            accuracies = np.zeros(4)
            
            # creating the indexes for each split
            kfold_indexes = [[], [], [], []]
            for class_index in range (len(classnames)):
                kfold_indexes[class_index] = manual_kfold_random(random_indexes[class_index], n_splits, seed=seed)
            # kfold cross validation
            for split in range(n_splits):
                # Creating the training and validation sets
                X_train = np.empty(len(classnames), dtype=object)
                y_train = np.empty(len(classnames), dtype=object)
                X_val = np.empty(len(classnames), dtype=object)
                y_val = np.empty(len(classnames), dtype=object)
                for class_index in range (len(classnames)):
                    X_train[class_index], y_train[class_index] = audio_to_fv(class_index, kfold_indexes[class_index][split][0])
                    X_val[class_index], y_val[class_index] = audio_to_fv(class_index, kfold_indexes[class_index][split][1])
                    
                # Training
                X_train = np.vstack([sample for class_samples in X_train for sample in class_samples])
                y_train = np.hstack([np.repeat(label, len(samples)) for samples, labels in zip(X_train, y_train) for label in labels])
                y_train = y_train[::fv_len]
                X_train = X_train/np.linalg.norm(X_train, axis=0)
                X_train = pca.transform(X_train)
                clf.fit(X_train, y_train)
                
                # Validation
                probas = predict_all_probabilities(clf, X_val)
                for decision_index in range(4):
                    accuracies[decision_index] += compute_accuracy(predict_for_all_samples(probas,decision_index), y_val)
                    
            # averaging the accuracies
            accuracies /= n_splits
            for decision_index in range(4):
                results_RF = results_RF._append({"model": "Random Forest", "n_estimators": n, "max_depth": d, "min_samples_split": s, "accuracy": accuracies[decision_index], "decision_method": dict_decision_methods[decision_index]}, ignore_index=True)
        print(results_RF.loc[results_RF["accuracy"].idxmax()])

# Save the results in .csv
results_RF.to_csv("results_RF.csv")
                
                    

AttributeError: 'list' object has no attribute 'shape'

In [None]:
# OLD Model training
# we will test three models, CNN, SVM and Random Forest
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
# TO DO : implement the CNN model

# 5-fold cross validation
from sklearn.model_selection import KFold
kf = KFold(n_splits=5)

# Random Forest
n_estimators = [10, 50, 100, 200]
max_depth = [5, 10, 20, 50, 100]
min_samples_split = [2, 5, 10, 15]

results_RF = pd.DataFrame(columns=["model", "n_estimators", "max_depth", "min_samples_split", "accuracy"])

for n in n_estimators:
    for d in max_depth:
        for s in min_samples_split:
            clf = RandomForestClassifier(n_estimators=n, max_depth=d, min_samples_split=s)
            acc = 0
            for train_index, test_index in kf.split(X_train):
                X_train_kf, X_test_kf = X_train[train_index], X_train[test_index]
                y_train_kf, y_test_kf = y_train[train_index], y_train[test_index]
                clf.fit(X_train_kf, y_train_kf)
                acc += accuracy(y_test_kf, clf.predict(X_test_kf))
            acc /= 5
            results_RF = results_RF._append({"model": "Random Forest", "n_estimators": n, "max_depth": d, "min_samples_split": s, "accuracy": acc}, ignore_index=True)
        print(results_RF.loc[results_RF["accuracy"].idxmax()])
        
# save the results in a .csv file
results_RF.to_csv("results_RF.csv")


# SVM
kernel = ["rbf"]
C = [0.001, 0.01, 0.1, 1, 10, 100, 1000]
gamma = ["scale", "auto", 0.1, 0.5, 1, 5, 10]

# the results will be saved in a pandas dataframe
results_SVM = pd.DataFrame(columns=["model", "kernel", "C", "gamma", "accuracy"])

for k in kernel:
    for c in C:
        for g in gamma:
            clf = SVC(kernel=k, C=c, gamma=g)
            acc = 0
            for train_index, test_index in kf.split(X_train):
                X_train_kf, X_test_kf = X_train[train_index], X_train[test_index]
                y_train_kf, y_test_kf = y_train[train_index], y_train[test_index]
                clf.fit(X_train_kf, y_train_kf)
                acc += accuracy(y_test_kf, clf.predict(X_test_kf))
            acc /= 5
            results_SVM = results_SVM._append({"model": "SVM", "kernel": k, "C": c, "gamma": g, "accuracy": acc}, ignore_index=True)
        print(results_SVM.loc[results_SVM["accuracy"].idxmax()])

# save the results in a .csv file
results_SVM.to_csv("results_SVM.csv")