Ce bloc de code importe plusieurs bibliothèques de Python nécessaires pour l'analyse de données et la création de modèles de classification. Voici ce que font les bibliothèques importées :

sklearn.model_selection : cette bibliothèque fournit des fonctions pour la sélection de modèles, la validation croisée et la sélection de paramètres.

sklearn.linear_model : cette bibliothèque fournit des classes pour la régression linéaire.

sklearn.neural_network : cette bibliothèque fournit des classes pour les réseaux de neurones.

sklearn.neighbors : cette bibliothèque fournit des classes pour la classification kNN.

sklearn.svm : cette bibliothèque fournit des classes pour les machines à vecteurs de support.

sklearn.gaussian_process : cette bibliothèque fournit des classes pour les processus gaussiens.

sklearn.tree : cette bibliothèque fournit des classes pour les arbres de décision.

sklearn.ensemble : cette bibliothèque fournit des classes pour les méthodes d'ensemble, comme Random Forest et AdaBoost.

sklearn.naive_bayes : cette bibliothèque fournit des classes pour la classification bayésienne naïve.

sklearn.discriminant_analysis : cette bibliothèque fournit des classes pour l'analyse discriminante linéaire et quadratique.

sklearn.preprocessing : cette bibliothèque fournit des classes pour la prétraitement des données, telles que la normalisation et la mise à l'échelle.

sklearn.pipeline : cette bibliothèque fournit des classes pour la construction de pipelines de traitement de données.

scipy.signal : cette bibliothèque fournit des fonctions pour le traitement du signal.

tensorflow : cette bibliothèque fournit des classes pour la création de modèles de réseau de neurones.

tensorflow.keras : cette bibliothèque fournit des fonctions pour la création de modèles de réseau de neurones avec Keras.

matplotlib.pyplot : cette bibliothèque fournit des fonctions pour la création de graphiques et de visualisations.

scipy.io : cette bibliothèque fournit des fonctions pour lire et écrire des fichiers MATLAB.

neurokit2 : cette bibliothèque fournit des fonctions pour l'analyse de données psychophysiologiques.

seaborn : cette bibliothèque fournit des fonctions pour la visualisation de données statistiques.

pandas : cette bibliothèque fournit des classes pour la manipulation de données en tableau.

numpy : cette bibliothèque fournit des fonctions pour le calcul numérique.

time : cette bibliothèque fournit des fonctions pour la mesure du temps.

mne : cette bibliothèque fournit des fonctions pour l'analyse de données d'électroencéphalographie (EEG).

In [None]:
from sklearn.model_selection import GroupKFold
from sklearn.linear_model import SGDClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import make_pipeline
from sklearn import svm
from scipy import signal
import tensorflow as tf
from tensorflow.keras.models import Sequential
from keras.layers.core import Dense, Dropout
from tensorflow.keras.layers import LSTM

import matplotlib.pyplot as plt
import scipy.io as sio

!pip install neurokit2
import neurokit2 as nk

import seaborn as sns

import pandas as pd
import numpy as np
import time
!pip install mne
import mne

Ce bloc de code permet de monter votre lecteur Google Drive dans Google Colab. Cela vous permet de charger des fichiers à partir de votre lecteur Google Drive directement dans votre environnement Colab.

La fonction drive.mount() prend en paramètre le chemin d'accès au dossier de montage. Une fois que la commande est exécutée, vous serez invité à autoriser l'accès à votre lecteur Google Drive. Lorsque vous aurez donné votre autorisation, le lecteur sera monté et vous pourrez accéder à son contenu à partir de votre environnement Colab.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Ce bloc de code charge un fichier de données DREAMERedit.mat à partir du chemin d'accès spécifié dans la variable path en utilisant la fonction sio.loadmat() de la bibliothèque scipy.io.

La fonction sio.loadmat() charge les données à partir d'un fichier MATLAB .mat et les stocke sous forme de dictionnaire Python. La variable raw contient les données chargées à partir du fichier.

In [None]:
path = "/content/drive/My Drive/DREAMER.mat"
raw = sio.loadmat(path)

La section ci-dessous concerne les signaux EEG

# EEG SIGNALS 

In [None]:
import plotly.graph_objs as go
from plotly.subplots import make_subplots

electrode = 0
video = 0
participant = 0

stim = raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["stimuli"][0, 0][video, 0][:, electrode]
basl = raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["baseline"][0, 0][video, 0][:, electrode]


fig = make_subplots(rows=2, cols=1, shared_xaxes=True)


fig.add_trace(
    go.Scatter(x=list(range(len(stim))), y=stim, name="Stimulus Signal"),
    row=1, col=1
)


fig.add_trace(
    go.Scatter(x=list(range(len(basl))), y=basl, name="Baseline Signal"),
    row=2, col=1
)

fig.update_xaxes(title_text="Time (samples)", row=2, col=1)
fig.update_yaxes(title_text="Signal Amplitude", row=1, col=1)
fig.update_yaxes(title_text="Signal Amplitude", row=2, col=1)

fig.update_layout(title_text="EEG Signals")

fig.show()


Cette fonction effectue un prétraitement sur un signal EEG brut. Le signal est découpé en 12 segments de 640 points de données chacun, puis chaque segment est filtré avec un filtre passe-bas de fréquence de coupure de 40 Hz. Ensuite, la fonction calcule la densité spectrale de puissance (PSD) de chaque segment en utilisant la méthode Welch. Les PSDs sont ensuite utilisées pour calculer la moyenne de la puissance dans les bandes de fréquences theta (4-7.5 Hz), alpha (7.5-13 Hz) et beta (13-20 Hz) pour chaque segment. Ces moyennes sont stockées dans une liste de caractéristiques. La fonction renvoie cette liste de caractéristiques.

In [None]:
def preprocess_EEG(raw, feature):

  frame = [[0 for x in range(640)] for y in range(12)] 

  for i in range(0,7680):
    j = i//640
    k = i%640
    frame[j][k] = raw[i]
  
  samplingRate = 128 
  nyquest = samplingRate/2.0
  n=213

  f1 = 40/nyquest
  

  fnew = [[0 for x in range(640)] for y in range(18)]
  pnew = [[0 for x in range(640)] for y in range(18)]
  thetamean =[]
  alphamean =[]
  betamean  =[]

  overall_filter = signal.firwin(n, f1, pass_zero=True, window="hamming") 

  for i in range(0,12):

    filt_overall = signal.filtfilt(overall_filter, 1, frame[i], method ='pad')
    f,p = signal.welch(filt_overall,fs =128, nperseg=256)
    
    fnew[i] = f
    pnew[i] = p
    
    plt.semilogy(fnew[i], pnew[i])
    plt.xlabel('frequency [Hz]')
    plt.ylabel('PSD')

    c =[]
    for k in range(0,129):
      x=[pnew[i][k],fnew[i][k]]
      c.append(x)

    count = 0.0; sum = 0.0; mean = 0.0
    for a in range(0,129):
      if(c[a][1] >4.0 and c[a][1] <=7.5 ): #theta
        count +=1
        sum += c[a][0] 
    mean = sum/count
    thetamean.append(mean)

    count = 0.0; sum = 0.0; mean = 0.0
    for a in range(0,129):
      if(c[a][1] >7.5 and c[a][1]<=13):   #alpha
        count +=1
        sum += c[a][0] 
    mean = sum/count
    alphamean.append(mean)

    count = 0.0; sum = 0.0; mean = 0.0
    for a in range(0,129):
      if(c[a][1] >13 and c[a][1]<=20):    #beta
        count +=1
        sum += c[a][0] 
    mean = sum/count
    betamean.append(mean)


    fe=[]
    fe.append(thetamean[i])
    fe.append(alphamean[i])
    fe.append(betamean[i])
      
    feature.append(fe)

  return feature

