




# Projet Fil Rouge 2023 : Reconnaissance de commandes audio


**Noms :** GHAMNIA, MUSSARD

**Prénoms :** Karima, Cassandra

**Nom du binôme :** Cassandra&Karima




Ces 12 séances de TP vont vous permettre de tester l'algorithme de programmation dynamique vu en CTD puis de réaliser la mise en oeuvre d'un système de reconnaissance audio de mots isolés (constituant des
commandes pour les drones).

<img src="files/DroneJS.JPG" width="600" height="500"  >



Ces séances se décomposent en ces parties : 
- Partie I : Prétraitement des données 
- Partie II : Sélection de variables et pénalisation
- Partie III : Classification par méthodes à noyau 
- Partie IV : Apprentissage par ensemble : Adaboost, gradient boosting
- Partie V : Classification par réseaux de neurones
- Partie VI : Votre étude




In [None]:
import matplotlib.pyplot as plt
import numpy as np
import scipy
import sklearn
import math
import numpy.random as rnd
import seaborn as sns
import librosa
from os import listdir
from os.path import isfile, join
import glob
import re

# Preprocessing

Sur l'espace moodle, vous trouverez un dossier d'enregistrements audio de mots de commandes pour un drone quadricoptère constitués de plusieurs locuteurs masculins (notés M01..M13) et locutrice féminines (F01..F05) pour quelques commandes. 


In [None]:
data = [] 
label = [] 
genres = []
min_duration = None
words = ['avance','recule','tournegauche']
list_genres = ['M', 'F']

for file_name in glob.glob('FichierTest/*.wav'):
    record = librosa.load(file_name)[0]
    data.append(record)
    # Computation of the minimal size of recordings
    if min_duration is None or record.shape[0] < min_duration:
        min_duration = record.shape[0] 
    
    # Creation of the vector of label
    for i, word in enumerate(words):
      if re.search(word, file_name):
        label.append(i)

    # Creation of the vector of label
    for i, genre in enumerate(list_genres):
      if re.search(genre, file_name[12:]):# 12 is for ignoring "FichierTest/"
        genres.append(genre)

fs = librosa.load(file_name)[1] # Sampling frequency
genres = np.array(genres)
print(f'The smallest record contains {min_duration} samples, and the sample frequency is {fs} Hz')


### We trim the recordings to isolate the word and have identical durations
The smallest record contains 18 522 samples. We are going to cut all recordings to be of this size.

In [None]:

def trim(record):
    half_duration = 18522//2

    # First, we compute the barycenter of energy along time. We interpret it as the moment when the word appears
    barycenter = int(np.floor(np.multiply(np.power(record,2),np.arange(0,record.shape[0],1)).sum()/np.power(record,2).sum()))

    # Second, we adjust the barycenter to be in the index range
    if barycenter-half_duration < 0:
        barycenter += half_duration-barycenter
    if barycenter+half_duration >= record.shape[0]:
        barycenter -= barycenter+half_duration - record.shape[0]
    
    # Finally, we trim the recording around the barycenter 
    return record[barycenter-half_duration:barycenter+half_duration]

In [None]:
X = np.empty((len(data),min_duration))
for i in range(len(data)):
    X[i,:] = trim(data[i])

y = np.array(label)
print(f'Shape of inputs X is{X.shape} and size of targets class is {y.shape}')

### Spectral representation

1. Apply a Fourrier transform on the signals in $X$ using the function fft of scipy. Explain why the resulting dimension is too large to apply logistic regression.

2. Let $\hat{X}$ be the fourier transform of $X$. Apply a PCA on $|\hat{X}|$ and plot the total explained variance in function of the numer of components.

In [None]:
from scipy import signal
from scipy.fft import fft, fftfreq

# Transformée de Fourier sur le signal X
X_fft = fft(X)
Te = 1/22050
freq = fftfreq(X.size, d=Te)
freq = np.reshape(freq, (54, 18522))
freq.shape

In [None]:
plt.plot(freq[0,:], X.real[0,:], label="Partie réel")
plt.plot(freq[0,:], X.imag[0,:], label="Partie imaginaire")
plt.legend()
plt.show()

In [None]:
X_fft.shape

#### Explication :
L'objectif d'un modèle de régression logistique est de trouver une relation entre les variables et les labels. Cepandent, quand on a un grand nombre de variable avec très peu d'observations, il est difficile de trouver ce lien. De plus, cela peut conduire à de l'overfitting. 

Dans notre cas, on a 18522 variables pour 54 observations, on risque donc  de rencontrer le problème expliquée ci-dessus.

In [None]:
from sklearn.decomposition import PCA
# Appliquer une PCA sur X
n_components=11
pca = PCA(n_components=n_components)
X_fft_pca = pca.fit(abs(X_fft))
var = pca.explained_variance_ratio_.sum()
print(var)
plt.plot(range(0,n_components), pca.explained_variance_ratio_)
plt.ylabel('Explained Variance')
plt.xlabel('Principal Components')
plt.title('Explained Variance Ratio')
plt.show()

On peut remarquer qu'en retenant 11 composantes principales, on atteint environ 80% de la variance expliquée.

3. Apply a Short Term Fourier Transform on $X$. What are the dimension of stft $\hat{X}[t,f]$?

4. Make 2 subplots (3x3) of the stft (as images with function .imshow()) with three instances of each words, one for male and one for female 

In [None]:
nperseg = 253
f, t, X_stft = signal.stft(X, fs=fs, window='hann', nperseg=nperseg, noverlap=None)
X_stft.shape

