In [None]:
!pip install mne

import scipy.io as spio
import pandas as pd
import numpy as np
import os
import json
import matplotlib.pyplot as plt
import mne
from mne.io import RawArray
from mne import create_info
from mne.channels import make_standard_montage

path_data = 'MyDrive/Colab Notebooks/preprocessedata'
path_mount = '/content/drive/'

In [None]:
import torch
from torch.utils.data import Dataset, random_split, DataLoader, TensorDataset, DataLoader
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics import f1_score
import torch.nn.functional as F
import torch.nn as nn
from torchvision.utils import make_grid
import torch.optim as optim

In [None]:
from google.colab import drive
drive.mount(path_mount)

# Data loading

In [None]:
subjects = ["S1", "S2", "S3", "S4", "S5"]
unicorn_channels = ["Fz", "C3", "Cz", "C4", "Pz", "PO7", "Oz", "PO8"]
subject = "S5"

data = mne.io.read_raw_fif(f'{path_mount}/{path_data}'+'/S1_eeg.fif')

#sample_data_folder = mne.datasets.sample.data_path()
#sample_data_raw_file = os.path.join(,'S1_eeg.fif')
#raw = mne.io.read_raw_fif(sample_data_raw_file)


#file_path = os.path.join(path_mount, path_data, "json", subject)
#df = pd.read_json(file_path + ".json")
#trigger = np.array(df.trigger)

#eeg = df[unicorn_channels].to_numpy()
#chs = unicorn_channels
#fs = df['sampling_rate'].values[0]

In [None]:
data.ch_names

In [None]:
from mne import find_events


# extracting events from the stimuli channel and giving thema a class name with the dict ev_ids
evs = find_events(data, stim_channel='STI')
ev_ids = {'NT': 1, 'T': 3}

# Easily visualize events along the signal plot
data.plot(events = evs, event_id = ev_ids, event_color ={1:'g',3:'r'}, color = 'Gray',
             block = True, clipping=None, scalings=50e-6)

In [None]:
obj=  ["Fz", "C3", "Cz", "C4", "Pz", "PO7", "Oz", "PO8", "STI"]
df= data[obj][0]

In [None]:
from mne import Epochs
eps = Epochs(data, evs, event_id=ev_ids, 
             tmin=-.6, tmax=0.8, baseline=(-.6,-.1)) 

In [None]:
epochs_target = eps
e = epochs_target.to_data_frame()
e = e.drop(e[e['time'] < .2].index)
e = e.drop(e[e['time'] > .65].index)

e.loc[e['condition'] == 'NT', 'condition'] = 1
e.loc[e['condition'] == 'T', 'condition'] = 0

e['condition'] = e['condition'].astype(int)

y= e['condition']
y=y.values
x = e.drop(['condition'], axis = 1)
x=x.drop(['STI'],axis=1)
x=x.drop(['time'],axis=1)
n = 1200
Y = []
X = []
for i in range(n):
    X.append(x[x['epoch'] == i])
    X[-1]=X[-1].drop(['epoch'], axis = 1)
    X[-1]=X[-1].T
    X[-1]=X[-1].values
    X[-1]=[X[-1]]

for i in range(n):
    if y[113*i]==0:
      Y.append(0)
    if y[113*i]!=0:
      Y.append(1)

  

In [None]:
dataframe=pd.DataFrame(df.T, columns=obj)
dataframe

In [None]:
eps.to_data_frame()

# CNN

In [None]:
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)

class EEGNET(nn.Module):
    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # output: 64 x 16 x 16

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # output: 128 x 8 x 8

            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # output: 256 x 4 x 4

            nn.Flatten(), 
            nn.Linear(3584, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
            #nn.Linear(512,1)
            #,nn.Tanh()
            )
        
    def forward(self, xb):
        return self.network(xb)

### Training set e Test set


In [None]:
torch.manual_seed(10)

In [None]:
epochs=X
iris = TensorDataset(torch.FloatTensor(X),torch.LongTensor(Y))

In [None]:
val_pct = 0.2
test_size = int(val_pct * len(epochs))
train_size = len(epochs) - test_size

In [None]:
train_ds, test_ds = random_split(iris, [train_size, test_size])
len(train_ds), len(test_ds)

In [None]:
batch_size=60
train_loader = DataLoader(train_ds, batch_size, shuffle=True, num_workers=2, pin_memory=True)
signal, labels = next(iter(train_loader))
test_loader = DataLoader(test_ds, batch_size*2, num_workers=2, pin_memory=True)
signal1, labels1 = next(iter(test_loader))
classes=('0','1')

# Using cuda

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [None]:
device = get_default_device()
device
model= EEGNET()

In [None]:
train_loader = DeviceDataLoader(train_loader, device)
test_loader = DeviceDataLoader(test_loader, device)
to_device(model, device);

In [None]:
if torch.cuda.is_available():
  torch_device='cuda'
else:
  torch_device='cpu'

# Training phase

In [None]:
from tqdm.notebook import tqdm

In [None]:
salva=[]
def train(epoch,net,optimizer,criterion):
    for epoch in tqdm(range(epoch)):  
        train_loss = 0
        correct_train = 0
        total_train = 0
        #count=0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            optimizer.zero_grad()
            outputs = net(inputs.to(torch_device))
            #if count==0:
             # print(np.shape(outputs),"\n", outputs,"\n",np.shape(labels), labels)
             # count+=1
            loss = criterion(outputs, labels.to(torch_device))
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, predicted_train = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += predicted_train.eq(labels.to(torch_device)).cpu().sum().item()

        train_accuracy = 100 * correct_train / total_train
        print('Epoch %d, train loss: %.3f, train accuracy: %.2f%%' %
              (epoch + 1, train_loss / len(train_loader), train_accuracy))
        
    print('Finished Training')

In [None]:
def evaluate_accuracy(model, dataloader):
    correct = 0
    total = 0
    with torch.no_grad():
        for data in dataloader:
            images, labels = data
            outputs = model(images.to(torch_device))
            _, predicted = torch.max(outputs.data, 1)  
            total += labels.size(0) 
            correct += (predicted == labels.to(torch_device)).sum().item() 
    accuracy = 100 * correct / total  
    return accuracy

def accuracy_classes(net,dataloader,classes): 
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}
    with torch.no_grad():
        for data in dataloader:
            images, labels = data
            outputs = net(images.to(torch_device))
            _, predictions = torch.max(outputs, 1)
            for label, prediction in zip(labels.to(torch_device), predictions):
                if label == prediction:
                    correct_pred[classes[label]] += 1
                total_pred[classes[label]] += 1

    for classname, correct_count in correct_pred.items():
        accuracy = 100 * float(correct_count) / total_pred[classname]
        print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')

In [None]:
criterion = nn.CrossEntropyLoss(reduction='sum')
#criterion=nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epoch=100

In [None]:
pip install torch-summary

In [None]:
from torchsummary import summary

summary(model, input_size=(60,1, 8, 351))

In [None]:
train(epoch,model,optimizer,criterion)

In [None]:
evaluate_accuracy(model, test_loader)

In [None]:
y_true = to_device(labels1, device)

for data in test_loader:
    s1, l1 = data
    outputs = model(s1.to(torch_device))
    #print(outputs)
    _, predictions = torch.max(outputs,1)
    #print(predictions)
    break

#outputs = to_device(labels1,'cpu')
#_, predicted = torch.max(outputs.data, 1) 
#pred=to_device(model(tree),'cpu')
#print(predictions==y_true)
#print(y_true)

In [None]:
f1_score(y_true.cpu(),predictions.cpu())