In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import random
import os
import copy

from sklearn.model_selection import train_test_split

# For demonstration, if you need plotting or additional utilities:
#import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)



Using device: cuda


In [2]:
import os
import sys
import time
import json
import datetime
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import pyzed.sl as sl
import cv2
from PIL import Image
from scipy.spatial.distance import euclidean
from scipy.signal import savgol_filter
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cuda


In [4]:
def interpolate_missing(df):
    for col in df.columns:
        if col in ['timestamp','gesture','frame_number']:
            continue
        df[col] = df[col].replace(0, np.nan).interpolate(method='linear').ffill().bfill()
    return df

def smooth_data(df, window=5):
    for col in df.columns:
        if col in ['timestamp', 'gesture', 'frame_number']:
            continue
        w = min(window, len(df)) if len(df)>=3 else 3
        poly = 3
        if w <= poly:
            poly = w-1  
            if poly < 1:
                continue  
        if w % 2 == 0:
            w -= 1
        if w < 3:
            continue
        df[col] = savgol_filter(df[col], w, poly)
    return df


def normalize_time(df):
    df = df.copy()
    df['normalized_time'] = df.groupby('gesture')['timestamp'].transform(
        lambda x: (x - x.iloc[0]) / (x.iloc[-1] - x.iloc[0] + 1e-6))
    return df

def calculate_angle(a,b,c):
    try:
        ba = a - b
        bc = c - b
        if np.all(ba==0) or np.all(bc==0):
            return 0
        cos_angle = np.dot(ba, bc)/(np.linalg.norm(ba)*np.linalg.norm(bc))
        cos_angle = np.clip(cos_angle, -1.0,1.0)
        return np.degrees(np.arccos(cos_angle))
    except:
        return 0

def calculate_angular_velocity(angles, timestamps):
    return np.gradient(angles, timestamps)

def extract_features(df):
    SHOULDER, ELBOW, WRIST = 13, 15, 17
    timestamps = df['timestamp'].values/1000.0  # to seconds?

    features = pd.DataFrame(index=df.index)

    # relative positions
    for joint in [SHOULDER, ELBOW, WRIST]:
        for axis in ['x', 'y', 'z']:
            features[f'rel_{joint}_{axis}'] = df[f'kp{joint}_{axis}'] - df[f'kp{SHOULDER}_{axis}']

    # angle at elbow
    posS = df[[f'kp{SHOULDER}_x', f'kp{SHOULDER}_y', f'kp{SHOULDER}_z']].values
    posE = df[[f'kp{ELBOW}_x', f'kp{ELBOW}_y', f'kp{ELBOW}_z']].values
    posW = df[[f'kp{WRIST}_x', f'kp{WRIST}_y', f'kp{WRIST}_z']].values
    angles = []
    for i in range(len(df)):
        angles.append(calculate_angle(posS[i], posE[i], posW[i]))
    features['angle_elbow'] = angles
    features['angular_velocity_elbow'] = calculate_angular_velocity(features['angle_elbow'].values, timestamps)

    # velocity, acceleration, jerk
    for joint in [SHOULDER, ELBOW, WRIST]:
        p = df[[f'kp{joint}_x', f'kp{joint}_y', f'kp{joint}_z']].values
        vel = np.zeros_like(p)
        if len(p) > 2:
            vel[1:-1] = (p[2:] - p[:-2]) / ((timestamps[2:] - timestamps[:-2])[:, None])
            vel[0] = vel[1]
            vel[-1] = vel[-2]
        else:
            vel = np.gradient(p, axis=0)
        acc = np.gradient(vel, axis=0)
        jerk = np.gradient(acc, axis=0)

        for idx, axis in enumerate(['x', 'y', 'z']):
            features[f'vel_{joint}_{axis}'] = vel[:, idx]
            features[f'acc_{joint}_{axis}'] = acc[:, idx]
            features[f'jerk_{joint}_{axis}'] = jerk[:, idx]

    # speed wrist
    features['speed_15'] = np.sqrt(features['vel_15_x']**2 + features['vel_15_y']**2 + features['vel_15_z']**2)
    features['speed_17'] = np.sqrt(features['vel_17_x']**2 + features['vel_17_y']**2 + features['vel_17_z']**2)

    # acc magnitude
    features['acc_magnitude_15'] = np.sqrt(features['acc_15_x']**2 + features['acc_15_y']**2 + features['acc_15_z']**2)
    features['acc_magnitude_17'] = np.sqrt(features['acc_17_x']**2 + features['acc_17_y']**2 + features['acc_17_z']**2)

    # path length wrist
    path_length = np.zeros(len(df))
    for i in range(1, len(df)):
        path_length[i] = path_length[i-1] + euclidean(posW[i], posW[i-1])
    features['path_length_17'] = path_length

    features = features.replace([np.nan, np.inf, -np.inf], 0.0)
    return features

def extract_additional_features(features):
    # example: we do that "straightness, planarity, etc." logic
    wpos = np.array([features['rel_17_x'].values,
                     features['rel_17_y'].values,
                     features['rel_17_z'].values])
    vel = np.array([np.gradient(wpos[0]),
                    np.gradient(wpos[1]),
                    np.gradient(wpos[2])])
    disp = float(np.linalg.norm(wpos[:, -1] - wpos[:, 0]))
    pl = float(np.sum(np.sqrt(np.sum(np.diff(wpos,axis=1)**2,axis=0))))
    straightness = disp/pl if pl>0 else 0

    cov = np.cov(wpos)
    ev = np.linalg.eigvals(cov)
    if ev[0] != 0:
        planarity = float(ev[1]/ev[0])
    else:
        planarity = 0.0

    speeds = np.sqrt(np.sum(vel**2, axis=0))
    peak_speed = float(np.max(speeds))
    avg_speed  = float(np.mean(speeds))
    speed_var  = float(np.std(speeds))

    direction = np.diff(np.arctan2(vel[1], vel[0]))
    direction_changes = int(np.sum(np.abs(direction) > np.pi/4))

    vert_ext  = float(np.ptp(wpos[1]))
    horiz_ext = float(np.ptp(wpos[0]))
    vh_ratio  = vert_ext/horiz_ext if horiz_ext>0 else 0

    return {
        'straightness': straightness,
        'planarity': planarity,
        'peak_speed': peak_speed,
        'avg_speed': avg_speed,
        'speed_variability': speed_var,
        'direction_changes': direction_changes,
        'vertical_extent': vert_ext,
        'horizontal_extent': horiz_ext,
        'vertical_horizontal_ratio': vh_ratio,
        'total_displacement': disp,
        'path_length': pl
    }

def extract_directional_features(df):
    """
    Extract directional features that better distinguish between up/down and left/right swipes
    """
    NECK, TORSO, SHOULDER, ELBOW, WRIST = 0, 1, 13, 15, 17  # Key joint indices
    
    # Get the initial and final positions
    wrist_start = df.iloc[0][[f'kp{WRIST}_x', f'kp{WRIST}_y', f'kp{WRIST}_z']].values
    wrist_end = df.iloc[-1][[f'kp{WRIST}_x', f'kp{WRIST}_y', f'kp{WRIST}_z']].values
    torso_pos = df.iloc[-1][[f'kp{TORSO}_x', f'kp{TORSO}_y', f'kp{TORSO}_z']].values
    neck_pos = df.iloc[-1][[f'kp{NECK}_x', f'kp{NECK}_y', f'kp{NECK}_z']].values
    
    # Calculate features
    features = {}
    
    # 1. End position of wrist relative to torso (normalized)
    wrist_to_torso_vec = wrist_end - torso_pos
    body_height = np.linalg.norm(neck_pos - torso_pos) + 1e-6  # Avoid division by zero
    wrist_to_torso_normalized = wrist_to_torso_vec / body_height
    
    features['wrist_end_x_rel_torso'] = wrist_to_torso_normalized[0]
    features['wrist_end_y_rel_torso'] = wrist_to_torso_normalized[1]
    features['wrist_end_z_rel_torso'] = wrist_to_torso_normalized[2]
    
    # 2. Movement direction vector (from start to end)
    movement_vec = wrist_end - wrist_start
    movement_dist = np.linalg.norm(movement_vec) + 1e-6
    movement_dir = movement_vec / movement_dist
    
    features['movement_dir_x'] = movement_dir[0]
    features['movement_dir_y'] = movement_dir[1]
    features['movement_dir_z'] = movement_dir[2]
    
    # 3. Horizontal vs Vertical movement ratio
    horizontal_movement = abs(movement_vec[0]) + abs(movement_vec[2])
    vertical_movement = abs(movement_vec[1])
    features['horiz_vert_ratio'] = horizontal_movement / (vertical_movement + 1e-6)
    
    # 4. Dominant plane
    xy_movement = abs(movement_vec[0]) + abs(movement_vec[1])
    yz_movement = abs(movement_vec[1]) + abs(movement_vec[2])
    xz_movement = abs(movement_vec[0]) + abs(movement_vec[2])
    
    features['dominant_xy'] = xy_movement / (movement_dist + 1e-6)
    features['dominant_yz'] = yz_movement / (movement_dist + 1e-6)
    features['dominant_xz'] = xz_movement / (movement_dist + 1e-6)
    
    # 5. Quadrant of end position (relative to start)
    features['end_right'] = 1.0 if movement_vec[0] > 0 else 0.0
    features['end_up'] = 1.0 if movement_vec[1] > 0 else 0.0
    features['end_forward'] = 1.0 if movement_vec[2] > 0 else 0.0
    
    # 6. Measure of how "clean" the directional movement is
    # (high value = movement primarily in one direction)
    movement_abs = np.abs(movement_vec)
    primary_direction = np.max(movement_abs) / (np.sum(movement_abs) + 1e-6)
    features['directional_clarity'] = primary_direction
    
    # 7. Direction angles
    features['angle_from_horizontal'] = np.degrees(np.arctan2(movement_vec[1], 
                                               np.sqrt(movement_vec[0]**2 + movement_vec[2]**2)))
    features['angle_in_horizontal'] = np.degrees(np.arctan2(movement_vec[0], movement_vec[2]))
    
    return features

In [5]:
import pyzed.sl as sl
import pandas as pd
import numpy as np
from scipy.spatial.distance import euclidean