On peut voir que la dimension de $Xfft$ est de : (54,127,147)

In [None]:
nperseg = 253

fig, axs = plt.subplots(3, 3, constrained_layout=True, figsize=(15,5))
for i,word in enumerate(words):
    for instance in range(3):
        record = X[(y==i) & (genres == 'M')][instance]
        f, t, Zxx = signal.stft(record, fs=fs, window='hann', nperseg=200, noverlap=None)
        axs[i,instance].imshow(np.absolute(Zxx[:80]))
        axs[i,instance].set_title(f'mfcc{instance,word}')

        
print(Zxx.shape)
fig.suptitle('STFT de chaque mot pour les hommes')
plt.show()

In [None]:
nperseg = 253

fig, axs = plt.subplots(3, 3, constrained_layout=True, figsize=(15,5))
for i,word in enumerate(words):
    for instance in range(3):
        record = X[(y==i) & (genres == 'M')][instance]
        f, t, Zxx = signal.stft(record, fs=fs, window='hann', nperseg=200, noverlap=None)
        axs[i,instance].imshow(np.absolute(Zxx[:40]))
        axs[i,instance].set_title(f'mfcc{instance,word}')

        
print(Zxx.shape)
fig.suptitle('STFT de chaque mot pour les femmes')
plt.show()

####  Remarque  :
Pour les enregistrements des femmes, on remarque que les stft ont une forme différente pour chaque mot. Cependant, on remarque que certains stft peuvent être confondus avec d'autres (par exemple le 3ème stft de la classe avancé peut être confondu avec le 2ème et le 3ème  de la classe reculé).

#### Now we will build sklearn transformers to extract features

Create a class STFT in the same spirit as FFT. 
Add a first argument to choose between returning different statistics (mean, quantile, max...) along time such that each signal. Add a second argument that gives the maximum frequency index 

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

In [None]:
class FFT(BaseEstimator, TransformerMixin):
    def __init__(self, idx_frequence_max=None):
        self.idx_frequence_max = idx_frequence_max    #def split_train_test():
        
    def fit(self, X, y=None):
        return self
        # Perform arbitary transformation 
    def transform(self,X,y=None):       
        return np.absolute(fft(X)[:self.idx_frequence_max])

In [None]:
Fft = FFT()
X_ = Fft.transform(X)
X_.shape

In [None]:
class STFT(BaseEstimator, TransformerMixin):
    def __init__(self, idx_frequence_max=None, statistique = np.mean):
        self.idx_frequence_max = idx_frequence_max    #def split_train_test():
        self.statistique = statistique
    def fit(self, X, y=None):
        return self
        # Perform arbitary transformation 
    def transform(self,X,y=None):    
        f, t, Zxx = signal.stft(X, fs=fs, window='hann', nperseg=200, noverlap=None)   
        return self.statistique(np.absolute(Zxx[:,:self.idx_frequence_max,:]),axis=2)

In [None]:
Fft = STFT(idx_frequence_max=40)
X_ = STFT(idx_frequence_max=40).fit(X, y).transform(X)
X_.shape


# Partie I : Sélection de modèles et pénalisation

### 1. Multiclass regression


Apply a **multiclass regression** model.

We model the probabilities by the following form :

$$
\mathbb{P}(Y_i = j) = \frac { \exp^{-\beta_j^{T} X_i } } {1 + \sum_{\ell = 1}^{K-1} \exp^{-\beta_\ell^{T} X_i }}, 
$$
For all $j$ in $\{ 1,2, \dots , K-1 \}$.

### Objective

Try to apply a logistic regression with **Leave one out Cross validation** on :

1. The first PCA components of FFT (try multiple "n_compenents")
2. Different statistics and maximum frequency of the STFT
3. The same as before with scaling

In each situations try different regularization coefficient C.

To simplify use the **pipeline** function of sklearn. You can also use the function **GridSearchCV** with cv = X.shape[0] to vary the parameters of preprocessing and logistic regression. You can acess to all results with "cv_results_"



In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.model_selection import LeaveOneOut

On applique une regression logistique sur $X$, auquel on applique une FFT puis, une PCA. On effectue une validation croisée ainsi qu'une recherche de grille $('Grid Search')$  pour retrouver les meilleurs hyperparamètres. On choisit de varier les hyperparamètres comme suit :

$n\_components$ :
* 11 composantes (80% de la variance)
* 18 composantes (90% de la variance)
* 54 composantes (100% de la variance)

$logistic\_solver$ : On teste tous les algorithmes d'optimisation possibles.



$logistic\_C$ : On l'utilise pour éviter l'overfitting. On le varie d'une façon loglinéaire.



In [None]:
pca = PCA()
Fft = FFT()

logistic = LogisticRegression(multi_class='multinomial', max_iter = 6000)
cv = LeaveOneOut()

pipe = Pipeline(steps=[("fft", Fft), ("pca", pca), ("logistic", logistic)])

param_grid = {
    "pca__n_components": [11, 18, 54],
    "logistic__C": np.logspace(-4, 4, 4),
    "logistic__solver": ['lbfgs', 'saga', 'sag', 'newton-cg'],
}
search = GridSearchCV(pipe, param_grid, verbose =1, cv=cv, n_jobs=-1)
res = search.fit(X, y)
print("Best parameter (CV score=%0.3f):" % search.best_score_)
print(search.best_params_)

Résultat : On obtient  81% d'accuracy avec $c = 0.0001$, lbfgs et $n\_components = 18$

