In [2]:
import numpy as np
import pandas as pd
import scipy
import scipy.signal
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.optim  as optim
from sklearn.model_selection import train_test_split
from Utils.preprocess_util import *


In [3]:
X_train,X_valid,X_test,Y_train,Y_valid,Y_test = load_preprocess_eeg_data()


Training/Valid data shape: (2115, 22, 1000)
Test data shape: (443, 1000, 22)
Training/Valid target shape: (2115,)
Test target shape: (443,)
Person train/valid shape: (2115, 1)
Person test shape: (443, 1)

Training data shape: (1417, 22, 1000)
Valid data shape: (698, 22, 1000)
Training target shape: (1417,)
Valid target shape: (698,)


In [5]:
X_test = np.load("../Data/X_test.npy")[:,0:22,:]
y_test = np.load("../Data/y_test.npy")
person_train_valid = np.load("../Data/person_train_valid.npy")
X_train_valid = np.load("../Data/X_train_valid.npy")[:,0:22,:]
y_train_valid = np.load("../Data/y_train_valid.npy")
person_test = np.load("../Data/person_test.npy")

print ('Test data shape: {}'.format(X_test.shape))

Test data shape: (443, 22, 1000)


In [6]:
X_train, X_valid, y_train, y_valid = train_test_split(  X_train_valid, y_train_valid, test_size=0.33, random_state=42)


In [7]:
X_train_modified =[]
X_valid_modified =[]
X_test_modified =[]

for xi in X_train:
    X_train_modified.append(exponential_running_standardize(xi.T, eps=1e-4))

for xi in X_valid:
    X_valid_modified.append(exponential_running_standardize(xi.T, eps=1e-4))
    
for xi in X_test:
    X_test_modified.append(exponential_running_standardize(xi.T, eps=1e-4))
    

X_train = np.array(X_train_modified)
X_valid = np.array(X_valid_modified)
X_test = np.array(X_test_modified)

In [8]:
print ('Training/Valid data shape: {}'.format(X_train_valid.shape))
print ('Test data shape: {}'.format(X_test.shape))
print ('Training/Valid target shape: {}'.format(y_train_valid.shape))
print ('Test target shape: {}'.format(y_test.shape))
print ('Person train/valid shape: {}'.format(person_train_valid.shape))
print ('Person test shape: {}'.format(person_test.shape))
print()

X_train = np.transpose(X_train,[0,2,1])
X_valid = np.transpose(X_valid,[0,2,1])
X_test = np.transpose(X_test,[0,2,1])
print ('Training data shape: {}'.format(X_train.shape))
print ('Valid data shape: {}'.format(X_valid.shape))
print ('Training target shape: {}'.format(y_train.shape))
print ('Valid target shape: {}'.format(y_valid.shape))

Training/Valid data shape: (2115, 22, 1000)
Test data shape: (443, 1000, 22)
Training/Valid target shape: (2115,)
Test target shape: (443,)
Person train/valid shape: (2115, 1)
Person test shape: (443, 1)

Training data shape: (1417, 22, 1000)
Valid data shape: (698, 22, 1000)
Training target shape: (1417,)
Valid target shape: (698,)


In [9]:
Y_train = []
for y in y_train:
    value = np.abs(769-y)
    Y_train.append(value)
Y_train = np.array(Y_train)

Y_valid = []
for y in y_valid:
    value = np.abs(769-y)
    Y_valid.append(value)
Y_valid = np.array(Y_valid)

Y_test = []
for y in y_test:
    value = np.abs(769-y)
    Y_test.append(value)
Y_test = np.array(Y_test)

In [10]:
class Flatten(nn.Module):
    def forward(self, x):
        a= x.view(x.size(0), -1)
        return a
    
class threed_to_twod(nn.Module):
    def forward(self, x):
        #print(x.shape)
        a = x.reshape(x.shape[0],x.shape[3],x.shape[1])
        #print (a)
        return a