def open_svo_and_extract_dataframe(svo_path, gesture_label="unknown"):
    """
    Updated approach that:
      1) Enables positional tracking.
      2) Uses a loop that doesn't skip the last frame prematurely.
      3) Appends a row for every frame (0..nframes-1),
         inserting dummy 0s if no skeleton was found.
    """
    zed = sl.Camera()

    init_params = sl.InitParameters()
    init_params.set_from_svo_file(svo_path)
    # If your old code used a different depth_mode, replicate it:
    init_params.depth_mode = sl.DEPTH_MODE.ULTRA  # or MEDIUM / QUALITY if needed
    init_params.coordinate_units = sl.UNIT.METER
    init_params.coordinate_system = sl.COORDINATE_SYSTEM.RIGHT_HANDED_Y_UP

    status = zed.open(init_params)
    if status != sl.ERROR_CODE.SUCCESS:
        print("Could not open SVO:", status)
        return pd.DataFrame()  # Return empty if can't open

    # Positional Tracking (often required for skeleton detection in older SVOs)
    pt_params = sl.PositionalTrackingParameters()
    err_pt = zed.enable_positional_tracking(pt_params)
    if err_pt != sl.ERROR_CODE.SUCCESS:
        print("Positional tracking not enabled:", err_pt)
        zed.close()
        return pd.DataFrame()

    # Body Tracking
    body_params = sl.BodyTrackingParameters()
    body_params.detection_model = sl.BODY_TRACKING_MODEL.HUMAN_BODY_FAST
    # if your old code used HUMAN_BODY_ACCURATE or MEDIUM, do so
    body_params.body_format = sl.BODY_FORMAT.BODY_38
    err_body = zed.enable_body_tracking(body_params)
    if err_body != sl.ERROR_CODE.SUCCESS:
        print("Body tracking not enabled:", err_body)
        zed.close()
        return pd.DataFrame()

    runtime_params = sl.RuntimeParameters()
    body_runtime_params = sl.BodyTrackingRuntimeParameters()

    nframes = zed.get_svo_number_of_frames()  # total frames in SVO
    frames_data = []

    frame_number = 0
    while True:
        err = zed.grab(runtime_params)
        if err != sl.ERROR_CODE.SUCCESS:
            # If we can't grab, we break (end of SVO or error)
            break

        # Retrieve body data
        bodies = sl.Bodies()
        zed.retrieve_bodies(bodies, body_runtime_params)

        ts_ns = zed.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()

        # We'll either fill real skeleton or dummy zeros
        main_body = None
        if bodies.body_list:
            # pick the body with the highest sum of confidence
            raw_list = []
            for b in bodies.body_list:
                conf_list = [1.0]*len(b.keypoint)  # placeholder if you don't have actual confidences
                raw_list.append({
                    "id": b.id,
                    "keypoints": [list(kp) for kp in b.keypoint],  # convert Sl.float3 -> list
                    "confidence": conf_list
                })
            # pick the main body with greatest sum of conf
            main_body = max(raw_list, key=lambda x: sum(x["confidence"]))

        # We'll create a row for this frame (dummy if no skeleton)
        row = {
            "frame_number": frame_number,
            "timestamp": ts_ns,
            "gesture": gesture_label
        }

        if main_body:
            # We have a skeleton, fill real data
            for jidx, kp in enumerate(main_body["keypoints"]):
                row[f'kp{jidx}_x'] = kp[0]
                row[f'kp{jidx}_y'] = kp[1]
                row[f'kp{jidx}_z'] = kp[2]
        else:
            # No skeleton, fill with 0. If you know BODY_38 => up to j=37
            # or you can detect how many keypoints you typically expect
            for jidx in range(38):  
                row[f'kp{jidx}_x'] = 0.0
                row[f'kp{jidx}_y'] = 0.0
                row[f'kp{jidx}_z'] = 0.0

        frames_data.append(row)

        frame_number += 1

        # Now we check if we've read the last frame
        if zed.get_svo_position() >= (nframes - 1):
            # we've handled the final frame, so break
            break

    zed.close()

    # Build final DataFrame
    if not frames_data:
        return pd.DataFrame()

    df = pd.DataFrame(frames_data)
    df = df.sort_values("frame_number").reset_index(drop=True)
    return df


In [6]:
import glob
import os

folders = {
    'RArm_SwipeRight': 'right_swipe',
    'RArm_SwipeLeft':  'left_swipe',
    'RArm_SwipeUp':    'up_swipe',
    'RArm_SwipeDown':  'down_swipe'
}

base_dataset = "D:\PLENG_temp\ZED_Gesture_Detection\Part2\Part3\Part4\dataset_2"

all_features = []
all_labels   = []

for folder_name, gesture_label in folders.items():
    folder_path = os.path.join(base_dataset, folder_name)
    svo_paths   = glob.glob(os.path.join(folder_path, '*.svo2'))

    print(f"=== Found {len(svo_paths)} SVO files in {folder_name} ({gesture_label}) ===")

    for svo_file in svo_paths:
        print(f"Processing: {svo_file} as gesture: {gesture_label}")
        # 1) Read SVO and build DataFrame
        df = open_svo_and_extract_dataframe(svo_file, gesture_label=gesture_label)
        if df.empty:
            print("  -> No data extracted; skipping.")
            continue

        # 2) Interpolate, smooth, normalize time
        df = interpolate_missing(df)
        df = smooth_data(df, window=3)
        df = normalize_time(df)

        # 3) Compute features
        feats = extract_features(df)
        extra_dict = extract_additional_features(feats)
        directional_dict = extract_directional_features(df)
        for k,v in extra_dict.items():
            feats[k] = v
        for k,v in directional_dict.items(): 
            feats[k] = v

        # Expect exactly 7 frames per .svo2
        if len(feats) != 7:
            print(f"  -> Skipping {svo_file}, frames != 8: {len(feats)}")
            continue

        # Convert to numpy array
        feats_np = feats.values.astype(np.float32)  # shape: (7, num_features)

        all_features.append(feats_np)
        all_labels.append(gesture_label)

# Convert to final numpy arrays
X = np.array(all_features)         # shape: (num_samples,7,num_features)
labels = np.array(all_labels)      # shape: (num_samples,)

print("=== Dataset Summary ===")
print("X shape:", X.shape)
print("Labels distribution:\n", pd.Series(labels).value_counts())


=== Found 161 SVO files in RArm_SwipeRight (right_swipe) ===
Processing: D:\PLENG_temp\ZED_Gesture_Detection\Part2\Part3\Part4\dataset_2\RArm_SwipeRight\SB_RArm_SwipeRight_000000001.svo2 as gesture: right_swipe
Processing: D:\PLENG_temp\ZED_Gesture_Detection\Part2\Part3\Part4\dataset_2\RArm_SwipeRight\SB_RArm_SwipeRight_000000002.svo2 as gesture: right_swipe
Processing: D:\PLENG_temp\ZED_Gesture_Detection\Part2\Part3\Part4\dataset_2\RArm_SwipeRight\SB_RArm_SwipeRight_000000003.svo2 as gesture: right_swipe
Processing: D:\PLENG_temp\ZED_Gesture_Detection\Part2\Part3\Part4\dataset_2\RArm_SwipeRight\SB_RArm_SwipeRight_000000004.svo2 as gesture: right_swipe
Processing: D:\PLENG_temp\ZED_Gesture_Detection\Part2\Part3\Part4\dataset_2\RArm_SwipeRight\SB_RArm_SwipeRight_000000005.svo2 as gesture: right_swipe
Processing: D:\PLENG_temp\ZED_Gesture_Detection\Part2\Part3\Part4\dataset_2\RArm_SwipeRight\SB_RArm_SwipeRight_000000006.svo2 as gesture: right_swipe
Processing: D:\PLENG_temp\ZED_Gesture_D

In [7]:
unique_labels = sorted(list(set(labels)))  # e.g. ["SwipeDown","SwipeLeft","SwipeRight","SwipeUp"]
label_to_idx = {lab: i for i, lab in enumerate(unique_labels)}
y = np.array([label_to_idx[l] for l in labels])

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, 
                                                  random_state=42, 
                                                  stratify=y)

class Gesture7Dataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        x_t = torch.tensor(self.X[idx], dtype=torch.float32)  # shape (7, F)
        y_t = torch.tensor(self.y[idx], dtype=torch.long)
        return x_t, y_t

train_ds = Gesture7Dataset(X_train, y_train)
val_ds   = Gesture7Dataset(X_val,   y_val)

batch_size = 16
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False)

print("Train size:", len(train_ds), "Val size:", len(val_ds))
print("Number of features:", X.shape[2], "Number of classes:", len(unique_labels))


Train size: 512 Val size: 128
Number of features: 70 Number of classes: 4


In [8]:
unique_labels = sorted(list(set(labels)))
label_to_idx = {lab: i for i, lab in enumerate(unique_labels)}
y = np.array([label_to_idx[lbl] for lbl in labels])

In [9]:
%%capture

import time
import os
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, LSTM, Dropout, Input, LayerNormalization
from tensorflow.keras.layers import MultiHeadAttention, GlobalAveragePooling1D, Concatenate
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tqdm import tqdm

################################################################################
# If not already defined, set the global variable:
# WINDOW_SIZE = 7  # or whatever your sequence length is.
################################################################################

