# setting up the training and testing data

In [1]:
import h5py
import numpy as np 
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pylab as plt
%matplotlib inline

path = []
path.append('Rat08-20130711_017.h5')
path.append('Part1SubjectHB10.h5')
path.append('Part2SubjectHB13.h5')
i = 0
f = h5py.File(path[1], 'r')  # read data with h5 format
fs = f.attrs['fs'][0]  # get sampling frequency of LFP signal (Hz)
states = []  # two states (NREM & WAKE) to be classified
for name, grp in f.items():
  states.append(name)
# Convert the recording in to numpy arrays
# Use a dictionary to store the LFP recordings of the two states
# each containing a list of numpy arrays of all segments
lfp = {key: [] for key in states}  
for key in states:
  group = f[key]  # h5 group of a state
  n = len(group)  # number of segments
  for i in range(n):
    lfp[key].append(group[str(i+1)][()].astype(float))  # convert data to numpy array and from int type to float type
    
f = h5py.File(path[2], 'r')  # read data with h5 format
fs = f.attrs['fs'][0]  # get sampling frequency of LFP signal (Hz)
states = []  # two states (NREM & WAKE) to be classified
for name, grp in f.items():
  states.append(name)
# Convert the recording in to numpy arrays
# Use a dictionary to store the LFP recordings of the two states
# each containing a list of numpy arrays of all segments
lfp2 = {key: [] for key in states}  
for key in states:
  group = f[key]  # h5 group of a state
  n = len(group)  # number of segments
  for i in range(n):
    lfp2[key].append(group[str(i+1)][()].astype(float))  # convert data to numpy array and from int type to float type 
    
f = h5py.File(path[0], 'r')  # read data with h5 format
fs = f.attrs['fs'][0]  # get sampling frequency of LFP signal (Hz)
states = []  # two states (NREM & WAKE) to be classified
for name, grp in f.items():
  states.append(name)
# Convert the recording in to numpy arrays
# Use a dictionary to store the LFP recordings of the two states
# each containing a list of numpy arrays of all segments
lfp3 = {key: [] for key in states}  
for key in states:
  group = f[key]  # h5 group of a state
  n = len(group)  # number of segments
  for i in range(n):
    lfp3[key].append(group[str(i+1)][()].astype(float))  # convert data to numpy array and from int type to float type      
    
# combines all the data into one lfp array
lfp['NREM'] = lfp['NREM'] + lfp2['NREM'] + lfp3['NREM']
lfp['WAKE'] = lfp['WAKE'] + lfp2['WAKE'] + lfp3['WAKE']

y = []
x = []

for i in range(len(lfp['NREM'])):
    y.append('NREM')
    x.append(lfp['NREM'][i])
    
for i in range(len(lfp['WAKE'])):
    y.append('WAKE')
    x.append(lfp['WAKE'][i])
    
#print(len(x[0][0:5000]))
#print(len(y))
#print(len(x))

sliced_x = []
sliced_y = []
for i in range(len(x)):
    temp_x = []
    temp_y = []
    for j in range(int((len(x[i]))/1000)):
        temp = 0
        new_x =x[i][0+temp:1000+temp]
        if (y[i] =='NREM'):
            new_y = 'NREM'
        if (y[i] =='WAKE'):
            new_y = 'WAKE'
        temp = temp + 5000
        temp_x.append(new_x)
        temp_y.append(new_y)
    sliced_x.append(temp_x)
    sliced_y.append(temp_y)


all_data_split_array_x = []
all_data_split_array_y = []
for i in range(len(sliced_x)):
    t_x = [0]
    t_y = [0]
    for j in range(len(sliced_x[i])):
        t_x = sliced_x[i][j]
        t_y = sliced_y[i][j]
        all_data_split_array_x.append(t_x)
        all_data_split_array_y.append(t_y)
        
        
for i in range(len(all_data_split_array_y)):
    if(all_data_split_array_y[i] == 'NREM'):
        all_data_split_array_y[i] = 0
    if(all_data_split_array_y[i] == 'WAKE'):
        all_data_split_array_y[i] = 1

final_labeled_data = np.column_stack((all_data_split_array_y,all_data_split_array_x))

