Train a LSTM model to predict if the detected pose is running

In [None]:
import sys
sys.path.append("..")

In [None]:
import os
import random
import pandas as pd
import numpy as np
from tqdm import tqdm
from pathlib import Path
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt

import torch
from torch.utils.data import Dataset, DataLoader

from model_classes.RNN.v1 import RNNModel
from action_classifier import OUTPUT_LABELS

from typing import Literal

In [None]:
# Prevent "RuntimeError: CUDA error: device-side assert triggered"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

# Load data

In [None]:
POSE_OUTPUT_LABELS = [
    "nose_x", "nose_y",
    "left_eye_x", "left_eye_y",
    "right_eye_x", "right_eye_y",
    "left_ear_x", "left_ear_y",
    "right_ear_x", "right_ear_y",
    "left_shoulder_x", "left_shoulder_y",
    "right_shoulder_x", "right_shoulder_y",
    "left_elbow_x", "left_elbow_y",
    "right_elbow_x", "right_elbow_y",
    "left_wrist_x", "left_wrist_y",
    "right_wrist_x", "right_wrist_y",
    "left_hip_x", "left_hip_y",
    "right_hip_x", "right_hip_y",
    "left_knee_x", "left_knee_y",
    "right_knee_x", "right_knee_y",
    "left_ankle_x", "left_ankle_y",
    "right_ankle_x", "right_ankle_y",
]

MODEL_INPUT_LABELS = [
    "right_shoulder_x", "right_shoulder_y",
    "left_elbow_x", "left_elbow_y",
    "right_elbow_x", "right_elbow_y",
    "left_wrist_x", "left_wrist_y",
    "right_wrist_x", "right_wrist_y",
    "left_hip_x", "left_hip_y",
    "right_hip_x", "right_hip_y",
    "left_knee_x", "left_knee_y",
    "right_knee_x", "right_knee_y",
    "left_ankle_x", "left_ankle_y",
    "right_ankle_x", "right_ankle_y",
]

In [None]:
POSE_MODEL = "ultralytics"

In [None]:
LABELS = {label: i for i, label in enumerate(OUTPUT_LABELS)}

In [None]:
LABELS

In [None]:
def _get_dataset_action(filename: str|Path):
    """Get the action label from the filename."""
    filename = str(filename)
    if "walking" in filename:
        return "walking"
    elif "running" in filename:
        return "running"
    else:
        raise ValueError(f"Unknown action label in filename: {filename}")

def _process_dataset(dataset: pd.DataFrame, action: Literal["walking", "running"]) -> pd.DataFrame:
    """Process the loaded dataset, assigning action labels to each frame. """
    # assign label
    dataset = dataset.assign(action=LABELS[action])
    # If the frame has no detection, assign as no_detection, deprecated due to the overwhelming of no_detection in the dataset
    # dataset.loc[(dataset[POSE_OUTPUT_LABELS] == 0).any(axis="columns"), "action"] = LABELS["no_detection"]

    # drop all no_detection frames
    dataset = dataset.loc[(dataset[POSE_OUTPUT_LABELS]!=0).any(axis="columns")]
    return dataset


def load_KTH_datasets(pose_model: Literal["mediapipe", "ultralytics"], status: Literal["raw", "normalized"]) -> list[pd.DataFrame]:
    """KTH dataset is recorded in 25 FPS, each frame contains the detected pose keypoint coordinates, all 0 if no person is detected."""
    dataset_dir = Path.cwd().parent / "TrainingData" / pose_model / status # this notebook is within model_training/, need to go back one level
    dataset_list = []
    for parquet_filename in tqdm(dataset_dir.iterdir(), desc=f"Loading {pose_model} datasets"):
        if not parquet_filename.suffix == ".parquet":
            continue
        action = _get_dataset_action(parquet_filename)
        if not any(action in label for label in OUTPUT_LABELS): # ignore parquet files that are not used in training
            continue
        dataset = pd.read_parquet(parquet_filename)
        dataset = _process_dataset(dataset, _get_dataset_action(parquet_filename))
        dataset_list.append(dataset)

    return dataset_list

In [None]:
dataset_list = load_KTH_datasets(POSE_MODEL, "normalized")
random.shuffle(dataset_list) # shuffle the dataset so videos dataframes of different actions are not next to each other