def LSTMModel(training_data, save_dir='models_lstm', n_attempts=10):
    """
    Trains an LSTM model for up to 100 epochs, repeated n_attempts times,
    saving the best run based on validation accuracy. Expects:
      training_data['sequence'] = (X_train_seq, X_val_seq, X_test_seq)
      training_data['labels']   = (y_train, y_val, y_test)
    and a global WINDOW_SIZE for the length of each sequence.
    """

    print("=== Training LSTM Model ===")

    # Check window size
    (X_train_seq, X_val_seq, X_test_seq) = training_data['sequence']
    (y_train, y_val, y_test) = training_data['labels']

    assert X_train_seq.shape[1] == WINDOW_SIZE, \
        f"Sequence length mismatch: expected {WINDOW_SIZE}, got {X_train_seq.shape[1]}"

    n_classes = len(np.unique(y_train))

    # Convert labels to one-hot
    y_train_cat = to_categorical(y_train, n_classes)
    y_val_cat   = to_categorical(y_val,   n_classes)
    y_test_cat  = to_categorical(y_test,  n_classes)

    print(f"X_train shape: {X_train_seq.shape}")
    print(f"y_train shape: {y_train.shape}")
    print(f"One-hot y_train shape: {y_train_cat.shape}")
    print(f"Number of classes: {n_classes}")

    best_val_acc    = 0
    best_model      = None
    best_history    = None
    best_test_acc   = 0
    best_attempt    = 0
    feature_importance = None

    os.makedirs(save_dir, exist_ok=True)

    print(f"\nPerforming {n_attempts} training attempts...")
    attempts_pbar = tqdm(range(n_attempts), desc="LSTM Training", ncols=100)

    for attempt in attempts_pbar:
        # Shuffle training data each attempt
        indices = np.random.permutation(len(X_train_seq))
        X_train_shuffled = X_train_seq[indices]
        y_train_shuffled = y_train_cat[indices]

        with tf.device('/device:GPU:0'):
            model = Sequential([
                LSTM(128, input_shape=(X_train_seq.shape[1], X_train_seq.shape[2]), return_sequences=True),
                LayerNormalization(),
                Dropout(0.3),
                LSTM(256),
                LayerNormalization(),
                Dropout(0.3),
                Dense(128, activation='relu'),
                LayerNormalization(),
                Dropout(0.2),
                Dense(n_classes, activation='softmax')
            ])

        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )

        early_stop = EarlyStopping(monitor='val_accuracy', patience=15, restore_best_weights=True)
        reduce_lr  = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=0.00001)

        class TrainingProgressCallback(tf.keras.callbacks.Callback):
            def on_epoch_end(self, epoch, logs=None):
                attempts_pbar.set_postfix({
                    "attempt": f"{attempt+1}/{n_attempts}",
                    "epoch": f"{epoch+1}/100",
                    "acc": f"{logs['accuracy']:.4f}",
                    "val_acc": f"{logs['val_accuracy']:.4f}"
                })

        start_time = time.time()
        history = model.fit(
            X_train_shuffled, y_train_shuffled,
            validation_data=(X_val_seq, y_val_cat),
            epochs=100,
            batch_size=32,
            callbacks=[early_stop, reduce_lr, TrainingProgressCallback()],
            verbose=0
        )
        training_time = time.time() - start_time

        val_acc = max(history.history['val_accuracy'])
        test_loss, test_acc = model.evaluate(X_test_seq, y_test_cat, verbose=0)

        attempts_pbar.set_postfix({"attempt": attempt+1, "best_val": f"{val_acc:.4f}", "test_acc": f"{test_acc:.4f}"})

        if val_acc > best_val_acc:
            best_val_acc  = val_acc
            best_model    = model
            best_history  = history
            best_test_acc = test_acc
            best_attempt  = attempt + 1

            # Save best model
            model_path = os.path.join(save_dir, 'lstm_model_best')
            best_model.save(model_path)

            # Optional feature importance via gradient
            if hasattr(training_data, 'feature_columns'):
                feature_names = training_data.get('feature_columns',
                                    [f"feature_{i}" for i in range(X_train_seq.shape[2])])
                # Gradients on a subset
                inputs = tf.convert_to_tensor(X_test_seq[:100])
                with tf.GradientTape() as tape:
                    tape.watch(inputs)
                    predictions = model(inputs)
                gradients = tape.gradient(predictions, inputs)
                feature_importance = np.mean(np.abs(gradients.numpy()), axis=(0, 1))

                feature_imp_dict = {
                    feature_names[i]: float(feature_importance[i])
                    for i in range(len(feature_names))
                }
                with open(os.path.join(save_dir, 'lstm_feature_importance.json'), 'w') as f:
                    json.dump(feature_imp_dict, f, indent=2)

            # Save history
            history_dict = {
                'accuracy':      [float(v) for v in best_history.history['accuracy']],
                'val_accuracy':  [float(v) for v in best_history.history['val_accuracy']],
                'loss':          [float(v) for v in best_history.history['loss']],
                'val_loss':      [float(v) for v in best_history.history['val_loss']],
                'best_val_accuracy': float(best_val_acc),
                'best_test_accuracy': float(best_test_acc),
                'best_attempt':  best_attempt,
                'training_time': training_time,
                'params_count':  best_model.count_params()
            }

            with open(os.path.join(save_dir, 'lstm_history_best.json'), 'w') as f:
                json.dump(history_dict, f, indent=2)

    print(f"\nBest model was from attempt {best_attempt}")
    print(f"Best validation accuracy: {best_val_acc:.4f}")
    print(f"Best model test accuracy: {best_test_acc:.4f}")

    return {
        'model': best_model,
        'history': best_history.history if best_history else None,
        'best_val_accuracy': best_val_acc,
        'best_test_accuracy': best_test_acc,
        'best_attempt': best_attempt,
        'feature_importance': feature_importance
    }


def positional_encoding(length, depth):
    positions = np.arange(length)[:, np.newaxis]
    depths = np.arange(depth)[np.newaxis, :] / depth
    angle_rates = 1 / (10000 ** depths)
    angle_rads = positions * angle_rates
    pos_encoding = np.concatenate([np.sin(angle_rads), np.cos(angle_rads)], axis=-1)
    pos_encoding = pos_encoding[..., :depth]
    return tf.cast(pos_encoding, dtype=tf.float32)


def TransformerModel(training_data, save_dir='models_tf', n_attempts=10):
    """
    Similar logic, but for a Transformer-based architecture.
    """

    print("=== Training Transformer Model ===")
    (X_train_seq, X_val_seq, X_test_seq) = training_data['sequence']
    (y_train, y_val, y_test) = training_data['labels']

    assert X_train_seq.shape[1] == WINDOW_SIZE, \
        f"Sequence length mismatch: expected {WINDOW_SIZE}, got {X_train_seq.shape[1]}"

    n_classes = len(np.unique(y_train))
    y_train_cat = to_categorical(y_train, n_classes)
    y_val_cat   = to_categorical(y_val,   n_classes)
    y_test_cat  = to_categorical(y_test,  n_classes)

    best_val_acc  = 0
    best_model    = None
    best_history  = None
    best_test_acc = 0
    best_attempt  = 0
    feature_importance = None

    os.makedirs(save_dir, exist_ok=True)

    seq_len     = X_train_seq.shape[1]
    feature_dim = X_train_seq.shape[2]
    d_model     = 64

    print(f"\nPerforming {n_attempts} training attempts...")
    attempts_pbar = tqdm(range(n_attempts), desc="Transformer Training", ncols=100)

    for attempt in attempts_pbar:
        indices = np.random.permutation(len(X_train_seq))
        X_train_shuffled = X_train_seq[indices]
        y_train_shuffled = y_train_cat[indices]

        with tf.device('/device:GPU:0'):
            inputs = Input(shape=(seq_len, feature_dim))
            x = Dense(d_model)(inputs)

            pos_encode = positional_encoding(seq_len, d_model)
            pos_encode = tf.expand_dims(pos_encode, axis=0)  # shape: (1,seq_len,d_model)
            x = x + pos_encode

            # First Transformer block
            attn_out = MultiHeadAttention(num_heads=8, key_dim=8, value_dim=8)(x, x, x)
            x = LayerNormalization(epsilon=1e-6)(x + attn_out)
            x = Dropout(0.1)(x)

            ff = Dense(128, activation='relu')(x)
            ff = Dropout(0.1)(ff)
            ff = Dense(d_model)(ff)
            x = LayerNormalization(epsilon=1e-6)(x + ff)

            # Second block
            attn_out = MultiHeadAttention(num_heads=8, key_dim=8, value_dim=8)(x, x, x)
            x = LayerNormalization(epsilon=1e-6)(x + attn_out)
            x = Dropout(0.1)(x)

            ff = Dense(128, activation='relu')(x)
            ff = Dropout(0.1)(ff)
            ff = Dense(d_model)(ff)
            x = LayerNormalization(epsilon=1e-6)(x + ff)

            # Pool
            x = GlobalAveragePooling1D()(x)

            # final dense
            x = Dense(128, activation='relu')(x)
            x = LayerNormalization()(x)
            x = Dropout(0.2)(x)
            outputs = Dense(n_classes, activation='softmax')(x)

            model = Model(inputs=inputs, outputs=outputs)

        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )

        early_stop = EarlyStopping(monitor='val_accuracy', patience=15, restore_best_weights=True)
        reduce_lr  = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5)

        class TrainingProgressCallback(tf.keras.callbacks.Callback):
            def on_epoch_end(self, epoch, logs=None):
                attempts_pbar.set_postfix({
                    "attempt": f"{attempt+1}/{n_attempts}",
                    "epoch": f"{epoch+1}/100",
                    "acc": f"{logs['accuracy']:.4f}",
                    "val_acc": f"{logs['val_accuracy']:.4f}"
                })

        start_time = time.time()
        history = model.fit(
            X_train_shuffled, y_train_shuffled,
            validation_data=(X_val_seq, y_val_cat),
            epochs=100,
            batch_size=32,
            callbacks=[early_stop, reduce_lr, TrainingProgressCallback()],
            verbose=0
        )
        training_time = time.time() - start_time

        val_acc = max(history.history['val_accuracy'])
        test_loss, test_acc = model.evaluate(X_test_seq, y_test_cat, verbose=0)

        attempts_pbar.set_postfix({"attempt": attempt+1, "best_val": f"{val_acc:.4f}", "test_acc": f"{test_acc:.4f}"})

        if val_acc > best_val_acc:
            best_val_acc  = val_acc
            best_model    = model
            best_history  = history
            best_test_acc = test_acc
            best_attempt  = attempt + 1

            model_path = os.path.join(save_dir, 'transformer_model_best')
            best_model.save(model_path)

            if hasattr(training_data, 'feature_columns'):
                feature_names = training_data.get('feature_columns',
                                    [f"feature_{i}" for i in range(X_train_seq.shape[2])])
                inputs = tf.convert_to_tensor(X_test_seq[:100])
                with tf.GradientTape() as tape:
                    tape.watch(inputs)
                    predictions = model(inputs)
                gradients = tape.gradient(predictions, inputs)
                feature_importance = np.mean(np.abs(gradients.numpy()), axis=(0, 1))

                feature_imp_dict = {
                    feature_names[i]: float(feature_importance[i]) for i in range(len(feature_names))
                }
                with open(os.path.join(save_dir, 'transformer_feature_importance.json'), 'w') as f:
                    json.dump(feature_imp_dict, f, indent=2)

            history_dict = {
                'accuracy':        [float(v) for v in best_history.history['accuracy']],
                'val_accuracy':    [float(v) for v in best_history.history['val_accuracy']],
                'loss':            [float(v) for v in best_history.history['loss']],
                'val_loss':        [float(v) for v in best_history.history['val_loss']],
                'best_val_accuracy': float(best_val_acc),
                'best_test_accuracy': float(best_test_acc),
                'best_attempt':    best_attempt,
                'training_time':   training_time,
                'params_count':    best_model.count_params()
            }

            with open(os.path.join(save_dir, 'transformer_history_best.json'), 'w') as f:
                json.dump(history_dict, f, indent=2)

    print(f"\nBest model was from attempt {best_attempt}")
    print(f"Best validation accuracy: {best_val_acc:.4f}")
    print(f"Best model test accuracy: {best_test_acc:.4f}")

    return {
        'model': best_model,
        'history': best_history.history if best_history else None,
        'best_val_accuracy': best_val_acc,
        'best_test_accuracy': best_test_acc,
        'best_attempt': best_attempt,
        'feature_importance': feature_importance
    }