Ce code définit une fonction plot_spectral_density qui prend trois paramètres en entrée - electrode, video et participant. Il calcule la densité spectrale des signaux EEG du jeu de données DREAMER pour une électrode, une vidéo et un participant spécifiques. Il applique un filtre FIR fenêtré de type Hamming aux signaux EEG, calcule l'estimation de la densité spectrale de puissance de Welch, et trace les résultats sur une échelle logarithmique. Le graphique comprend deux lignes représentant la densité spectrale des stimuli et de la ligne de base, respectivement. L'axe des x représente la fréquence en Hertz et l'axe des y représente la densité spectrale de puissance en décibels. Le titre du graphique inclut les informations sur l'électrode et le participant.

In [None]:
def plot_spectral_density(electrode, video, participant):
    stim = raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["stimuli"][0, 0][video, 0][:, electrode]
    basl = raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["baseline"][0, 0][video, 0][:, electrode]
    frame_stim = [[0 for x in range(640)] for y in range(12)] 
    frame_basl = [[0 for x in range(640)] for y in range(12)]
    for i in range(0,7680):
        j = i//640
        k = i%640
        frame_stim[j][k] = stim[i]
    for i in range(0,7680):
        j = i//640
        k = i%640
        frame_basl[j][k] = basl[i]  
    samplingRate = 128 
    nyquist = samplingRate/2.0
    n = 213

    f1 = 40/nyquist

    fnew_stim = []
    pnew_stim = []
    fnew_basl = []
    pnew_basl = []

    overall_filter = signal.firwin(n, f1, pass_zero=True, window="hamming") 

    for i in range(0, 12):

        filt_overall_stim = signal.filtfilt(overall_filter, 1, frame_stim[i], method='pad')
        f_stim, p_stim = signal.welch(filt_overall_stim, fs=128, nperseg=256)

        fnew_stim.append(f_stim)
        pnew_stim.append(p_stim)
        
    for i in range(0, 12):

        filt_overall_basl = signal.filtfilt(overall_filter, 1, frame_basl[i], method='pad')
        f_basl, p_basl = signal.welch(filt_overall_basl, fs=128, nperseg=256)

        fnew_basl.append(f_basl)
        pnew_basl.append(p_basl)

    plt.figure(figsize=(10, 6))
    plt.grid(True)
    plt.semilogy(fnew_stim[electrode], pnew_stim[electrode], label='Stimuli')
    plt.semilogy(fnew_basl[electrode], pnew_basl[electrode], label='Baseline')
    plt.xlabel('Frequency [Hz]')
    plt.ylabel('PSD [dB]')
    plt.legend()
    plt.title('Electrode: {}, Participant: {}, Video {}'.format(electrode, participant,video))
    plt.show()

In [None]:
def plot_spectral_density_alpha_beta_theta(electrode, video, participant):
    stim = raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["stimuli"][0, 0][video, 0][:, electrode]
    basl = raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["baseline"][0, 0][video, 0][:, electrode]
    frame_stim = [[0 for x in range(640)] for y in range(12)] 
    frame_basl = [[0 for x in range(640)] for y in range(12)]
    for i in range(0,7680):
        j = i//640
        k = i%640
        frame_stim[j][k] = stim[i]
    for i in range(0,7680):
        j = i//640
        k = i%640
        frame_basl[j][k] = basl[i]  
    samplingRate = 128 
    nyquist = samplingRate/2.0
    n = 213

    f1 = 40/nyquist

    fnew_stim = []
    pnew_stim = []
    fnew_basl = []
    pnew_basl = []

    overall_filter = signal.firwin(n, f1, pass_zero=True, window="hamming") 

    for i in range(0, 12):

        filt_overall_stim = signal.filtfilt(overall_filter, 1, frame_stim[i], method='pad')
        f_stim, p_stim = signal.welch(filt_overall_stim, fs=128, nperseg=256)

        fnew_stim.append(f_stim)
        pnew_stim.append(p_stim)
        
    for i in range(0, 12):

        filt_overall_basl = signal.filtfilt(overall_filter, 1, frame_basl[i], method='pad')
        f_basl, p_basl = signal.welch(filt_overall_basl, fs=128, nperseg=256)

        fnew_basl.append(f_basl)
        pnew_basl.append(p_basl)

    plt.figure(figsize=(10, 6))
    plt.grid(True)
    alpha_band = np.where((fnew_stim[electrode] >= 8) & (fnew_stim[electrode] <= 12))
    beta_band = np.where((fnew_stim[electrode] >= 12) & (fnew_stim[electrode] <= 30))
    theta_band = np.where((fnew_stim[electrode] >= 4) & (fnew_stim[electrode] <= 8))
    
    plt.semilogy(fnew_stim[electrode][alpha_band], pnew_stim[electrode][alpha_band], label='Alpha')
    plt.semilogy(fnew_stim[electrode][beta_band], pnew_stim[electrode][beta_band], label='Beta')
    plt.semilogy(fnew_stim[electrode][theta_band], pnew_stim[electrode][alpha_band], label='Theta')
    plt.xlabel('Frequency [Hz]')
    plt.ylabel('PSD [dB]')
    plt.legend()
    plt.title('Electrode: {}, Participant: {}, Video {}'.format(electrode, participant,video))
    plt.show()

In [None]:
plot_spectral_density(2,2,1)
plot_spectral_density_alpha_beta_theta(2,2,1)

Cette fonction a pour but d'extraire des caractéristiques à partir de signaux EEG brut. Les données brutes sont stockées dans une variable "raw" et les électrodes à partir desquelles les caractéristiques seront extraites sont stockées dans une variable "electrode".

La fonction commence par initialiser une matrice EEG_tmp de dimensions (23,18,36). Cette matrice sera utilisée pour stocker les caractéristiques extraites de chaque participant pour chaque vidéo.

La fonction parcourt ensuite les participants et les vidéos et extrait les signaux de base et de stimulation pour chaque participant et vidéo. Les signaux de base et de stimulation sont ensuite envoyés à la fonction preprocess_EEG qui calcule la densité spectrale de puissance (PSD) pour chaque électrode. Les PSD de la stimulation sont divisées par les PSD de base pour obtenir la densité relative. Les valeurs de densité relative sont stockées dans la matrice EEG_tmp.

La fonction se termine en créant un tableau de données pandas à partir de la matrice EEG_tmp, en redimensionnant le tableau de données pour avoir 36 colonnes et en normalisant les données à l'aide de la classe StandardScaler de scikit-learn. Enfin, la fonction renvoie le tableau de données pandas.

