In [1]:
import torch
import pickle
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import numpy as np
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [2]:
data_folder = "../data"
print("Using data folder:", data_folder)
print("Files in folder:", os.listdir(data_folder))
labels = [i.split(".")[0] for i in os.listdir(data_folder) if i.endswith(".pkl")]
print("Labels found:", labels)
print("Number of labels:", len(labels))

Using data folder: ../data
Files in folder: ['background_recording.pkl', 'action_left_recording.pkl', 'action_down_recording.pkl']
Labels found: ['background_recording', 'action_left_recording', 'action_down_recording']
Number of labels: 3


In [3]:
def open_pickle(file):
    with open(file, "rb") as f:
        return pickle.load(f)


background = open_pickle(os.path.join(data_folder, "background_recording.pkl"))
action_left = open_pickle(os.path.join(data_folder, "action_left_recording.pkl"))
action_down = open_pickle(os.path.join(data_folder, "action_down_recording.pkl"))

print(f"Number of samples in background: {len(background)}")
print(f"Number of samples in action_left: {len(action_left)}")
print(f"Number of samples in action_down: {len(action_down)}")

Number of samples in background: 47
Number of samples in action_left: 39
Number of samples in action_down: 51


In [4]:
# Crop each sample to have size [100, 8]
def crop_sample(data, size=(100, 8)):
    new_data = []
    for sample in data:
        if sample.shape[0] < size[0]:
            new_sample = np.zeros((size[0], size[1]))
            new_sample[: sample.shape[0], : sample.shape[1]] = sample
            new_data.append(new_sample)
        else:
            new_data.append(sample[: size[0], : size[1]])
    return np.array(new_data)


background = crop_sample(background)
action_left = crop_sample(action_left)
action_down = crop_sample(action_down)

print(f"Shape of background: {background.shape}")
print(f"Shape of action_left: {action_left.shape}")
print(f"Shape of action_down: {action_down.shape}")

Shape of background: (47, 100, 8)
Shape of action_left: (39, 100, 8)
Shape of action_down: (51, 100, 8)


In [5]:
background_labels = np.zeros((background.shape[0], 1))
action_left_labels = np.ones((action_left.shape[0], 1))
action_down_labels = np.full((action_down.shape[0], 1), 2)

# Combine all data and labels
data = np.concatenate((background, action_left, action_down), axis=0)
labels = np.concatenate(
    (background_labels, action_left_labels, action_down_labels), axis=0
)
print(f"Shape of data: {data.shape}")
print(f"Shape of labels: {labels.shape}")

# Remove first two columns from each sample (axis=2)
data = data[:, :, 2:]
print(f"Shape of data after removing first two columns: {data.shape}")

# Flatten the data so that each sample is a 1D array
data = data.reshape(data.shape[0], -1)
print(f"Shape of data after flattening: {data.shape}")

# Standardize the data
scaler = StandardScaler()
data = scaler.fit_transform(data)
print(f"Shape of data after standardization: {data.shape}")

X_train, X_test, y_train, y_test = train_test_split(
    data, labels, test_size=0.2, random_state=42
)
print(f"Shape of X_train: {X_train.shape}")
print(f"Shape of X_test: {X_test.shape}")
print(f"Shape of y_train: {y_train.shape}")
print(f"Shape of y_test: {y_test.shape}")

Shape of data: (137, 100, 8)
Shape of labels: (137, 1)
Shape of data after removing first two columns: (137, 100, 6)
Shape of data after flattening: (137, 600)
Shape of data after standardization: (137, 600)
Shape of X_train: (109, 600)
Shape of X_test: (28, 600)
Shape of y_train: (109, 1)
Shape of y_test: (28, 1)


In [10]:
# But data into tensor dataset from torch data and then dataloaders
trainset = torch.utils.data.TensorDataset(
    torch.tensor(X_train, dtype=torch.float32),
    torch.tensor(y_train, dtype=torch.float32),
)
testset = torch.utils.data.TensorDataset(
    torch.tensor(X_test, dtype=torch.float32),
    torch.tensor(y_test, dtype=torch.float32),
)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=2, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=2, shuffle=False)

