# POSEIDON: Pose Estimation & Activity Recognition using GNNs

Team Members (Group 16): 
1. Chong Jun Rong Brian (A0290882U)
2. Parashara Ramesh (A0285647M)
3. Ng Wei Jie Brandon (A0184893L)

In [None]:
%load_ext autoreload
%autoreload 2

<h2><u> Table of contents </u></h2>

1. What is this project about?
<br> 1.1. Project Motivation
<br> 1.2. Project Description
<br> 1.3. Project Setup   
2. Understanding the Human 3.6M Dataset
3. Dataset preparation
4. Models
5. Baseline 1 - SimplePose (Simple ML model without using GNNs)
6. Baseline 2 - SimplePoseGNN (Simple ML model using GNNs) 
7. Improvement 1 - SemGCN model (Reimplementation of Semantic GCN)
8. Improvement 2 - PoseGCN model (Tweaks of SemGCN)
9. Evaluation & Analysis of models
10. Creating our own custom dataset
11. Evaluation on custom dataset
12. Conclusion
13. Video presentation & Resources


<h2><u>1. What is this project about?</u></h2>
<h3><u>1.1 Project Motivation</u></h3>

Accurately predicting 3D human poses from 2D keypoints is a critical task for many applications such as motion capture and activity recognition. Traditional methods that use direct regression or lifting techniques often struggle to fully capture the complex spatial relationships between body joints. By treating the 2D pose keypoints as graphs, we can leverage the underlying connectivity between joints to improve the 3D pose estimation. Additionally, recognizing and classifying human activities from these poses is an essential task in fields like surveillance and healthcare. Therefore, this project seeks to explore how GNNs can enhance 3D pose estimation and activity recognition.

<h3><u>1.2 Project Description</h3></u>

The primary objective of this project is to predict 3D human poses from 2D pose keypoints accurately using GNNs. 
* Firstly, we will develop two baseline models: one using standard Neural Network (NN) & Convolutional Neural Network (CNN) followed by a simple GNN based model both for 3d pose estimation 
* Secondly, we will reimplement the SemGCN model, which treats the body joints of a 2D pose as nodes in a graph, with edges representing the connectivity between them. 
* Finally, we will design an improved version of the SemGCN model by exploring different GNN architectures and modifications to enhance its performance.

The secondary objective is to classify human activities based on 2D pose keypoints. We will use custom datasets to validate this task, allowing us to assess the generalization capabilities of GNN-based models for activity recognition.

<h3><u>1.3 Project Setup</u></h3>

1. Install the dependencies from requirements.txt (TODO.all to fix later)


In [1]:
import os
os.environ["DGLBACKEND"] = "pytorch" # DGL Settings
import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
import networkx as nx
import numpy as np
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from dgl.data import DGLDataset
from utils.visualization_utils import visualize_2d_pose, visualize_3d_pose

<h2><u>2. Understanding the Human 3.6M Dataset</u></h2>

TODO.brandon :
- Add a small brief about the human 3.6M dataset + how we plan to use it
- add h3 tags for each subsection + make changes in table of contents + only present the story points here, main code can go to appropriate folders

EDA points will also come here as individual cell blocks but will be called as one function directly in the dataset preparation

In [None]:
poses_2d = np.load("datasets/h36m/Processed/test_2d_poses.npy")
poses_3d = np.load("datasets/h36m/Processed/test_3d_poses.npy")
visualize_2d_pose(poses_2d[0])
visualize_3d_pose(poses_3d[0], elev=110, azim=90)

<h2><u>3. Dataset preparation </u></h2>

TODO.parash: create it once, and then mention that the code blocks under this section need not be run as they are present in this (drive/sharepoint folder)?

<h2><u>4. Models</u></h2>

TODO.all - write a convincing story on our approach + high level thoughts on why the following models are worth building and what we hope to gain from it

In [18]:
# Simple Model based on A Simple yet effective baseline for 3D Pose Estimation
class LinearBaselineModel(nn.Module):
    
    def __init__(self, total_joints, total_actions):
        super().__init__()
        self.total_joints = total_joints,
        self.total_actions = total_actions
        self.input_linear = nn.Linear(total_joints * 2, 1024) # 1d input shape is B x 16 x 2
        self.block1 = nn.Sequential(
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
        )
        self.block2 = nn.Sequential(
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
        )
        self.output_3d_pose_linear = nn.Linear(1024, total_joints * 3) # 3D output shape is B x 16 x 3
        self.output_label_linear = nn.Linear(1024, total_actions) # Predict Action Labels
        
    def forward(self, x):
        x = x.view(x.shape[0], -1)
        x = self.input_linear(x)
        x = self.block1(x) + x # First Residual connection
        x = self.block2(x) + x # Second Residual Connection
        three_dim_pose_predictions = self.output_3d_pose_linear(x)
        action_label_predictions = self.output_label_linear(x)
        joint_preds = three_dim_pose_predictions.view(x.shape[0], -1, 3)
        action_preds = action_label_predictions
        return joint_preds, action_preds   # 3D output shape is B x 16 x 3
        