In [None]:
def feat_extract_EEG(raw, electrode):
    EEG_tmp = np.zeros((23, 18, 36))
    for participant in range(0, 23):
        for video in range(0, 18):

              B, S = [], []
                
              basl = (raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["baseline"][0, 0][video, 0][:, electrode])
              stim = (raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["stimuli"][0, 0][video, 0][:, electrode])
                
              B = preprocess_EEG(basl, B) 
              S = preprocess_EEG(stim, S) 

              A = np.divide(S, B)   
              column = 0

              for k in range(0,3):
                for l in range(0,12):
                  EEG_tmp[participant, video,column] = A[l][k]
                  column+=1
                                

    col = []

    for i in range(0, 12):
      col.append("psdtheta_"+str(i + 1)+"_un")
    for i in range(0, 12):
      col.append("psdalpha_"+str(i + 1)+"_un")
    for i in range(0, 12):
     col.append("psdbeta_"+str(i + 1)+"_un")

    EEG_tmp = EEG_tmp.reshape(-1,EEG_tmp.shape[2])
    data_EEG = pd.DataFrame(EEG_tmp, columns=col)
    scaler = StandardScaler()

    
    for i in range(len(col)):
        data_EEG[col[i][:-3]] = data_EEG[[col[i]]]
        
    data_EEG.drop(col, axis=1, inplace=True)
    return data_EEG

Cette fonction prend en entrée un objet 'raw' et crée un tableau 'a' de taille (23, 18, 9). Ensuite, elle parcourt les 23 participants et les 18 vidéos pour lesquels les données de valence, arousal et dominance doivent être prédites. Dans chaque itération de la boucle, la fonction remplit les premières colonnes de 'a' avec l'âge, le genre, le numéro de participant et le numéro de vidéo correspondants. La fonction remplit également les noms des vidéos et les émotions cibles pour chaque vidéo, ainsi que les scores de valence, d'excitation et de domination correspondants extraits de l'objet 'raw'.

Enfin, la fonction convertit 'a' en un objet DataFrame de pandas en réorganisant les données en 18*23 lignes et 9 colonnes correspondant à l'âge, le genre, le numéro de participant, le numéro de vidéo, le nom de la vidéo, l'émotion cible, la valence, l'excitation et la domination. La fonction retourne le DataFrame 'b'.

In [None]:
def participant_affective(raw):
    a = np.zeros((23, 18, 9), dtype=object)
    for participant in range(0, 23):
        for video in range(0, 18):
            a[participant, video, 0] = (raw["DREAMER"][0, 0]["Data"]
                                        [0, participant]["Age"][0][0][0])
            
            a[participant, video, 1] = (raw["DREAMER"][0, 0]["Data"]
                                        [0, participant]["Gender"][0][0][0])
            
            a[participant, video, 2] = int(participant+1)
            
            a[participant, video, 3] = int(video+1)
            
            a[participant, video, 4] = ["Searching for Bobby Fischer",
                                        "D.O.A.", "The Hangover", "The Ring",
                                        "300", "National Lampoon\'s VanWilder",
                                        "Wall-E", "Crash", "My Girl",
                                        "The Fly", "Pride and Prejudice",
                                        "Modern Times", "Remember the Titans",
                                        "Gentlemans Agreement", "Psycho",
                                        "The Bourne Identitiy",
                                        "The Shawshank Redemption",
                                        "The Departed"][video]
            
            a[participant, video, 5] = ["calmness", "surprise", "amusement",
                                        "fear", "excitement", "disgust",
                                        "happiness", "anger", "sadness",
                                        "disgust", "calmness", "amusement",
                                        "happiness", "anger", "fear",
                                        "excitement", "sadness",
                                        "surprise"][video]
            a[participant, video, 6] = int(raw["DREAMER"][0, 0]["Data"]
                                           [0, participant]["ScoreValence"]
                                           [0, 0][video, 0])
            a[participant, video, 7] = int(raw["DREAMER"][0, 0]["Data"]
                                           [0, participant]["ScoreArousal"]
                                           [0, 0][video, 0])
            a[participant, video, 8] = int(raw["DREAMER"][0, 0]["Data"]
                                           [0, participant]["ScoreDominance"]
                                           [0, 0][video, 0])
    b = pd.DataFrame(a.reshape((23*18, a.shape[2])),
                     columns=["age", "gender", "participant",
                              "video", "video_name", "target_emotion",
                              "valence", "arousal", "dominance"])
    return b

Cette cellule doit être exécuté ligne par ligne pour sauvegarder chaque fois les caractéristiques(features) de l'électrode dans une variable séparée.

In [None]:
#RUN Each one by one till cell where is it savd as csv

def_EEG1 = feat_extract_EEG(raw, 0)
#def_EEG2 = feat_extract_EEG(raw, 1)
#def_EEG3 = feat_extract_EEG(raw, 2)
#def_EEG4 = feat_extract_EEG(raw, 3)
#def_EEG5 = feat_extract_EEG(raw, 4)
#def_EEG6 = feat_extract_EEG(raw, 5)
#def_EEG7 = feat_extract_EEG(raw, 6)
#def_EEG8 = feat_extract_EEG(raw, 7)
#def_EEG9 = feat_extract_EEG(raw, 8)
#def_EEG10 = feat_extract_EEG(raw, 9)
#def_EEG11 = feat_extract_EEG(raw, 10)
#def_EEG12 = feat_extract_EEG(raw, 11)
#def_EEG13 = feat_extract_EEG(raw, 12)
#def_EEG14 = feat_extract_EEG(raw, 13)

Ce code concatène deux dataframes df_features et df_participant_affective en utilisant la fonction pd.concat(), pour créer le dataframe final df. Les colonnes valence, arousal et dominance de df_participant_affective sont converties en entiers en utilisant la méthode astype(int). Le dataframe final est stocké dans la variable df.

Le deuxième dataframe df2 est également créé en concaténant df_features et df_participant_affective avec la fonction pd.concat().

In [None]:
#Choisir l'électrode voulu
df_features = def_EEG1

df_participant_affective = participant_affective(raw)

df_participant_affective["valence"] = (df_participant_affective
                                       ["valence"].astype(int))
df_participant_affective["arousal"] = (df_participant_affective
                                       ["arousal"].astype(int))
df_participant_affective["dominance"] = (df_participant_affective
                                         ["dominance"].astype(int))

df = pd.concat([df_features, df_participant_affective], axis=1)
df

Ce code crée un nouveau DataFrame appelé data2 qui est une copie de df contenant uniquement les émotions cibles suivantes : 'anger', 'fear', 'calmness', 'surprise', 'excitement', 'amusement', 'happiness', 'sadness' et 'disgust'. Ensuite, il crée trois nouvelles colonnes dans data2 : 'class', 'valencehigh' et 'arousalhigh'. La colonne 'class' est créée en utilisant un dictionnaire qui associe chaque émotion à une classe numérique allant de 0 à 3. Les colonnes 'valencehigh' et 'arousalhigh' sont créées en utilisant des dictionnaires qui associent les valeurs de valence et d'excitation à des valeurs binaires de 0 ou 1. Enfin, le DataFrame data2 est enregistré en tant que fichier CSV sous le nom 'DE3_NS.csv'.

In [None]:
data2 = df.loc[(df['target_emotion'] == 'anger') |
                (df['target_emotion'] == 'fear') |
                (df['target_emotion'] == 'calmness') |
                (df['target_emotion'] == 'surprise') |
                (df['target_emotion'] == 'excitement') |
                (df['target_emotion'] == 'amusement') |
                (df['target_emotion'] == 'happiness') |
                (df['target_emotion'] == 'sadness') |
                (df['target_emotion'] == 'disgust')].copy()