In [None]:
model = nn.Sequential()
model.add_module('conv_across_time',nn.Conv2d(1,40,kernel_size=(1,51),stride = 1))
model.add_module('conv_across_electrodes',nn.Conv2d(40,40,kernel_size=(22,1),stride = 1))
model.add_module('BatchNorm2d',nn.BatchNorm2d(40,momentum=0.1))
model.add_module('Nonlinearity', nn.ReLU())
model.add_module('correct_dimensions',threed_to_twod())
model.add_module('AvgPool2d',nn.AvgPool2d(kernel_size=(135,1),stride = (5,1)))
model.add_module('drop', nn.Dropout(p=0.5))
model.add_module('Flatten',Flatten())    
model.add_module('Fc_layer',nn.Linear(164*40,10))
torch.nn.init.xavier_uniform_(model.conv_across_time.weight, gain=1)
torch.nn.init.xavier_uniform_(model.conv_across_electrodes.weight, gain=1)


In [12]:
dtype = torch.FloatTensor
model.type(dtype)
loss_fn = nn.CrossEntropyLoss().type(dtype)


In [13]:
#N,C,H,W = 18,1,25,1000
#x = Variable(torch.tensor(X_train.reshape((18,1, 25, 1000))))
x = Variable(torch.tensor(X_train))
y = Variable(torch.tensor(Y_train),requires_grad = False)
dtype = torch.FloatTensor
x.type(dtype)
y.type(dtype)

tensor([3., 0., 1.,  ..., 2., 1., 0.])

In [None]:
'''
for t in range(3):
    y_pred = model( x.float())
    loss = loss_fn(y_pred,y.type(torch.LongTensor))
    print(loss.data)
    model.zero_grad()
    loss.backward()
loss = loss_fn(y_pred,y.type(torch.LongTensor))
'''

In [15]:
def get_accuracy(ouput, target, batch_size):
    ''' Obtain accuracy for training round '''
    
    
    classes_predicted = torch.max(ouput, 1)[1]
    corrects = (np.equal(classes_predicted.tolist(),target.tolist()).astype(int)).sum()

    #corrects = (max_values[1].view(target.size()).data == target.data).sum()
    accuracy = 100.0 * corrects/batch_size
    return accuracy.item()

def threeD_to_fourDTensor(X):
    return Variable(torch.tensor(X.reshape((X.shape[0],1,X.shape[1],X.shape[2],))))

In [None]:
num_train = X_train.shape[0]
num_valid = X_valid.shape[0]
batch_size = 50
num_epochs = 50
iterations_per_epoch = max(num_train // batch_size, 1)
num_iterations = num_epochs * iterations_per_epoch
epoch = 1
 
optimizer = torch.optim.Adam(model.parameters(), lr=0.00001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)   
for t in range(num_iterations):
    batch_mask = np.random.choice(num_train, batch_size)
    X_batch = X_train[batch_mask]
    y_batch = Y_train[batch_mask]
    X_batch_tensor = threeD_to_fourDTensor(X_batch)
    y_batch_tensor = Variable(torch.tensor(y_batch))
    
    y_pred = model( X_batch_tensor.float())
    
    loss = loss_fn(y_pred,y_batch_tensor.type(torch.LongTensor))
    
    model.zero_grad()
    loss.backward()
    optimizer.step()
    if(t%10 == 0):
        print('(Iteration %d / %d) loss: %f' % (
                       t + 1, num_iterations, loss.detach().numpy()))
    
    epoch_end = (t + 1) % iterations_per_epoch == 0
    
    if epoch_end:
                epoch += 1
    
    first_it = (t == 0)
    last_it = (t == num_iterations - 1)

    if first_it or last_it or epoch_end:
        X_train_tensor =threeD_to_fourDTensor(X_train[0:50,:,:])
        y_pred_train = model( X_train_tensor.float())
        train_acc = get_accuracy(y_pred_train, Y_train[0:50],
            batch_size=50)
        
        X_valid_tensor = threeD_to_fourDTensor(X_valid[0:50,:,:])
        y_pred_valid = model( X_valid_tensor.float())
        val_acc = get_accuracy(y_pred_valid, Y_valid[0:50],
            batch_size=50)
        print('(Epoch %d / %d) train acc: %f; val_acc: %f' % (
                           epoch, num_epochs, train_acc, val_acc))


In [None]:
X_valid_tensor = threeD_to_fourDTensor(X_valid)
y_pred_valid = model( X_valid_tensor.float())
val_acc = get_accuracy(y_pred_valid, Y_valid,
    batch_size=X_valid.shape[0])
print(val_acc)

In [None]:
X_test_tensor = threeD_to_fourDTensor(X_test)
y_pred_test = model( X_test_tensor.float())
test_acc = get_accuracy(y_pred_test, Y_test,
    batch_size=X_test.shape[0])
print(test_acc)