<h2><u>5. Baseline 1 - SimplePose (Simple ML model without using GNNs)</u></h2>

TODO.brian

In [19]:
# Dataset Class
class Human36MDataset(Dataset):
    def __init__(self, two_dim_dataset_path, three_dim_dataset_path, label_dataset_path):
        self.two_dim_dataset_path = two_dim_dataset_path
        self.three_dim_dataset_path = three_dim_dataset_path
        self.label_dataset_path = label_dataset_path
        self.input_data = np.load(self.two_dim_dataset_path)
        self.output_data = np.load(self.three_dim_dataset_path)
        self.labels = np.load(self.label_dataset_path)
        unique_labels, tags = np.unique(self.labels, return_inverse=True)
        self.unique_labels = unique_labels
        self.labels = tags
        self.labels_map = dict(zip(range(len(unique_labels)),unique_labels))
        assert len(self.input_data) == len(self.labels) == len(self.output_data)
    
    def get_action_numbers(self):
        return len(self.unique_labels)
    
    def get_joint_numbers(self):
        return self.input_data[0].shape[0]
    
    def __len__(self):
        return len(self.input_data)
    
    def __getitem__(self, index):
        return self.input_data[index], self.output_data[index], self.labels[index]

In [None]:

# Parameters
LEARNING_RATE = 1e-3
BATCH_SIZE = 64
NUM_EPOCHS = 200
DEVICE = 'cuda' if torch.cuda.is_available() else ('mps' if torch.backends.mps.is_available() else 'cpu') 

training_2d_dataset_path = os.path.join('datasets', 'h36m', 'Processed', 'train_2d_poses.npy')
training_3d_dataset_path = os.path.join('datasets', 'h36m', 'Processed', 'train_3d_poses.npy')
training_label_path  = os.path.join('datasets', 'h36m', 'Processed', 'train_actions.npy')
training_data = Human36MDataset(training_2d_dataset_path, training_3d_dataset_path, training_label_path)
train_dataloader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True)
testing_2d_dataset_path = os.path.join('datasets', 'h36m', 'Processed', 'test_2d_poses.npy')
testing_3d_dataset_path = os.path.join('datasets', 'h36m', 'Processed', 'test_3d_poses.npy')
testing_label_path  = os.path.join('datasets', 'h36m', 'Processed', 'test_actions.npy')
testing_data = Human36MDataset(testing_2d_dataset_path, testing_3d_dataset_path, testing_label_path)
test_dataloader = DataLoader(testing_data, batch_size=BATCH_SIZE, shuffle=True)

def kaiming_weights_init(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.kaiming_normal_(m.weight)

TOTAL_JOINTS = training_data.get_joint_numbers()
TOTAL_ACTIONS = training_data.get_action_numbers()

# Declare Model
model = LinearBaselineModel(TOTAL_JOINTS, TOTAL_ACTIONS).to(DEVICE)
# Apply Kaiming Init on Linear Layers
model.apply(kaiming_weights_init)

print(f"Model Parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

# Declare Optimizer
optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)

# Declare Scheduler
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.96) # Value used by the original authors

# Loss Function
three_dim_pose_estimation_loss_fn = nn.MSELoss()
action_label_loss_fn = nn.CrossEntropyLoss()

