In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch  as th
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pickle
import pandas as pd
import seaborn as sns
import random
import os
import glob
import numpy as np
import gzip as gz
import pickle
from tqdm import tqdm
from torch.utils.data import TensorDataset, DataLoader, random_split
from BESTRq_classes.BESTRq import BestRqFramework, RandomProjectionQuantizer
from compute_fft import compute_spectrogram, plot_spectrogram, mask_and_replace
from models.CNN_BiLSTM_Attention import ParallelModel

# GAN implementation : comparison between spectrum and time serie 
https://towardsdatascience.com/synthetic-time-series-data-a-gan-approach-869a984f2239

In [2]:
class Recovery(nn.Module):
    def __init__(self, hidden_dim, n_seq):
        super(Recovery, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_seq = n_seq
        self.model = self.build()

    def build(self):
        layers = []
        layers.append(nn.Linear(self.hidden_dim, self.hidden_dim))
        layers.append(nn.ReLU())
        layers.append(nn.Linear(self.hidden_dim, self.n_seq))
        return nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

class Embedder(nn.Module):
    def __init__(self, hidden_dim):
        super(Embedder, self).__init__()
        self.hidden_dim = hidden_dim
        self.model = self.build()

    def build(self):
        layers = []
        layers.append(nn.Linear(self.hidden_dim, self.hidden_dim))
        layers.append(nn.ReLU())
        layers.append(nn.Linear(self.hidden_dim, self.hidden_dim))
        return nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

class Supervisor(nn.Module):
    def __init__(self, hidden_dim):
        super(Supervisor, self).__init__()
        self.hidden_dim = hidden_dim
        self.model = self.build()

    def build(self):
        layers = []
        layers.append(nn.Linear(self.hidden_dim, self.hidden_dim))
        layers.append(nn.ReLU())
        layers.append(nn.Linear(self.hidden_dim, self.hidden_dim))
        return nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [3]:
data_path = '/Users/martinblot/Desktop/sleep-edf-prepared/5-cassette'  ## path towards the 5-cassette file
fp = gz.open(data_path+'/SC4001E0.npz.gz','rb')
data_test = np.load(fp,allow_pickle=True)
fnames = glob.glob(os.path.join(data_path, "*npz.gz"))
devpart = 10
xtrain , xvalid = None , None
ytrain , yvalid = None , None
measurement=data_test['ch_label'][2]

for fn in tqdm(fnames):
    fp = gz.open(fn,'rb')
    data = np.load(fp,allow_pickle=False) # for now, don't care about headers
    x = data['x'][:,:,2] # EEG and EOG
    y = data['y'] # Take the labels
    idx = np.arange(x.shape[0])
    np.random.shuffle(idx)
    devlim = x.shape[0]//devpart
    devpart = 10
    idx = np.arange(x.shape[0])
    np.random.shuffle(idx)
    devlim = x.shape[0]//devpart
    if xtrain is None:
        xtrain = np.zeros((1,x.shape[1]))    ##np.zeros((1,x.shape[1],2)) if we include EOG  
        xvalid = np.zeros((1,x.shape[1]))
        ytrain , yvalid = np.zeros(1) , np.zeros(1)
    xvalid = np.concatenate((xvalid,x[idx[:devlim]]), axis=0)
    yvalid = np.concatenate((yvalid,y[idx[:devlim]]), axis=0)
    xtrain = np.concatenate((xtrain,x[idx[devlim:]]), axis=0)
    ytrain = np.concatenate((ytrain,y[idx[devlim:]]), axis=0)
    del x,y

xtrain , xvalid = xtrain[1:] , xvalid[1:]
ytrain , yvalid = ytrain[1:] , yvalid[1:]
xtrain, xvalid = th.FloatTensor(xtrain), th.FloatTensor(xvalid)
ytrain, yvalid = th.IntTensor(ytrain), th.IntTensor(yvalid)

100%|██████████| 153/153 [00:55<00:00,  2.77it/s]


In [10]:
outf="/Users/martinblot/Desktop/sleep-edf-prepared/cassette-th-data-all.pck"
fp = open(outf,"wb")
pickle.dump((xtrain , xvalid , ytrain , yvalid), fp)
filepath = '/Users/martinblot/Desktop/sleep-edf-prepared/cassette-th-data-all.pck'
xtrain,xvalid, ytrain, yvalid = np.load(filepath, allow_pickle = True)
print(xtrain.shape)

torch.Size([175995, 600])


In [11]:
batch=100
xtrain_tensor,ytrain_tensor=th.tensor(xtrain[:1000]),th.tensor(ytrain[:1000])
xvalid_tensor,yvalid_tensor=th.tensor(xvalid[:1000]),th.tensor(yvalid[:1000])

dataset_t = TensorDataset(xtrain_tensor, ytrain_tensor)
train_loader = DataLoader(dataset_t, batch_size= batch, shuffle=True)
dataset_v = TensorDataset(xvalid_tensor, yvalid_tensor)
valid_loader = DataLoader(dataset_v, batch_size= batch, shuffle=True)

  xtrain_tensor,ytrain_tensor=th.tensor(xtrain[:1000]),th.tensor(ytrain[:1000])
  xvalid_tensor,yvalid_tensor=th.tensor(xvalid[:1000]),th.tensor(yvalid[:1000])


In [24]:
class Generator(nn.Module):
    def __init__(self, hidden_dim, data_dim,n_layers):
        super(Generator, self).__init__()
        self.hidden_dim = hidden_dim
        self.data_dim = data_dim
        self.n_layers=n_layers
        # Couches du générateur
        self.GRU=nn.GRU(hidden_dim,hidden_dim,batch_first=True,num_layers=n_layers)
        self.gen_block=nn.Linear(hidden_dim,hidden_dim)
        self.activation=nn.Sigmoid()
    def forward(self, x):
        x=self.GRU(x)
        x=x[0]
        x=self.gen_block(x)
        x=self.activation(x)   ##classification binaire utile au GAN
        return x
    

class Discriminator(nn.Module):
    def __init__(self,hidden_dim,data_dim,n_layers):
        super(Discriminator,self).__init__()
        self.hidden_dim = hidden_dim
        self.data_dim = data_dim
        self.n_layers=n_layers
        # Couches du générateur
        self.GRU=nn.GRU(data_dim,hidden_dim,batch_first=True,num_layers=n_layers)
        self.gen_block=nn.Linear(hidden_dim,1)
        self.activation=nn.Sigmoid()
    def forward(self, x):
        x=self.GRU(x)
        x=x[0]
        x=self.gen_block(x)
        x=self.activation(x)
        return x
    
class Supervisor(nn.Module):
    def __init__(self, hidden_dim, data_dim,n_layers):
        super(Supervisor, self).__init__()
        self.hidden_dim = hidden_dim
        self.data_dim = data_dim
        self.n_layers=n_layers
        # Couches du générateur
        self.GRU=nn.GRU(hidden_dim,hidden_dim,batch_first=True,num_layers=n_layers)
        self.gen_block=nn.Linear(hidden_dim,data_dim)
        self.activation=nn.Sigmoid()
    def forward(self, x):
        x=self.GRU(x)
        x=x[0]
        x=self.gen_block(x)
        x=self.activation(x)   ##classification binaire utile au GAN
        return x
    
    
class Embedder(nn.Module):
    def __init__(self, hidden_dim, data_dim,n_layers):
        super(Embedder, self).__init__()
        self.hidden_dim = hidden_dim
        self.data_dim = data_dim
        self.n_layers=n_layers
        # Couches du générateur
        self.GRU=nn.GRU(data_dim,hidden_dim,batch_first=True,num_layers=n_layers)
        self.gen_block=nn.Linear(hidden_dim,hidden_dim)
        self.activation=nn.Sigmoid()
    def forward(self, x):
        x=self.GRU(x)
        x=x[0]
        x=self.gen_block(x)
        x=self.activation(x)   ##classification binaire utile au GAN
        return x

class Recovery(nn.Module):
    def __init__(self, hidden_dim, data_dim,n_layers):
        super(Recovery, self).__init__()
        self.hidden_dim = hidden_dim
        self.data_dim = data_dim
        self.n_layers=n_layers
        # Couches du générateur
        self.GRU=nn.GRU(hidden_dim,hidden_dim,batch_first=True,num_layers=n_layers)
        self.gen_block=nn.Linear(hidden_dim,hidden_dim)
        self.activation=nn.Sigmoid()
    def forward(self, x):
        x=self.GRU(x)
        x=x[0]
        x=self.gen_block(x)
        x=self.activation(x)   ##classification binaire utile au GAN
        return x

https://github.com/ydataai/ydata-synthetic/blob/dev/src/ydata_synthetic/synthesizers/timeseries/timegan/model.py

In [30]:
learning_rate=0.0001
hidden_dim = 128
num_epochs = 10
input_dim=600
n_layers=5

# Instanciation des modèles
generator = Generator(hidden_dim,input_dim,n_layers)
discriminator = Discriminator(hidden_dim,input_dim,n_layers)
supervisor=Supervisor(hidden_dim,input_dim,n_layers)

# Définition des optimiseurs
gen_optimizer = optim.Adam(generator.parameters(), lr=learning_rate)
dis_optimizer = optim.Adam(discriminator.parameters(), lr=learning_rate)
sup_optimizer = optim.Adam(supervisor.parameters(), lr=learning_rate)

gen_losses = []  
dis_losses=[]
sup_losses=[]

for epoch in range(num_epochs):
    gen_loss=0.0
    dis_loss=0.0
    sup_loss=0.0

    # Entraînement
    generator.train()  # Mettre le générateur en mode d'entraînement
    discriminator.train()  # Mettre le discriminateur en mode d'entraînement
    supervisor.train()
    for real_data,_ in train_loader:
        batch = len(real_data)
        # Échantillonne des codes au hasard dans l'espace latent
        z = th.randn((batch, hidden_dim))
        print(z.shape)
        fake_data = generator(z)
        print(fake_data.shape)
        supervised_data=supervisor(fake_data)
        print(supervised_data.shape)
        Y_fake = discriminator(fake_data)[:,0]
        Y_fake_e=discriminator(supervised_data)
        Y_real=discriminator(real_data)

        # Classe "faux" = 0
        fake_labels =  th.zeros(len(Y_fake))
        fake_labels_e= th.zeros(len(Y_fake_e))
        # Classe "vrai" = 1
        true_labels = th.ones(len(Y_real))

        # Entraîne le Superviseur
        sup_optimizer.zero_grad()
        sloss=nn.MSELoss(real_data,supervised_data)
        sloss.backward()
        sup_optimizer.step()
        sup_loss+=sloss

        # Entraîne le générateur
        gen_optimizer.zero_grad()
        gloss_unsupervised = nn.BCE(fake_labels,Y_fake)  # Utilisation de la negative cross entropy loss
        gloss_unsupervised_e= nn.BCE(fake_labels_e,Y_fake_e)  # Utilisation de la negative cross entropy loss
        gloss=gloss_unsupervised+gloss_unsupervised_e+sloss
        gloss.backward()
        gen_optimizer.step()
        gen_loss+=gloss

        # Entraîne le discriminateur
        dis_optimizer.zero_grad()
        dloss_real=nn.BCE(true_labels,Y_real) ##false negative
        dloss_fake=nn.BCE(fake_labels,Y_fake) ##false positive
        dloss_fake_e=nn.BCE(fake_labels_e,Y_fake_e)
        dloss=dloss_real+dloss_fake+dloss_fake_e
        dloss.backward()
        dis_optimizer.step()
        dis_loss+=dloss
    gen_losses.append(gen_loss.detach())
    dis_losses.append(dis_loss.detach())
    # Afficher les résultats de l'époque
    print(f'Epoch [{epoch+1}/{num_epochs}], Generator Loss: {gen_loss},Discriminator Loss: {dis_loss}')

    # Plot loss curves
plt.figure(figsize=(10, 5))
plt.plot(gen_losses, label='Generator Loss')
plt.plot(dis_losses, label='Discriminator Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Evolution de la perte pendant l\'entraînement')
plt.legend()
plt.show()


torch.Size([100, 128])
torch.Size([100, 128])
torch.Size([100, 600])


RuntimeError: input.size(-1) must be equal to input_size. Expected 600, got 128