def HybridModel(training_data, save_dir='models_hybrid', n_attempts=10):
    """
    A hybrid LSTM-Transformer model, also 100 epochs x n_attempts,
    saving best run based on val_acc.
    """

    print("=== Training Hybrid LSTM-Transformer Model ===")
    (X_train_seq, X_val_seq, X_test_seq) = training_data['sequence']
    (y_train, y_val, y_test) = training_data['labels']

    assert X_train_seq.shape[1] == WINDOW_SIZE, \
        f"Sequence length mismatch: expected {WINDOW_SIZE}, got {X_train_seq.shape[1]}"

    n_classes = len(np.unique(y_train))
    y_train_cat = to_categorical(y_train, n_classes)
    y_val_cat   = to_categorical(y_val,   n_classes)
    y_test_cat  = to_categorical(y_test,  n_classes)

    best_val_acc = 0
    best_model   = None
    best_history = None
    best_test_acc= 0
    best_attempt = 0
    feature_importance = None

    os.makedirs(save_dir, exist_ok=True)

    print(f"\nPerforming {n_attempts} training attempts...")

    seq_len     = X_train_seq.shape[1]
    feature_dim = X_train_seq.shape[2]
    d_model     = 64

    attempts_pbar = tqdm(range(n_attempts), desc="Hybrid Model Training", ncols=100)

    def positional_encoding(length, depth):
        positions = np.arange(length)[:, np.newaxis]
        depths = np.arange(depth)[np.newaxis, :] / depth
        angle_rates = 1 / (10000 ** depths)
        angle_rads = positions * angle_rates
        pos_encoding = np.concatenate([np.sin(angle_rads), np.cos(angle_rads)], axis=-1)
        pos_encoding = pos_encoding[..., :depth]
        return tf.cast(pos_encoding, dtype=tf.float32)

    for attempt in attempts_pbar:
        indices = np.random.permutation(len(X_train_seq))
        X_train_shuffled = X_train_seq[indices]
        y_train_shuffled = y_train_cat[indices]

        with tf.device('/device:GPU:0'):
            inputs = Input(shape=(seq_len, feature_dim))

            # LSTM branch
            x_lstm = LSTM(128, return_sequences=True)(inputs)
            x_lstm = LayerNormalization()(x_lstm)
            x_lstm = Dropout(0.3)(x_lstm)

            x_lstm2 = LSTM(128)(x_lstm)
            x_lstm2 = LayerNormalization()(x_lstm2)
            x_lstm2 = Dropout(0.3)(x_lstm2)

            # Transformer branch
            trans_x = Dense(d_model)(inputs)
            pos_encode = positional_encoding(seq_len, d_model)
            pos_encode = tf.expand_dims(pos_encode, axis=0)  
            trans_x = trans_x + pos_encode

            attn_out = MultiHeadAttention(num_heads=8, key_dim=8, value_dim=8)(trans_x, trans_x, trans_x)
            trans_x = LayerNormalization(epsilon=1e-6)(trans_x + attn_out)
            trans_x = Dropout(0.1)(trans_x)

            ff = Dense(128, activation='relu')(trans_x)
            ff = Dropout(0.1)(ff)
            ff = Dense(d_model)(ff)
            trans_x = LayerNormalization(epsilon=1e-6)(trans_x + ff)

            trans_out = GlobalAveragePooling1D()(trans_x)
            trans_out = Dropout(0.2)(trans_out)

            # Combine
            combined = Concatenate()([x_lstm2, trans_out])

            x = Dense(128, activation='relu')(combined)
            x = LayerNormalization()(x)
            x = Dropout(0.2)(x)
            outputs = Dense(n_classes, activation='softmax')(x)

            model = Model(inputs=inputs, outputs=outputs)

        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )

        early_stop = EarlyStopping(monitor='val_accuracy', patience=15, restore_best_weights=True)
        reduce_lr  = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5)

        class TrainingProgressCallback(tf.keras.callbacks.Callback):
            def on_epoch_end(self, epoch, logs=None):
                attempts_pbar.set_postfix({
                    "attempt": f"{attempt+1}/{n_attempts}",
                    "epoch": f"{epoch+1}/100",
                    "acc": f"{logs['accuracy']:.4f}",
                    "val_acc": f"{logs['val_accuracy']:.4f}"
                })

        start_time = time.time()
        history = model.fit(
            X_train_shuffled, y_train_shuffled,
            validation_data=(X_val_seq, y_val_cat),
            epochs=100,
            batch_size=32,
            callbacks=[early_stop, reduce_lr, TrainingProgressCallback()],
            verbose=0
        )
        training_time = time.time() - start_time

        val_acc = max(history.history['val_accuracy'])
        test_loss, test_acc = model.evaluate(X_test_seq, y_test_cat, verbose=0)

        attempts_pbar.set_postfix({"attempt": attempt+1, "best_val": f"{val_acc:.4f}", "test_acc": f"{test_acc:.4f}"})

        if val_acc > best_val_acc:
            best_val_acc  = val_acc
            best_model    = model
            best_history  = history
            best_test_acc = test_acc
            best_attempt  = attempt + 1

            model_path = os.path.join(save_dir, 'hybrid_model_best')
            best_model.save(model_path)

            if hasattr(training_data, 'feature_columns'):
                feature_names = training_data.get('feature_columns',
                                    [f"feature_{i}" for i in range(X_train_seq.shape[2])])
                inputs = tf.convert_to_tensor(X_test_seq[:100])
                with tf.GradientTape() as tape:
                    tape.watch(inputs)
                    predictions = model(inputs)
                gradients = tape.gradient(predictions, inputs)
                feature_importance = np.mean(np.abs(gradients.numpy()), axis=(0, 1))

                feature_imp_dict = {
                    feature_names[i]: float(feature_importance[i]) for i in range(len(feature_names))
                }
                with open(os.path.join(save_dir, 'hybrid_feature_importance.json'), 'w') as f:
                    json.dump(feature_imp_dict, f, indent=2)

            history_dict = {
                'accuracy':        [float(v) for v in best_history.history['accuracy']],
                'val_accuracy':    [float(v) for v in best_history.history['val_accuracy']],
                'loss':            [float(v) for v in best_history.history['loss']],
                'val_loss':        [float(v) for v in best_history.history['val_loss']],
                'best_val_accuracy': float(best_val_acc),
                'best_test_accuracy': float(best_test_acc),
                'best_attempt':    best_attempt,
                'training_time':   training_time,
                'params_count':    best_model.count_params()
            }

            with open(os.path.join(save_dir, 'hybrid_history_best.json'), 'w') as f:
                json.dump(history_dict, f, indent=2)

    print(f"\nBest model was from attempt {best_attempt}")
    print(f"Best validation accuracy: {best_val_acc:.4f}")
    print(f"Best model test accuracy: {best_test_acc:.4f}")

    return {
        'model': best_model,
        'history': best_history.history if best_history else None,
        'best_val_accuracy': best_val_acc,
        'best_test_accuracy': best_test_acc,
        'best_attempt': best_attempt,
        'feature_importance': feature_importance
    }


In [None]:
import os
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, f1_score

def create_comparison_visualizations(results, y_test, label_map, models_dir='models_04'):
    if not os.path.exists(models_dir):
        os.makedirs(models_dir, exist_ok=True)

    # Bar plot of accuracies / F1
    plt.figure(figsize=(12, 6))
    model_names = list(results.keys())
    accuracies = [results[m]['accuracy'] for m in model_names]
    f1_scores = [results[m]['f1_score'] for m in model_names]

    x = np.arange(len(model_names))
    width = 0.3
    plt.bar(x - width/2, accuracies, width, label='Accuracy', color='skyblue')
    plt.bar(x + width/2, f1_scores, width, label='F1 Score', color='lightgreen')
    plt.xticks(x, [m.upper() for m in model_names])
    plt.ylabel("Score")
    plt.ylim(0, 1.0)
    plt.title("Model Performance Comparison")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'performance_comparison.png'), dpi=300)
    plt.close()

    # Confusion matrices
    fig, axes = plt.subplots(1, len(model_names), figsize=(8*len(model_names), 6))
    if len(model_names) == 1:
        axes = [axes]  # make it iterable

    gesture_labels = [label_map[i] for i in sorted(np.unique(y_test))]

    for i, model_name in enumerate(model_names):
        conf_matrix = results[model_name]['confusion_matrix']
        conf_matrix_norm = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]
        
        sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', ax=axes[i],
                    xticklabels=gesture_labels, yticklabels=gesture_labels)
        axes[i].set_title(f"{model_name.upper()} Confusion Matrix")
        axes[i].set_xlabel("Predicted")
        axes[i].set_ylabel("True")

    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'confusion_matrices.png'), dpi=300)
    plt.close()
    
    # Normalized confusion matrices
    fig, axes = plt.subplots(1, len(model_names), figsize=(8*len(model_names), 6))
    if len(model_names) == 1:
        axes = [axes]  # make it iterable

    for i, model_name in enumerate(model_names):
        conf_matrix = results[model_name]['confusion_matrix']
        conf_matrix_norm = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]
        
        sns.heatmap(conf_matrix_norm, annot=True, fmt='.2f', cmap='Blues', ax=axes[i],
                    xticklabels=gesture_labels, yticklabels=gesture_labels)
        axes[i].set_title(f"{model_name.upper()} Normalized Confusion Matrix")
        axes[i].set_xlabel("Predicted")
        axes[i].set_ylabel("True")

    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'confusion_matrices_normalized.png'), dpi=300)
    plt.close()

    # Prediction analysis CSV
    predictions_df = pd.DataFrame()
    
    for model_name in model_names:
        predictions_df[f'{model_name}_pred'] = results[model_name]['predictions']
        predictions_df[f'{model_name}_correct'] = (results[model_name]['predictions'] == y_test)
        
        if 'probabilities' in results[model_name]:
            # Get confidence for predicted class
            probs = results[model_name]['probabilities']
            pred_indices = results[model_name]['predictions']
            confidence = np.array([probs[i, pred_indices[i]] for i in range(len(pred_indices))])
            predictions_df[f'{model_name}_confidence'] = confidence
    
    # Add true labels
    predictions_df['true_label_idx'] = y_test
    predictions_df['true_label'] = [label_map[idx] for idx in y_test]
    
    # Convert prediction indices to names
    for model_name in model_names:
        predictions_df[f'{model_name}_pred_label'] = [label_map[idx] for idx in predictions_df[f'{model_name}_pred']]
    
    # Save predictions
    predictions_df.to_csv(os.path.join(models_dir, 'prediction_analysis.csv'), index=False)
    
    # Feature importance comparison
    try:
        feature_imp_data = {}
        feature_names = []
        
        for model_name in model_names:
            imp_file = os.path.join(models_dir, f'{model_name}_feature_importance.json')
            if os.path.exists(imp_file):
                with open(imp_file, 'r') as f:
                    feature_imp = json.load(f)
                    feature_imp_data[model_name] = feature_imp
                    if not feature_names:
                        feature_names = list(feature_imp.keys())
        
        if feature_imp_data:
            # Create DataFrame
            imp_df = pd.DataFrame()
            for model_name, imp_dict in feature_imp_data.items():
                imp_df[model_name] = [imp_dict.get(feat, 0) for feat in feature_names]
            
            imp_df['feature'] = feature_names
            
            # Normalize importance scores
            for model_name in feature_imp_data.keys():
                imp_df[model_name] = imp_df[model_name] / imp_df[model_name].sum()
            
            # Sort by average importance
            avg_col = 'avg_importance'
            imp_df[avg_col] = imp_df[[col for col in imp_df.columns if col != 'feature']].mean(axis=1)
            imp_df = imp_df.sort_values(by=avg_col, ascending=False).reset_index(drop=True)
            
            # Save feature importance table
            imp_df.to_csv(os.path.join(models_dir, 'feature_importance.csv'), index=False)
            
            # Plot top 20 features
            plt.figure(figsize=(14, 10))
            top_n = min(20, len(imp_df))
            
            for model_name in feature_imp_data.keys():
                plt.plot(imp_df['feature'].values[:top_n], imp_df[model_name].values[:top_n], 
                         marker='o', label=model_name)
            
            plt.xticks(rotation=90)
            plt.xlabel('Features')
            plt.ylabel('Normalized Importance')
            plt.title(f'Top {top_n} Features by Importance')
            plt.legend()
            plt.grid(True, linestyle='--', alpha=0.7)
            plt.tight_layout()
            plt.savefig(os.path.join(models_dir, 'feature_importance_top20.png'), dpi=300)
            plt.close()
    except Exception as e:
        print(f"Could not generate feature importance plots: {e}")
    
    # Training histories plot (accuracy)
    plt.figure(figsize=(12, 6))
    
    for model_name in model_names:
        hist = results[model_name]['history']
        if hist and 'accuracy' in hist:
            plt.plot(hist['accuracy'], label=f'{model_name} train acc')
        if hist and 'val_accuracy' in hist:
            plt.plot(hist['val_accuracy'], label=f'{model_name} val acc', linestyle='--')

    plt.title("Training Accuracy Histories")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'training_accuracy_history.png'), dpi=300)
    plt.close()
    
    # Training histories plot (loss)
    plt.figure(figsize=(12, 6))
    
    for model_name in model_names:
        hist = results[model_name]['history']
        if hist and 'loss' in hist:
            plt.plot(hist['loss'], label=f'{model_name} train loss')
        if hist and 'val_loss' in hist:
            plt.plot(hist['val_loss'], label=f'{model_name} val loss', linestyle='--')

    plt.title("Training Loss Histories")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'training_loss_history.png'), dpi=300)
    plt.close()

