In [2]:
import pickle
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA

Import Data

In [3]:
def to_onehot(x):
    labels = np.unique(x)
    result = np.zeros(shape=(x.shape[0], labels.shape[0]))

    for i in range(len(x)):
        result[i][x[i]] = 1.0

    return result

In [8]:
from sklearn.model_selection import train_test_split

X = [] # Features per class
y = [] # Labels
N = 5250 # Amount of data we want to use max: 5250

# Import the features
with open("sample_data/training_data_onehot.pkl", "rb") as f:
	X_train, y_train = pickle.load(f)

with open("sample_data/test_data_onehot.pkl", "rb") as f:
	X_test, y_test = pickle.load(f)


Preprocess Data

In [123]:
# Calculates the number of components to consider when performing pca
def num_components(X, variance_tol = 0.8):
    # Standardize each feature of the matrix
    x_mean = np.mean(X, axis = 0)
    x_std = np.std(X, axis = 0)
    Z = (X - x_mean) / x_std

    # Calculate covariance matrix
    C = np.cov(Z, rowvar=False)
    # Calculate eigenvalues and eigenvectors and sort by size
    eigenvalues, eigenvectors = np.linalg.eig(C)
    index = eigenvalues.argsort()[:: -1]
    eigenvalues = eigenvalues[index]
    eigenvectors = eigenvectors[:, index]

    # Calculate explained variance matrix 
    explained_var = np.cumsum(eigenvalues) / np.sum(eigenvalues)

    # Select number of components responsible for variance_tol% of variance
    n_components = np.argmax(explained_var >= variance_tol) + 1
    return Z, x_mean, x_std, n_components

# Parameters are trained components, trained mean, trained standard deviation and the new inputs X
# Changes to the PCA basis
def convert_to_pca(components, mean, std, X):
    Z = (X - mean)/std
    return np.matmul(components, Z)

Z, mean, std, n_components = num_components(X_train)
# Initialize prinicipal component analysis
pca = PCA(n_components)
pca.fit(Z)
components = pca.components_
x_pca = pca.transform(Z)

Build Model

In [14]:
import torch as T
import torch.nn as nn


# Build Model
def build_model(n_inputs, n_outputs, h1_dims=512, h2_dims=256):
    model =  nn.Sequential(
        nn.Linear(n_inputs, h1_dims),
        nn.ReLU(),
        nn.Linear(h1_dims, h2_dims),
        nn.ReLU(),                              
        nn.Linear(h2_dims, n_outputs),
        nn.Softmax(dim=-1)
    )
    return model


class NeuralNetwork(nn.Module):
    def __init__(self, n_inputs, n_outputs, h1_dims=512, h2_dims=256, name="NeuralNetwork", save_dir="/trained_models"):
        super(NeuralNetwork, self).__init__()
        self.name = name
        self.save_dir = save_dir

        self.flatten = nn.Flatten()
        self.model = build_model(n_inputs, n_outputs, h1_dims, h2_dims)
    
    def forward(self, X):
        X = self.flatten(X)
        X = self.model(X)
        return X
    
    def save_model(self):
        T.save(self.state_dict, f"{self.save_dir}/{self.name}.pth")

    def load_model(self, name):
        self.load_state_dict(T.load(f"{self.save_dir}/{name}.pth"))



labels = np.unique(y_train) # Labels of our data (0 - 20)

n_inputs = len(np.reshape(X_train[0], -1))
h1_dims = 2 * n_inputs # Number of nodes for the 1st hidden layer
h2_dims = n_inputs # Number of nodes for the 2nd hidden layer
n_outputs = 10 # 21 labels

# Initialize the model
net = NeuralNetwork(n_inputs=n_inputs, n_outputs=n_outputs, h1_dims=h1_dims, h2_dims=h2_dims, name="NN-v1")


Train Model

In [20]:
import torch.optim as optim

# Global Variables
epochs = 3
learning_rate = 1e-5

loss_func = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=learning_rate)


def test_model(model, X_test, y_test, loss_func):
    size = len(y_test)

    model.eval()
    with T.no_grad():
        
        X = T.from_numpy(X_test).to(T.float32)
        y_tensor = T.Tensor(y_test).to(T.float)

        y_pred = model.forward(X)

        test_loss = loss_func(y_pred, y_tensor)

        correct = (y_pred.argmax(1) == y_tensor.argmax(1)).type(T.float).sum().item()
        
        test_loss /= size
        correct /= size
        print(f"Test Error: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


def train_model(model, X_train, y_train, loss_func, optimizer):
    size = len(X_train)
    batch_size = 50

    for i in range(size//batch_size):
        start = batch_size * i
        end = start + batch_size
        X = T.from_numpy(X_train[start:end]).to(T.float32)
        y_true = T.Tensor(y_train[start:end]).to(T.float)

        y_pred = model(X)
        loss = loss_func(y_pred, y_true)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i * batch_size) % 2500 == 0:
            loss, current = loss.item(), (i + 1) * batch_size
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


for i in range(epochs):
    print(f"Epoch {i+1}\n-------------------------------")

    train_model(net, X_train, y_train, loss_func, optimizer)
    print('Finished training')
    test_model(net, X_test, y_test, loss_func)




Epoch 1
-------------------------------
loss: 0.002534  [   50/35700]
loss: 0.006014  [ 2550/35700]
loss: 0.002144  [ 5050/35700]
loss: 0.005779  [ 7550/35700]
loss: 0.001126  [10050/35700]
loss: 0.004376  [12550/35700]
loss: 0.004941  [15050/35700]
loss: 0.000882  [17550/35700]
loss: 0.005417  [20050/35700]
loss: 0.000394  [22550/35700]
loss: 0.005584  [25050/35700]
loss: 0.009057  [27550/35700]
loss: 0.011634  [30050/35700]
loss: 0.001576  [32550/35700]
loss: 0.000797  [35050/35700]
Finished training
Test Error: 
 Accuracy: 97.0%, Avg loss: 0.000001 

Epoch 2
-------------------------------
loss: 0.001613  [   50/35700]
loss: 0.005542  [ 2550/35700]
loss: 0.001630  [ 5050/35700]
loss: 0.006163  [ 7550/35700]
loss: 0.000919  [10050/35700]
loss: 0.003915  [12550/35700]
loss: 0.004621  [15050/35700]
loss: 0.000731  [17550/35700]
loss: 0.005036  [20050/35700]
loss: 0.000341  [22550/35700]
loss: 0.005205  [25050/35700]
loss: 0.008866  [27550/35700]
loss: 0.011167  [30050/35700]
loss: 0.00