# Training Loop
global_total_losses = []
global_pose_losses = []
global_action_losses = []
for epoch in tqdm(range(NUM_EPOCHS)):
    predicted_labels = None
    true_labels = None
    total_losses = []
    pose_losses = []
    action_losses = []
    for data in tqdm(train_dataloader):
        # Prepare Data
        two_dim_input_data, three_dim_output_data, action_labels= data
        two_dim_input_data = two_dim_input_data.to(DEVICE)
        three_dim_output_data = three_dim_output_data.to(DEVICE)
        action_labels = action_labels.to(DEVICE)
        # Set Gradients to 0
        optimizer.zero_grad()
        # Train Model
        predicted_3d_pose_estimations, predicted_action_labels = model(two_dim_input_data)
        # Calculate Loss
        three_dim_pose_estimation_loss = three_dim_pose_estimation_loss_fn(predicted_3d_pose_estimations, three_dim_output_data)
        action_label_loss = action_label_loss_fn(predicted_action_labels, action_labels)
        loss = three_dim_pose_estimation_loss + action_label_loss
        if epoch % 10 == 0: # Every 10 epochs, report once
            # Store Results
            total_losses.append(loss)
            pose_losses.append(three_dim_pose_estimation_loss)
            action_losses.append(action_label_loss)
            predicted_action_labels = torch.argmax(predicted_action_labels, axis=1)
            predicted_labels = predicted_action_labels if predicted_labels is None else torch.cat((predicted_labels, predicted_action_labels), axis=0)
            true_labels = action_labels if true_labels is None else torch.cat((true_labels, action_labels), axis=0)
        # Optimize Gradients and Update Learning Rate
        loss.backward()
        optimizer.step()
        scheduler.step()

    # Every 10 epochs, report once
    if epoch % 10 == 0:
        correct_predictions = (predicted_labels == true_labels).sum().item()
        accuracy = correct_predictions / predicted_labels.size(0) * 100
        total_loss = sum(total_losses) / len(train_dataloader)
        pose_loss = sum(pose_losses) / len(train_dataloader)
        action_loss = sum(action_losses) / len(train_dataloader)
        global_total_losses.append(total_loss)
        global_action_losses.append(action_loss)
        global_pose_losses.append(pose_loss)
        print(f"Epoch: {epoch} | Total Training Loss: {total_loss} | Pose Training Loss: {pose_loss} | Action Training Loss: {action_loss} | Action Train Label Accuracy: {accuracy}")
    
    if epoch % 10 == 0: # Every 10 epochs, test once
        predicted_labels = None
        true_labels = None
        total_losses = []
        pose_losses = []
        action_losses = []
        for data in tqdm(test_dataloader):
            # Prepare Data
            two_dim_input_data, three_dim_output_data, action_labels= data
            two_dim_input_data = two_dim_input_data.to(DEVICE)
            three_dim_output_data = three_dim_output_data.to(DEVICE)
            action_labels = action_labels.to(DEVICE)
            # Predict with model
            predicted_3d_pose_estimations, predicted_action_labels = model(two_dim_input_data)
            # Calculate Loss
            three_dim_pose_estimation_loss = three_dim_pose_estimation_loss_fn(predicted_3d_pose_estimations, three_dim_output_data)
            action_label_loss = action_label_loss_fn(predicted_action_labels, action_labels)
            loss = three_dim_pose_estimation_loss + action_label_loss
            # Store Results
            total_losses.append(loss)
            pose_losses.append(three_dim_pose_estimation_loss)
            action_losses.append(action_label_loss)
            predicted_action_labels = torch.argmax(predicted_action_labels, axis=1)
            predicted_labels = predicted_action_labels if predicted_labels is None else torch.cat((predicted_labels, predicted_action_labels), axis=0)
            true_labels = action_labels if true_labels is None else torch.cat((true_labels, action_labels), axis=0)
        
        # Calculate Test Accuracy
        correct_predictions = (predicted_labels == true_labels).sum().item()
        accuracy = correct_predictions / predicted_labels.size(0) * 100
        total_loss = sum(total_losses) / len(train_dataloader)
        pose_loss = sum(pose_losses) / len(train_dataloader)
        action_loss = sum(action_losses) / len(train_dataloader)
        print(f"Epoch: {epoch} | Total Testing Loss: {total_loss} | Pose Testing Loss: {pose_loss} | Action Testing Loss: {action_loss} | Action Test Label Accuracy: {accuracy}")
    
# Save model
state_dict = {
    'optimizer': optimizer.state_dict(),
    'model': model.state_dict(),
    'scheduler': scheduler.state_dict(),
}
weight_save_path = os.path.join('weights', 'linear_baseline_model')
if not os.path.exists(weight_save_path):
    os.makedirs(weight_save_path)

torch.save(state_dict, os.path.join(weight_save_path, 'weights.pth'))

<h2><u>6. Baseline 2 - SimplePoseGNN (Simple ML model using GNNs)</u></h2>

TODO.brandon

In [48]:
# Convert 2D Pose Dataset to 3D
# Tutorial: https://docs.dgl.ai/tutorials/blitz/6_load_data.html
# Source: https://arxiv.org/pdf/1904.03345 Appendix A

