# EEG classifier

In [None]:
import os
import numpy as np
import time, datetime
import matplotlib.pyplot as plt
from collections import OrderedDict
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.autograd import Variable

from sklearn.metrics import confusion_matrix

In [None]:
is_cuda = torch.cuda.is_available()
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

In [None]:
data_path = "../dataset/"

for dirname, _, filenames in os.walk(data_path):
    for filename in filenames:
        if "train" in filename:
            train_path = os.path.join(dirname, filename)
        if "test" in filename:
            test_path = os.path.join(dirname, filename)

In [None]:
def encode_target(target):
    if target == -55:
        return 0
    else:
        return 1

In [None]:
train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

# remove unnamed columns
train_df = train_df.iloc[: , 3:]
test_df = test_df.iloc[: , 3:]

# change target value
train_df.target = train_df.target.apply(encode_target)
test_df.target = test_df.target.apply(encode_target)

In [None]:
plt.plot(train_df.iloc[2:3, :-1].to_numpy()[0])

In [None]:
# classes
np.unique(train_df.to_numpy()[:,-1])

In [None]:
def print_conf_matrix(y_true, y_pred):
    conf_matrix = confusion_matrix(y_true=y_true, y_pred=y_pred)

    fig, ax = plt.subplots(figsize=(7.5, 7.5))
    ax.matshow(conf_matrix, cmap=plt.cm.Blues, alpha=0.3)
    for i in range(conf_matrix.shape[0]):
        for j in range(conf_matrix.shape[1]):
            ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')

    plt.xlabel('Predictions', fontsize=18)
    plt.ylabel('Actuals', fontsize=18)
    plt.title('Confusion Matrix', fontsize=18)
    plt.show()

In [None]:
def create_data_loader(train_df, test_df, batch_size = 100, val_split=0.3):
    
    # transform dataframe to numpy array
    train_data = train_df.to_numpy()
    test_data = test_df.to_numpy()
    
    # create x and y
    x_train = train_data[:, :-1]/train_data[:, :-1].max()
    x_test = test_data[:, :-1]/test_data[:, :-1].max()
    y_train = train_data[:, -1]
    y_test = test_data[:, -1]
    
    x_train = x_train[:, :178]
    x_test = x_test[:, :178]

    # create tensor dataset of x and y
    train_dataset = torch.utils.data.TensorDataset(torch.from_numpy(x_train).float(),
                                                   torch.from_numpy(y_train).long(),)
    test_dataset = torch.utils.data.TensorDataset(torch.from_numpy(x_test).float(),
                                                  torch.from_numpy(y_test).long())
    
    # split dataset in train, val and test
    train_len = train_data.shape[0]
    val_len = int(train_len * val_split)
    train_len -= val_len

    # shuffle train and validade data
    train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [train_len, val_len])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    return train_loader, val_loader, test_loader

In [None]:
batch_size = 8
val_split = 0.2

In [None]:
train_loader, val_loader, test_loader = create_data_loader(train_df, test_df, batch_size = batch_size, val_split=val_split)

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        
        super(RNNModel, self).__init__()
        
        # Number of hidden dimensions
        self.hidden_dim = hidden_dim
        
        # Number of hidden layers
        self.layer_dim = layer_dim
        
        # RNN
        self.rnn = nn.RNN(input_dim, hidden_dim, layer_dim, batch_first=True, nonlinearity='relu')
        
        # Readout layer
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        
        # Initialize hidden state with zeros
        h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
            
        # One time step
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :]) 
        return out

In [None]:
num_epochs = 10 # number of training epochs

In [None]:
input_dim = 178   # input dimension
hidden_dim = 1000  # hidden layer dimension
layer_dim = 1     # number of hidden layers
output_dim = 2   # output dimension

# Initiate RNN model
model = RNNModel(input_dim, hidden_dim, layer_dim, output_dim)

# Cross Entropy Loss 
error = nn.CrossEntropyLoss()