d={'surprise': 0, 'excitement': 0, 'amusement': 0, 'happiness': 0, 'fear': 1, 'anger': 1, 'sadness':2, 'disgust':2 ,'calmness': 3}
data2['class'] = data2.target_emotion.map(d)

e={0: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1 }
data2['valencehigh'] = data2.valence.map(e)

f={0: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1 }
data2['arousalhigh'] = data2.arousal.map(e)

data2.to_csv('DE3_NS.csv')

Ce code correspond à la sélection de données EEG brutes (non traitées) pour une paire participant-vidéo-électrode donnée à partir d'un ensemble de données stockées dans un dictionnaire multidimensionnel. Les données sont ensuite prétraitées (filtrage, normalisation) en utilisant une fonction nommée preprocess_EEG(). Les données de base (basl) et de stimulation (stim) sont stockées dans deux listes B et S. 

In [None]:
participant =0
video = 0
electrode = 0
B, S = [], []
                
basl = (raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["baseline"][0, 0][video, 0][:, electrode])
stim = (raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["stimuli"][0, 0][video, 0][:, electrode])
                
B = preprocess_EEG(basl, B) 
S = preprocess_EEG(stim, S)

Cette fonction prend des données EEG brutes (non traitées) en entrée et extrait des caractéristiques (features) pertinentes de ces données. Elle organise ces caractéristiques dans un tableau et normalise les valeurs de ces caractéristiques pour qu'elles soient toutes comprises entre 0 et 1. La fonction renvoie ce tableau contenant les caractéristiques extraites normalisées.

In [None]:
def feat_extract_EEG_All(raw):
    
    EEG_tmp = np.zeros((23, 18, 107520))
    for participant in range(0, 23):
        for video in range(0, 18):
          column=0
          for electrode in range(0,14):
            stim = (raw["DREAMER"][0, 0]["Data"][0, participant]["EEG"][0, 0]["stimuli"][0, 0][video, 0][:, electrode])
            for i in range(0,7680):
              EEG_tmp[participant,video,column]= stim[i]
              column+=1
                                

    col = []

    for i in range(0, 7680):
      col.append("Electrode1_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode2_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode3_"+str(i + 1))
    for i in range(0, 7680):
     col.append("Electrode4_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode5_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode6_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode7_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode8_"+str(i + 1))
    for i in range(0, 7680):
     col.append("Electrode9_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode10_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode11_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode12_"+str(i + 1))
    for i in range(0, 7680):
      col.append("Electrode13_"+str(i + 1))
    for i in range(0, 7680):
     col.append("Electrode14_"+str(i + 1))


    EEG_tmp = EEG_tmp.reshape(-1,EEG_tmp.shape[2])
    data_EEG = pd.DataFrame(EEG_tmp, columns=col)
    print(data_EEG)
    from sklearn import preprocessing

    x = data_EEG.values
    min_max_scaler = preprocessing.MinMaxScaler()
    x_scaled = min_max_scaler.fit_transform(x)
    data_EEG = pd.DataFrame(x_scaled)
    data_EEG.columns = col
    print(data_EEG)
    return data_EEG

Ce code commence par extraire les caractéristiques des signaux EEG bruts en utilisant la fonction "feat_extract_EEG_All". Ensuite, il crée des dataframes à partir des caractéristiques et des scores d'affect de chaque participant. Il convertit ensuite les scores de valence, d'excitation et de domination en entiers et concatène les deux dataframes.

Le code crée ensuite un nouveau dataframe "data1" en filtrant les émotions cibles à partir d'une colonne nommée "target_emotion". Il attribue un numéro de classe à chaque émotion cible et crée deux colonnes binaires "valencehigh" et "arousalhigh" à partir des scores de valence et d'excitation.

Enfin, il exporte le dataframe résultant dans un fichier CSV nommé "normalized.csv".

In [None]:
def_all = feat_extract_EEG_All(raw)
df_features = def_all
df_participant_affective = participant_affective(raw)

df_participant_affective["valence"] = (df_participant_affective
                                       ["valence"].astype(int))
df_participant_affective["arousal"] = (df_participant_affective
                                       ["arousal"].astype(int))
df_participant_affective["dominance"] = (df_participant_affective
                                         ["dominance"].astype(int))
dfn = pd.concat([df_features, df_participant_affective], axis=1)
data1 = dfn.loc[(df['target_emotion'] == 'anger') |
                (df['target_emotion'] == 'fear') |
                (df['target_emotion'] == 'calmness') |
                (df['target_emotion'] == 'surprise') |
                (df['target_emotion'] == 'excitement') |
                (df['target_emotion'] == 'amusement') |
                (df['target_emotion'] == 'happiness') |
                (df['target_emotion'] == 'sadness') |
                (df['target_emotion'] == 'disgust')].copy()

d={'surprise': 0, 'excitement': 0, 'amusement': 0, 'happiness': 0, 'fear': 1, 'anger': 1, 'sadness':2, 'disgust':2 ,'calmness': 3}
data1['class'] = data1.target_emotion.map(d)

e={0: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1 }
data1['valencehigh'] = data1.valence.map(e)

f={0: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1 }
data1['arousalhigh'] = data1.arousal.map(e)

data1.to_csv('normalized.csv')

In [None]:
data1 = pd.read_csv("normalized.csv")  
data_1 = data1.head()
data_1 

Dans ce code, on commence par extraire deux colonnes du DataFrame data2 qui sont nommées psdalpha_12 et psdalpha_3. Les valeurs de ces deux colonnes sont ensuite converties en tableaux numpy de type float. Ensuite, ces deux tableaux sont concaténés en un seul tableau numpy df3.

Ensuite, on réinitialise l'index de data2, puis on extrait toutes les colonnes à partir de la 2ème colonne jusqu'à la 36ème colonne à l'aide de la méthode iloc. Les colonnes extraites sont stockées dans le DataFrame dfz.

In [None]:

df1=data2.reset_index()['psdalpha_12']
df2=data2.reset_index()['psdalpha_3']
df1 = np.array(df1, dtype = float)
df2 = np.array(df2, dtype = float)
df3 = np.concatenate((df1, df2), axis=0)
data2=data2.reset_index()
dfz=data2.iloc[:,1:37]


In [None]:
dfz
dfy=pd.DataFrame(data={ 'alpha12':df1,'alpha3':df2})
dfz = np.array(dfz, dtype = float)
dfz

In [None]:
target=data2['valencehigh']
target

La première ligne importe la fonction train_test_split du module model_selection de la bibliothèque sklearn (Scikit-Learn), qui permet de diviser les données en ensemble d'entraînement et ensemble de test pour l'évaluation d'un modèle de machine learning.

Les trois lignes suivantes créent des tableaux numpy contenant les données d'entrée (features) de l'ensemble d'entraînement et de test (df1, df2) ainsi que la variable cible (target). La méthode dtype = float est utilisée pour s'assurer que les données sont stockées sous forme de nombres flottants.

Enfin, la dernière ligne applique la fonction train_test_split pour diviser les données en deux ensembles, un pour l'entraînement (x_Train et y_Train) et l'autre pour le test (x_Test et y_Test), avec une proportion de 35% pour l'ensemble de test et une graine aléatoire de 6 pour la reproductibilité.