In [None]:
dataset = pd.concat(dataset_list)

In [None]:
dataset.groupby("action").count()

In [None]:
# no shuffle for time series data
X_train, X_test, y_train, y_test = train_test_split(dataset[MODEL_INPUT_LABELS], dataset["action"], test_size=0.2, shuffle=False)
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.5, shuffle=False)

# Define pytorch dataset

In [None]:
class PoseActionDataset(Dataset):
    """We maintain a sliding window of certain length as model input shape"""
    def __init__(self, X: torch.Tensor, y: torch.Tensor, window_length: int):
        self.X = X
        self.y = y
        self.window_length = window_length

    def __len__(self):
        return len(self.X) - self.window_length

    def __getitem__(self, idx):
        return self.X[idx:idx+self.window_length], self.y[idx+self.window_length]

# Prepare data and model for training

In [None]:
WINDOW_LENGTH = 50 # classifying every 50 frames window (2s)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
batch_size = 2**6
n_epochs = 30
lr = 1e-2

input_dim = len(MODEL_INPUT_LABELS)
hidden_dim = 32
layer_dim = 1
output_dim = len(LABELS)

In [None]:
X_train_tensor = torch.from_numpy(X_train.values).float().to(device)
y_train_tensor = torch.from_numpy(y_train.values).to(device)
y_train_tensor = torch.nn.functional.one_hot(y_train_tensor, num_classes=len(LABELS)).float()

X_val_tensor = torch.from_numpy(X_val.values).float().to(device)
y_val_tensor = torch.from_numpy(y_val.values).to(device)
y_val_tensor = torch.nn.functional.one_hot(y_val_tensor, num_classes=len(LABELS)).float()

X_test_tensor = torch.from_numpy(X_test.values).float().to(device)
y_test_tensor = torch.from_numpy(y_test.values).to(device)
y_test_tensor = torch.nn.functional.one_hot(y_test_tensor, num_classes=len(LABELS)).float()

In [None]:
train_dataset = PoseActionDataset(X_train_tensor, y_train_tensor, WINDOW_LENGTH)
val_dataset = PoseActionDataset(X_val_tensor, y_val_tensor, WINDOW_LENGTH)
test_dataset = PoseActionDataset(X_test_tensor, y_test_tensor, WINDOW_LENGTH)

In [None]:
# no shuffle for time series data
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

In [None]:
model = RNNModel(input_dim=input_dim, hidden_dim=hidden_dim, layer_dim=layer_dim, output_dim=output_dim).to(device)
criterion = torch.nn.CrossEntropyLoss()

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# Start Training

In [None]:
model.train()

In [None]:
training_losses = []
for epoch in range(n_epochs):
    epoch_loss = 0
    for i, (pose_seq, labels) in enumerate(train_dataloader):
        optimizer.zero_grad()

        # the data were already put to corresponding device
        outputs = model(pose_seq)
        loss = criterion(outputs, labels)
        loss.backward()

        optimizer.step()
        epoch_loss += loss.item()
    scheduler.step()

    val_loss = 0
    with torch.no_grad():
        for i, (pose_seq, labels) in enumerate(val_dataloader):
            outputs = model(pose_seq)

            loss = criterion(outputs, labels)
            val_loss += loss.item()

    training_losses.append(epoch_loss / len(train_dataloader))
    print(f'Epoch [{epoch+1}/{n_epochs}], Training Loss: {epoch_loss / len(train_dataloader):.8f}, Validation Loss: {val_loss / len(val_dataloader):.8f}')
    # break

In [None]:
plt.plot(training_losses, label="Training Loss")
plt.title("Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
model.eval()
testing_loss = 0
with torch.no_grad():
    for i, (pose_seq, labels) in enumerate(test_dataloader):
        # the data were already put to corresponding device
        outputs = model(pose_seq)

        loss = criterion(outputs, labels)
        testing_loss += loss.item()

print("Testing Loss: ", testing_loss / len(test_dataloader))


In [None]:
model_save_path = Path.cwd().parent / "models" / "RNN" / f"{input_dim}-{hidden_dim}-{layer_dim}-{output_dim}_WIN-{WINDOW_LENGTH}_EPOCH-{n_epochs}_LR-{lr}.pt"

In [None]:
torch.save(model.state_dict(), model_save_path)