In [None]:
#On plot l'accuracy en fonction de c
tab_c = []
tab_mean= []
for i in range(3):
    tab_c.append(search.cv_results_["mean_test_score"][i])
    tab_mean.append([1.00000000e-04, 4.64158883e-02, 2.15443469e+01, 1.00000000e+04][i])
plt.plot(tab_mean, tab_c)
plt.xlabel('C')
plt.ylabel('Accuracy')

In [None]:
from mpl_toolkits.mplot3d import Axes3D

# Plot des résultats de GridSearch
mean_scores = res.cv_results_['mean_test_score']
params = res.cv_results_['params']

n_components = [p['pca__n_components'] for p in params]
C_val = [p['logistic__C'] for p in params]

fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
scatter = ax.scatter(n_components, C_val, mean_scores, c=mean_scores, cmap='viridis')

ax.set_xlabel('Nombre de composantes d ACP')
ax.set_ylabel('C')
ax.set_zlabel('Score')
ax.set_title('Résultats de Grid Search')

ax.set_xticks(n_components)
ax.set_yticks(C_val)

cbar = fig.colorbar(scatter)
cbar.set_label('Score')

plt.show()

2. Different statistics and maximum frequency of the STFT

On applique une regression logistique sur $X$, auquel on applique une STFT. On effectue une validation croisée ainsi qu'une recherche de grille $('Grid Search')$  pour retrouver les meilleurs hyperparamètres. On choisit de varier les hyperparamètres comme suit :


$logistic\_solver$ : On garde lbfgs car, généralement les solvers n'influencent pas trop sur l'accuracy, mais plus sur le temps d'exécution.

$logistic\_C$ : On l'utilise pour éviter l'overfitting. On le varie d'une façon loglinéaire.


$Stft\_idx\_frequence_max$ :
$Stft\_statistique$ : On teste des statistiques classiques.


In [None]:
Stft = STFT()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(multi_class='multinomial', max_iter = 10000)
cv = LeaveOneOut()
pipe2 = Pipeline(steps=[("Stft", Stft), ("logistic", logistic)])

# Parameters of pipelines can be set using '__' separated parameter names:
param_grid2 = {
    "logistic__C": np.logspace(-4, 4, 4),
    "logistic__solver": ['lbfgs'],
    "Stft__idx_frequence_max": [10, 20, 80],
    "Stft__statistique": [np.mean, np.median, np.max],
}
search2 = GridSearchCV(pipe2, param_grid2, verbose =1, cv=cv, n_jobs=-1)
res2 = search2.fit(X, y)
print("Best parameter (CV score=%0.3f):" % search2.best_score_)
print(search2.best_params_)

Résultat : On obtient une accuracy de 81% avec la statistique max, c=10000, Stft__idx_frequence_max = 80 et l'algorithme lbfgs

In [None]:
#On plot l'accuracy en fonction de c
tab_c = []
tab_mean= []
for i in range(3):
    tab_c.append(search2.cv_results_["mean_test_score"][i])
    tab_mean.append(search2.cv_results_["params"][i]["logistic__C"])
plt.plot(tab_mean, tab_c)
plt.xlabel('C')
plt.ylabel('Accuracy')

In [None]:
# Plot des résultats de GridSearch
mean_scores = res2.cv_results_['mean_test_score']
params = res2.cv_results_['params']

freq = [p['Stft__idx_frequence_max'] for p in params]
C_val = [p['logistic__C'] for p in params]

fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
scatter = ax.scatter(freq, C_val, mean_scores, c=mean_scores, cmap='viridis')

ax.set_xlabel('Nombre de frequences')
ax.set_ylabel('C')
ax.set_zlabel('Score')
ax.set_title('Résultats de Grid Search')

ax.set_xticks(n_components)
ax.set_yticks(C_val)

cbar = fig.colorbar(scatter)
cbar.set_label('Score')

plt.show()

3. The same as before with scaling

On varie les hyperparamètres de la même façon que précedemment après avoir effectuer un scaling sur les $X$.

In [None]:
from sklearn.preprocessing import StandardScaler

scale = StandardScaler()
Stft = STFT()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(multi_class='multinomial', max_iter = 8000)
cv = LeaveOneOut()
pipe3 = Pipeline(steps=[("scale", scale),("Stft", Stft), ("logistic", logistic)])

# Parameters of pipelines can be set using '__' separated parameter names:
param_grid3 = {
    "logistic__C": np.logspace(-4, 4, 4),
    "logistic__solver": ['lbfgs'],
    "Stft__idx_frequence_max": [10, 20,80],
    "Stft__statistique": [np.mean, np.median, np.max],

}
search3 = GridSearchCV(pipe3, param_grid3, verbose =1, cv=cv, n_jobs=-1)
res3 = search3.fit(X, y)
print("Best parameter (CV score=%0.3f):" % search3.best_score_)
print(search3.best_params_)

Résultat : On obtient 88% d'accuracy avec une fréquence max de 80, une statistique = median, C=10000 et l'algorithme lbfgs.

In [None]:
#On plot l'accuracy en fonction de c
tab_c = []
tab_mean= []
for i in range(3):
    tab_c.append(search3.cv_results_["mean_test_score"][i])
    tab_mean.append(search3.cv_results_["params"][i]["logistic__C"])
plt.plot(tab_mean, tab_c)
plt.xlabel('C')
plt.ylabel('Accuracy')