# dataset definition
class LFP_data(Dataset):
    # load the dataset
    def __init__(self):       
        self.X = torch.tensor(all_data_split_array_x)
        self.y = torch.tensor(all_data_split_array_y)
        self.n_samples = len(all_data_split_array_x)
 
    def __len__(self):
        return self.n_samples
 
    # get a row at an index
    def __getitem__(self, index):
        return [self.X[index], self.y[index]]
 
dataset = LFP_data()
first_data = dataset[0]
features, labels = first_data
#print(features,labels)

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

batch_size = 10

training_data = DataLoader(train_dataset, batch_size=batch_size,
                         shuffle=True, num_workers=0)

validation_data = DataLoader(test_dataset, batch_size=batch_size,
                        shuffle=False, num_workers=0)

# Defining the model

In [2]:
class NeuralNetwork(torch.nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.activation = nn.Tanh()
        self.first_hidden = nn.Linear(1000, 500)
        self.second_hidden = nn.Linear(500, 50)
        self.output = nn.Linear(50, 2)
        
    def forward(self, x):
        x = self.activation(self.first_hidden(x))
        x = self.activation(self.second_hidden(x))
        x = self.output(x)
        return x

# Setting up some of the model parameters

In [8]:
import torch.optim as optim
import torch.nn as nn

epochs = 5
learning_rate = 0.01

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

model = NeuralNetwork().to(device)

loss_fn = nn.CrossEntropyLoss()

optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

print(model)


Using cpu device
NeuralNetwork(
  (activation): Tanh()
  (first_hidden): Linear(in_features=1000, out_features=500, bias=True)
  (second_hidden): Linear(in_features=500, out_features=50, bias=True)
  (output): Linear(in_features=50, out_features=2, bias=True)
)


# Setting up the training function

In [4]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X.float())
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

# Setting up the testing function

In [5]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X.float())
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

# Actually running the model

In [6]:
# Running the model
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(training_data, model, loss_fn, optimizer)
    test(validation_data, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 0.668048  [    0/25873]
loss: 0.340853  [ 1000/25873]
loss: 0.175617  [ 2000/25873]
loss: 0.115584  [ 3000/25873]
loss: 0.072704  [ 4000/25873]
loss: 0.205569  [ 5000/25873]
loss: 0.066608  [ 6000/25873]
loss: 0.239737  [ 7000/25873]
loss: 0.057810  [ 8000/25873]
loss: 0.089075  [ 9000/25873]
loss: 0.034986  [10000/25873]
loss: 0.055723  [11000/25873]
loss: 0.006589  [12000/25873]
loss: 0.012218  [13000/25873]
loss: 0.048736  [14000/25873]
loss: 0.013992  [15000/25873]
loss: 0.023574  [16000/25873]
loss: 0.016225  [17000/25873]
loss: 0.015249  [18000/25873]
loss: 0.007429  [19000/25873]
loss: 0.018200  [20000/25873]
loss: 0.008183  [21000/25873]
loss: 0.008805  [22000/25873]
loss: 0.014207  [23000/25873]
loss: 0.006599  [24000/25873]
loss: 0.004545  [25000/25873]
Test Error: 
 Accuracy: 100.0%, Avg loss: 0.007579 

Epoch 2
-------------------------------
loss: 0.006949  [    0/25873]
loss: 0.003247  [ 1000/25873]
loss: 0.004819  [ 2000/2587

# Just checking the first 10 points in the test dataset to see if the model can actually predict correctly

In [7]:
classes = [
    "NREM",
    "WAKE",
]
model.eval()
for i in range(10):
    x, y = test_dataset[i][0], test_dataset[i][1]
    with torch.no_grad():
        pred = model(x.float())
        #print(pred.argmax(0),y)
        print('Predicted', {classes[pred.argmax(0)]},'Actual', {classes[y]})

Predicted {'WAKE'} Actual {'WAKE'}
Predicted {'NREM'} Actual {'NREM'}
Predicted {'WAKE'} Actual {'WAKE'}
Predicted {'WAKE'} Actual {'WAKE'}
Predicted {'WAKE'} Actual {'WAKE'}
Predicted {'WAKE'} Actual {'WAKE'}
Predicted {'WAKE'} Actual {'WAKE'}
Predicted {'WAKE'} Actual {'WAKE'}
Predicted {'NREM'} Actual {'NREM'}
Predicted {'NREM'} Actual {'NREM'}


# The hardest part of the homework was getting the accuracy of the model high enough. I tried serveral different network layouts and activation functions and found the TanH function to greatly improve the accuracy of this model