In [None]:
from sklearn.model_selection import train_test_split
df1 = np.array(df1, dtype = float)
df2 = np.array(df2, dtype = float)
target = np.array(target, dtype = float)


x_Train, x_Test, y_Train, y_Test = train_test_split(dfz, target, test_size = 0.35, random_state = 6)

In [None]:
y_Train

Ces trois lignes de code préparent les données pour l'entrainement d'un modèle de réseau de neurones en redimensionnant les données d'entraînement et de test pour qu'elles aient la forme (n_samples, n_channels, n_features). Ici, il y a 1 canal car il n'y a qu'un seul type de données (PSD) utilisé, et 36 caractéristiques par canal (caractéristiques extraites du PSD). La fonction reshape de numpy est utilisée pour redimensionner les tableaux en conséquence.

In [None]:
x_Train = x_Train.reshape(x_Train.shape[0],1,36)
x_Test = x_Test.reshape(x_Test.shape[0],1,36)
x_Test.shape

Ce code crée un modèle de réseau de neurones pour la classification binaire à partir des données d'entraînement et de test préparées précédemment. Le modèle est créé en utilisant les couches d'entrée, de LSTM et de sortie de Keras, et utilise une fonction d'activation relu pour la couche LSTM et une fonction d'activation sigmoid pour la couche de sortie. L'optimiseur Adamax est utilisé pour minimiser la perte binaire_crossentropy et maximiser la précision du modèle. Le modèle est entraîné sur 1000 epochs avec une taille de lot de 20 et un poids de classe spécifique est donné pour la classification des classes minoritaires. Les résultats de la formation sont stockés dans l'objet d'historique.

In [None]:
import tensorflow.keras.layers as KL
inputs = KL.Input(shape=(1,36))
x = KL.LSTM(units = 8, activation = 'relu')(inputs)
outputs = KL.Dense(units=1, activation= 'sigmoid')(x)

model = tf.keras.models.Model(inputs, outputs)

opt = tf.keras.optimizers.Adamax()
model.compile( optimizer=opt , loss='binary_crossentropy', metrics = ['accuracy'])
history = model.fit(x_Train, y_Train, batch_size=20, epochs=1000, class_weight={0:1, 1:2},validation_data =(x_Test,y_Test))



In [None]:
y_pred = model.predict(x_Test)
print(y_pred)

In [None]:
model.summary()

In [None]:
 plt.plot(history.history['loss'])
 plt.title('Loss for LSTM-binary model')
 plt.xlabel('epochs')
 plt.ylabel('loss')
 plt.show()
 plt.plot(history.history['accuracy'])
 plt.title('Accuracy for LSTM-binary model')
 plt.xlabel('epochs')
 plt.ylabel('accuracy')
 plt.show()

Ce code construit un modèle de réseau de neurones à longue mémoire récurrent (LSTM) pour prédire la valence émotionnelle. Les données d'entraînement et de test sont préparées en transformant les tableaux 2D en tableaux 3D avec une dimension supplémentaire pour le temps. Le modèle est construit en utilisant la classe Sequential de Keras, qui permet de définir une pile linéaire de couches de neurones. Le modèle contient deux couches LSTM avec des unités de 128 et 64, respectivement. Des couches Dropout sont également ajoutées pour éviter le surapprentissage. Enfin, deux couches de neurones Denses avec des fonctions d'activation ReLU et Sigmoid sont ajoutées pour produire les prédictions. Le modèle est compilé avec l'optimiseur Adam, la fonction de perte 'sparse_categorical_crossentropy' et la métrique 'accuracy'. Le modèle est ensuite entraîné avec les données d'entraînement et les prédictions sont générées à partir des données de test.

In [None]:
x_Train = x_Train.reshape(x_Train.shape[0],1,36)
x_Test = x_Test.reshape(x_Test.shape[0],1,36)