def compare_models(training_data, models_dir='models_03'):
    # Load test data
    (X_train_seq, X_val_seq, X_test_seq) = training_data['sequence']
    (y_train, y_val, y_test) = training_data['labels']

    n_classes = len(np.unique(y_test))
    y_test_cat = to_categorical(y_test, n_classes)

    # Get label map
    label_map = {}
    if 'label_encoder' in training_data:
        for idx, lbl in enumerate(training_data['label_encoder'].classes_):
            label_map[idx] = lbl
    else:
        # fallback
        unique_y = sorted(np.unique(y_test))
        for i, val in enumerate(unique_y):
            label_map[i] = f"Class{i}"

    results = {}
    model_names = ["lstm", "transformer", "hybrid"]

    for model_name in model_names:
        model_path = os.path.join(models_dir, f"{model_name}_model_best")
        hist_path = os.path.join(models_dir, f"{model_name}_history_best.json")

        if not os.path.exists(model_path):
            print(f"Model not found: {model_path}. Skipping {model_name}.")
            continue

        # Load model
        model = tf.keras.models.load_model(model_path)

        # Load history if available
        history_data = None
        if os.path.exists(hist_path):
            with open(hist_path, 'r') as f:
                history_data = json.load(f)

        # Evaluate
        X_test = X_test_seq
        y_test_eval = y_test_cat

        test_loss, test_acc = model.evaluate(X_test, y_test_eval, verbose=0)
        y_pred_probs = model.predict(X_test, verbose=0)
        y_pred_classes = np.argmax(y_pred_probs, axis=1)

        f1 = f1_score(y_test, y_pred_classes, average='weighted')
        conf_mat = confusion_matrix(y_test, y_pred_classes)

        results[model_name] = {
            'accuracy': test_acc,
            'f1_score': f1,
            'confusion_matrix': conf_mat,
            'predictions': y_pred_classes,
            'probabilities': y_pred_probs,
            'history': history_data
        }

        print(f"\n=== {model_name.upper()} MODEL ===")
        print(f"Test Accuracy: {test_acc:.4f}")
        print(f"F1 Score: {f1:.4f}")
        print("Confusion Matrix:")
        print(conf_mat)
        print("Classification Report:")
        print(classification_report(y_test, y_pred_classes, target_names=[label_map[i] for i in range(n_classes)]))

    # Generate visuals
    if results:
        create_comparison_visualizations(results, y_test, label_map, models_dir=models_dir)
    else:
        print("\nNo models found to compare (did you train them?).")

    return results

In [11]:
X = np.array(all_features)
labels = np.array(all_labels)
unique_labels = sorted(np.unique(labels))
y = np.array([label_to_idx[lbl] for lbl in labels])
input_dim = X.shape[-1]
num_classes = len(unique_labels)


In [None]:
# Define global window size (required by the imported functions)
WINDOW_SIZE = 7  # Your sequences have 6 frames based on the output in the notebook

# Prepare the training data in the required format for the imported functions
X_train_np = X_train.astype(np.float32)
X_val_np = X_val.astype(np.float32)
X_test_np = X_val.astype(np.float32)

# Prepare training data dictionary
training_data = {
    'sequence': (X_train_np, X_val_np, X_test_np),
    'labels': (y_train, y_val, y_val),  # Use y_val as y_test
    'feature_columns': [f"feature_{i}" for i in range(X_train.shape[2])]
}

# Create output directories
os.makedirs('models_lstm', exist_ok=True)
os.makedirs('models_tf', exist_ok=True)
os.makedirs('models_hybrid', exist_ok=True)

# Using updated visualization functions from paste.txt

# Add this code right after creating the comparison_data dictionary:

# Print dataset statistics
print("\n=== Dataset Statistics ===")
print(f"Total dataset size: {len(X)} samples")
print(f"Train set: {len(X_train)} samples, Val/Test set: {len(X_val)} samples")
print(f"Feature dimension: {X_train.shape[-1]}")

# Count class distribution in each split
print("\n=== Class Distribution ===")
print("Class\t\tTotal\tTrain\tVal/Test")
print("-" * 40)

for idx, label in enumerate(unique_labels):
    total_count = np.sum(y == idx)
    train_count = np.sum(y_train == idx)
    val_count = np.sum(y_val == idx)
    
    # Calculate percentages
    train_pct = train_count / len(y_train) * 100
    val_pct = val_count / len(y_val) * 100
    
    print(f"{label:<15} {total_count:>5} ({np.sum(y == idx)/len(y)*100:>4.1f}%) {train_count:>5} ({train_pct:>4.1f}%) {val_count:>5} ({val_pct:>4.1f}%)")

print("\n=== Model Training Starting ===")

# Train LSTM model
print("Training LSTM model...")
lstm_results = LSTMModel(training_data, save_dir='models_lstm', n_attempts=10)

# Train Transformer model
print("\nTraining Transformer model...")
transformer_results = TransformerModel(training_data, save_dir='models_tf', n_attempts=10)

# Train Hybrid model
print("\nTraining Hybrid model...")
hybrid_results = HybridModel(training_data, save_dir='models_hybrid', n_attempts=10)

# Define comparison visualization functions from paste.txt
def create_comparison_visualizations(results, y_test, label_map, models_dir='models_04'):
    if not os.path.exists(models_dir):
        os.makedirs(models_dir, exist_ok=True)

    # Bar plot of accuracies / F1
    plt.figure(figsize=(12, 6))
    model_names = list(results.keys())
    accuracies = [results[m]['accuracy'] for m in model_names]
    f1_scores = [results[m]['f1_score'] for m in model_names]

    x = np.arange(len(model_names))
    width = 0.3
    plt.bar(x - width/2, accuracies, width, label='Accuracy', color='skyblue')
    plt.bar(x + width/2, f1_scores, width, label='F1 Score', color='lightgreen')
    plt.xticks(x, [m.upper() for m in model_names])
    plt.ylabel("Score")
    plt.ylim(0, 1.0)
    plt.title("Model Performance Comparison")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'performance_comparison.png'), dpi=300)
    plt.close()

    # Confusion matrices
    fig, axes = plt.subplots(1, len(model_names), figsize=(8*len(model_names), 6))
    if len(model_names) == 1:
        axes = [axes]  # make it iterable

    gesture_labels = [label_map[i] for i in sorted(np.unique(y_test))]

    for i, model_name in enumerate(model_names):
        conf_matrix = results[model_name]['confusion_matrix']
        conf_matrix_norm = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]
        
        sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', ax=axes[i],
                    xticklabels=gesture_labels, yticklabels=gesture_labels)
        axes[i].set_title(f"{model_name.upper()} Confusion Matrix")
        axes[i].set_xlabel("Predicted")
        axes[i].set_ylabel("True")

    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'confusion_matrices.png'), dpi=300)
    plt.close()
    
    # Normalized confusion matrices
    fig, axes = plt.subplots(1, len(model_names), figsize=(8*len(model_names), 6))
    if len(model_names) == 1:
        axes = [axes]  # make it iterable

    for i, model_name in enumerate(model_names):
        conf_matrix = results[model_name]['confusion_matrix']
        conf_matrix_norm = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]
        
        sns.heatmap(conf_matrix_norm, annot=True, fmt='.2f', cmap='Blues', ax=axes[i],
                    xticklabels=gesture_labels, yticklabels=gesture_labels)
        axes[i].set_title(f"{model_name.upper()} Normalized Confusion Matrix")
        axes[i].set_xlabel("Predicted")
        axes[i].set_ylabel("True")

    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'confusion_matrices_normalized.png'), dpi=300)
    plt.close()

    # Prediction analysis CSV
    predictions_df = pd.DataFrame()
    
    for model_name in model_names:
        predictions_df[f'{model_name}_pred'] = results[model_name]['predictions']
        predictions_df[f'{model_name}_correct'] = (results[model_name]['predictions'] == y_test)
        
        if 'probabilities' in results[model_name]:
            # Get confidence for predicted class
            probs = results[model_name]['probabilities']
            pred_indices = results[model_name]['predictions']
            confidence = np.array([probs[i, pred_indices[i]] for i in range(len(pred_indices))])
            predictions_df[f'{model_name}_confidence'] = confidence
    
    # Add true labels
    predictions_df['true_label_idx'] = y_test
    predictions_df['true_label'] = [label_map[idx] for idx in y_test]
    
    # Convert prediction indices to names
    for model_name in model_names:
        predictions_df[f'{model_name}_pred_label'] = [label_map[idx] for idx in predictions_df[f'{model_name}_pred']]
    
    # Save predictions
    predictions_df.to_csv(os.path.join(models_dir, 'prediction_analysis.csv'), index=False)
    
    # Feature importance comparison
    try:
        feature_imp_data = {}
        feature_names = []
        
        for model_name in model_names:
            imp_file = os.path.join(models_dir, f'{model_name}_feature_importance.json')
            if os.path.exists(imp_file):
                with open(imp_file, 'r') as f:
                    feature_imp = json.load(f)
                    feature_imp_data[model_name] = feature_imp
                    if not feature_names:
                        feature_names = list(feature_imp.keys())
        
        if feature_imp_data:
            # Create DataFrame
            imp_df = pd.DataFrame()
            for model_name, imp_dict in feature_imp_data.items():
                imp_df[model_name] = [imp_dict.get(feat, 0) for feat in feature_names]
            
            imp_df['feature'] = feature_names
            
            # Normalize importance scores
            for model_name in feature_imp_data.keys():
                imp_df[model_name] = imp_df[model_name] / imp_df[model_name].sum()
            
            # Sort by average importance
            avg_col = 'avg_importance'
            imp_df[avg_col] = imp_df[[col for col in imp_df.columns if col != 'feature']].mean(axis=1)
            imp_df = imp_df.sort_values(by=avg_col, ascending=False).reset_index(drop=True)
            
            # Save feature importance table
            imp_df.to_csv(os.path.join(models_dir, 'feature_importance.csv'), index=False)
            
            # Plot top 20 features
            plt.figure(figsize=(14, 10))
            top_n = min(20, len(imp_df))
            
            for model_name in feature_imp_data.keys():
                plt.plot(imp_df['feature'].values[:top_n], imp_df[model_name].values[:top_n], 
                         marker='o', label=model_name)
            
            plt.xticks(rotation=90)
            plt.xlabel('Features')
            plt.ylabel('Normalized Importance')
            plt.title(f'Top {top_n} Features by Importance')
            plt.legend()
            plt.grid(True, linestyle='--', alpha=0.7)
            plt.tight_layout()
            plt.savefig(os.path.join(models_dir, 'feature_importance_top20.png'), dpi=300)
            plt.close()
    except Exception as e:
        print(f"Could not generate feature importance plots: {e}")
    
    # Training histories plot (accuracy)
    plt.figure(figsize=(12, 6))
    
    for model_name in model_names:
        hist = results[model_name]['history']
        if hist and 'accuracy' in hist:
            plt.plot(hist['accuracy'], label=f'{model_name} train acc')
        if hist and 'val_accuracy' in hist:
            plt.plot(hist['val_accuracy'], label=f'{model_name} val acc', linestyle='--')

    plt.title("Training Accuracy Histories")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'training_accuracy_history.png'), dpi=300)
    plt.close()
    
    # Training histories plot (loss)
    plt.figure(figsize=(12, 6))
    
    for model_name in model_names:
        hist = results[model_name]['history']
        if hist and 'loss' in hist:
            plt.plot(hist['loss'], label=f'{model_name} train loss')
        if hist and 'val_loss' in hist:
            plt.plot(hist['val_loss'], label=f'{model_name} val loss', linestyle='--')

    plt.title("Training Loss Histories")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(models_dir, 'training_loss_history.png'), dpi=300)
    plt.close()