In [None]:
# Plot des résultats de GridSearch
mean_scores = res3.cv_results_['mean_test_score']
params = res3.cv_results_['params']

freq = [p['Stft__idx_frequence_max'] for p in params]
C_val = [p['logistic__C'] for p in params]

fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
scatter = ax.scatter(freq, C_val, mean_scores, c=mean_scores, cmap='viridis')

ax.set_xlabel('Nombre de frequences')
ax.set_ylabel('C')
ax.set_zlabel('Score')
ax.set_title('Résultats de Grid Search')

ax.set_xticks(n_components)
ax.set_yticks(C_val)

cbar = fig.colorbar(scatter)
cbar.set_label('Score')

plt.show()

###  Evaluation des résultats 

Evaluer le résultat par matrice de confusion et pourcentage de bonne classification.


In [None]:
# Evaluation des résultats par matrices de confusion 
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

def res_score(search) : 
    y_pred = search.best_estimator_.predict(X)
    confusion_matrice = confusion_matrix(y, y_pred)
    score = search.best_score_
    return confusion_matrice, score

In [None]:
# Calcul des matrices de confusion et les scores
conf_matrix1, score1 = res_score(search)
conf_matrix2, score2 = res_score(search2)
conf_matrix3, score3 = res_score(search3)

In [None]:
# fonction pour afficher les matrice de confusion

def affiche_conf_mtx(conf, couleur):
    sns.heatmap(conf, annot=True, fmt="d", cmap=couleur)
    plt.xlabel("Prédictions")
    plt.ylabel("Valeurs réelles")
    plt.show()


In [None]:

affiche_conf_mtx(conf_matrix1, 'Blues')
affiche_conf_mtx(conf_matrix2, 'Greens')
affiche_conf_mtx(conf_matrix3, 'Reds')

print("Question 1 \n", "\n", score1)
print("Question 2 \n", "\n", score2)
print("Question 3 \n", "\n", score3)


# Partie II : Classification par méthodes à noyau

**Rappel** Les méthodes à noyau consistent à plonger les données dans un espace de dimension de Hilbert $\mathcal{H}$ ou les donnés pourront être séparé linéairement. 

**Theorème de Représentation :** La solution du problème de séparation en dimension infinie est contenue dans un sous espace vectoriel de dimension finie de $\mathcal{H}$ 

### 1. Réaliser une classification par SVM à noyau

    1) Varier le noyau
    2) Varier le paramètre de régularisation.

On applique une classification par SVM à noyau sur $X$, auquel on applique une FFT, pui une PCA. On effectue une validation croisée ainsi qu'une recherche de grille $('Grid Search')$  pour retrouver les meilleurs hyperparamètres suivants : $svc\_kernel$ et $svc\_C$


In [None]:
from sklearn.svm import SVC
pca = PCA()
Fft = FFT()

svc = SVC()
cv = LeaveOneOut()
pipe = Pipeline(steps=[("fft", Fft), ("pca", pca), ("svc", svc)])


param_grid = {
    "pca__n_components": [15],
    "svc__C": np.logspace(-4, 4, 4),
    "svc__kernel": ['linear', 'poly', 'rbf', 'sigmoid', 'precomputed']
}
search = GridSearchCV(pipe, param_grid, verbose =1, cv=cv, n_jobs=-1, refit=True)
res_svc = search.fit(X, y)
print("Best parameter (CV score=%0.3f):" % search.best_score_)
print(search.best_params_)

In [None]:
# Plot des résultats de GridSearch
mean_scores = res_svc.cv_results_['mean_test_score']
params = res_svc.cv_results_['params']

n_components = [p['pca__n_components'] for p in params]
svc_c = [p['svc__C'] for p in params]

fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
scatter = ax.scatter(n_components, svc_c, mean_scores, c=mean_scores, cmap='viridis')

ax.set_xlabel('Nombre de composantes d ACP')
ax.set_ylabel('C')
ax.set_zlabel('Score')
ax.set_title('Résultats de Grid Search')

ax.set_xticks(n_components)
ax.set_yticks(C_val)

cbar = fig.colorbar(scatter)
cbar.set_label('Score')

plt.show()

Résultat : On obtient 77% d'accuracy avec $C=0.046$, $n\_components = 15$, et un noyeau linéaire. 

In [None]:
# le modèle avec les meilleurs hyperparamètres
best_model = res_svc.best_estimator_

### Decision boundary

Ici, on affiche les frontières entre les 3 classes :

In [None]:
from sklearn.inspection import DecisionBoundaryDisplay

def plot_decision_boundary(X, y, model):
    feature_1, feature_2 = np.meshgrid(
            np.linspace(X[:, 0].min(), X[:, 0].max()),
            np.linspace(X[:, 1].min(), X[:, 1].max())
        )
    grid = np.vstack([feature_1.ravel(), feature_2.ravel()]).T
    tree = SVC().fit(X[:, :2], y)
    y_pred = np.reshape(tree.predict(grid), feature_1.shape)
    display = DecisionBoundaryDisplay(
        xx0=feature_1, xx1=feature_2, response=y_pred)
    display.plot()
    plt.title("Frontières de décision")
    plt.show()

plot_decision_boundary(X, y, best_model)

### Bagging et Boosting

## 2. Adaptative boosting : AdaBoost

Here is the algorithm Adaboost

1. Initialize the data weighting coefficients ${w_n}$ by setting $w_n^{(1)} = 1/N$ for $n = 1,...,N$.
2. For $m = 1,...,M$:
    
    **(a)** Fit a classifier $y_m(x)$ to the training data by minimizing the weighted