# SGD Optimizer
learning_rate = 0.05
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [None]:
seq_dim = 178
loss_list = []
iteration_list = []
accuracy_list = []
y_true=[]
y_pred=[]
count = 0
for epoch in range(num_epochs):
    for i, (signals, labels) in enumerate(train_loader):

        train = signals.unsqueeze(1)
        labels = labels.long()
            
        # Clear gradients
        optimizer.zero_grad()
        
        # Forward propagation
        outputs = model(train)
        
        # Calculate softmax and ross entropy loss
        loss = error(outputs, labels)
        
        # Calculating gradients
        loss.backward()
        
        # Update parameters
        optimizer.step()
        
        count += 1
        
        if count % (test_df.shape[0]//batch_size) == 0:
            # Calculate Accuracy         
            correct = 0
            total = 0
            # Iterate through test dataset
            for signals, labels in val_loader:
                signals = signals.unsqueeze(1)
                
                # Forward propagation
                outputs = model(signals)
                
                # Get predictions from the maximum value
                predicted = torch.max(outputs.data, 1)[1]
                
                # Total number of labels
                total += labels.size(0)
                
                correct += (predicted == labels).sum()
            
            accuracy = 100 * correct / float(total)
            
            # store loss and iteration
            loss_list.append(loss.data)
            iteration_list.append(count)
            accuracy_list.append(accuracy)
            y_true+=labels
            y_pred+=predicted
#             if count % 10 == 0:
                # Print validade loss
            print('iteration: {}  val_loss: {}  val_accuracy: {} %'.format(count, loss.data, accuracy))

In [None]:
# Test model
for signals, labels in test_loader:
    signals = signals.unsqueeze(1)

    # Forward propagation
    outputs = model(signals)

    # Get predictions from the maximum value
    predicted = torch.max(outputs.data, 1)[1]

    # Total number of labels
    total += labels.size(0)

    correct += (predicted == labels).sum()
    
accuracy = 100 * correct / float(total)

# store loss and iteration
# loss_list.append(loss.data)
# accuracy_list.append(accuracy)
y_true+=labels
y_pred+=predicted
# Print validade loss
print('test_accuracy: {} %'.format(accuracy.numpy().round()))

In [None]:
print_conf_matrix(y_true, y_pred)

In [None]:
# visualization loss 
plt.plot(iteration_list,loss_list)
plt.xlabel("Number of iteration")
plt.ylabel("Loss")
plt.title("RNN: Loss vs Number of iteration")
plt.show()

# visualization accuracy 
plt.plot(iteration_list,accuracy_list,color = "red")
plt.xlabel("Number of iteration")
plt.ylabel("Accuracy")
plt.title("RNN: Accuracy vs Number of iteration")
plt.show()

### Test model with an experimental data

In [None]:
def test_data_loader(test_df, batch_size = 100):
    
    # transform dataframe to numpy array
    test_data = test_df.to_numpy()
    
    # create x and y
    x_test = test_data[:, :-1]/test_data[:, :-1].max()
    y_test = test_data[:, -1]

    # create tensor dataset of x and y
    test_dataset = torch.utils.data.TensorDataset(torch.from_numpy(x_test).float(),
                                                  torch.from_numpy(y_test).long())

    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    return test_loader

In [None]:
df = pd.read_csv("../dataset/experimental_data.csv")

In [None]:
# Remove first column
final_test_df = df.iloc[: , 1:]

In [None]:
def get_target(target):
    if target==1:
        return 0
    elif target==2:
        return 1
    else:
        return np.nan

In [None]:
# select targets 1 and 2
final_test_df.y = final_test_df.y.apply(get_target)

In [None]:
final_test_df.y.value_counts(dropna=False)

In [None]:
# drop missing values
final_test_df.dropna(subset=["y"], inplace=True)

In [None]:
test_loader = test_data_loader(final_test_df, batch_size = batch_size)

In [None]:
# Test model
for signals, labels in test_loader:
    signals = signals.unsqueeze(1)

    # Forward propagation
    outputs = model(signals)

    # Get predictions from the maximum value
    predicted = torch.max(outputs.data, 1)[1]

    # Total number of labels
    total += labels.size(0)

    correct += (predicted == labels).sum()
    
accuracy = 100 * correct / float(total)

# store loss and iteration
# loss_list.append(loss.data)
# accuracy_list.append(accuracy)
y_true+=labels
y_pred+=predicted
# Print validade loss
print('test_accuracy: {} %'.format(accuracy.numpy().round()))

In [None]:
print_conf_matrix(y_true, y_pred)