In [1]:
# -*- coding: utf-8 -*-

# Importation des librairies nécessaires

import numpy as np
import matplotlib.pyplot as plt
import random

# Pour le traitement des données
import torch 
import h5py # pour gérer les formats de données utilisés ici 
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, confusion_matrix
import torchmetrics


In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# TP3: Reconnaissance de signaux de communication par apprentissage profond 

<div class=consignes> Listez les noms des étudiants (2 au maximum) ayant participé à ce notebook dans la cellule suivante (prénom, nom).<br/>
Au moment du rendu, le notebook doit être nommé nom1_nom2_dlts_tp3.ipynb 

2 séances de TP sur ce sujet : le 6 novembre (1h00), le 13 novembre (3h). 
Le cours du 19 novembre sera partagé en 3 : cours sur la séparation de sources audio / présentation des mini projets et organisation des soutenances / fin de ce TP.<br> 
Deadline pour le rendu du TP: 26 novembre 2024, 13h59, par mail à deepetsignal.mva@gmail.com <br> 

Pour installer les paquets nécessaires à la réalisation de ce TP vous pouvez utiliser dans le notebook 
    
```
!pip install \< nom_du_paquet \>
```
merci de regrouper toutes les installations dans la première cellule du notebook. 
Essayez de faire en sorte que votre notebook puisse se lire comme un compte rendu, évitez de laisser du code mort et prennez le temps de commenter vos observations et résultats.

## Problématique

On cherche à identifier un type d'émetteur de communication à partir de l'observation d'un signal provenant de l'émetteur 
de 2048 échantillons IQ (In Phase / Quadrature) ie le signal prend des valeurs complexes. On représente la partie 
réelle et la partie imaginaire du signal par deux canaux réel d'un signal multivarié. 

L'émetteur peut provenir de 6 catégories différentes. 
Les paramètres différenciant les différentes catégories sont 
- le type de modulation 
- la présence ou non de séquences pilotes et le cas échéant la structure de trame pilotes / données 
- le débit de la transmission 

Les signaux se propagent en champs libre et sont enregistrés par une antenne. Le signal reçu est transposé en bande de base c'est à dire que si le signal est transmis autour d'une fréquence centrale f0, une première étape de traitement du signal à la réception recentre le signal autour de la fréquence 0. 


Les différents signaux observés dans ce TP sont entachés de différentes erreurs caractéristiques de la propagation 
électromagnétique comme : 
- modification aléatoire de la phase du signal lors de la transmission
- imperfection de la transposition en bande de base qui laisse le signal transposé à une fréquence df0 << f0
- présence d'interférence entre les symboles transmis (dûes par exemple à plusieurs chemins de propagation)
- présence d'un bruit blanc additif gaussien