model = Sequential()
model.add(LSTM(128,activation = 'relu', return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(64,activation ='relu'))
model.add(Dropout(0.2))

model.add(Dense(32,activation ='relu'))
model.add(Dense(2,activation ='sigmoid'))
y_pred = model.predict(x_Test)
opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
model.compile(loss='sparse_categorical_crossentropy',optimizer =opt,metrics=['accuracy'])
history = model.fit(x_Train,y_Train,batch_size=16,epochs=200, validation_data =(x_Test,y_Test))

In [None]:
print(y_pred)

In [None]:
model.summary()

In [None]:
 plt.plot(history.history['loss'])
 plt.title("Loss for LSTM model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Loss")
 plt.show()
 plt.plot(history.history['accuracy'])
 plt.title("Accuracy for LSTM model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Accuracy")
 plt.show()


Le deuxième modèle est un modèle CNN (Convolutional Neural Network). Le modèle a deux couches Dense avec une fonction d'activation ReLU, une couche Flatten qui convertit les données en un vecteur, et une couche Dense avec une fonction d'activation softmax qui produit la sortie en 10 catégories. Le modèle utilise également une fonction de perte "sparse_categorical_crossentropy" et l'optimiseur Adam pour la classification en deux catégories. Le modèle est entraîné sur les mêmes données d'apprentissage que le premier modèle, mais pendant 200 époques.

In [None]:
model = Sequential()
model.add(tf.keras.layers.Flatten())
model.add(Dense(128,activation =tf.nn.relu))
model.add(Dropout(0.2))
model.add(Dense(128,activation =tf.nn.relu))
model.add(Dense(9,activation =tf.nn.softmax))


opt = tf.keras.optimizers.Adam(learning_rate=1e-3)

model.compile(loss='sparse_categorical_crossentropy',optimizer =opt ,metrics=['accuracy'])
history_CNN = model.fit(x_Train,y_Train,epochs=200, validation_data =(x_Test,y_Test))

In [None]:
model.summary()

In [None]:
 plt.plot(history_CNN.history['loss'])
 plt.title("Loss for CNN model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Loss")
 plt.show()
 plt.plot(history_CNN.history['accuracy'])
 plt.title("Accuracy for CNN model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Accuracy")
 plt.show()


Dans ce code, les données sont préparées pour la prédiction de l'AROUSAL. Les valeurs de psdalpha_1 et psdbeta_1 sont extraites et divisées entre elles, puis les valeurs de psdalpha_12 et psdbeta_12 sont également extraites et divisées. Les deux nouveaux ensembles de données sont ensuite concaténés pour former df8.

In [None]:
#DATA PREP FOR AROUSAL
df4=data2.reset_index()['psdalpha_1']
df5=data2.reset_index()['psdbeta_1']

df45 = df5/df4

df6=data2.reset_index()['psdalpha_12']
df7=data2.reset_index()['psdbeta_12']

df67  = df6/df7

df8 = df45.append(df67)
df8

In [None]:
target2 =data2['arousalhigh']
target2

In [None]:
from sklearn.model_selection import train_test_split
target2 = np.array(target2, dtype = float)
x_Train2, x_Test2, y_Train2, y_Test2 = train_test_split(dfz, target2, test_size = 0.35, random_state = 5)

In [None]:
x_Train2 = x_Train2.reshape(x_Train2.shape[0],1,36)
x_Test2 = x_Test2.reshape(x_Test2.shape[0],1,36)
x_Test2.shape

Ce code crée un modèle de réseaux de neurones LSTM pour la classification de l'arousal. Il utilise la fonction d'activation 'relu' pour les couches LSTM et une fonction d'activation 'softmax' pour la couche de sortie Dense. Le modèle est compilé avec l'optimiseur Adam et une fonction de perte de 'sparse_categorical_crossentropy'. Il est ensuite entraîné sur les données d'entraînement et évalué sur les données de validation pour 1000 époques.

In [None]:
model = Sequential()
model.add(LSTM(128,activation = 'relu', return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(128,activation ='relu'))
model.add(Dense(32,activation ='relu'))
model.add(Dense(10,activation ='softmax'))

opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
model.compile(loss='sparse_categorical_crossentropy',optimizer =opt,metrics=['accuracy'])
history = model.fit(x_Train2,y_Train2,epochs=1000, validation_data =(x_Test2,y_Test2))

In [None]:
 plt.plot(history.history['loss'])
 plt.title("Loss for LSTM model - Arousal")
 plt.xlabel("Epochs")
 plt.ylabel("Loss")
 plt.show()
 plt.plot(history.history['accuracy'])
 plt.title("Accuracy for LSTM model - Arousal")
 plt.xlabel("Epochs")
 plt.ylabel("Accuracy")
 plt.show()


#ECG SIGNALS

La fonction "participant_affective(raw)" prend en entrée un objet "raw". Elle crée une matrice "a" de dimensions (23,18,9) remplit de zéros et de type objet. Cette matrice sera remplie de données issues de l'objet "raw" pour chaque participant et chaque vidéo. Elle crée ensuite un DataFrame "b" de dimensions (23*18, 9) avec les données de la matrice "a" et des noms de colonnes spécifiques. Enfin, elle renvoie le DataFrame "b".

In [None]:
def participant_affective(raw):
    a = np.zeros((23, 18, 9), dtype=object)
    for participant in range(0, 23):
        for video in range(0, 18):
            a[participant, video, 0] = (raw["DREAMER"][0, 0]["Data"]
                                        [0, participant]["Age"][0][0][0])
            
            a[participant, video, 1] = (raw["DREAMER"][0, 0]["Data"]
                                        [0, participant]["Gender"][0][0][0])
            
            a[participant, video, 2] = int(participant+1)
            
            a[participant, video, 3] = int(video+1)
            
            a[participant, video, 4] = ["Searching for Bobby Fischer",
                                        "D.O.A.", "The Hangover", "The Ring",
                                        "300", "National Lampoon\'s VanWilder",
                                        "Wall-E", "Crash", "My Girl",
                                        "The Fly", "Pride and Prejudice",
                                        "Modern Times", "Remember the Titans",
                                        "Gentlemans Agreement", "Psycho",
                                        "The Bourne Identitiy",
                                        "The Shawshank Redemption",
                                        "The Departed"][video]
            
            a[participant, video, 5] = ["calmness", "surprise", "amusement",
                                        "fear", "excitement", "disgust",
                                        "happiness", "anger", "sadness",
                                        "disgust", "calmness", "amusement",
                                        "happiness", "anger", "fear",
                                        "excitement", "sadness",
                                        "surprise"][video]
            a[participant, video, 6] = int(raw["DREAMER"][0, 0]["Data"]
                                           [0, participant]["ScoreValence"]
                                           [0, 0][video, 0])
            a[participant, video, 7] = int(raw["DREAMER"][0, 0]["Data"]
                                           [0, participant]["ScoreArousal"]
                                           [0, 0][video, 0])
            a[participant, video, 8] = int(raw["DREAMER"][0, 0]["Data"]
                                           [0, participant]["ScoreDominance"]
                                           [0, 0][video, 0])
    b = pd.DataFrame(a.reshape((23*18, a.shape[2])),
                     columns=["age", "gender", "participant",
                              "video", "video_name", "target_emotion",
                              "valence", "arousal", "dominance"])
    return b

Cette fonction extrait les données d'ECG (électrocardiogramme) pour les stimuli et les périodes de base des 23 participants de l'étude DREAMER. Les signaux ECG pour chaque participant et vidéo sont stockés dans des matrices ECG_tmp_stim et ECG_tmp_basl. Les signaux ECG pour chaque participant et vidéo sont stockés dans des matrices ECG_tmp_stim et ECG_tmp_basl. Ensuite, ces données sont normalisées à l'aide de la méthode MinMaxScaler du module preprocessing de sklearn et stockées dans les dataframes data_ECG_stim et data_ECG_basl respectivement. Ces dataframes sont ensuite renvoyés par la fonction.

In [None]:
def feat_extract_ECG_scaled(raw):
    
    ECG_tmp_stim = np.zeros((23, 18, 31232))
    ECG_tmp_basl = np.zeros((23, 18, 31232))
    for participant in range(0, 23):
        for video in range(0, 18):
          column=0
          for signal in range(0,2):
            stim = (raw["DREAMER"][0, 0]["Data"][0, participant]["ECG"][0, 0]["stimuli"][0, 0][video, 0][:, signal])
            basl = (raw["DREAMER"][0, 0]["Data"][0, participant]["ECG"][0, 0]["baseline"][0, 0][video, 0][:, signal])
            for i in range(0,15616):
              ECG_tmp_stim[participant,video,column]= stim[i]
              ECG_tmp_basl[participant,video,column]= basl[i]
              column+=1
                                

    col_stim = []
    col_basl = []

    for i in range(0, 15616):
      col_stim.append("RA"+str(i + 1))
    for i in range(0, 15616):
      col_stim.append("LA"+str(i + 1))
    for i in range(0, 15616):
      col_basl.append("RA"+str(i + 1))
    for i in range(0, 15616):
      col_basl.append("LA"+str(i + 1))



    ECG_tmp_stim = ECG_tmp_stim.reshape(-1,ECG_tmp_stim.shape[2])
    data_ECG_stim = pd.DataFrame(ECG_tmp_stim, columns=col_stim)

    ECG_tmp_basl = ECG_tmp_basl.reshape(-1,ECG_tmp_basl.shape[2])
    data_ECG_basl = pd.DataFrame(ECG_tmp_basl, columns=col_basl)

    from sklearn import preprocessing

    x_stim = data_ECG_stim.values
    min_max_scaler = preprocessing.MinMaxScaler()
    x_scaled_stim = min_max_scaler.fit_transform(x_stim)
    data_ECG_stim = pd.DataFrame(x_scaled_stim)
    data_ECG_stim.columns = col_stim

    x_basl = data_ECG_basl.values
    min_max_scaler = preprocessing.MinMaxScaler()
    x_scaled_basl = min_max_scaler.fit_transform(x_basl)
    data_ECG_basl = pd.DataFrame(x_scaled_basl)
    data_ECG_basl.columns = col_basl
    return data_ECG_stim,data_ECG_basl

In [None]:
ecg_stim_data,ecg_basl_data = feat_extract_ECG_scaled(raw)

In [None]:
ecg_stim_data

Cette fonction extrait les données d'ECG (électrocardiogramme) pour les stimuli et les périodes de base des 23 participants de l'étude DREAMER. Les signaux ECG pour chaque participant et vidéo sont stockés dans des matrices ECG_tmp_stim et ECG_tmp_basl. Les signaux ECG pour chaque participant et vidéo sont stockés dans des matrices ECG_tmp_stim et ECG_tmp_basl. Ces dataframes sont ensuite renvoyés par la fonction.

In [None]:
def feat_extract_ECG(raw):
    
    ECG_tmp_stim = np.zeros((23, 18, 31232))
    ECG_tmp_basl = np.zeros((23, 18, 31232))
    for participant in range(0, 23):
        for video in range(0, 18):
          column=0
          for signal in range(0,2):
            stim = (raw["DREAMER"][0, 0]["Data"][0, participant]["ECG"][0, 0]["stimuli"][0, 0][video, 0][:, signal])
            basl = (raw["DREAMER"][0, 0]["Data"][0, participant]["ECG"][0, 0]["baseline"][0, 0][video, 0][:, signal])
            for i in range(0,15616):
              ECG_tmp_stim[participant,video,column]= stim[i]
              ECG_tmp_basl[participant,video,column]= basl[i]
              column+=1
                                

    col_stim = []
    col_basl = []

    for i in range(0, 15616):
      col_stim.append("RA"+str(i + 1))
    for i in range(0, 15616):
      col_stim.append("LA"+str(i + 1))
    for i in range(0, 15616):
      col_basl.append("RA"+str(i + 1))
    for i in range(0, 15616):
      col_basl.append("LA"+str(i + 1))



    ECG_tmp_stim = ECG_tmp_stim.reshape(-1,ECG_tmp_stim.shape[2])
    data_ECG_stim = pd.DataFrame(ECG_tmp_stim, columns=col_stim)

    ECG_tmp_basl = ECG_tmp_basl.reshape(-1,ECG_tmp_basl.shape[2])
    data_ECG_basl = pd.DataFrame(ECG_tmp_basl, columns=col_basl)

    return data_ECG_stim,data_ECG_basl

Ce code utilise la bibliothèque Plotly pour tracer un graphique de signaux ECG pour un participant donné. Les signaux d'ECG sont extraits des données de stimulus et de ligne de base pour les deux canaux RA et LA . Les signaux sont tracés en fonction du temps avec une couleur différente pour chaque signal. Le code définit également un titre pour le graphique ainsi que des étiquettes pour les axes x et y. Le graphique est créé en utilisant les données tracées et la mise en page définie, puis est affiché à l'aide de la fonction "fig.show()".

In [None]:
import plotly.graph_objs as go
import scipy.signal as signal

df_subset_stim_RA = ecg_stim_data.iloc[2,:15616]
df_subset_stim_LA = ecg_stim_data.iloc[2,15617:]
df_subset_basl_RA = ecg_basl_data.iloc[2, :15616]
df_subset_basl_LA = ecg_basl_data.iloc[2, 15617:]

trace1 = go.Scatter(x=df_subset_stim_RA.index, y=df_subset_stim_RA.values, mode='lines', name='Stimulus RA', line=dict(color='blue'))
trace2 = go.Scatter(x=df_subset_stim_LA.index, y=df_subset_stim_LA.values, mode='lines', name='Stimulus LA', line=dict(color='green'))
trace3 = go.Scatter(x=df_subset_basl_RA.index, y=df_subset_basl_RA.values, mode='lines', name='Baseline RA', line=dict(color='red'))
trace4 = go.Scatter(x=df_subset_basl_LA.index, y=df_subset_basl_LA.values, mode='lines', name='Baseline LA', line=dict(color='purple'))

peaks_stim_RA, _ = signal.find_peaks(df_subset_stim_RA.values, distance=150)
trace_peaks_stim_RA = go.Scatter(x=df_subset_stim_RA.index[peaks_stim_RA], y=df_subset_stim_RA.values[peaks_stim_RA], mode='markers', 
                         name='Pics Stim RA', marker=dict(color='pink', symbol='triangle-down', size=7, line=dict(width=1, color='pink')))

peaks_stim_LA, _ = signal.find_peaks(df_subset_stim_LA.values, distance=150)
trace_peaks_stim_LA = go.Scatter(x=df_subset_stim_LA.index[peaks_stim_LA], y=df_subset_stim_LA.values[peaks_stim_LA], mode='markers', 
                         name='Pics Stim LA', marker=dict(color='orange', symbol='triangle-down', size=7, line=dict(width=1, color='orange')))

peaks_basl_RA, _ = signal.find_peaks(df_subset_basl_RA.values, distance=150)
trace_peaks_basl_RA = go.Scatter(x=df_subset_basl_RA.index[peaks_basl_RA], y=df_subset_basl_RA.values[peaks_basl_RA], mode='markers', 
                         name='Pics Basl RA', marker=dict(color='green', symbol='triangle-down', size=7, line=dict(width=1, color='green')))

peaks_basl_LA, _ = signal.find_peaks(df_subset_basl_LA.values, distance=150)
trace_peaks_basl_LA = go.Scatter(x=df_subset_basl_LA.index[peaks_basl_LA], y=df_subset_basl_LA.values[peaks_basl_LA], mode='markers', 
                         name='Pics Basl LA', marker=dict(color='purple', symbol='triangle-down', size=7, line=dict(width=1, color='purple')))

layout = go.Layout(title='Figure des signaux ECG', xaxis=dict(title='temps'), yaxis=dict(title='Signal'))

fig = go.Figure(data=[trace1, trace2, trace3, trace4, trace_peaks_stim_RA, trace_peaks_stim_LA, trace_peaks_basl_RA, trace_peaks_basl_LA], layout=layout)

fig.show()


In [None]:
import plotly.graph_objs as go
import scipy.signal as signal

df_subset_stim_RA = ecg_stim_data.iloc[2,:15616]
df_subset_stim_LA = ecg_stim_data.iloc[2,15617:]

trace1 = go.Scatter(x=df_subset_stim_RA.index, y=df_subset_stim_RA.values, mode='lines', name='Stimulus RA', line=dict(color='blue'))
trace2 = go.Scatter(x=df_subset_stim_LA.index, y=df_subset_stim_LA.values, mode='lines', name='Stimulus LA', line=dict(color='blue'))


layout = go.Layout(title='Figure des signaux ECG', xaxis=dict(title='temps'), yaxis=dict(title='Signal'))

fig = go.Figure(data=[trace1, trace2], layout=layout)

fig.show()


In [None]:
df_participant_affective = participant_affective(raw)

df_participant_affective["valence"] = (df_participant_affective
                                       ["valence"].astype(int))
df_participant_affective["arousal"] = (df_participant_affective
                                       ["arousal"].astype(int))
df_participant_affective["dominance"] = (df_participant_affective
                                         ["dominance"].astype(int))

df = pd.concat([ecg_stim_data, df_participant_affective], axis=1)

In [None]:
df.head()

Ce code crée un nouveau DataFrame ecg_data à partir du DataFrame df contenant les données d'émotion et d'ECG. Le nouveau DataFrame contient uniquement les données pour les émotions sélectionnées ('anger', 'fear', 'calmness', 'surprise', 'excitement', 'amusement', 'happiness', 'sadness' et 'disgust').

Ensuite, le code ajoute deux nouvelles colonnes class et valencehigh à ecg_data. La colonne class affecte un entier à chaque émotion, où 0 représente les émotions positives (surprise, excitement, amusement, happiness), 1 représente les émotions négatives (fear, anger, sadness, disgust), et 2 représente la neutralité (calmness).

La colonne valencehigh affecte un 1 ou un 0 en fonction de la valence de l'émotion. Les valeurs élevées de valence sont affectées à 1 et les valeurs faibles de valence sont affectées à 0.

In [None]:
ecg_data = df.loc[(df['target_emotion'] == 'anger') |
                (df['target_emotion'] == 'fear') |
                (df['target_emotion'] == 'calmness') |
                (df['target_emotion'] == 'surprise') |
                (df['target_emotion'] == 'excitement') |
                (df['target_emotion'] == 'amusement') |
                (df['target_emotion'] == 'happiness') |
                (df['target_emotion'] == 'sadness') |
                (df['target_emotion'] == 'disgust')].copy()

d={'surprise': 0, 'excitement': 0, 'amusement': 0, 'happiness': 0, 'fear': 1, 'anger': 1, 'sadness':2, 'disgust':2 ,'calmness': 3}
ecg_data['class'] = ecg_data.target_emotion.map(d)

e={0: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1 }
ecg_data['valencehigh'] = ecg_data.valence.map(e)

f={0: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1 }
ecg_data['arousalhigh'] = ecg_data.arousal.map(e)

In [None]:
target = ecg_data['valencehigh']
target

In [None]:
from sklearn.model_selection import train_test_split
ecg_stim_data = np.array(ecg_stim_data, dtype = float)
target = np.array(target, dtype = float)

In [None]:
x_Train, x_Test, y_Train, y_Test = train_test_split(ecg_stim_data, target, test_size = 0.2, random_state = 6)

x_Train = x_Train.reshape(x_Train.shape[0],1,31232)
x_Test = x_Test.reshape(x_Test.shape[0],1,31232)

Ce code définit un modèle de réseau de neurones récurrents (RNN) pour la classification de séquences. Le modèle utilise deux couches LSTM (Long Short-Term Memory) avec une fonction d'activation ReLU et une couche dense avec une fonction d'activation sigmoïde. Le modèle est compilé avec l'optimiseur Adam et une fonction de perte de "sparse_categorical_crossentropy". Le modèle est ensuite entraîné avec les données d'entraînement et validé avec les données de test. Le nombre d'époques est fixé à 200 et la taille de lot est de 16. Les prédictions du modèle pour les données de test sont stockées dans la variable "y_pred".

In [None]:
model = Sequential()
model.add(LSTM(128,activation = 'relu', return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(64,activation ='relu'))
model.add(Dropout(0.2))
model.add(Dense(32,activation ='relu'))
model.add(Dense(2,activation ='sigmoid'))
y_pred = model.predict(x_Test)
opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
model.compile(loss='sparse_categorical_crossentropy',optimizer =opt,metrics=['accuracy'])
history = model.fit(x_Train,y_Train,batch_size=16,epochs=200, validation_data =(x_Test,y_Test))

In [None]:
model.summary()

In [None]:
 plt.plot(history.history['loss'])
 plt.title("Loss for LSTM model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Loss")
 plt.show()
 plt.plot(history.history['accuracy'])
 plt.title("Accuracy for LSTM model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Accuracy")
 plt.show()

Ce code crée un modèle de réseau de neurones à convolution (CNN) pour une classification multiclasse. Le modèle est créé à l'aide de la classe Sequential de Keras, qui permet de créer un modèle linéaire de couches. Le modèle comprend une couche d'aplatissement (flatten) qui transforme l'entrée en un vecteur 1D, deux couches denses (fully connected) avec une fonction d'activation relu, une couche de dropout pour éviter le surapprentissage et une couche dense de sortie avec une fonction d'activation softmax pour prédire les probabilités pour chaque classe

In [None]:
model = Sequential()
model.add(tf.keras.layers.Flatten())
model.add(Dense(128,activation =tf.nn.relu))
model.add(Dropout(0.2))
model.add(Dense(128,activation =tf.nn.relu))
model.add(Dense(5,activation =tf.nn.softmax))

opt = tf.keras.optimizers.Adam(learning_rate=1e-3)

model.compile(loss='sparse_categorical_crossentropy',optimizer =opt ,metrics=['accuracy'])
history_CNN = model.fit(x_Train,y_Train,epochs=200, validation_data =(x_Test,y_Test))

In [None]:
model.summary()

In [None]:
 plt.plot(history_CNN.history['loss'])
 plt.title("Loss for LSTM model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Loss")
 plt.show()
 plt.plot(history_CNN.history['accuracy'])
 plt.title("Accuracy for LSTM model - Valence")
 plt.xlabel("Epochs")
 plt.ylabel("Accuracy")
 plt.show()

In [None]:
target=ecg_data['arousalhigh']

In [None]:
from sklearn.model_selection import train_test_split
ecg_stim_data = np.array(ecg_stim_data, dtype = float)
target = np.array(target, dtype = float)

In [None]:
x_Train, x_Test, y_Train, y_Test = train_test_split(ecg_stim_data, target, test_size = 0.2, random_state = 6)

x_Train = x_Train.reshape(x_Train.shape[0],1,31232)
x_Test = x_Test.reshape(x_Test.shape[0],1,31232)

Fonction de rappel (callback) qui arrête l'entraînement une fois que l'exactitude (accuracy) atteint 80% et que l'exactitude de validation atteint 65%

In [None]:
class CustomCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs={}):
        if logs.get('accuracy') > 0.8 and logs.get('val_accuracy') > 0.65:
            self.model.stop_training = True

Dans ce code, un modèle de réseaux de neurones LSTM est créé en utilisant la bibliothèque Keras. Le modèle est composé de deux couches LSTM, chacune suivie d'une couche de régularisation Dropout pour réduire le surapprentissage. Ensuite, deux couches denses avec les fonctions d'activation relu et sigmoid sont ajoutées. Le modèle est compilé avec la fonction de perte "sparse_categorical_crossentropy" et l'optimiseur Adam avec un taux d'apprentissage de 1e-3.

Le modèle est entraîné avec le jeu de données d'apprentissage "x_Train" et "y_Train" pour un maximum de 200 époques, avec une taille de lot de 16. Le jeu de données de validation "x_Test" et "y_Test" est également fourni pour évaluer les performances du modèle après chaque époque. En outre, un objet CustomCallback() est ajouté en tant que callback pour interrompre l'entraînement une fois que la précision de l'apprentissage dépasse 80% et que la précision de validation dépasse 65%.

In [None]:
model = Sequential()
model.add(LSTM(128,activation = 'relu', return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(64,activation ='relu'))
model.add(Dropout(0.2))
model.add(Dense(32,activation ='relu'))
model.add(Dense(2,activation ='sigmoid'))
y_pred = model.predict(x_Test)
opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
model.compile(loss='sparse_categorical_crossentropy',optimizer =opt,metrics=['accuracy'])
history = model.fit(x_Train,y_Train,batch_size=16,epochs=200, validation_data =(x_Test,y_Test),callbacks=[CustomCallback()])

In [None]:
model.summary()

In [None]:
 plt.plot(history.history['loss'])
 plt.title("Loss for LSTM model - Arousal")
 plt.xlabel("Epochs")
 plt.ylabel("Loss")
 plt.show()
 plt.plot(history.history['accuracy'])
 plt.title("Accuracy for LSTM model - Arousal")
 plt.xlabel("Epochs")
 plt.ylabel("Accuracy")
 plt.show()