# Simple Dense Model

In [11]:
class SimpleNN(nn.Module):
    def __init__(self, input_size):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 10)
        self.fc2 = nn.Linear(10, 3)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


# Create the model
input_size = X_train.shape[1]
model = SimpleNN(input_size)
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.1)
epochs = 10

model

SimpleNN(
  (fc1): Linear(in_features=600, out_features=10, bias=True)
  (fc2): Linear(in_features=10, out_features=3, bias=True)
)

In [12]:
# train loop and test loop, calculate accuracy and loss
def test_input_sizes(trainloader, testloader):
    for inputs, labels in trainloader:
        print(f"Train input size: {inputs.size()}")
        print(f"Train label size: {labels.size()}")
        break
    for inputs, labels in testloader:
        print(f"Test input size: {inputs.size()}")
        print(f"Test label size: {labels.size()}")
        break


def one_hot_target(target_class, predicted_shape):
    # Ensure target_class is a torch tensor and flatten it
    target_class = target_class.flatten().long()
    num_classes = predicted_shape[-1]
    num_samples = target_class.shape[0]

    one_hot = torch.zeros((num_samples, num_classes), device=target_class.device)
    one_hot[torch.arange(num_samples), target_class] = 1

    return one_hot


def train(model, trainloader, criterion, optimizer, epochs):
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        correct = 0
        total = 0
        for inputs, labels in trainloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, one_hot_target(labels, outputs.shape))
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            # Calculate accuracy for the epoch
            preds = outputs.argmax(dim=1)
            correct += (preds == labels.flatten()).sum().item()
            total += labels.size(0)
        epoch_accuracy = correct / total
        print(
            f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss / len(trainloader)}, Accuracy: {epoch_accuracy * 100:.2f}%"
        )


def test(model, testloader):
    model.eval()
    total = 0
    correct = 0
    with torch.no_grad():
        for inputs, labels in testloader:
            outputs = model(inputs)
            loss = criterion(outputs, one_hot_target(labels, outputs.shape))
            # Calculate accuracy
            preds = outputs.argmax(dim=1)

            total += labels.size(0)
            correct += (preds == labels.flatten()).sum().item()

            # print(
            #     f"Batch accuracy: {(preds == labels.flatten()).sum().item() / labels.size(0) * 100:.2f}%"
            # )

    accuracy = correct / total
    print(f"Accuracy: {accuracy * 100:.2f}%")


# Check input sizes
test_input_sizes(trainloader, testloader)
# Train the model
train(model, trainloader, criterion, optimizer, epochs)
# Test the model
test(model, testloader)

Train input size: torch.Size([2, 600])
Train label size: torch.Size([2, 1])
Test input size: torch.Size([2, 600])
Test label size: torch.Size([2, 1])
Epoch 1/10, Loss: 2.354554326348594, Accuracy: 81.65%
Epoch 2/10, Loss: 1.4692316018502674, Accuracy: 96.33%
Epoch 3/10, Loss: 0.39677366042372036, Accuracy: 95.41%
Epoch 4/10, Loss: 0.18581341956874398, Accuracy: 96.33%
Epoch 5/10, Loss: 0.08371303511971306, Accuracy: 97.25%
Epoch 6/10, Loss: 0.08601478717543837, Accuracy: 97.25%
Epoch 7/10, Loss: 0.08387824402498956, Accuracy: 97.25%
Epoch 8/10, Loss: 0.0828456576168801, Accuracy: 97.25%
Epoch 9/10, Loss: 0.08340369000821216, Accuracy: 97.25%
Epoch 10/10, Loss: 0.08398327000147442, Accuracy: 97.25%
Accuracy: 100.00%


In [13]:
torch.save(model.state_dict(), "../models/simple_nn_model.pth")
with open("../models/scaler.pkl", "wb") as f:
    pickle.dump(scaler, f)