error function
    
    $J_m = \sum_{n=1}^N{w_n^{(m)}I(y_m(x)\neq t_n)}$

    where $I(y_m(x)\neq t_n)$ is the indicator function and equals $1$ when $y_m(x_n) 	= t_n$ and $0$ otherwise

    **(b)** Evaluate the quantities

    $\epsilon_m = \frac{\sum_{n=1}^N{w_n^{(m)}I(y_m(x)\neq t_n)}}{\sum_{n=1}^N{w_n^{(m)}}}$

    and then use these to evaluate

    $\alpha_m = \textit{ln}\left({\frac{1-\epsilon_m}{\epsilon_m}}\right)$

    **(c)** Update the data weighting coefficients
    
    $w_n^{(m+1)} = w_n^{(m)} \textit{exp}\left({\alpha_m I(y_m(x_n) \neq t_n)}\right)$

3. Make predictions using the final model, which is given by

    $Y_M(x) = \text{sign}\left(\sum_{m=1}^M {\alpha_m y_m(x)}\right)$


**Question 1 :** Code from scratch the Adaboost algorithm in the same configuration as Bagging in the previous section. Use the sklearn decision tree classifier and its argument *sample_weight*. Compare its performances with Bagging.


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

Dans un premier temps on modifie les labels. On transforme les labels de sorte à avoir des labels binaire (-1 et 1). On met 1 si y (correspond aux labels de base) = 1 et -1 sinon. 

In [None]:
y1 = np.zeros((len(y)))
for i in range(len(y)):
    if y[i]==1:
        y1[i]= 1
    else :
        y1[i]= 0
y2=y1*2-1

Ici on réalise le bagging. À chaque itération m de la boucle, un nouvel arbre de décision de profondeur maximale 1 est créé et ajusté sur les données d'apprentissage X_train avec des poids w_i. L'arbre est ajouté à la liste J_m. Les prédictions du modèle sur les données d'apprentissage sont calculées et comparées aux étiquettes de classe réelles pour calculer l'erreur err_m. Ensuite, le coefficient alpha_m est calculé en utilisant l'erreur err_m. Les poids w_i sont mis à jour en multipliant les poids actuels par l'exponentielle de alpha_m pour les échantillons mal classés.

In [None]:
# Bagging 
from sklearn.metrics import accuracy_score
X_train, X_test, y_train, y_test = train_test_split(X,y1,test_size=0.4, random_state=42)
np.random.seed(seed=42)
N = X_train.shape[0]
trees = []
y_test_pred = []

for b in range(N):
    sample = np.random.choice(np.arange(N), size = N, replace = True) # On tire un échantillon 
    X_train_b = X_train[sample] #On selectionne cet echantillon dans les données trains
    y_train_b = y_train[sample] # On récupère le label associé
    modele = DecisionTreeClassifier()
    tree = modele.fit(X_train_b, y_train_b) #On entraine l'arbre sur cet echantillon
    trees.append(tree)
    y_test_pred.append(modele.predict(X_test))

In [None]:
acc = (y_test_pred == y_test).mean() #On compare la prédiction avec le label
print(acc)

On obtient une accuracy d'environ 55%.

In [None]:
from sklearn.metrics import accuracy_score
X_train, X_test, y_train, y_test = train_test_split(X,y2,test_size=0.4, random_state=42)
np.random.seed(seed=42)
M = 1000
N = X_train.shape[0]
J_m = []
err= []
alphas = []
y_test_pred = []
w_i = np.ones(N)/ N

for m in range(0,M):
    modele = DecisionTreeClassifier(max_depth = 1) #On crée un arbre 
    tree = modele.fit(X_train, y_train, sample_weight = w_i) #On entraine l'arbre sur X_train
    J_m.append(tree)
    y_train_pred = tree.predict(X_train) #On prédit 
    err_m= (sum(w_i*(np.not_equal(y_train, y_train_pred)).astype(int)))/sum(w_i) #On calcule l'erreur
    err.append(err_m)
    alpha_m = np.log((1 - err_m) / err_m) #On update alpha
    alphas.append(alpha_m)
    w_i= w_i * np.exp(alpha_m* (np.not_equal(y_train, y_train_pred)).astype(int)) #On update les poids


In [None]:
y_test_pred = np.zeros((len(J_m), len(y_test)))
for i in range(len(J_m)):
    y_test_pred[i] = alphas[i]*J_m[i].predict(X_test) #Pour chaque arbre on fait la prediction moyenne
Ym = np.sign(np.sum(y_test_pred, axis=0))
accuracy = np.mean(Ym == y_test) 
accuracy

On réalise maintenant le bagging avec la librairie Sklearn

**Question 2 :** 
With sklearn library, apply adaboost with decision tree (*max_depth=2*) on the 3-class classification problem. Find good parameters with the leave one out cross validation. Do the same thing with Gradient bossting.
If you have the time, you can test with XGBoost.

In [None]:
from sklearn.ensemble import AdaBoostClassifier
AdaBoostClassifier = AdaBoostClassifier(DecisionTreeClassifier(max_depth=1))
AdaBoostClassifier.fit(X_train, y_train)
y_pred = AdaBoostClassifier.predict(X_test)
accuracy = np.mean(y_pred == y_test)
accuracy 

On voit qu'on obtient à peu près le même résultat que précédemment.

### Random Forest 

