In [1]:
# -------------------------------------------------------------
# 1. IMPORT REQUIRED LIBRARIES
# -------------------------------------------------------------
# These libraries handle:
# - file reading (os, glob, Path)
# - numerical and data manipulation (numpy, pandas)
# - machine learning preprocessing (scikit-learn)
# - model building and saving (pickle)
# -------------------------------------------------------------

import os, re, glob, pickle
import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline

print("Libraries imported successfully yay!")

Libraries imported successfully yay!


In [2]:
# -------------------------------------------------------------
# 2. LOAD AND VERIFY DATASET
# -------------------------------------------------------------
# The dataset comes from the CleanedKARDDataset ZIP you uploaded.
# Each CSV file represents one action clip containing 3D joint positions
# in real-world coordinates (x, y, z) for different body joints.
# -------------------------------------------------------------


# Change this path if your folder is stored elsewhere
DATA_DIR = Path("CleanedKARDDataset/RealWorldCoordinates")  # change if needed

# Make sure the folder exists before continuing
assert DATA_DIR.exists(), f"Data dir not found: {DATA_DIR}"


# Collect all CSV files in the folder
files = sorted(glob.glob(str(DATA_DIR / "*.csv")))
print("Found files:", len(files))

# Preview first few file names
print(files[:3])


Found files: 180
['CleanedKARDDataset\\RealWorldCoordinates\\a01_s01_e01_realworld.csv', 'CleanedKARDDataset\\RealWorldCoordinates\\a01_s02_e01_realworld.csv', 'CleanedKARDDataset\\RealWorldCoordinates\\a01_s03_e01_realworld.csv']


In [4]:
# -------------------------------------------------------------
# 3. PARSE SEQUENCES INTO ARRAYS
# -------------------------------------------------------------
# Each CSV lists joint coordinates for multiple frames.
# Reconstructing the frames so each file becomes:
#     seq shape = (T, J, 3)
# where:
#   T = number of frames
#   J = number of joints
#   3 = x, y, z coordinates
# -------------------------------------------------------------

def parse_sequence(csv_path: str):
    """
    Convert a single CSV into a 3D numpy array [frames, joints, coords].
    'Head' rows are used to detect new frames in the CSV stream.
    Returns:
        seq  -> np.ndarray of shape (T, J, 3)
        joint_order -> list of joint names in consistent order
    """
    
    df = pd.read_csv(csv_path)
    frames = []     # stores frame DataFrames
    current = []    # collects rows for current frame

     # Every time we see a "Head" joint, assume a new frame begins
    for _, row in df.iterrows():
        if row['Joint'] == 'Head' and current:
            frames.append(pd.DataFrame(current))
            current = []
        current.append(row)
    if current:
        frames.append(pd.DataFrame(current))

    # Determine the joint order from the first valid frame
    joint_order = None
    for fr in frames:
        if 'Joint' in fr and fr['Joint'].nunique() >= 10:
            joint_order = list(fr['Joint'])
            break
    if joint_order is None:
        return None

     # Create a 3D array: time × joints × coordinates
    T = len(frames)
    J = len(joint_order)
    seq = np.full((T, J, 3), np.nan, dtype=float)

     # Fill sequence array with x,y,z coordinates for each joint per frame
    for t, fr in enumerate(frames):
        # Map each joint name → (x, y, z)
        name_to_xyz = {n: (x, y, z) for n, x, y, z in fr[['Joint', 'x', 'y', 'z']].itertuples(index=False)}
        for j, name in enumerate(joint_order):
            if name in name_to_xyz:
                seq[t, j, :] = name_to_xyz[name]
    return seq, joint_order


In [5]:
# -------------------------------------------------------------
# 4. PREPROCESS EACH SEQUENCE
# -------------------------------------------------------------
# To feed into ML, all sequences must have:
# - the same number of frames (resampling)
# - comparable scale (normalization)
# - dynamic info (velocities)
# -------------------------------------------------------------

def resample_sequence(seq: np.ndarray, target_T: int = 60) -> np.ndarray:
    """
    Resample sequence to a fixed number of frames (default 60)
    by linear interpolation along the time dimension.
    """
    T, J, C = seq.shape
    src_t = np.arange(T)
    dst_t = np.linspace(0, T - 1, target_T)
    out = np.empty((target_T, J, C), dtype=float)

    for j in range(J):
        for c in range(C):
            # Fill any missing (NaN) data before interpolation
            y = pd.Series(seq[:, j, c]).ffill().bfill().to_numpy()
            out[:, j, c] = np.interp(dst_t, src_t, y)
    return out


def root_center_normalize(seq: np.ndarray, joint_names: list, root_name: str = 'Torso') -> np.ndarray:
    """
    Center coordinates around the torso joint and scale
    by the median head-to-torso distance.
    """
    T, J, C = seq.shape
    # identify root and head joints (fallback to 0 if missing)
    root_idx = joint_names.index(root_name) if root_name in joint_names else 0
    head_idx = joint_names.index('Head') if 'Head' in joint_names else 0

    # Center by subtracting torso coordinates
    centered = seq - seq[:, [root_idx], :]

    # Compute typical human scale factor
    d = np.linalg.norm(centered[:, head_idx, :] - centered[:, root_idx, :], axis=1)
    scale = np.median(d) if np.isfinite(np.median(d)) and np.median(d) != 0 else 1.0

    return centered / scale


def make_features(seq: np.ndarray, include_velocity: bool = True) -> np.ndarray:
    """
    Convert 3D sequence into a 1D feature vector:
    - positions flattened across all frames and joints
    - optional velocities (differences between consecutive frames)
    """
    pos_flat = seq.reshape(-1)
    feats = [pos_flat]

    if include_velocity:
        vel = np.diff(seq, axis=0)
        feats.append(vel.reshape(-1))

    return np.concatenate(feats)


In [None]:
# -------------------------------------------------------------
# 6. LABEL ENCODING AND DATA SPLIT
# -------------------------------------------------------------
# Machine learning models require numerical labels.
# LabelEncoder maps each motion type string (a01, a02, ...)
# into an integer ID. We then split data 80/20 for training/testing.
# -------------------------------------------------------------

le = LabelEncoder()
y_enc = le.fit_transform(y)

# Split data, ensuring each class is proportionally represented
X_train, X_test, y_train, y_test = train_test_split(
    X, y_enc, test_size=0.2, stratify=y_enc, random_state=42
)

print(f"Train set: {X_train.shape}, Test set: {X_test.shape}")
