In [1]:
import csv

import numpy as np

import os
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import torch.nn.functional as F

# from sklearn.model_selection import train_test_split
# RANDOM_SEED = 42

In [None]:
# TODO: add dataset
dataset = 'model/keypoint_classifier/keypoint.csv'
NUM_CLASSES = 4

In [3]:
class KeypointDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        """
        Args:
            csv_file (str): Path to the CSV file.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.data_frame = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # Extract the data row corresponding to the index
        row = self.data_frame.iloc[idx]
        # Assuming the first columns are features and the last column is the label
        sample = {
                    'label': torch.tensor(row[0], dtype=torch.long),
                    'features': torch.tensor(row[1:].values, dtype=torch.float32)
                }

        if self.transform:
            sample = self.transform(sample)

        return sample

In [4]:
csv_file_path = 'model/keypoint_classifier/keypoint.csv'
dataset = KeypointDataset(csv_file=csv_file_path)

In [5]:
# Split dataset into train and test sets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_data, test_data = random_split(dataset, [train_size, test_size])

In [6]:
train_dataloader = DataLoader(train_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)

In [7]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cpu device


In [10]:
class KeypointClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super().__init__()
        # super(KeypointClassifier, self).__init__()
        self.dropout1 = nn.Dropout(0.2)
        self.fc1 = nn.Linear(input_size, 20)
        self.dropout2 = nn.Dropout(0.4)
        self.fc2 = nn.Linear(20, 10)
        self.fc3 = nn.Linear(10, num_classes)

    def forward(self, x):
        x = self.dropout1(x)
        x = F.relu(self.fc1(x))
        x = self.dropout2(x)
        x = F.relu(self.fc2(x))
        x = F.softmax(self.fc3(x), dim=1)
        return x

class KeypointClassifierInference(nn.Module):
    def __init__(self, input_size, num_classes):
        super().__init__()
        # super(KeypointClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 20)
        self.fc2 = nn.Linear(20, 10)
        self.fc3 = nn.Linear(10, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.softmax(self.fc3(x), dim=1)
        return x

# Example usage
INPUT_SIZE = 21 * 2  # Number of input features
NUM_CLASSES = 4      # Adjust based on your use case

# model = KeypointClassifier(input_size=INPUT_SIZE, num_classes=NUM_CLASSES).to(device)
# print(model)

In [11]:
learning_rate = 1e-3
batch_size = 64
epochs = 10

In [12]:
model = KeypointClassifierInference(INPUT_SIZE, NUM_CLASSES)
model.load_state_dict(torch.load('keypoint_classifier_torch.pth', weights_only=True))
model.eval()

KeypointClassifierInference(
  (fc1): Linear(in_features=42, out_features=20, bias=True)
  (fc2): Linear(in_features=20, out_features=10, bias=True)
  (fc3): Linear(in_features=10, out_features=4, bias=True)
)

In [13]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [16]:
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    running_accuracy = 0.0

    for batch in train_dataloader:
        features = batch['features']
        labels = batch['label']

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(features)

        # Compute loss
        loss = loss_fn(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Update running metrics
        running_loss += loss.item() * features.size(0)
        _, predicted = torch.max(outputs, 1)
        running_accuracy += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_dataloader.dataset)
    epoch_accuracy = running_accuracy / len(train_dataloader.dataset)
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

  'label': torch.tensor(row[0], dtype=torch.long),


Epoch 1/10, Loss: 1.3667, Accuracy: 0.3271
Epoch 2/10, Loss: 1.3277, Accuracy: 0.3273
Epoch 3/10, Loss: 1.3044, Accuracy: 0.3318
Epoch 4/10, Loss: 1.2700, Accuracy: 0.4412
Epoch 5/10, Loss: 1.2221, Accuracy: 0.5230
Epoch 6/10, Loss: 1.1833, Accuracy: 0.5812
Epoch 7/10, Loss: 1.1461, Accuracy: 0.6223
Epoch 8/10, Loss: 1.1227, Accuracy: 0.6330
Epoch 9/10, Loss: 1.1115, Accuracy: 0.6471
Epoch 10/10, Loss: 1.0953, Accuracy: 0.6667


In [14]:
model.eval()

test_loss = 0.0
test_accuracy = 0.0

with torch.no_grad():
    for batch in test_dataloader:
        features = batch['features']
        labels = batch['label']

        # Forward pass
        outputs = model(features)

        # Compute loss
        loss = loss_fn(outputs, labels)

        # Update test metrics
        test_loss += loss.item() * features.size(0)
        _, predicted = torch.max(outputs, 1)
        test_accuracy += (predicted == labels).sum().item()

test_loss /= len(test_dataloader.dataset)
test_accuracy /= len(test_dataloader.dataset)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


  'label': torch.tensor(row[0], dtype=torch.long),


Test Loss: 1.3860, Test Accuracy: 0.3455


In [None]:
model_save_path = 'keypoint_classifier_torch.pth'
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")

In [21]:
model.eval()
with torch.no_grad():
    samples = next(iter(test_dataloader))
    features, labels = samples['features'], samples['label']
    outputs = model(features)

  'label': torch.tensor(row[0], dtype=torch.long),


In [None]:
model.eval()
with torch.no_grad():
    sample = next(iter(test_data))
    features, label = sample['features'], sample['label']
    features = features.unsqueeze(0)
    outputs = model(features)
_, predicted = torch.max(outputs, 1)
predicted.item()

  'label': torch.tensor(row[0], dtype=torch.long),


In [15]:
from torchconverter import TracedModule

In [16]:
sample = next(iter(test_data))
features, label = sample['features'], sample['label']
features = features.unsqueeze(0)

  'label': torch.tensor(row[0], dtype=torch.long),


In [None]:
m = TracedModule(model)
output = m.forward(torch.ones_like(features))
m.print_graph()
output
m.convert()


placeholder: x
call module: fc1 fc1
call function: relu <function relu at 0x10a59bc70> (fc1,)
call module: fc2 fc2
call function: relu_1 <function relu at 0x10a59bc70> (fc2,)
call module: fc3 fc3
call function: softmax <function softmax at 0x10a5a44c0> (fc3,)
output: output torch.Size([1, 4]) torch.float32
opcode         name     target                             args        kwargs
-------------  -------  ---------------------------------  ----------  -------------------------------------------
placeholder    x        x                                  ()          {}
call_module    fc1      fc1                                (x,)        {}
call_function  relu     <function relu at 0x10a59bc70>     (fc1,)      {'inplace': False}
call_module    fc2      fc2                                (relu,)     {}
call_function  relu_1   <function relu at 0x10a59bc70>     (fc2,)      {'inplace': False}
call_module    fc3      fc3                                (relu_1,)   {}
call_function  softmax 

tensor([[0.2134, 0.3199, 0.2371, 0.2295]], grad_fn=<SoftmaxBackward0>)