Nous réalisons maintenant un random forest qui repose sur le bagging

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
X_train, X_test, y_train, y_test = train_test_split(X_,y2,test_size=0.4, random_state=42)
randomForest = RandomForestClassifier()
randomForest.fit(X_train, y_train)
y_pred_rf = randomForest.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_rf)
print("Accuracy:", accuracy)

On retrouve à peu près la même accuracy que celle obtenue pour le bagging (72% d'accuracy)

## 2. Gradient Boosting

Le Gradient Boosting permet l'optimisation de fonctions de perte différentiables arbitraires. Il permet d'optimiser la fonction de perte de l'apprenant précédent en ajoutant un nouveau modèle adaptatif qui combine des apprenants faibles.

Etudier sur la fonction de perte et le taux d'apprentissage.

In [None]:
from numba.core.types.containers import ListTypeIterableType
from sklearn.ensemble import GradientBoostingClassifier

gradient = GradientBoostingClassifier()
cv = LeaveOneOut()
pipe = Pipeline(steps=[("gradient", gradient)])

param_grid = {
    "gradient__n_estimators": [20],
    "gradient__learning_rate": [1e-2],
    "gradient__loss": ["log_loss"],
    "gradient__criterion": ["friedman_mse"]
}
search = GridSearchCV(pipe, param_grid, verbose =1, cv=cv, n_jobs=-1)
res = search.fit(X, y)
print("Best parameter (CV score=%0.3f):" % search.best_score_)
print(search.best_params_)

On obtient une accuracy de 45%.

## XGBOOST

In [None]:
from xgboost import XGBClassifier
XGBClassifier = XGBClassifier(DecisionTreeClassifier(max_depth=2))
X_train, X_test, y_train, y_test = train_test_split(X_,y,test_size=0.4, random_state=42) # On sépare les données en train/test (60%/40%)
y_train = y_train
XGBClassifier.fit(X_train, y_train)
y_pred_xgb = XGBClassifier.predict(X_test)
accuracy_XGB= np.mean(y_pred_xgb == y_test)
accuracy_XGB

On obtient une accuracy d'environ : 68%.

# Partie IV : Neural Network with pytorch

Below we create torch tensor with the shape $(N,B,F)$, where
    
$N$ is the number of recordings in the set (train/test)

$B$ the size of batch, we choose $B=1$ because the dataset is really small

$F$ is the number of features

The tensors are converted to float type

The train set and test set constitute 50% of the initial dataset

**Transform X with your preprocessing**

In [None]:
import torch
### transform X with your preprocessing
X_train, X_test, y_train, y_test = train_test_split(X_,y,test_size=0.5, random_state=42)
X_train = torch.tensor(X_train).reshape((X_train.shape[0],1,-1)).float()
X_test = torch.tensor(X_test).reshape((X_test.shape[0],1,-1)).float()
y_train = torch.nn.functional.one_hot(torch.tensor(y_train), num_classes=- 1).reshape((X_train.shape[0],1,-1)).float()
y_test = torch.nn.functional.one_hot(torch.tensor(y_test), num_classes=- 1).reshape((X_test.shape[0],1,-1)).float()

**Question 1:** : Create a model class (descending from torch.nn.Module). In a first time choose the appropriate architecture and the appropriate loss (the loss appear later) to reproduce logistic regression

Usually a FNN is a succession of blocks (linear -> ReLU). Finally the networks transforms the initial vector into the output $\hat{y} \in \mathbb{R}^3, \hat{y}=(\mathbb{P}(y=0|x),\mathbb{P}(y=1|x),\mathbb{P}(y=2|x))$ where $y$ is the word we want to predict and $x \in \mathbb{R}^{18522}$ is the accoustic signal

    
    

In [None]:
X_train.shape

Ici on prend une couche Linéaire avec 18522 neurones sur la première couche (car cela correspondant à la taille de X en entrée) et en sortie on ne veut que 3 classes. On applique ensuite une Softmax pour obtenir une matrice avec des probabilités pour chaque classe. 

In [None]:
class NNClassification(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.network = torch.nn.Sequential(
        torch.nn.Linear(40, 3),
        torch.nn.Softmax(dim = 1)
        )
        
            ### Define here the succession of torch.nn modules that will constitutes your network
            ### building blocks are torch.nn.ReLU, torch.nn.Linear

    
    def forward(self, xb):
        ### the forward method will be called each time you will write model(x). 
        ### It's equivalent to the function predict of sklearn
        return self.network(xb)

On réalise ensuite la boucle d'entrainement suivie de la boucle de test.

Pour la boucle d'entrainement, on réalise 2000 epochs (on montre 2000 fois le dataset en entier au réseau), on utilise un pas de 0.001 (ce pas à été changé pour obtenir une bonne accuracy), la cross-entropy comme fonction d'activation (cette loss est très utilisée pour la classification multi-classe).

In [None]:

model = NNClassification()

num_epochs = 2000
result_test_loss = []
result_train_loss=[]
result_train_accuracy=[]
result_test_accuracy=[]

lr = 8e-2
optimizer = torch.optim.Adam(model.parameters(),lr)
loss = torch.nn.CrossEntropyLoss()### What loss do you think is well suited for the classification problem (same as logistic regression)

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    predictions_train= []
    predictions_test = []
    for i in range(X_train.shape[0]):
        optimizer.zero_grad()
        outputs = model(X_train[i]) #Les images train sont données en entrée du réseau
        loss_ =  loss(outputs, y_train[i]) # On calcule la loss
        train_losses.append(loss_)
        res = torch.argmax(outputs) == torch.argmax(y_train[i])#On prend l'argmax de la prédiction pour obtenir l'indice qui correspond à la classe prédite
        predictions_train.append(res)
        loss_.backward() # backpropagation
        optimizer.step()
    model.eval()
    test_losses = [] ##Phase de test 
    for i in range(X_test.shape[0]):
        outputs = model(X_test[i])#On passe les données dans le réseau
        res_test = torch.argmax(outputs) == torch.argmax(y_test[i])  #On prend l'argmax de la prédiction pour obtenir l'indice qui correspond à la classe prédite
        predictions_test.append(res_test)
        loss_test = loss(outputs, y_test[i]) #On calcule la loss de test sans faire la backpropagation
        test_losses.append(loss_test)
    
    result_train_loss.append(torch.stack(train_losses).mean().item())
    result_test_loss.append( torch.stack(test_losses).mean().item())
    result_train_accuracy.append(100*torch.stack(predictions_train).sum().item()/len(y_train))
    result_test_accuracy.append( 100*torch.stack(predictions_test).sum().item()/len(y_test))
    
    

**Question 2:** Plot the train and test loss. What do you observe?

On affiche ensuite les loss de train et de test en fonction du nombre d'epochs. On voit que les loss diminuent bien au fur et à mesure que les epochs augmentent. Cependant, on peut commencer à observer un phénomène d'overfitting car, on remarque une légère augmentation de la loss de test et une diminution de la loss de train. 

In [None]:
plt.close()
plt.plot(result_train_loss, label='train')
plt.plot(result_test_loss, label='test')
plt.legend()
plt.title("Loss de train et de test en fonction du nombre d'epochs")

**Question 3 :** Compute the accuracy.

Ici on affiche l'accuracy du train et du test en fonction des epochs. On peut confirmer les remarques observées dans la question précédente.

In [None]:
plt.close()
plt.plot(result_train_accuracy, label='train')
plt.plot(result_test_accuracy, label='test')
plt.title("Accuracy de train et de test en fonction du nombre d'epochs")

**Question 4:** If you encounter overfitting try to regularize your model with Dropout and/or L2/L1 Regularization

In [None]:
import torch
### transform X with your preprocessing
X_train, X_test, y_train, y_test = train_test_split(X_,y,test_size=0.5, random_state=42)
X_train = torch.tensor(X_train).reshape((X_train.shape[0],1,-1)).float()
X_test = torch.tensor(X_test).reshape((X_test.shape[0],1,-1)).float()
y_train = torch.nn.functional.one_hot(torch.tensor(y_train), num_classes=- 1).reshape((X_train.shape[0],1,-1)).float()
y_test = torch.nn.functional.one_hot(torch.tensor(y_test), num_classes=- 1).reshape((X_test.shape[0],1,-1)).float()

On change l'architecture du réseau de neurones pour ne pas avoir d'overfitting. On rajoute la fonction d'activation ReLU et on ajoute aussi des couches. Pour éviter l'overfitting, on met une couche de Dropout qui permet de faire de la régularisation. Ici, 10% des neurones vont être désactivés à chaque epoch pour éviter que cela soit toujours les mêmes neurones qui reçoivent l'information.

In [None]:
class NNClassification(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.network = torch.nn.Sequential(
        torch.nn.Linear(40, 100),
        torch.nn.ReLU(),
        torch.nn.Dropout(0.2),
        torch.nn.Linear(100, 60),
        torch.nn.ReLU(),
        torch.nn.Dropout(0.2),
        torch.nn.Linear(60, 20),
        torch.nn.ReLU(),
        torch.nn.Dropout(0.2),
        torch.nn.Linear(20, 3),
        torch.nn.Softmax(dim = 1),
        torch.nn.Dropout(0.2)
        )


    
    def forward(self, xb):
        return self.network(xb)

On réalise ensuite la boucle d'entrainement suivie de la boucle de test.

Pour la boucle d'entrainement, on réalise 2000 epochs, on utilise un pas de 0.001 (ce pas à été changé pour obtenir une bonne accuracy), la cross-entropy comme fonction d'activation (cette loss est très utilisée pour la classification multi-classe).

In [None]:
model = NNClassification()

num_epochs = 1000
result_test_loss = []
result_train_loss=[]
result_train_accuracy=[]
result_test_accuracy=[]

lr = 1e-3
optimizer = torch.optim.Adam(model.parameters(),lr)
loss = torch.nn.CrossEntropyLoss()### What loss do you think is well suited for the classification problem (same as logistic regression)

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    predictions_train= []
    predictions_test = []
    for i in range(X_train.shape[0]):
        optimizer.zero_grad()
        outputs = model(X_train[i]) #Les images train sont données en entrée du réseau
        loss_ =  loss(outputs, y_train[i]) # On calcule la loss
        train_losses.append(loss_)
        res = torch.argmax(outputs) == torch.argmax(y_train[i]) #On prend l'argmax de la prédiction pour obtenir l'indice qui correspond à la classe prédite
        predictions_train.append(res)
        loss_.backward() # backpropagation
        optimizer.step()
    model.eval()
    test_losses = []
    for i in range(X_test.shape[0]):
        outputs = model(X_test[i])
        res_test = torch.argmax(outputs) == torch.argmax(y_test[i]) #On prend l'argmax de la prédiction pour obtenir l'indice qui correspond à la classe prédite
        predictions_test.append(res_test)
        loss_test = loss(outputs, y_test[i]) #On calcule la loss pour le test sans faire la backpropagation
        test_losses.append(loss_test)
    
    result_train_loss.append(torch.stack(train_losses).mean().item())
    result_test_loss.append( torch.stack(test_losses).mean().item())
    result_train_accuracy.append(100*torch.stack(predictions_train).sum().item()/len(y_train))
    result_test_accuracy.append( 100*torch.stack(predictions_test).sum().item()/len(y_test))
    
    

On affiche les loss de train et de test en fonction du nombre d'epochs.
On voit que la loss de train fait beaucoup d'oscillation (tout en diminuant progressivement en fonction du nombre d'epochs). On a là aussi éviter l'overfitting car on voit bien que la loss de test d'augmente pas quand la loss de train diminue.

In [None]:
plt.close()
plt.plot(result_train_loss, label='train')
plt.plot(result_test_loss, label='test')
plt.title("Loss de train et de test en fonction du nombre d'epochs")
plt.xlabel("nombre d'epochs")
plt.ylabel("Loss")
plt.legend()

On affiche l'accuracy de train et de test en fonction du nombre d'epochs. On voit que l'accuracy de test atteint 90% comme précédemment (le modèle semble donc performant).

In [None]:
plt.close()
plt.plot(result_train_accuracy, label='train_accuracy')
plt.plot(result_test_accuracy, label = 'test_accuracy')
plt.title("Accuracy de train et de test en fonction du nombre d'epochs")
plt.xlabel("nombre d'epochs")
plt.ylabel("Loss")
plt.legend()

**Question 5(Bonus)** : Create a CNN that takes in input the accoustic signal without preprocessing

In [None]:
import torch
### transform X with your preprocessing
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.5, random_state=42)
X_train = torch.tensor(X_train).reshape((X_train.shape[0],1,-1)).float()
X_test = torch.tensor(X_test).reshape((X_test.shape[0],1,-1)).float()
y_train = torch.nn.functional.one_hot(torch.tensor(y_train), num_classes=- 1).reshape((X_train.shape[0],1,-1)).float()
y_test = torch.nn.functional.one_hot(torch.tensor(y_test), num_classes=- 1).reshape((X_test.shape[0],1,-1)).float()

In [None]:
X_train.shape

Pour les CNN, on n'applique pas de transformations sur nos données, car on va extraire les features du signal entier directement dans les couches convolutionnelles.

In [None]:
import torch
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.5, random_state=40)
X_train = torch.tensor(X_train).reshape((X_train.shape[0],1,-1)).float()
X_test = torch.tensor(X_test).reshape((X_test.shape[0],1,-1)).float()
y_train = torch.nn.functional.one_hot(torch.tensor(y_train), num_classes=- 1).reshape((X_train.shape[0],1,-1)).float()
y_test = torch.nn.functional.one_hot(torch.tensor(y_test), num_classes=- 1).reshape((X_test.shape[0],1,-1)).float()

