In [1]:
import pandas as pd
import numpy as np
from numpy import load
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt


In [2]:
### import data from csv (written in data_processing)
q_array = load('q_array.npy')
a_array = load('a_array.npy')

### find number of trials
data_trials = int(a_array.shape[0])

In [3]:
### organize data into training and test tensors
train_x = (q_array[0:int(0.8*data_trials),:,:]).astype(np.float32) # training data from csv/pandas (80% of existing data?)
x_train_tensor = torch.from_numpy(train_x)
test_x = (q_array[int(0.8*data_trials):data_trials,:,:]).astype(np.float32) # test data from csv/pandas (remaining ~20% of data)
test_x = np.expand_dims(test_x, 1) # add dimension for neural net
x_test_tensor = torch.from_numpy(test_x)

train_y = (a_array[0:int(0.8*data_trials),:]).astype(np.float32)
y_train_tensor = torch.from_numpy(train_y)
test_y = (a_array[int(0.8*data_trials):data_trials,:]).astype(np.float32)
y_test_tensor = torch.from_numpy(test_y)

test_data = [(x_test_tensor[i],y_test_tensor[i]) for i in range(x_test_tensor.shape[0])]


train_dataset = torch.utils.data.TensorDataset(x_train_tensor, y_train_tensor)
test_dataset = torch.utils.data.TensorDataset(x_test_tensor, y_test_tensor)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = 1000, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = 1000, shuffle=True)


for batch, (x, y) in enumerate(train_loader):
    print("batch", batch)
    print("Shape of x: ", x.shape, x.dtype)
    print("Shape of y: ", y.shape, y.dtype)
    break

batch 0
Shape of x:  torch.Size([1000, 16, 100]) torch.float32
Shape of y:  torch.Size([1000, 6]) torch.float32


In [4]:
### define model
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.flatten = nn.Flatten()
            
        self.ls = nn.LSTM(16,32,2, batch_first = True)
        self.linear = nn.Linear(32,6)
            

    def forward(self, x):
        batch_size = x.shape[0] 
        x = self.flatten(x)
        x = torch.reshape(x, (batch_size, 100, 16)) 
        output,(h,c) = self.ls(x)
        output = self.linear(h[-1,...]) 
        return nn.Sigmoid()(output)

model = Net()
print(model)

Net(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (ls): LSTM(16, 32, num_layers=2, batch_first=True)
  (linear): Linear(in_features=32, out_features=6, bias=True)
)


In [5]:
### loss function and optimizer for training 
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

### Functions for training and testing the model
def train(dataloader, model, loss_fn, optimizer,loss_list):
    size = len(dataloader.dataset)
    for batch, (x,y) in enumerate(dataloader):

        # Compute prediction error
        pred = model(x)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 10 == 0:
            loss, current = loss.item(), batch * len(x)
            loss_list.append(loss)
        if batch % 20 == 0:
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    return loss_list

def test(dataloader, model,test_loss_list):
    size = len(dataloader.dataset)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y)
    print(f"Test Error: Loss = {test_loss:>8f} \n")
    test_loss_list.append(test_loss)
    return test_loss_list

In [None]:
### Train the model
epochs = 300
loss_list=[]
test_loss_list=[]
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    loss_list = train(train_loader, model, loss_fn, optimizer,loss_list)
    test_loss_list = test(test_loader, model, test_loss_list)

Epoch 1
-------------------------------
loss: 0.083004  [    0/16396]
Test Error: Loss = 0.391950 

Epoch 2
-------------------------------
loss: 0.076387  [    0/16396]
Test Error: Loss = 0.350568 

Epoch 3
-------------------------------
loss: 0.071345  [    0/16396]
Test Error: Loss = 0.348438 

Epoch 4
-------------------------------
loss: 0.070377  [    0/16396]
Test Error: Loss = 0.316444 

Epoch 5
-------------------------------
loss: 0.062566  [    0/16396]
Test Error: Loss = 0.281041 

Epoch 6
-------------------------------
loss: 0.056158  [    0/16396]
Test Error: Loss = 0.285617 

Epoch 7
-------------------------------
loss: 0.057530  [    0/16396]
Test Error: Loss = 0.279059 

Epoch 8
-------------------------------
loss: 0.055172  [    0/16396]
Test Error: Loss = 0.277142 

Epoch 9
-------------------------------
loss: 0.055700  [    0/16396]
Test Error: Loss = 0.277279 

Epoch 10
-------------------------------
loss: 0.054992  [    0/16396]
Test Error: Loss = 0.273554 


loss: 0.037141  [    0/16396]
Test Error: Loss = 0.185122 

Epoch 83
-------------------------------
loss: 0.035688  [    0/16396]
Test Error: Loss = 0.179819 

Epoch 84
-------------------------------
loss: 0.036052  [    0/16396]
Test Error: Loss = 0.185982 

Epoch 85
-------------------------------
loss: 0.037079  [    0/16396]
Test Error: Loss = 0.168357 

Epoch 86
-------------------------------
loss: 0.033171  [    0/16396]
Test Error: Loss = 0.163065 

Epoch 87
-------------------------------
loss: 0.030761  [    0/16396]
Test Error: Loss = 0.155590 

Epoch 88
-------------------------------
loss: 0.030970  [    0/16396]
Test Error: Loss = 0.163543 

Epoch 89
-------------------------------
loss: 0.031472  [    0/16396]
Test Error: Loss = 0.151928 

Epoch 90
-------------------------------
loss: 0.029834  [    0/16396]
Test Error: Loss = 0.159067 

Epoch 91
-------------------------------
loss: 0.030975  [    0/16396]
Test Error: Loss = 0.163383 

Epoch 92
----------------------

In [None]:
### Plot results
plt.figure()
plt.plot(loss_list)
axes = plt.gca()
plt.title('Training Loss')
plt.xlabel('time')
plt.ylabel('loss')
axes.set_ylim([0,0.4])
plt.show()

plt.figure()
plt.plot(test_loss_list)
axes = plt.gca()
plt.title('Test Loss')
plt.xlabel('data')
plt.ylabel('loss')
# axes.set_ylim([0.04,0.1])
plt.show()