def compare_models(training_data, models_dir='models_03'):
    # Load test data
    (X_train_seq, X_val_seq, X_test_seq) = training_data['sequence']
    (y_train, y_val, y_test) = training_data['labels']

    n_classes = len(np.unique(y_test))
    y_test_cat = to_categorical(y_val, n_classes)

    # Get label map
    label_map = {}
    if 'label_encoder' in training_data:
        for idx, lbl in enumerate(training_data['label_encoder'].classes_):
            label_map[idx] = lbl
    else:
        # fallback
        unique_y = sorted(np.unique(y_test))
        for i, val in enumerate(unique_y):
            label_map[i] = f"Class{i}"

    results = {}
    model_names = ["lstm", "transformer", "hybrid"]

    for model_name in model_names:
        model_path = os.path.join(models_dir, f"{model_name}_model_best")
        hist_path = os.path.join(models_dir, f"{model_name}_history_best.json")

        if not os.path.exists(model_path):
            print(f"Model not found: {model_path}. Skipping {model_name}.")
            continue

        # Load model
        model = tf.keras.models.load_model(model_path)

        # Load history if available
        history_data = None
        if os.path.exists(hist_path):
            with open(hist_path, 'r') as f:
                history_data = json.load(f)

        # Evaluate
        X_test = X_test_seq
        y_test_eval = y_test_cat

        test_loss, test_acc = model.evaluate(X_test, y_test_eval, verbose=0)
        y_pred_probs = model.predict(X_test, verbose=0)
        y_pred_classes = np.argmax(y_pred_probs, axis=1)

        f1 = f1_score(y_test, y_pred_classes, average='weighted')
        conf_mat = confusion_matrix(y_test, y_pred_classes)

        results[model_name] = {
            'accuracy': test_acc,
            'f1_score': f1,
            'confusion_matrix': conf_mat,
            'predictions': y_pred_classes,
            'probabilities': y_pred_probs,
            'history': history_data
        }

        print(f"\n=== {model_name.upper()} MODEL ===")
        print(f"Test Accuracy: {test_acc:.4f}")
        print(f"F1 Score: {f1:.4f}")
        print("Confusion Matrix:")
        print(conf_mat)
        print("Classification Report:")
        print(classification_report(y_test, y_pred_classes, target_names=[label_map[i] for i in range(n_classes)]))

    # Generate visuals
    if results:
        create_comparison_visualizations(results, y_test, label_map, models_dir=models_dir)
    else:
        print("\nNo models found to compare (did you train them?).")

    return results

# Create output directory for complete model comparison
os.makedirs('models_comparison_all', exist_ok=True)

# Label map for visualization
label_map = {idx: label for idx, label in enumerate(unique_labels)}

# Prepare dataset for comparison
comparison_data = {
    'sequence': (X_train_np, X_val_np, X_test_np),
    'labels': (y_train, y_val, y_val),  # Use y_val as y_test
    'feature_columns': [f"feature_{i}" for i in range(X_train.shape[2])]
}



# First, evaluate each model in its own directory to ensure visualizations are created
print("\nEvaluating individual models:")
model_dirs = {
    'lstm': 'models_lstm',
    'transformer': 'models_tf',
    'hybrid': 'models_hybrid'
}

for model_name, model_dir in model_dirs.items():
    print(f"\nEvaluating {model_name} from {model_dir}...")
    try:
        compare_models(comparison_data, models_dir=model_dir)
    except Exception as e:
        print(f"Error evaluating {model_name}: {e}")

# Now try to combine all models in a single visualization
print("\nCreating combined model comparison...")
all_results = {}

# Try to load all models



=== Dataset Statistics ===
Total dataset size: 640 samples
Train set: 512 samples, Val/Test set: 128 samples
Feature dimension: 70

=== Class Distribution ===
Class		Total	Train	Val/Test
----------------------------------------
down_swipe        150 (23.4%)   120 (23.4%)    30 (23.4%)
left_swipe        164 (25.6%)   131 (25.6%)    33 (25.8%)
right_swipe       161 (25.2%)   129 (25.2%)    32 (25.0%)
up_swipe          165 (25.8%)   132 (25.8%)    33 (25.8%)

=== Model Training Starting ===
Training LSTM model...
=== Training LSTM Model ===
X_train shape: (512, 7, 70)
y_train shape: (512,)
One-hot y_train shape: (512, 4)
Number of classes: 4

Performing 10 training attempts...


LSTM Training:   0%|            | 0/10 [00:09<?, ?it/s, attempt=1, best_val=0.9766, test_acc=0.9766]INFO:tensorflow:Assets written to: models_lstm\lstm_model_best\assets
LSTM Training:  30%|█▏  | 3/10 [00:41<01:10, 10.08s/it, attempt=4, best_val=0.9844, test_acc=0.9844]INFO:tensorflow:Assets written to: models_lstm\lstm_model_best\assets
LSTM Training: 100%|██| 10/10 [01:44<00:00, 10.45s/it, attempt=10, best_val=0.9844, test_acc=0.9844]



Best model was from attempt 4
Best validation accuracy: 0.9844
Best model test accuracy: 0.9844

Training Transformer model...
=== Training Transformer Model ===

Performing 10 training attempts...