In [None]:
class NNClassification(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.network = torch.nn.Sequential(
           
            torch.nn.Conv1d(1, 20, kernel_size=4, stride=3), 
            torch.nn.ReLU(),
            torch.nn.Dropout(0.7),
            torch.nn.MaxPool1d(kernel_size=3, stride=2), 
            torch.nn.ReLU(),
            torch.nn.Dropout(0.7),
            torch.nn.Conv1d(20, 1, kernel_size=4, stride=3), 
            torch.nn.ReLU(),
            torch.nn.Dropout(0.7),
            torch.nn.Linear(1028, 3) # 1 x 3
        )
    
    def forward(self, xb):

        return torch.nn.functional.softmax(self.network(xb), 1)

        
  

In [None]:
model = NNClassification()
num_epochs = 300

result_test_loss = []
result_train_loss= []
train_accuracy = []
test_accuracy = []

lr = 0.001
optimizer = torch.optim.Adam(model.parameters(),lr)
loss = torch.nn.CrossEntropyLoss() ### What loss do you think is well suited for the classification problem (same as logistic regression)

for epoch in range(num_epochs):

    model.train()
    train_losses = []
    predictions_train = []
    for i in range(X_train.shape[0]):
       ### code the training step (compute loss -> optimization step -> save the loss )
        optimizer.zero_grad()
        sortie = model(X_train[i])
        loss_train = loss(sortie, y_train[i])
        loss_train.backward()
        optimizer.step()
        train_losses.append(loss_train)
        predictions_train.append(torch.argmax(sortie) == torch.argmax(y_train[i]))
        
    model.eval()
    test_losses = []
    predictions_test = []
    for i in range(X_test.shape[0]):
        ### code the eval step  (compute loss -> save the loss )
        sortie = model(X_test[i])
        predictions_test.append(torch.argmax(sortie) == torch.argmax(y_test[i]))
        loss_test = loss(sortie, y_test[i])
        test_losses.append(loss_test)
        
    result_train_loss.append(torch.stack(train_losses).mean().item())
    result_test_loss.append( torch.stack(test_losses).mean().item())
    test_accuracy.append(100 * torch.stack(predictions_test).sum().item() / len(y_test))
    train_accuracy.append(100 * torch.stack(predictions_train).sum().item() / len(y_train))


In [None]:
plt.close()
plt.plot(train_accuracy, label='train_accuracy')
plt.plot(test_accuracy, label = 'test_accuracy')
plt.title("Accuracy de train et de test en fonction du nombre d'epochs")
plt.xlabel("nombre d'epochs")
plt.ylabel("Loss")
plt.legend()