Le niveau du bruit relativement à celui du signal utile est décrit par le SNR (Signal to Noise Ratio) et exprimé en dB. On suppose que le SNR est connu lors de l'acquisition d'un signal. Lors de ce TP nous rencontrerons 4 niveaux de SNR: 30 dB (facile), 20 dB, 10 dB et 0 dB (en espérant qu'on puisse faire quelque chose de ces données). 
Un de nos objectifs sera de qualifier la performance des algorithmes mis en place en fonction du SNR.

Les objectifs de ce TP sont: 

1. Définir une ou plusieurs architectures de réseaux de neurones profonds et les implémenter en PyTorch
2. Entrainer ces architectures, la fonction de perte employée pourra être la log vraisemblance négative: https://pytorch.org/docs/stable/generated/torch.nn.NLLLoss.html. 
3. Qualifier les performances de votre réseau de neurones sur l'ensemble de test via: 
   - Le calcul de l'accuracy implémentée par exemple dans le package TorchMetrics (https://torchmetrics.readthedocs.io/en/stable/classification/accuracy.html)
   - La réalisation d'un graphique accuracy vs SNR 
   - La réalisation des matrices de confusion entre les différentes classes pour les différents SNR (https://scikit-learn.org/stable/modules/generated/sklearn.metrics.confusion_matrix.html#sklearn.metrics.confusion_matrix)
4. Rapporter les performances de vos architectures à leur complexité, la complexité d'un réseau de neurones étant ici résumé au nombre de paramètres libres qu'il fait intervenir

Durant l'entraînement on observera l'évolution de la fonction de perte et de l'accuracy sur l'ensemble d'entraînement et sur l'ensemble de validation. 


Les 4 premières parties sont un échauffement sur lequel vous pouvez passer vite si vous êtes à l'aise avec le sujet. 
Le gros du travail est dans la partie 5 "Entraînemenent d'un réseau de neurones". 

Surtout privilégiez dans un premier temps la simplicité quitte à complexifier votre approche ensuite pour doper ses performances. Ne restez pas bloqué sur des réseaux qui "mettent trop de temps à apprendre"

## Chargement des données en numpy

Le TP est composé de trois jeux de données : 
- train.hdf5 destiné à nourrir l'entrainement de réseaux de neurones 
- test.hdf5 destiné à évaluer les algorithmes après entrainement
- samples.hdf5 qui est beaucoup plus petit que train.hdf5 et destiné à servir de modèle de données dans une phase de prototypage 
des algorithmes et de la pipeline d'entrainement

Les trois jeux de données sont au format hdf5 qui peut être manipulé via l'API Python h5py https://docs.h5py.org/en/stable/quick.html.
Un fichier hdf5 est consitué d'une arborescence de datasets et de groups. Un dataset hdf5 représente un tenseur n dimensionnel. Un dataset se convertit très facilement en numpy array.

Par exemple vous pouvez charger les données samples selon la cellule suivante:

Sample pour le wifi. Validation optionnel. Cela ne sert à rien d'être intelligent sur les données.

In [3]:
data_path = 'train.hdf5'

data = h5py.File(data_path , 'r')

signals = np.array(data['signaux'])
snr =  np.array(data['snr'])
labels_id = np.array(data['labels'])

data.close()

Vous pouvez récupérer le nom de la correspondance entre un label et le nom du standard d'émetteur correspondant via:

In [4]:
def get_labels(open_h5_file): 
    return {
        open_h5_file['label_name'].attrs[k] : k
        for k in open_h5_file['label_name'].attrs.keys()
    }

### Visualisation des données 

Commencez par étudier les données: 

    - observez leur taille 
    - la distribution des différentes classes et des différents SNR dans l'ensemble d'entrainement 
    - visualisez quelques signaux bien choisis selon une ou des représentations que vous choisirez 

Remarque : dans ce TP il n'y a pas beaucoup à gagner à faire du feature engineering 

In [5]:
data_array = np.array(data)
print(len(signals))
print(len(snr))
print(len(labels_id))

30000
30000
30000


In [6]:
print(signals)
print(np.average(signals))
print(np.min(signals))
print(np.max(signals))

[[[-1.6314461   0.10213946]
  [-1.8610548   1.0587581 ]
  [-1.3227473  -0.3087512 ]
  ...
  [ 2.5524678  -1.1176909 ]
  [-2.144728   -0.14183743]
  [ 1.5685402   1.4364138 ]]

 [[ 2.2401392   0.3780224 ]
  [-0.8386898   0.41933286]
  [-0.31032094 -0.25639293]
  ...
  [-0.06527975 -2.5865684 ]
  [-1.4600252  -1.2727835 ]
  [ 0.3735278  -0.942739  ]]

 [[ 0.7964652   0.3639845 ]
  [-0.17897135  0.5694222 ]
  [-0.384163    0.90277517]
  ...
  [ 0.6548506  -0.40047857]
  [ 0.5892955  -0.43906537]
  [ 0.12116134 -0.32299644]]

 ...

 [[-2.515238   -0.08754388]
  [-1.0148655  -0.28562468]
  [-0.42127475  2.0521984 ]
  ...
  [ 0.22316395 -2.762959  ]
  [ 0.23331423 -0.07616359]
  [-0.09802152 -1.241122  ]]

 [[ 1.6766893   0.99218315]
  [ 0.6713277   0.8992305 ]
  [-2.2675726   1.1803068 ]
  ...
  [-0.23502544  0.63387656]
  [-0.767505   -0.58275646]
  [ 1.7550484  -1.5197784 ]]

 [[ 0.7512635   0.29957932]
  [ 0.5410202  -1.8889133 ]
  [ 0.14981823  1.7470977 ]
  ...
  [ 0.4695632   0.029245

In [7]:
print(snr)
print(np.min(snr))
print(np.max(snr))
print(np.average(snr))

[ 0  0 30 ...  0  0  0]
0
30
14.906666666666666


In [8]:
print(labels_id)
print(np.max(labels_id))
print(np.min(labels_id))
print(np.average(labels_id))

[4 3 3 ... 3 2 0]
5
0
2.4875333333333334


## Chargement des données en Pytorch

Pour entrainer des réseaux de neurones profond sur nos données nous allons utiliser le framework Pytorch. 
Une première étape va consister à transférer les données de numpy à PyTorch, cela passe par deux objets : 
    - un Dataset qui modélise le dataset à haut niveau dans la mémoire de l'ordinateur
    - un Dataloader qui permet d'échantillonner le Dataset Pytorch dans les itérations de l'optimisation du réseau de neurones 
    
Un dataset prend la forme 
```python
class MyDataset(torch.utils.data.Dataset):
    
    def __init__(self, path_to_data):
        ...
    def __len__(self): #retourne le nombre de données dans le dataset
        ...
    def __getitem__(self,i): #retourne pour chaque indice i un couple (data_i, lablel_i), data_i étant un signal et label_i le label associé au signal
        ...
```

Implémentez une classe Dataset pour le dataset considéré ici 

In [9]:
class MyDataset(torch.utils.data.Dataset):
    
    def __init__(self, path_to_data):
        self.data = h5py.File(path_to_data, 'r')
        self.labels = get_labels(self.data)
        self.signals = np.array(self.data['signaux'])
        self.snr = np.array(self.data['snr'])
        self.labels_id = np.array(self.data['labels'])
        self.data.close()

    def __len__(self): #retourne le nombre de données dans le dataset
        return len(self.signals)
        ...
    def __getitem__(self,i): #retourne pour chaque indice i un couple (data_i, lablel_i), data_i étant un signal et label_i le label associé au signal
        return self.signals[i], self.labels_id[i]

Instanciez un objet dataset et testez le sur les données samples
```python
dataset = MyDataset(...)
```

In [10]:
dataset = MyDataset(data_path)

Pytorch propose une classe Dataloader qui permet d'échantillonner des batchs de taille fixe à partir d'un dataset. 
La cellule suivante donne un exemple d'utilisation

In [11]:
from torch.utils.data import DataLoader

dataloader = DataLoader(dataset, 
                        batch_size=10, 
                        shuffle=True
                       )

for i, (data, label) in enumerate(dataloader):
    print(i, data, label)
    if i > 5:
        break

0 tensor([[[ 0.4481, -0.7993],
         [ 0.1755, -0.9998],
         [ 1.7816,  0.1814],
         ...,
         [-1.2973, -0.6685],
         [-1.1556,  1.9490],
         [-0.1869, -0.0865]],

        [[ 0.4443,  0.1804],
         [ 0.3837, -0.6346],
         [-0.3020, -1.1736],
         ...,
         [ 1.3421,  0.2865],
         [ 0.7873, -0.4197],
         [-0.7349, -0.7868]],

        [[ 0.3555, -0.3041],
         [-0.4527, -0.2785],
         [-0.9770, -1.3957],
         ...,
         [ 0.9386,  0.0441],
         [ 0.8415, -0.4073],
         [ 0.0168,  0.3605]],

        ...,

        [[-1.2628, -0.3563],
         [-0.3169, -0.6170],
         [ 0.9412,  0.0583],
         ...,
         [ 0.2827,  0.6412],
         [-0.3050,  0.4586],
         [ 0.7993, -0.0764]],

        [[ 0.0683, -0.8869],
         [ 0.9536, -1.1645],
         [ 0.6606, -0.8335],
         ...,
         [-0.1914, -0.5499],
         [-0.5127, -0.4803],
         [ 0.1377,  0.6138]],

        [[-3.5910,  2.7428],
     

Testez le dataloader pour différentes valeurs de batch_size 

In [12]:
for batch_size in [1, 10, 100, 1000]:
    dataloader = DataLoader(dataset, 
                            batch_size=batch_size, 
                            shuffle=True
                           )
    for i, (data, label) in enumerate(dataloader):
        print(i, data, label)
        if i > 5:
            break

0 tensor([[[ 0.3586, -0.4452],
         [-0.1619, -0.6541],
         [-0.2681, -0.7529],
         ...,
         [ 1.0476,  0.0058],
         [ 0.5146, -0.2484],
         [ 0.3076, -0.5168]]]) tensor([4], dtype=torch.int8)
1 tensor([[[-0.3768, -0.2032],
         [-0.6254, -0.2036],
         [-0.7560, -0.3614],
         ...,
         [-0.8199,  0.4308],
         [-0.3401,  0.5197],
         [-0.7075,  0.0626]]]) tensor([5], dtype=torch.int8)
2 tensor([[[ 1.8558, -0.4627],
         [ 1.3487,  0.0484],
         [ 0.6687, -0.5672],
         ...,
         [ 1.3390, -1.3692],
         [ 0.4961, -1.0729],
         [-0.9861,  0.4877]]]) tensor([2], dtype=torch.int8)
3 tensor([[[-0.8961,  0.4463],
         [-1.0730,  0.3067],
         [-0.2819,  0.2916],
         ...,
         [ 0.0651,  0.2872],
         [ 0.1821, -0.6458],
         [-0.4147,  0.3094]]]) tensor([0], dtype=torch.int8)
4 tensor([[[ 1.9102,  1.7257],
         [ 2.2360, -1.5433],
         [-0.0471,  0.0150],
         ...,
         

## Mise en place d'un réseau "dumb" pour tester la pipeline d'entrainement

Définissez un premier modèle Pytorch qui prend en entrée un batch de données (tenseur de dimensions [B , C, T] avec B la taille du batch, C le nombre de canaux des signaux et T le nombre d'échantillons dans les signaux) et renvoie un batch de vecteur de probabilités (ou de log probabilités si vous préférez) (tenseur de dimensions [B,N] où N est le nombre de classe à identifier). 

Ce modèle doit comporter moins de 10000 paramètres libres. 

Ce Modèle doit être très simple, il doit être rapide à exécuter, il servira à tester et éventuellement débugger la pipeline d'entrainement que vous mettrez en place un peu plus loin. Un template d'implémentation d'une classe Model se trouve dans les diapositives du cours.

In [13]:
class MonModeleQuiTorche(torch.nn.Module):
    def __init__(self,delta_chan=4,verbose=False):
        if verbose:
            self.print = print
        else:
            self.print = lambda x:None
        self.print('Initialisation classe mère \n')
        torch.nn.Module.__init__(self) 
        
        self.print('\n Initialisation classe courante \n')
        self.delta_chan=delta_chan
        self.learnable_param = torch.nn.Parameter(torch.rand([1,delta_chan,1]))
        self.not_learnable_param = torch.rand((1,delta_chan,1))

    def __setattr__(self,name,value):
        super().__setattr__(name,value)
        self.print(f'Enregistrement de: {name} à la valeur {value}')
        
    def forward(self,x): 
        #x is [B,input_chan,T]
        # output is [B,self.output_chan,T]
        x_reduced = torch.mean(x , axis = 1 , keepdim=True)
        x_duplicated = torch.tile(x_reduced , dims = (1, self.delta_chan, 1))

        y0 = self.learnable_param *x_duplicated 
        y1 = y0+ self.not_learnable_param

        y2 = torch.abs(y1)
        
        y3 = torch.concat([x, y2], axis=1)
        return y3
    
    def __call__(self,x):
    # Défini dans la classe mère
        return self.forward(x)

In [14]:
mon_modele = MonModeleQuiTorche(delta_chan=4,verbose=True)

Enregistrement de: print à la valeur <built-in function print>
Initialisation classe mère 


 Initialisation classe courante 

Enregistrement de: delta_chan à la valeur 4
Enregistrement de: learnable_param à la valeur Parameter containing:
tensor([[[0.7843],
         [0.2532],
         [0.2976],
         [0.9456]]], requires_grad=True)
Enregistrement de: not_learnable_param à la valeur tensor([[[0.0581],
         [0.7892],
         [0.5902],
         [0.8185]]])


Instanciez votre modèle et testez la consistence de ses entrées / sorties vis à vis des données étudiées (test purement fonctionnel, pas besoin de chercher à réaliser un entraînement à ce point).

In [15]:
# Define input_channels
input_channels = 2  # Number of channels in the input signal

# Create a dummy input tensor with the same shape as the data
dummy_input = torch.randn(10, input_channels, signals.shape[1])

# Test the consistency of MonModeleQuiTorche
mon_modele = MonModeleQuiTorche(delta_chan=4, verbose=True)
mon_modele.eval()  # Set the model to evaluation mode

# Pass the dummy input through the model
mon_modele_output = mon_modele(dummy_input)
print("MonModeleQuiTorche output shape:", mon_modele_output.shape)

Enregistrement de: print à la valeur <built-in function print>
Initialisation classe mère 


 Initialisation classe courante 

Enregistrement de: delta_chan à la valeur 4
Enregistrement de: learnable_param à la valeur Parameter containing:
tensor([[[0.9758],
         [0.6871],
         [0.6460],
         [0.7800]]], requires_grad=True)
Enregistrement de: not_learnable_param à la valeur tensor([[[0.0627],
         [0.0089],
         [0.7380],
         [0.2831]]])
Enregistrement de: training à la valeur False
MonModeleQuiTorche output shape: torch.Size([10, 6, 2048])


Estimez par un calcul "théorique" le nombre de paramètres du modèle que vous avez défini et vérifié que le modèle a bien ce nombre de paramètres en pratique par exemple en utilisant la fonction suivante : 

In [16]:
def count_n_param(model):
    return sum([p.numel() for p in model.parameters()])

In [17]:
print("MonModeleQuiTorche nombre de paramètres:", count_n_param(mon_modele))

MonModeleQuiTorche nombre de paramètres: 4


## Mise en place de la pipeline d'entraînement

La pipeline d'entrainement consiste à 
- charger les données 
- les batcher 
- réaliser des itération (epochs) de descente de gradient pour optimiser les paramètres d'un algorithme selon une fonction de perte (loss)
- logger l'évolution au fil des epochs  de la loss sur l'ensemble train et l'ensemble de validation et éventuellement de métriques complémentaires 

Un cavnevas d'implémentation pourrait être:

```python
device = 'cpu' # set so 'cuda:xx' if you have a GPU, xx is GPU index. L'entraînement des réseaux de neurones est grandement accéléré par l'utilisation d'un GPU 

model = ...  # vous instanciez ici votre modèle

loss = .... # définissez la fonction de perte selon laquelle le modèle sera optimisé

optimizer = torch.optim.Adam(model.parameters()) # en pratique on utilise pas une simple descente de gradient mais une procédure d'optimisation plus sophistiquée qui est implémentée sous la forme d'un objet Optimizer. Il en existe beaucoup d'optimizers différents, vous pouvez en tester différents, je vous propose d'utiliser en premier lieu l'algorithme Adam

n_epochs = ... # le nombre d'itérations dans l'entrainement 

chemin_vers_sauvegarde_model = # chemin vers un fichier où vous sauvegarderez votre modèle après optimisation pour le réutiliser plus tard. 

model.to(device) # on place le modèle dans le GPU si nécessaire

for epoch in range(n_epochs):
    
    for batch_x,batch_y in dataloader_train:
        
        batch_x.to(device)
        batch_y.to(device)
        
        optimizer.zero_grad()
        
        batch_y_predicted = model(batch_x)
        
        l = loss(batch_y_predicted, batch_y)
        # loggez la loss sur le batch d'entraînement
        
        l.backward()
        
        optimizer.step()
        
    for batch_x,batch_y in dataloader_valid:
        
        batch_x.to(device)
        batch_y.to(device)
        
        with torch.no_grad():
            batch_y_predicted = model(batch_x)  
            
        # loggez la loss et les métriques sur le batch de validation

torch.save(model, chemin_vers_sauvegarde_model)

```

Mettez en place votre pipeline et testez là sur votre modèle dumb. Faites en sorte que votre façon de logger les loss et les métriques vous permette de visualiser l'évolution de ces différents indicateurs sur l'ensemble d'entrainement et de validation au fil des epochs. 

In [18]:
if torch.cuda.is_available():
    device = 'cuda:0'
else:
    device = 'cpu'

model0 = MonModeleQuiTorche(delta_chan=4, verbose=True) 
 # vous instanciez ici votre modèle

#loss = nn.L1Loss()
loss = nn.CrossEntropyLoss()
     # définissez la fonction de perte selon laquelle le modèle sera optimisé

optimizer = torch.optim.Adam(model0.parameters()) # en pratique on utilise pas une simple descente de gradient mais une procédure d'optimisation plus sophistiquée qui est implémentée sous la forme d'un objet Optimizer. Il en existe beaucoup d'optimizers différents, vous pouvez en tester différents, je vous propose d'utiliser en premier lieu l'algorithme Adam

n_epochs = 10 # le nombre d'itérations dans l'entrainement 

chemin_vers_sauvegarde_model0= 'model0.pth'
# chemin vers un fichier où vous sauvegarderez votre modèle après optimisation pour le réutiliser plus tard. 

model0.to(device) # on place le modèle dans le GPU si nécessaire

model = model0

dataset = MyDataset(data_path)

dataloader_train = DataLoader(dataset,
                                batch_size=10,
                                shuffle=True
                                 )

def evaluate_loss(loss, batch_y, batch_y_predicted):
    l = loss(batch_y_predicted, batch_y)
    return l

for epoch in range(n_epochs):

    epoch_loss = 0
    train_losses = []
    
    for batch_x, batch_y in dataloader_train:
        
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device).long()
        
        optimizer.zero_grad()

        batch_y_predicted = model(batch_x)
        
        # Ensure the predicted tensor has the same shape as the target tensor
        if batch_y_predicted.shape != batch_y.shape:
            batch_y_predicted = batch_y_predicted.view(batch_y.size(0), -1)
        
        l = evaluate_loss(loss, batch_y, batch_y_predicted)
        # loggez la loss sur le batch d'entraînement
        
        l.backward()
        
        optimizer.step()
    epoch_loss += l.item()

    avg_epoch_loss = epoch_loss / len(dataloader_train)
    train_losses.append(avg_epoch_loss)
    print(f'Epoch {epoch+1}/{n_epochs}, Loss: {avg_epoch_loss:.4f}')

torch.save(model, chemin_vers_sauvegarde_model0)

Enregistrement de: print à la valeur <built-in function print>
Initialisation classe mère 


 Initialisation classe courante 

Enregistrement de: delta_chan à la valeur 4
Enregistrement de: learnable_param à la valeur Parameter containing:
tensor([[[0.1699],
         [0.4248],
         [0.4499],
         [0.9194]]], requires_grad=True)
Enregistrement de: not_learnable_param à la valeur tensor([[[0.2619],
         [0.5998],
         [0.2669],
         [0.9883]]])


Epoch 1/10, Loss: 0.0028
Epoch 2/10, Loss: 0.0030
Epoch 3/10, Loss: 0.0030
Epoch 4/10, Loss: 0.0029
Epoch 5/10, Loss: 0.0029
Epoch 6/10, Loss: 0.0028
Epoch 7/10, Loss: 0.0030
Epoch 8/10, Loss: 0.0030
Epoch 9/10, Loss: 0.0029
Epoch 10/10, Loss: 0.0029


Vérifiez que vous avez bien enregistré votre modèle en fin d'entrainement. Chargez le avec la fonction 
```python
modele = torch.load(...) 
```
et vérifiez que vous pouvez l'utiliser sur des données du problème.

In [19]:
# Load the saved models
model0_loaded = torch.load(chemin_vers_sauvegarde_model0)

# Set the models to evaluation mode
model0_loaded.eval()

# Test the loaded models on a batch of data
with torch.no_grad():
    output0 = model0_loaded(batch_x)

print("Sortie du modèle très simple:", output0)

Enregistrement de: training à la valeur False
Sortie du modèle très simple: tensor([[[-1.1161,  0.6032],
         [-0.6636, -0.2503],
         [ 0.5572, -1.2712],
         ...,
         [ 0.5986,  0.6010],
         [ 0.2657,  0.2681],
         [ 0.9870,  0.9897]],

        [[ 0.2026, -1.0024],
         [ 0.6852, -0.3340],
         [ 1.0626,  0.2496],
         ...,
         [ 0.5944,  0.6002],
         [ 0.2615,  0.2674],
         [ 0.9822,  0.9888]],

        [[ 2.4645, -1.2924],
         [ 1.2611, -0.7624],
         [ 0.3174, -0.7226],
         ...,
         [ 0.5928,  0.5974],
         [ 0.2599,  0.2645],
         [ 0.9804,  0.9856]],

        ...,

        [[ 0.6297, -0.1072],
         [ 0.1320, -0.0280],
         [-1.2504,  0.0179],
         ...,
         [ 0.5954,  0.5989],
         [ 0.2625,  0.2660],
         [ 0.9833,  0.9873]],

        [[ 0.6955,  0.8705],
         [ 0.0686,  0.6918],
         [-0.7994,  0.5787],
         ...,
         [ 0.5975,  0.5938],
         [ 0.2646,  

  model0_loaded = torch.load(chemin_vers_sauvegarde_model0)


## Entraînement de réseaux de neurones

Dans cette partie vous définissez une ou plusieurs architecture de réseaux de neurones profonds et vous les réglez sur les données d'entrainement. 
Vous pouvez notamment utiliser des réseaux à base de convolutions et/ou de couches réurrentes. Vous pouvez vous inspirer de ce qui a été dit en cours sur la reconnaissance vocale.

Dans un deuxième temps (facultatif), si vous le souhaitez vous pouvez mettre en place des stratégies d'augmentation de données pour améliorer vos résultats. Pour mettre l'augmentation de données en pratique pouvez vous renseigner sur l'argument collate_fn du dataloader standard de Pytorch. 

### Entraînement avec un réseau à couche récurrente simple

In [None]:
class SimpleCNNModel(nn.Module):
    def __init__(self, input_channels, num_classes):
        super(SimpleCNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channel=input_channels, out_channel=16, kernel_size=3, padding="same")

        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, padding="same")
        self.fc1 = nn.Linear(32 * 2048, 128)  # 2048 is the number of samples in the signal
        self.fc2 = nn.Linear(128, num_classes)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

#Paramètres du problème (reste fixé)
input_channels = 2  # Number of channels in the input signal
num_classes = 6     # Number of classes to identify

model1 = SimpleCNNModel(input_channels, num_classes)
print(model1)

print("SimpleCNNModel nombre de paramètres:", count_n_param(model1))


SimpleCNNModel(
  (conv1): Conv1d(2, 16, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(16, 32, kernel_size=(3,), stride=(1,), padding=(1,))
  (fc1): Linear(in_features=65536, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=6, bias=True)
)
SimpleCNNModel nombre de paramètres: 8391190


In [None]:
#loss = nn.L1Loss()
loss = nn.CrossEntropyLoss()
     # définissez la fonction de perte selon laquelle le modèle sera optimisé

optimizer = torch.optim.Adam(model1.parameters()) # en pratique on utilise pas une simple descente de gradient mais une procédure d'optimisation plus sophistiquée qui est implémentée sous la forme d'un objet Optimizer. Il en existe beaucoup d'optimizers différents, vous pouvez en tester différents, je vous propose d'utiliser en premier lieu l'algorithme Adam

n_epochs = 10 # le nombre d'itérations dans l'entrainement 

dataset = MyDataset(data_path)

dataloader_train = DataLoader(dataset,
                                batch_size=32,
                                shuffle=True
                                 )


chemin_vers_sauvegarde_model1 = 'model1.pth'

model = model1
model1.to(device) # on place le modèle dans le GPU si nécessaire

train_losses = []

for epoch in range(n_epochs):
    epoch_loss = 0  # Initialize epoch_loss at the beginning of each epoch
    
    for batch_x,batch_y in dataloader_train:
        batch_x = batch_x.permute(0, 2, 1).to(device)
        batch_y = batch_y.to(device).long()
        
        optimizer.zero_grad()
        
        batch_y_predicted = model(batch_x)
        
        l = loss(batch_y_predicted, batch_y)
        # loggez la loss sur le batch d'entraînement
        
        l.backward()
        
        optimizer.step()
    
        
        with torch.no_grad():
            batch_y_predicted = model(batch_x)  
        
        epoch_loss += l.item()

    avg_epoch_loss = epoch_loss / len(dataloader_train)
    train_losses.append(avg_epoch_loss)
    print(f'Epoch {epoch+1}/{n_epochs}, Loss: {avg_epoch_loss:.4f}')
            
        # loggez la loss et les métriques sur le batch de validation

torch.save(model1, chemin_vers_sauvegarde_model1)

Epoch 1/10, Loss: 0.9250
Epoch 2/10, Loss: 0.4976
Epoch 3/10, Loss: 0.2170
Epoch 4/10, Loss: 0.1177
Epoch 5/10, Loss: 0.0606
Epoch 6/10, Loss: 0.0452
Epoch 7/10, Loss: 0.0295
Epoch 8/10, Loss: 0.0191
Epoch 9/10, Loss: 0.0101
Epoch 10/10, Loss: 0.0158


In [None]:
# Tracé de la loss au cours des epochs
plt.plot(range(1, n_epochs + 1), train_losses, marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.show()

### Avec un réseau convolutionel LSTM

In [None]:
class CNN_LSTM_Model(nn.Module):
    def __init__(self, input_channels, num_classes):
        super(CNN_LSTM_Model, self).__init__()
        self.conv1 = nn.Conv1d(input_channels, 32, kernel_size=3, padding="same")
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding="same")
        self.lstm = nn.LSTM(input_size=64, hidden_size=128, num_layers=2, batch_first=True)
        self.fc = nn.Linear(128, num_classes)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.permute(0, 2, 1)  # Change shape to (batch_size, sequence_length, input_size)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # Take the output of the last time step
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

# Paramètres du problème (reste fixé)
input_channels = 2  # Number of channels in the input signal
num_classes = 6     # Number of classes to identify

cnn_lstm_model = CNN_LSTM_Model(input_channels, num_classes)
print(cnn_lstm_model)

print("LSTM_CNN_Model nombre de paramètres:", count_n_param(cnn_lstm_model))

CNN_LSTM_Model(
  (conv1): Conv1d(2, 32, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(32, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (lstm): LSTM(64, 128, num_layers=2, batch_first=True)
  (fc): Linear(in_features=128, out_features=6, bias=True)
)
LSTM_CNN_Model nombre de paramètres: 238630


In [None]:
# Initialisation du modèle
model = cnn_lstm_model
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

chemin_vers_sauvegarde_modelLSTM = 'modelCNNLSTM.pth'


# Paramètres d'entrainement
n_epochs = 10
train_losses = []

model.to(device)  #Placer le modèle sur le GPU si possible

# Boucle d'entraînement
for epoch in range(n_epochs):
    model.train()
    epoch_loss = 0
    for batch_x, batch_y in dataloader_train:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        batch_x = batch_x.permute(0, 2, 1).to(device)
        batch_y = batch_y.to(device).long()

        
        optimizer.zero_grad()
        batch_y_predicted = model(batch_x)
        loss = loss_fn(batch_y_predicted, batch_y)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
    
    avg_epoch_loss = epoch_loss / len(dataloader_train)
    train_losses.append(avg_epoch_loss)
    print(f'Epoch {epoch+1}/{n_epochs}, Loss: {avg_epoch_loss:.4f}')

torch.save(model, chemin_vers_sauvegarde_modelLSTM)

In [None]:
# Tracé de la loss au cours des epochs
plt.plot(range(1, n_epochs + 1), train_losses, marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.show()

### Mise en place de l'augmentation de données

In [None]:
def augment_data(signal, snr):
    # Example augmentation: Add Gaussian noise based on SNR
    noise = torch.randn_like(signal) * (10 ** (-snr / 20))
    augmented_signal = signal + noise
    return augmented_signal

def custom_collate_fn(batch):
    signals, labels, snrs = zip(*batch)
    augmented_signals = []
    for signal, snr in zip(signals, snrs):
        augmented_signal = augment_data(signal, snr)
        augmented_signals.append(augmented_signal)
    return torch.stack(augmented_signals), torch.tensor(labels)

# Create a new DataLoader with the custom collate function
dataloader_augmented = DataLoader(dataset, 
                                  batch_size=batch_size, 
                                  shuffle=True, 
                                  collate_fn=custom_collate_fn)

# Example usage of the augmented DataLoader
for i, (data, label) in enumerate(dataloader_augmented):
    print(i, data, label)
    if i > 5:
        break

#### Test avec le CNN simple

In [None]:
#loss = nn.L1Loss()
loss = nn.CrossEntropyLoss()
     # définissez la fonction de perte selon laquelle le modèle sera optimisé

optimizer = torch.optim.Adam(model1.parameters()) # en pratique on utilise pas une simple descente de gradient mais une procédure d'optimisation plus sophistiquée qui est implémentée sous la forme d'un objet Optimizer. Il en existe beaucoup d'optimizers différents, vous pouvez en tester différents, je vous propose d'utiliser en premier lieu l'algorithme Adam

n_epochs = 10 # le nombre d'itérations dans l'entrainement 


chemin_vers_sauvegarde_model1 = 'model1_augmented.pth'

model = model1
model1.to(device) # on place le modèle dans le GPU si nécessaire

train_losses = []

for epoch in range(n_epochs):
    epoch_loss = 0  # Initialize epoch_loss at the beginning of each epoch
    
    for batch_x,batch_y in dataloader_augmented:
        batch_x = batch_x.permute(0, 2, 1).to(device)
        batch_y = batch_y.to(device).long()
        
        optimizer.zero_grad()
        
        batch_y_predicted = model(batch_x)
        
        l = loss(batch_y_predicted, batch_y)
        # loggez la loss sur le batch d'entraînement
        
        l.backward()
        
        optimizer.step()
    
        
        with torch.no_grad():
            batch_y_predicted = model(batch_x)  
        
        epoch_loss += l.item()

    avg_epoch_loss = epoch_loss / len(dataloader_augmented)
    train_losses.append(avg_epoch_loss)
    print(f'Epoch {epoch+1}/{n_epochs}, Loss: {avg_epoch_loss:.4f}')
            
        # loggez la loss et les métriques sur le batch de validation

torch.save(model1, chemin_vers_sauvegarde_model1)

## Synthèse des résultats
Une fois que votre ou vos réseaux sont entrainez vous comparez leurs performances selon les métriques définies en introduction sur l'ensemble de test sans oublier de mesurer également la complexité de chaque approche en termes de nombre de paramètres. Si vous avez testé des approches qui vous semblent avoir échoué vous pouvez rédiger un petit paragraphe pour expliquer votre analyse de cet échec. 

In [None]:
# Load the test dataset
test_dataset = MyDataset('test.hdf5')
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Load the trained models
model0 = torch.load(chemin_vers_sauvegarde_model0)
model1 = torch.load(chemin_vers_sauvegarde_model1)
model_lstm = torch.load(chemin_vers_sauvegarde_modelLSTM)

# Set the models to evaluation mode
model0.eval()
model1.eval()
model_lstm.eval()

# Move models to the appropriate device
model0.to(device)
model1.to(device)
model_lstm.to(device)

# Initialize lists to store true and predicted labels
true_labels = []
predicted_labels_model0 = []
predicted_labels_model1 = []
predicted_labels_modelLSTM = []

# Evaluate model0
with torch.no_grad():
    for batch_x, batch_y in test_dataloader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model0(batch_x)
        _, predicted = torch.max(outputs, 1)
        true_labels.extend(batch_y.cpu().numpy())
        predicted_labels_model0.extend(predicted.cpu().numpy())

# Evaluate model1
with torch.no_grad():
    for batch_x, batch_y in test_dataloader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model1(batch_x)
        _, predicted = torch.max(outputs, 1)
        predicted_labels_model1.extend(predicted.cpu().numpy())

# Evaluate modelLSTM
with torch.no_grad():
    for batch_x, batch_y in test_dataloader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model_lstm(batch_x)
        _, predicted = torch.max(outputs, 1)
        predicted_labels_modelLSTM.extend(predicted.cpu().numpy())

# Calculate accuracy
accuracy_model0 = accuracy_score(true_labels, predicted_labels_model0)
accuracy_model1 = accuracy_score(true_labels, predicted_labels_model1)
accuracy_modelLSTM = accuracy_score(true_labels, predicted_labels_modelLSTM)

# Calculate confusion matrix
confusion_matrix_model0 = confusion_matrix(true_labels, predicted_labels_model0)
confusion_matrix_model1 = confusion_matrix(true_labels, predicted_labels_model1)
confusion_matrix_modelLSTM = confusion_matrix(true_labels, predicted_labels_modelLSTM)

# Print results
print(f"Model0 Accuracy: {accuracy_model0:.4f}")
print(f"Model1 Accuracy: {accuracy_model1:.4f}")
print(f"ModelLSTM Accuracy: {accuracy_modelLSTM:.4f}")
print("Model0 Confusion Matrix:")
print(confusion_matrix_model0)
print("Model1 Confusion Matrix:")
print(confusion_matrix_model1)
print("ModelLSTM Confusion Matrix:")
print(confusion_matrix_modelLSTM)

# Measure model complexity
num_params_model0 = count_n_param(model0)
num_params_model1 = count_n_param(model1)
num_params_modelLSTM = count_n_param(model_lstm)
print(f"Model0 Number of Parameters: {num_params_model0}")
print(f"Model1 Number of Parameters: {num_params_model1}")
print(f"ModelLSTM Number of Parameters: {num_params_modelLSTM}")

## Reste à faire

- Faire de l'hyperparameter tuning
- Réentrainer les modèles avec de l'augmentation de données
- Compléter/rédiger la synthèse
- Améliorer la présentation du notebook et du rapport
- Tracer graphs analyse signal au début
- Padding sur le champ réceptif ( padding = "same"     
        #stride=2, # T’ = T//2
        #dilation=2, # paramètre de dilatation)
- Parralélisation avec JAX

### Optional
- Eventuellement tester d'autres modèles (pourquoi pas un transformer)