Transformer Training:   0%|     | 0/10 [00:03<?, ?it/s, attempt=1, best_val=0.9766, test_acc=0.9766]INFO:tensorflow:Assets written to: models_tf\transformer_model_best\assets
Transformer Training:  10%| | 1/10 [00:08<00:44,  4.94s/it, attempt=2, best_val=0.9844, test_acc=0.9INFO:tensorflow:Assets written to: models_tf\transformer_model_best\assets
Transformer Training: 100%|█| 10/10 [00:39<00:00,  3.93s/it, attempt=10, best_val=0.9688, test_acc=0



Best model was from attempt 2
Best validation accuracy: 0.9844
Best model test accuracy: 0.9844

Training Hybrid model...
=== Training Hybrid LSTM-Transformer Model ===

Performing 10 training attempts...


Hybrid Model Training:   0%|    | 0/10 [00:07<?, ?it/s, attempt=1, best_val=0.9844, test_acc=0.9844]INFO:tensorflow:Assets written to: models_hybrid\hybrid_model_best\assets
Hybrid Model Training: 100%|█| 10/10 [01:17<00:00,  7.79s/it, attempt=10, best_val=0.9844, test_acc=



Best model was from attempt 1
Best validation accuracy: 0.9844
Best model test accuracy: 0.9844

Evaluating individual models:

Evaluating lstm from models_lstm...

=== LSTM MODEL ===
Test Accuracy: 0.9844
F1 Score: 0.9844
Confusion Matrix:
[[30  0  0  0]
 [ 0 33  0  0]
 [ 0  0 32  0]
 [ 2  0  0 31]]
Classification Report:
              precision    recall  f1-score   support

      Class0       0.94      1.00      0.97        30
      Class1       1.00      1.00      1.00        33
      Class2       1.00      1.00      1.00        32
      Class3       1.00      0.94      0.97        33

    accuracy                           0.98       128
   macro avg       0.98      0.98      0.98       128
weighted avg       0.99      0.98      0.98       128

Model not found: models_lstm\transformer_model_best. Skipping transformer.
Model not found: models_lstm\hybrid_model_best. Skipping hybrid.

Evaluating transformer from models_tf...
Model not found: models_tf\lstm_model_best. Skipping lstm

In [18]:
y_test = y_val
for model_name, model_dir in model_dirs.items():
    model_path = os.path.join(model_dir, f"{model_name}_model_best")
    hist_path = os.path.join(model_dir, f"{model_name}_history_best.json")
    
    if not os.path.exists(model_path):
        print(f"Model not found: {model_path}. Skipping {model_name} in combined comparison.")
        continue
        
    try:
        # Load model
        model = tf.keras.models.load_model(model_path)
        
        # Load history
        history_data = None
        if os.path.exists(hist_path):
            with open(hist_path, 'r') as f:
                history_data = json.load(f)
        
        # Evaluate
        X_test = X_test_np
        y_test_eval = to_categorical(y_test, len(unique_labels))
        
        test_loss, test_acc = model.evaluate(X_test, y_test_eval, verbose=0)
        y_pred_probs = model.predict(X_test, verbose=0)
        y_pred_classes = np.argmax(y_pred_probs, axis=1)
        
        f1 = f1_score(y_test, y_pred_classes, average='weighted')
        conf_mat = confusion_matrix(y_test, y_pred_classes)
        
        all_results[model_name] = {
            'accuracy': test_acc,
            'f1_score': f1,
            'confusion_matrix': conf_mat,
            'predictions': y_pred_classes,
            'probabilities': y_pred_probs,
            'history': history_data
        }
        
        print(f"Successfully added {model_name} to combined comparison")
    except Exception as e:
        print(f"Error adding {model_name} to combined comparison: {e}")

# Create combined visualizations if we have any results
if all_results:
    create_comparison_visualizations(all_results, y_test, label_map, models_dir='models_comparison_all')
    print("Successfully created combined comparison visualizations in 'models_comparison_all' directory")
else:
    print("No models could be loaded for combined comparison")

print("\nTraining and evaluation complete!")

Successfully added lstm to combined comparison
Successfully added transformer to combined comparison
Successfully added hybrid to combined comparison
Successfully created combined comparison visualizations in 'models_comparison_all' directory

Training and evaluation complete!


In [19]:
# Add this after the dataset statistics and before model training

print("\n=== Visualizing Gesture Patterns ===")
os.makedirs('data_visualizations', exist_ok=True)

# 1. Visualize end positions of gestures by class
plt.figure(figsize=(12, 10))
markers = ['o', 's', '^', 'd']
colors = ['blue', 'green', 'red', 'purple']

# Create empty arrays to collect data points
end_positions_x = []
end_positions_y = []
end_positions_z = []
gesture_classes = []

# Extract end positions from original dataset
for i, sample in enumerate(X):
    # Last frame, wrist position relative to shoulder
    rel_x = sample[-1, X.shape[2]-54+X.shape[2]//3]  # Get rel_17_x from features
    rel_y = sample[-1, X.shape[2]-53+X.shape[2]//3]  # Get rel_17_y from features
    rel_z = sample[-1, X.shape[2]-52+X.shape[2]//3]  # Get rel_17_z from features
    
    end_positions_x.append(rel_x)
    end_positions_y.append(rel_y)
    end_positions_z.append(rel_z)
    gesture_classes.append(y[i])

# Convert to numpy arrays
end_positions_x = np.array(end_positions_x)
end_positions_y = np.array(end_positions_y)
end_positions_z = np.array(end_positions_z)
gesture_classes = np.array(gesture_classes)

# Plot 2D scatter plots with different projections
fig = plt.figure(figsize=(18, 6))

# X-Y Plane (Top view)
ax1 = fig.add_subplot(131)
for idx, label in enumerate(unique_labels):
    class_mask = gesture_classes == idx
    ax1.scatter(end_positions_x[class_mask], end_positions_y[class_mask], 
                c=colors[idx], marker=markers[idx], label=label, alpha=0.7)
ax1.set_title('End Positions - Top View (X-Y)')
ax1.set_xlabel('X (horizontal)')
ax1.set_ylabel('Y (vertical)')
ax1.grid(True, alpha=0.3)
ax1.legend()

# X-Z Plane (Side view)
ax2 = fig.add_subplot(132)
for idx, label in enumerate(unique_labels):
    class_mask = gesture_classes == idx
    ax2.scatter(end_positions_x[class_mask], end_positions_z[class_mask], 
                c=colors[idx], marker=markers[idx], label=label, alpha=0.7)
ax2.set_title('End Positions - Side View (X-Z)')
ax2.set_xlabel('X (horizontal)')
ax2.set_ylabel('Z (depth)')
ax2.grid(True, alpha=0.3)
ax2.legend()

# Y-Z Plane (Front view)
ax3 = fig.add_subplot(133)
for idx, label in enumerate(unique_labels):
    class_mask = gesture_classes == idx
    ax3.scatter(end_positions_z[class_mask], end_positions_y[class_mask], 
                c=colors[idx], marker=markers[idx], label=label, alpha=0.7)
ax3.set_title('End Positions - Front View (Z-Y)')
ax3.set_xlabel('Z (depth)')
ax3.set_ylabel('Y (vertical)')
ax3.grid(True, alpha=0.3)
ax3.legend()

plt.tight_layout()
plt.savefig('data_visualizations/end_positions.png', dpi=300)
plt.close()

# 2. Try a 3D visualization of end positions
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(111, projection='3d')

for idx, label in enumerate(unique_labels):
    class_mask = gesture_classes == idx
    ax.scatter(end_positions_x[class_mask], 
               end_positions_y[class_mask], 
               end_positions_z[class_mask],
               c=colors[idx], marker=markers[idx], label=label, alpha=0.7, s=50)

ax.set_title('3D End Positions of Gestures')
ax.set_xlabel('X (horizontal)')
ax.set_ylabel('Y (vertical)')
ax.set_zlabel('Z (depth)')
ax.legend()
plt.tight_layout()
plt.savefig('data_visualizations/end_positions_3d.png', dpi=300)
plt.close()

# 3. Feature importance visualization
if X.shape[2] > 2:
    # Use PCA to visualize data in lower dimension
    from sklearn.decomposition import PCA
    from sklearn.preprocessing import StandardScaler
    
    # Reshape data for PCA
    X_flat = X.reshape(X.shape[0], -1)  # Flatten the time and feature dimensions
    
    # Standardize the data
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_flat)
    
    # Apply PCA
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X_scaled)
    
    # Plot PCA results
    plt.figure(figsize=(10, 8))
    for idx, label in enumerate(unique_labels):
        class_mask = y == idx
        plt.scatter(X_pca[class_mask, 0], X_pca[class_mask, 1], 
                    c=colors[idx], marker=markers[idx], label=label, alpha=0.7)
    
    plt.title('PCA Visualization of Gesture Data')
    plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)')
    plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)')
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.savefig('data_visualizations/pca_visualization.png', dpi=300)
    plt.close()
    
    print(f"Total variance explained by first 2 PCs: {sum(pca.explained_variance_ratio_[:2]):.2%}")

print("Data visualizations completed. See 'data_visualizations' folder for results.")


=== Visualizing Gesture Patterns ===
Total variance explained by first 2 PCs: 27.62%
Data visualizations completed. See 'data_visualizations' folder for results.


<Figure size 1200x1000 with 0 Axes>

In [20]:
# Add this to your visualization section

print("\n=== Visualizing End Positions Relative to Torso ===")

# Create better visualizations showing end positions relative to torso
plt.figure(figsize=(16, 14))

# We'll create 4 visualizations:
# 1. A scatter plot for all gesture types
# 2-5. Individual plots for each gesture type

# Extract wrist to torso vectors at gesture endpoints
# We'll use wrist (joint 17) relative to torso (joint 1)
wrist_to_torso_x = []
wrist_to_torso_y = []
wrist_to_torso_z = []
gesture_types = []

for i, sample in enumerate(X):
    # Extract the final frame's relative positions
    # NOTE: If you implemented extract_directional_features function, use:
    # wx = sample[-1, sample.shape[1] + 0]  # wrist_end_x_rel_torso feature
    # wy = sample[-1, sample.shape[1] + 1]  # wrist_end_y_rel_torso feature
    # wz = sample[-1, sample.shape[1] + 2]  # wrist_end_z_rel_torso feature
    
    # If extract_directional_features not implemented, use direct calculation:
    # TORSO is joint 1, WRIST is joint 17
    wx = sample[-1, X.shape[2]-54]  # Using rel_17_x
    wy = sample[-1, X.shape[2]-53]  # Using rel_17_y
    wz = sample[-1, X.shape[2]-52]  # Using rel_17_z
    
    wrist_to_torso_x.append(wx)
    wrist_to_torso_y.append(wy)
    wrist_to_torso_z.append(wz)
    gesture_types.append(y[i])

# Convert to numpy arrays
wrist_to_torso_x = np.array(wrist_to_torso_x)
wrist_to_torso_y = np.array(wrist_to_torso_y)
wrist_to_torso_z = np.array(wrist_to_torso_z)
gesture_types = np.array(gesture_types)

# Create layout with 5 subplots: one main and 4 smaller ones
fig = plt.figure(figsize=(20, 16))
gs = fig.add_gridspec(2, 3, height_ratios=[2, 1])

# Main plot (all gestures together)
ax_main = fig.add_subplot(gs[0, :], projection='3d')
ax_main.set_title('All Gesture End Positions Relative to Torso', fontsize=14)

# Draw simplified torso reference
torso_size = 0.5
ax_main.plot([0, 0], [0, 0], [-torso_size, torso_size], 'k-', linewidth=2)  # Vertical line
ax_main.plot([-torso_size, torso_size], [0, 0], [0, 0], 'k-', linewidth=2)  # Horizontal line
ax_main.text(0, 0, 0, 'TORSO', fontsize=10, ha='center', va='center')

# Plot all gestures together
markers = ['o', 's', '^', 'd']
colors = ['blue', 'green', 'red', 'purple']

for idx, label_name in enumerate(unique_labels):
    mask = gesture_types == idx
    ax_main.scatter(
        wrist_to_torso_x[mask], 
        wrist_to_torso_z[mask],  # Putting Z as Y-axis for better visualization
        wrist_to_torso_y[mask],  # Putting Y as Z-axis
        c=colors[idx], marker=markers[idx], s=80, alpha=0.7, label=label_name
    )

ax_main.set_xlabel('X (Left/Right)', fontsize=12)
ax_main.set_ylabel('Z (Forward/Back)', fontsize=12)
ax_main.set_zlabel('Y (Up/Down)', fontsize=12)
ax_main.legend(fontsize=12)

# Individual plots for each gesture
for idx, label_name in enumerate(unique_labels):
    ax = fig.add_subplot(gs[1, idx % 3])
    mask = gesture_types == idx
    
    # 2D plot for this specific gesture (X-Y plane, top view)
    ax.scatter(wrist_to_torso_x[mask], wrist_to_torso_z[mask], 
              c=colors[idx], marker=markers[idx], s=60, alpha=0.7)
    
    # Draw reference point for torso
    ax.plot(0, 0, 'kx', markersize=10)
    ax.text(0, 0, 'TORSO', fontsize=10, ha='center', va='center')
    
    # Add direction arrows showing primary direction
    mean_x = np.mean(wrist_to_torso_x[mask])
    mean_z = np.mean(wrist_to_torso_z[mask])
    magnitude = np.sqrt(mean_x**2 + mean_z**2)
    if magnitude > 0:
        norm_x = mean_x / magnitude
        norm_z = mean_z / magnitude
        ax.arrow(0, 0, norm_x * 0.5, norm_z * 0.5, head_width=0.05, 
                head_length=0.1, fc=colors[idx], ec=colors[idx], linewidth=2)
    
    ax.set_title(f'{label_name} End Positions (Top View)', fontsize=12)
    ax.set_xlabel('X (Left/Right)')
    ax.set_ylabel('Z (Forward/Back)')
    ax.grid(True, alpha=0.3)
    
    # Make sure the aspect ratio is equal to see true spatial relationships
    ax.set_aspect('equal')
    
    # Set consistent axis limits across all plots
    max_range = max(np.max(np.abs(wrist_to_torso_x)), np.max(np.abs(wrist_to_torso_z))) * 1.2
    ax.set_xlim(-max_range, max_range)
    ax.set_ylim(-max_range, max_range)

plt.tight_layout()
plt.savefig('data_visualizations/end_positions_relative_to_torso.png', dpi=300)
plt.close()

# Create a side view (X-Y) plot to see vertical component
fig, ax = plt.subplots(1, 1, figsize=(12, 10))
ax.set_title('Gesture End Positions - Side View (X-Y)', fontsize=14)

# Draw reference torso
ax.plot(0, 0, 'kx', markersize=10)
ax.text(0, 0, 'TORSO', fontsize=10, ha='center', va='center')

# Plot all gestures
for idx, label_name in enumerate(unique_labels):
    mask = gesture_types == idx
    ax.scatter(wrist_to_torso_x[mask], wrist_to_torso_y[mask], 
              c=colors[idx], marker=markers[idx], s=60, alpha=0.7, label=label_name)
    
    # Add small ellipse to show distribution concentration
    if np.sum(mask) > 2:  # Need at least 3 points for an ellipse
        from matplotlib.patches import Ellipse
        import matplotlib.transforms as transforms
        
        mean_x = np.mean(wrist_to_torso_x[mask])
        mean_y = np.mean(wrist_to_torso_y[mask])
        
        # Calculate covariance and get eigen values/vectors for the ellipse
        cov = np.cov(wrist_to_torso_x[mask], wrist_to_torso_y[mask])
        if not np.any(np.isnan(cov)):
            lambda_, v = np.linalg.eig(cov)
            lambda_ = np.sqrt(lambda_)
            
            # Create ellipse at 2 standard deviations
            ellipse = Ellipse((0, 0), width=lambda_[0]*4, height=lambda_[1]*4,
                             angle=np.degrees(np.arctan2(v[1, 0], v[0, 0])),
                             facecolor=colors[idx], alpha=0.2)
            
            # Move ellipse to the right position
            transf = transforms.Affine2D().translate(mean_x, mean_y)
            ellipse.set_transform(transf + ax.transData)
            ax.add_patch(ellipse)

ax.set_xlabel('X (Left/Right)', fontsize=12)
ax.set_ylabel('Y (Up/Down)', fontsize=12)
ax.grid(True, alpha=0.3)
ax.legend(fontsize=12)
ax.set_aspect('equal')

# Set consistent axis limits
max_range = max(np.max(np.abs(wrist_to_torso_x)), np.max(np.abs(wrist_to_torso_y))) * 1.2
ax.set_xlim(-max_range, max_range)
ax.set_ylim(-max_range, max_range)

plt.tight_layout()
plt.savefig('data_visualizations/end_positions_side_view.png', dpi=300)
plt.close()

print("Torso-relative endpoint visualizations created.")


=== Visualizing End Positions Relative to Torso ===
Torso-relative endpoint visualizations created.


<Figure size 1600x1400 with 0 Axes>

In [21]:
# Add this to your visualization section

print("\n=== Creating Skeleton Visualizations ===")
os.makedirs('data_visualizations/skeletons', exist_ok=True)

# Define connections between keypoints in ZED's BODY_38 format
# This defines which joints are connected to form the skeleton
SKELETON_CONNECTIONS = [
    # Torso
    (0, 1),  # Neck to Torso 
    # Right arm
    (0, 13), (13, 15), (15, 17),  # Neck to R-Shoulder, R-Shoulder to R-Elbow, R-Elbow to R-Wrist
    # Left arm
    (0, 14), (14, 16), (16, 18),  # Neck to L-Shoulder, L-Shoulder to L-Elbow, L-Elbow to L-Wrist
    # Legs
    (1, 2), (2, 4), (4, 6),  # Torso to R-Hip, R-Hip to R-Knee, R-Knee to R-Ankle
    (1, 3), (3, 5), (5, 7),  # Torso to L-Hip, L-Hip to L-Knee, L-Knee to L-Ankle
]

def visualize_gesture_skeleton(sample_idx, gesture_name):
    """Visualize sample gesture showing skeleton at start, middle, and end frames"""
    
    sample = X[sample_idx]
    
    # Create figures for top view (X-Z) and side view (X-Y)
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    fig.suptitle(f'Gesture: {gesture_name} (Sample {sample_idx})', fontsize=16)
    
    # Column titles
    axes[0, 0].set_title('Start Frame')
    axes[0, 1].set_title('Middle Frame')
    axes[0, 2].set_title('End Frame')
    
    # Row titles
    axes[0, 0].set_ylabel('Top View (X-Z)')
    axes[1, 0].set_ylabel('Side View (X-Y)')
    
    # Select frames to display
    frames = [0, sample.shape[0]//2, sample.shape[0]-1]  # Start, middle, end
    
    # Collect wrist positions across all frames for trajectory
    wrist_x = []
    wrist_y = []
    wrist_z = []
    
    # Extract skeleton joint positions for all frames
    all_joints = []
    for frame_idx in range(sample.shape[0]):
        # Reconstruct joint positions from features
        # This depends on your feature structure, may need adjustment
        joints = {}
        for joint in range(38):  # Assuming BODY_38 format
            if f'kp{joint}_x' in feats.columns:
                # If absolute positions are available
                x = sample[frame_idx, feats.columns.get_loc(f'kp{joint}_x')]
                y = sample[frame_idx, feats.columns.get_loc(f'kp{joint}_y')]
                z = sample[frame_idx, feats.columns.get_loc(f'kp{joint}_z')]
            else:
                # If only relative positions to shoulder are available
                # Assume shoulder (joint 13) is at origin
                if joint == 13:  # Right shoulder
                    x, y, z = 0, 0, 0
                else:
                    # Try to find relative position
                    x_col = f'rel_{joint}_x'
                    if x_col in feats.columns:
                        idx = feats.columns.get_loc(x_col)
                        x = sample[frame_idx, idx]
                        y = sample[frame_idx, feats.columns.get_loc(f'rel_{joint}_y')]
                        z = sample[frame_idx, feats.columns.get_loc(f'rel_{joint}_z')]
                    else:
                        # Fall back to approximate positions
                        x, y, z = 0, 0, 0
            
            joints[joint] = (x, y, z)
            
            # Store wrist trajectory
            if joint == 17:  # Right wrist
                wrist_x.append(x)
                wrist_y.append(y)
                wrist_z.append(z)
        
        all_joints.append(joints)
    
    # Convert to numpy arrays
    wrist_x = np.array(wrist_x)
    wrist_y = np.array(wrist_y)
    wrist_z = np.array(wrist_z)
    
    # Loop through frames to display
    for i, frame_idx in enumerate(frames):
        joints = all_joints[frame_idx]
        
        # Extract coordinates for plotting
        x_coords = [joints[j][0] for j in range(38) if j in joints]
        y_coords = [joints[j][1] for j in range(38) if j in joints]
        z_coords = [joints[j][2] for j in range(38) if j in joints]
        
        # Top view (X-Z)
        ax_top = axes[0, i]
        ax_top.scatter(x_coords, z_coords, c='blue', s=30)
        
        # Draw skeleton connections
        for start, end in SKELETON_CONNECTIONS:
            if start in joints and end in joints:
                ax_top.plot([joints[start][0], joints[end][0]], 
                           [joints[start][2], joints[end][2]], 'k-', alpha=0.7)
        
        # Draw wrist trajectory
        ax_top.plot(wrist_x[:frame_idx+1], wrist_z[:frame_idx+1], 'r-', alpha=0.7, linewidth=2)
        
        # Side view (X-Y)
        ax_side = axes[1, i]
        ax_side.scatter(x_coords, y_coords, c='blue', s=30)
        
        # Draw skeleton connections
        for start, end in SKELETON_CONNECTIONS:
            if start in joints and end in joints:
                ax_side.plot([joints[start][0], joints[end][0]], 
                            [joints[start][1], joints[end][1]], 'k-', alpha=0.7)
        
        # Draw wrist trajectory
        ax_side.plot(wrist_x[:frame_idx+1], wrist_y[:frame_idx+1], 'r-', alpha=0.7, linewidth=2)
        
        # Set equal aspect ratio
        ax_top.set_aspect('equal')
        ax_side.set_aspect('equal')
        
        # Add labels
        ax_top.set_xlabel('X')
        ax_top.set_ylabel('Z')
        ax_side.set_xlabel('X')
        ax_side.set_ylabel('Y')
    
    # Adjust layout and save
    plt.tight_layout()
    plt.savefig(f'data_visualizations/skeletons/{gesture_name}_sample_{sample_idx}.png', dpi=300)
    plt.close()

# Sample a few examples from each gesture class
np.random.seed(42)  # For reproducibility
samples_per_class = 2

for idx, gesture_name in enumerate(unique_labels):
    class_indices = np.where(y == idx)[0]
    
    # Take random samples if there are enough
    if len(class_indices) >= samples_per_class:
        sample_indices = np.random.choice(class_indices, samples_per_class, replace=False)
    else:
        sample_indices = class_indices
    
    for sample_idx in sample_indices:
        try:
            visualize_gesture_skeleton(sample_idx, gesture_name)
            print(f"Created skeleton visualization for {gesture_name} (sample {sample_idx})")
        except Exception as e:
            print(f"Failed to create skeleton for {gesture_name} (sample {sample_idx}): {e}")

print("Skeleton visualizations completed.")


=== Creating Skeleton Visualizations ===
Created skeleton visualization for down_swipe (sample 563)
Created skeleton visualization for down_swipe (sample 508)
Created skeleton visualization for left_swipe (sample 320)
Created skeleton visualization for left_swipe (sample 181)
Created skeleton visualization for right_swipe (sample 132)
Created skeleton visualization for right_swipe (sample 30)
Created skeleton visualization for up_swipe (sample 432)
Created skeleton visualization for up_swipe (sample 339)
Skeleton visualizations completed.