class Human36MGraphDataset(DGLDataset):
    def __init__(self):
        super().__init__(name="human_3.6m")
        
    def process(self):
        training_2d_dataset_path = os.path.join('datasets', 'h36m', 'Processed', 'train_2d_poses.npy')
        training_3d_dataset_path = os.path.join('datasets', 'h36m', 'Processed', 'train_3d_poses.npy')
        training_label_path  = os.path.join('datasets', 'h36m', 'Processed', 'train_actions.npy')
        # Datasets
        two_dim_dataset = np.load(training_2d_dataset_path)
        three_dim_dataset = np.load(training_3d_dataset_path)
        label_dataset = np.load(training_label_path)
        assert len(two_dim_dataset) == len(three_dim_dataset) == len(label_dataset)
        # Edge Connections [Source & Destination] <-- Human Body Structure
        human_pose_edge_src = torch.LongTensor([0, 0, 0, 1, 2, 4, 5, 7, 8, 8, 8, 10, 11, 8, 13, 14])
        human_pose_edge_dst = torch.LongTensor([1, 4, 7, 2, 3, 5, 6, 8, 10, 13, 9, 11, 12, 13, 14, 15])
        # # Edge Features
        # for index in range(len(two_dim_dataset)):
        #     two_dim_data, three_dim_data, label = two_dim_dataset[index], three_dim_dataset[index], label_dataset[index]
        #     print(two_dim_data.shape, three_dim_data.shape, label.shape)
            
        
        self.graph = dgl.graph(
            (human_pose_edge_src, human_pose_edge_dst)
        )
        
        self.graph.ndata["feat"] = torch.Tensor(two_dim_dataset[0])
        
    def __getitem__(self, idx):
        return self.graph
    
    def __len__(self):
        return 1
    
graph = Human36MGraphDataset()
graph[0]

Graph(num_nodes=16, num_edges=16,
      ndata_schemes={'feat': Scheme(shape=(2,), dtype=torch.float32)}
      edata_schemes={})

In [None]:
urllib.request.urlretrieve(
    "https://data.dgl.ai/tutorial/dataset/graph_edges.csv", "./graph_edges.csv"
)
urllib.request.urlretrieve(
    "https://data.dgl.ai/tutorial/dataset/graph_properties.csv",
    "./graph_properties.csv",
)
edges = pd.read_csv("./graph_edges.csv")
properties = pd.read_csv("./graph_properties.csv")

edges.head()

properties.head()


class SyntheticDataset(DGLDataset):
    def __init__(self):
        super().__init__(name="synthetic")

    def process(self):
        edges = pd.read_csv("./graph_edges.csv")
        properties = pd.read_csv("./graph_properties.csv")
        self.graphs = []
        self.labels = []

        # Create a graph for each graph ID from the edges table.
        # First process the properties table into two dictionaries with graph IDs as keys.
        # The label and number of nodes are values.
        label_dict = {}
        num_nodes_dict = {}
        for _, row in properties.iterrows():
            label_dict[row["graph_id"]] = row["label"]
            num_nodes_dict[row["graph_id"]] = row["num_nodes"]

        # For the edges, first group the table by graph IDs.
        edges_group = edges.groupby("graph_id")

        # For each graph ID...
        for graph_id in edges_group.groups:
            # Find the edges as well as the number of nodes and its label.
            edges_of_id = edges_group.get_group(graph_id)
            src = edges_of_id["src"].to_numpy()
            dst = edges_of_id["dst"].to_numpy()
            num_nodes = num_nodes_dict[graph_id]
            label = label_dict[graph_id]

            # Create a graph and add it to the list of graphs and labels.
            g = dgl.graph((src, dst), num_nodes=num_nodes)
            self.graphs.append(g)
            self.labels.append(label)

        # Convert the label list to tensor for saving.
        self.labels = torch.LongTensor(self.labels)

    def __getitem__(self, i):
        return self.graphs[i], self.labels[i]

    def __len__(self):
        return len(self.graphs)


dataset = SyntheticDataset()
graph, label = dataset[0]
print(graph, label)

<h2><u>7. Improvement 1 - SemGCN model (Reimplementation of Semantic GCN)</u></h2>

TODO.parash


<h2><u>8. Improvement 2 - PoseGCN model (Tweaks of SemGCN)</u></h2>

TODO.all

<h2><u>9. Evaluation & Analysis of models<u></h2>

<h2><u>10. Creating our own custom dataset</u></h2>

<h2><u>11. Evaluation on custom dataset</u></h2>

<h2><u>12. Conclusion</u></h2>

<h2><u>13. Video presentation & Resources</u></h2>