## Classification of phonemes using neural signals

With a Bidirectional LSTM

In [None]:
##############################
#
# 09/03/18 setup
#
##############################
from __future__ import division
%matplotlib inline
%load_ext autoreload
%autoreload 2


import numpy as np
import matplotlib.pyplot as plt
import scipy.io
from numpy import array
import pandas as pd
import sklearn
import torch
import random
from sklearn.datasets import make_classification
#from skorch import NeuralNetClassifier
import torch.optim as optim
from functools import reduce
import torch.nn.functional as F

from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import GridSearchCV

# set random seed
# We need to set the cudnn to derministic mode so that 
#we can get consistent result
# Yet, this will slow down the training :( It's a trade-off
# see https://discuss.pytorch.org/t/how-to-
#confugure-pytorch-to-get-deterministic-results/11797
# see https://discuss.pytorch.org/t/network-forward-
#backward-calculation-precision-error/17716/2
random_state = 100
np.random.seed(random_state)
torch.manual_seed(random_state)
random.seed(random_state)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print ('using device ' + str(device))

In [None]:
# parameters
data_root_dir = '/Users/janaki/Dropbox/bci/bill_grant_Fall2018'
f_name = 'data5_nn_zscored.csv'

# load the feature and labels
df = pd.read_csv('%s/%s' %(data_root_dir, f_name), header=-1)

# define label and feature; convert it to ndarray
# adjust the label from 0 to 9
# y does not have to be one-hot embedding, i.e., if 10 class, then dim is 1000x10, can be 1000,
df_label = df.iloc[:-1,0]
df_f = df.iloc[:-1,1:]
X1 = df_f.as_matrix()
y1 = df_label.as_matrix() 

print(y1.shape)
print(X1.shape)

In [None]:
#converting data into the form required by an LSTM (time, batch size, number of features)
X_raw = np.zeros((225,199,267))
for i in np.arange(267):
    X_raw[:,:,i] = np.reshape(X1[:,i],(199,225)).transpose()
    assert np.all(X_raw[:,0,i] == X1[0:225,i])
y = np.reshape(y1,(199,225)).transpose()
s = np.arange(X_raw.shape[1])
np.random.shuffle(s)
X_raw = X_raw[:,s,:]
X_raw = X_raw.astype(np.float32)
y = y[:,s]
print(X_raw.shape)
print(y.shape)
print(X_raw.size)
print(y.size)
#assert np.all(torch.tensor(y,dtype = torch.long).contiguous().view(225*123) == torch.tensor(y1,dtype = torch.long).contiguous().view(225*123))

In [None]:
# calculate class ratio for loss calculation because it is imbalanced

def get_weight_balance(y, amp_weight=1):
    """
    Calculate the class ratio.
    """

    y_labels = np.reshape(y,(y.size,1))
    counts_np = np.bincount(y_labels[:,0])
    
    #counts_np = np.vstack((temp2,temp1[temp2])).T
    max_val = np.max(np.abs(counts_np),axis=0)
    
    class_weight = np.max(np.abs(counts_np),axis=0)/counts_np
    class_weight = class_weight*amp_weight
    class_weight = class_weight.tolist()
    
    return class_weight
weight = get_weight_balance(y)
print(weight)

In [None]:
"""
Define the RNN model.
"""
from torch import nn
import torch.nn.functional as F

class RNN(nn.Module):
    def __init__(self, num_directions, num_layers, hidden_dim, vocab_size, num_chan, batch_size, prob):
        super(RNN, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.direction = num_directions
        self.batch_size = batch_size
        self.num_layers = num_layers
                
        #first layer lstm cell
        self.lstm = nn.LSTM(input_size = num_chan, hidden_size=hidden_dim, 
                            bidirectional = self.direction>1, num_layers = self.num_layers)  
        self.affine0 = nn.Linear(in_features = num_directions*hidden_dim, 
                                 out_features = vocab_size)
        self.softmax = nn.LogSoftmax(dim=2)
        self.dropout = nn.Dropout(p=prob)

    def forward(self, X, hc, **kwargs):
        """
            x: input to the model
                *  x[t] - input of shape (batch, input_size) at time t
                
            hc: hidden and cell states
                *  tuple of hidden and cell state
        """ 
        hc_1 = hc
        output_lstm, (hn, cn) = self.lstm(X, hc_1)
        output_seq = self.affine0(output_lstm)
        
        # return the output sequence
        #pdb.set_trace()  
        #view goes across each row and then traverses down the matrix
        return output_seq.view((X.shape[0]*self.batch_size, self.vocab_size))
    
    def initHidden(self,device = None):
        
        # initialize the hidden state and the cell state to zeros
        #pdb.set_trace()

        return (torch.zeros((self.direction*self.num_layers, self.batch_size, self.hidden_dim), 
                            dtype=torch.float32, device=device),
                torch.zeros((self.direction*self.num_layers, self.batch_size, self.hidden_dim), 
                            dtype=torch.float32, device=device)) 
    

In [None]:
"""
Train the RNN model
"""
def train_rnn(rnn, input_tensor, target_tensor):
    hidden = rnn.initHidden(device = device)

    optimizer.zero_grad()
    
    output = rnn(input_tensor, hidden)

    loss = criterion(output, target_tensor)
    
    # calculate the gradients
    loss.backward()

    # update the parameters of the model
    optimizer.step()

    return output, loss.item()

In [None]:
def random_samples_without_zero(X, y, n):
    samples_without_zero = []
        samples_without_zero.append ([X[:,n,:],y[:,n]])
        #import pdb; pdb.set_trace()
    return  samples_without_zero
    

In [None]:
"""
Test the model
"""
def evaluate_rnn(input_tensor):
    rnn.batch_size = 1
    hidden = rnn.initHidden(device = device)
    output = rnn(input_tensor, hidden)
    return output

In [None]:
import time
t0 = time.time()

# define the input data
X_raw = X_raw.astype(np.float32) #225x199x178
y = y.astype(np.int64) #225x199
y_copy = np.copy(y)

#define output variables
out_proba_mat = np.zeros((225,y.shape[1],11))
val_mat = np.zeros_like(y)
y_new = np.zeros_like(y)
val_accuracy = np.zeros(y.shape[1])

#converting to torch variables
y_gpu = torch.tensor(y_copy, dtype=torch.long, device=device)
X_raw_gpu = torch.tensor(X_raw, device=device)

#defining parameters
n_hidden = 128

#variables for visualization
print_every = 30
plot_every = 1
val_every = 1
current_tr_loss = 0
current_val_loss = 0
val_loss = 0
cnt = 0 
all_tr_losses = []
all_val_losses = []
itr = 30

for test_index in np.arange(y.shape[1]):
    train_index = np.delete(np.arange(y.shape[1]),test_index)

    val_samples =  random_samples_without_zero(X_raw_gpu,y_gpu,test_index)
    
    #assigning weights to each class
    class_weight_samples = y_copy[:,train_index]
    class_weight = get_weight_balance(class_weight_samples, amp_weight=1)
    class_weight = torch.tensor(class_weight, device = device, dtype=torch.float32)
   
    rnn = RNN(num_layers = 2, num_directions = 2, hidden_dim = n_hidden, vocab_size = 11, num_chan = 171, batch_size = len(train_index), prob = 0)  
    rnn = rnn.to(device)
    criterion = nn.CrossEntropyLoss(weight = class_weight)
    optimizer = optim.Adam(rnn.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 20, gamma=0.1, last_epoch=-1)

    #training step
    for iter in np.arange(itr):
        input_sample =  random_samples_without_zero(X_raw_gpu,y_gpu,train_index)
        # massaging the input data
        input_y = input_sample[0][1].view(225*len(train_index))
        input_X = input_sample[0][0]
        
        output, loss = train_rnn(rnn, input_X, input_y)
        current_tr_loss += loss
        cnt += 1

        # Print iter number, loss, name and guess
        if cnt % print_every == 0:
            out_proba_val = F.softmax(output, dim=-1)  
            out_proba_val = out_proba_val.cpu().detach().numpy()
            #print(np.argmax(out_proba_val,axis=1))
            #print(input_y)

        if iter % plot_every == 0:
            all_tr_losses.append(current_tr_loss)
            current_tr_loss = 0
            #print(all_tr_losses)

    # validation step
    val_X = val_samples[0][0].unsqueeze(1)
    val_y = val_samples[0][1] 

    output = evaluate_rnn(val_X)
    out_proba_val = F.softmax(output, dim=-1)  
    out_proba_val = out_proba_val.cpu().detach().numpy()
    out_proba_mat[:,test_index,:] = out_proba_val
    val_preds = np.argmax(out_proba_val,axis=1)
    val_mat[:,test_index] = val_preds
    y_new[:,test_index] = val_y.cpu().detach().numpy()
    plt.plot(all_tr_losses)
    plt.show()
    print(val_y)
    print(val_preds)
t1 = time.time()
total = t1-t0    
    

In [None]:
y_col = y_new.reshape(y_new.size,1)
val_col = val_mat.reshape(val_mat.size,1)

"""
Evaluate the performance
"""
from sklearn.metrics import confusion_matrix

# calculation the confusion matrix
# row: true label
# col: predicted value
cm = np.array(confusion_matrix(y_col,val_col))
#cm = np.transpose(cm)
print(df.iloc[:,0].value_counts())
freq = np.array ([37577, 2996, 1444, 2060, 2074, 1780, 399, 401, 124, 97, 101])
print('confusion matrix')
print(cm)
print(np.sum(cm.diagonal())/np.sum(freq))

In [None]:
val_accuracy = np.mean(val_mat == y_new, axis = 0)
print(val_accuracy)
print(np.sum(val_accuracy>0.90))

In [None]:
import seaborn as sn

# export to csv`
df_cm = pd.DataFrame(cm/freq[:,None])
sn.set(font_scale=1)
accuracies = sn.heatmap(df_cm, annot=True, annot_kws={"size":9})
#figure = accuracies.get_figure()
#figure.savefig('Subject3_rnn.png')

In [None]:
import scipy.io
scipy.io.savemat('/Users/janaki/Dropbox/bci/bill_grant_Fall2018/probabilities_data6_nn_zscored_256_35.mat', mdict={'out_proba_mat': out_proba_mat})