# Imports

In [None]:
import numpy as np
import pandas as pd
import json
import os
import random
import time
from glob import glob
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_recall_curve
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
print("All necessary libraries imported.")

In [None]:
SEED = 42
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
WINDOW_SIZE_FRAMES = 150
STEP_SIZE_FRAMES = 30
BATCH_SIZE = 128
INPUT_FEATURES = 73
HIDDEN_SIZE = 128
NUM_LAYERS = 2
NUM_CLASSES = 2
NUM_EPOCHS = 10
def set_seed(seed_value: int):
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    print(f"Global seed set to {seed_value}")
def load_and_label_files(data_root: str) -> pd.DataFrame:
    all_csv_files = glob(os.path.join(data_root, '**', '*.csv'), recursive=True)
    labeled_data = [{'path': file_path, 'label': 1 if 'Potential_shoplifter' in file_path else 0} for file_path in all_csv_files]
    df_files = pd.DataFrame(labeled_data)
    print(f"Found {len(df_files)} total CSV files.")
    if not df_files.empty:
        print("\nValue counts for labels:")
        print(df_files['label'].value_counts())
    return df_files
def parse_poses_from_string(poses_str: str) -> np.ndarray:
    try:
        return np.array(json.loads(poses_str)).reshape(18, 2)
    except (json.JSONDecodeError, ValueError):
        return np.zeros((18, 2))
def extract_features(csv_path: str) -> np.ndarray | None:
    df = pd.read_csv(csv_path)
    if df.empty or 'POSES' not in df.columns:
        return None
    raw_poses = np.array(df['POSES'].apply(parse_poses_from_string).tolist())
    neck_positions = raw_poses[:, 1:2, :]
    normalized_poses = raw_poses - neck_positions
    velocities = np.diff(raw_poses, axis=0, prepend=raw_poses[0:1])
    neck_trajectory = raw_poses[:, 1, :]
    deltas = np.diff(neck_trajectory, axis=0, prepend=[neck_trajectory[0]])
    orientation_angles_deg = np.degrees(np.arctan2(deltas[:, 1], deltas[:, 0]))
    return np.concatenate([
        normalized_poses.reshape(raw_poses.shape[0], -1),
        velocities.reshape(raw_poses.shape[0], -1),
        orientation_angles_deg.reshape(-1, 1)
    ], axis=1)
set_seed(SEED)
print(f"Using device: {device}")
data_root = '/kaggle/input/skeleton-poses-normalvsshoplifter/csvs_Skeleton_poses_normal_potential_shoplifter/'
df_files = load_and_label_files(data_root)
df_files.head()

In [None]:
def get_group_id(file_path: str) -> str:
    return os.path.basename(file_path).replace('.csv', '')[-23:]
def create_sliding_windows(sequences: list, labels: list, window_size: int, step_size: int):
    windowed_sequences, windowed_labels = [], []
    for i, seq in enumerate(sequences):
        if seq.shape[0] >= window_size:
            for start in range(0, seq.shape[0] - window_size + 1, step_size):
                windowed_sequences.append(seq[start:start + window_size])
                windowed_labels.append(labels[i])
    return np.array(windowed_sequences), np.array(windowed_labels)
print("Processing all CSV files into feature sequences")
all_sequences, all_labels, all_groups, all_paths = [], [], [], []
for _, row in tqdm(df_files.iterrows(), total=len(df_files), desc="Extracting Features"):
    features = extract_features(row['path'])
    if features is not None and len(features) > 0:
        all_sequences.append(features)
        all_labels.append(row['label'])
        all_groups.append(get_group_id(row['path']))
        all_paths.append(row['path'])
print("\nSplitting data by group to prevent leakage")
gss_train_temp = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)
train_group_indices, temp_group_indices = next(gss_train_temp.split(all_sequences, all_labels, all_groups))
train_sequences = [all_sequences[i] for i in train_group_indices]
train_labels = [all_labels[i] for i in train_group_indices]
temp_sequences = [all_sequences[i] for i in temp_group_indices]
temp_labels = [all_labels[i] for i in temp_group_indices]
temp_groups = np.array(all_groups)[temp_group_indices]
temp_paths = [all_paths[i] for i in temp_group_indices]
print("\nCreating datasets: augmented for training, and both windowed & full-length for val/test")
X_train_windowed, y_train_windowed = create_sliding_windows(train_sequences, train_labels, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES)
gss_val_test = GroupShuffleSplit(n_splits=1, test_size=0.5, random_state=SEED)
val_indices, test_indices = next(gss_val_test.split(temp_sequences, temp_labels, temp_groups))
X_val_long = [temp_sequences[i] for i in val_indices]
y_val_long = [temp_labels[i] for i in val_indices]
X_test_long = [temp_sequences[i] for i in test_indices]
y_test_long = [temp_labels[i] for i in test_indices]
test_paths_long = [temp_paths[i] for i in test_indices]
X_val_windowed, y_val_windowed = create_sliding_windows(X_val_long, y_val_long, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES)
X_test_windowed, y_test_windowed = create_sliding_windows(X_test_long, y_test_long, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES)
X_train_tensor = torch.from_numpy(X_train_windowed).float()
y_train_tensor = torch.from_numpy(y_train_windowed).long()
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
g = torch.Generator()
g.manual_seed(SEED)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, generator=g)
print("\nData Preparation Complete")
print(f"Total original videos processed: {len(all_sequences)}")
print(f"Training on {len(X_train_tensor)} augmented windows (from {len(train_sequences)} videos).")
print(f"Validating on {len(X_val_long)} full-length videos.")
print(f"Testing on {len(X_test_long)} full-length videos.")

In [None]:
class RealTimeClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_rate=0.5):
        super(RealTimeClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=False)
        self.dropout = nn.Dropout(dropout_rate)
        self.attention = nn.Linear(hidden_size, 1)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out, _ = self.lstm(x)
        attention_scores = self.attention(out)
        attention_weights = torch.softmax(attention_scores, dim=1)
        context_vector = torch.sum(attention_weights * out, dim=1)
        context_vector_dropped = self.dropout(context_vector)
        return self.fc(context_vector_dropped)
class FocalLoss(nn.Module):
    def __init__(self, alpha: torch.Tensor, gamma: float = 2.0, reduction: str = 'mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = (self.alpha[targets] * (1 - pt)**self.gamma * ce_loss)
        return focal_loss.mean() if self.reduction == 'mean' else focal_loss.sum()
def evaluate_and_report_on_test_set(
    model: nn.Module,
    test_sequences: list[np.ndarray],
    test_labels: list[int],
    device: torch.device
):
    model.eval()
    all_window_preds, all_window_labels = [], []
    video_suspicion_scores = []

    with torch.no_grad():
        for i, long_seq_np in enumerate(tqdm(test_sequences, desc="Evaluating Test Set")):
            y_true_video = test_labels[i]
            seq_windows_tensors = []
            if long_seq_np.shape[0] >= WINDOW_SIZE_FRAMES:
                for start in range(0, long_seq_np.shape[0] - WINDOW_SIZE_FRAMES + 1, STEP_SIZE_FRAMES):
                    window = long_seq_np[start:start + WINDOW_SIZE_FRAMES]
                    seq_windows_tensors.append(torch.from_numpy(window))
            
            if not seq_windows_tensors:
                video_suspicion_scores.append(0.0)
                continue

            windows_batch = torch.stack(seq_windows_tensors).float().to(device)
            outputs = model(windows_batch)
            
            _, predicted_windows = torch.max(outputs, 1)
            all_window_preds.extend(predicted_windows.cpu().numpy())
            all_window_labels.extend([y_true_video] * len(predicted_windows))
            
            probabilities = torch.softmax(outputs, dim=1)[:, 1]
            avg_suspicion_score = probabilities.mean().item()
            video_suspicion_scores.append(avg_suspicion_score)

    plot_report_and_matrix(all_window_labels, all_window_preds, "Test Set Evaluation (Per-Window)")
    simple_accuracy = evaluate_realistically(model, test_sequences, test_labels, device, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES)
    print(f"\n--- Test Set Evaluation (Simple Per-Video) ---")
    print(f"Accuracy (if any window is 1, predict 1): {simple_accuracy:.2f}%")

    y_true_video_labels = test_labels
    precisions, recalls, thresholds = precision_recall_curve(y_true_video_labels, video_suspicion_scores)
    f1_scores = np.nan_to_num(2 * (precisions * recalls) / (precisions + recalls))
    best_threshold = thresholds[np.argmax(f1_scores)]
    
    y_pred_thresholded = [1 if score >= best_threshold else 0 for score in video_suspicion_scores]
    plot_report_and_matrix(y_true_video_labels, y_pred_thresholded, f"Test Set Evaluation (Optimal Threshold: {best_threshold:.2f})")

def plot_report_and_matrix(y_true, y_pred, title):
    print(f"\n--- {title} ---")
    print(classification_report(y_true, y_pred, target_names=['Normal', 'Potential Shoplifter'], zero_division=0))
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=['Normal', 'Potential Shoplifter'], yticklabels=['Normal', 'Potential Shoplifter'])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(title)
    plt.show()

def evaluate_realistically(model, long_sequences, long_labels, device, window_size, step_size):
    model.eval()
    y_pred_final = []
    with torch.no_grad():
        for long_seq_np in long_sequences:
            seq_windows_tensors = []
            if long_seq_np.shape[0] >= window_size:
                for start in range(0, long_seq_np.shape[0] - window_size + 1, step_size):
                    seq_windows_tensors.append(torch.from_numpy(long_seq_np[start:start + window_size]))
            
            if not seq_windows_tensors:
                final_video_prediction = 0
            else:
                outputs = model(torch.stack(seq_windows_tensors).float().to(device))
                final_video_prediction = 1 if 1 in torch.max(outputs, 1)[1] else 0
            y_pred_final.append(final_video_prediction)
    return accuracy_score(long_labels, y_pred_final) * 100

print("Model architecture, loss functions, and evaluation helpers are defined.")

In [None]:
def run_online_training_trial(
    model: nn.Module,
    train_loader: DataLoader,
    val_windowed_loader: DataLoader,
    val_long_sequences: list[np.ndarray],
    val_long_labels: list[int],
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    num_epochs: int,
    model_save_path_window: str,
    model_save_path_realistic: str,
    device: torch.device
):
    best_val_acc_window = 0.0
    best_val_acc_realistic = 0.0

    history = {'train_loss': [], 'val_acc_window': [], 'val_acc_realistic': []}

    for epoch in range(num_epochs):
        start_time = time.time()
        
        model.train()
        train_loss = 0.0
        train_pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} | Training")
        for sequences, labels in train_pbar:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            train_pbar.set_postfix(loss=f"{loss.item():.4f}")
        
        avg_train_loss = train_loss / len(train_loader)
        history['train_loss'].append(avg_train_loss)

        model.eval()
        correct_window, total_window = 0, 0
        with torch.no_grad():
            for sequences, labels in val_windowed_loader:
                sequences, labels = sequences.to(device), labels.to(device)
                outputs = model(sequences)
                _, predicted = torch.max(outputs.data, 1)
                total_window += labels.size(0)
                correct_window += (predicted == labels).sum().item()
        epoch_acc_val_window = 100 * correct_window / total_window
        history['val_acc_window'].append(epoch_acc_val_window)

        epoch_acc_val_realistic = evaluate_realistically(
            model, val_long_sequences, val_long_labels, device, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES
        )
        history['val_acc_realistic'].append(epoch_acc_val_realistic)
        
        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_train_loss:.4f} | "
              f"Window Val Acc: {epoch_acc_val_window:.2f}% | "
              f"Realistic Val Acc: {epoch_acc_val_realistic:.2f}%")
        
        if epoch_acc_val_window > best_val_acc_window:
            best_val_acc_window = epoch_acc_val_window
            torch.save(model.state_dict(), model_save_path_window)
            print(f"---> New best WINDOW model saved (Val Acc: {best_val_acc_window:.2f}%)")
            
        if epoch_acc_val_realistic > best_val_acc_realistic:
            best_val_acc_realistic = epoch_acc_val_realistic
            torch.save(model.state_dict(), model_save_path_realistic)
            print(f"---> New best REALISTIC model saved (Val Acc: {best_val_acc_realistic:.2f}%)")
            
    print("\n--- Finished Training Trial ---")
    return history

print("Trial 1: Training with Standard CrossEntropy Loss")
set_seed(SEED)

ce_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)

ce_criterion = nn.CrossEntropyLoss()
ce_optimizer = optim.Adam(ce_model.parameters(), lr=0.001, weight_decay=1e-4)

ce_model_path_window = 'best_ce_model_window.pth'
ce_model_path_realistic = 'best_ce_model_realistic.pth'

ce_history = run_online_training_trial(
    model=ce_model,
    train_loader=train_loader,
    val_windowed_loader=val_windowed_loader,
    val_long_sequences=X_val_long,
    val_long_labels=y_val_long,
    criterion=ce_criterion,
    optimizer=ce_optimizer,
    num_epochs=NUM_EPOCHS,
    model_save_path_window=ce_model_path_window,
    model_save_path_realistic=ce_model_path_realistic,
    device=device
)

In [None]:
def plot_history(history: dict, trial_name: str):
    fig, ax1 = plt.subplots(figsize=(12, 6))

    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Training Loss', color='tab:red')
    ax1.plot(history['train_loss'], 'r-', label='Training Loss')
    ax1.tick_params(axis='y', labelcolor='tab:red')
    ax1.grid(True)

    ax2 = ax1.twinx()
    ax2.set_ylabel('Validation Accuracy (%)', color='tab:blue')
    ax2.plot(history['val_acc_window'], 'b--', label='Window Val Acc')
    ax2.plot(history['val_acc_realistic'], 'g-.', label='Realistic Val Acc')
    ax2.tick_params(axis='y', labelcolor='tab:blue')

    fig.tight_layout()
    fig.suptitle(f'{trial_name} - Training History', y=1.03)
    fig.legend(loc='upper right', bbox_to_anchor=(0.9, 0.9))
    plt.show()

print("Plotting Training History for Trial 1 (CrossEntropy)")
plot_history(ce_history, "CrossEntropy Loss")

print("\nLoading best model for final evaluation on the Test Set")
set_seed(SEED)
final_ce_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)
final_ce_model.load_state_dict(torch.load(ce_model_path_realistic))
print(f"Model loaded successfully from: {ce_model_path_realistic}")

evaluate_and_report_on_test_set(
    model=final_ce_model,
    test_sequences=X_test_long,
    test_labels=y_test_long,
    device=device
)

In [None]:
print("Loading best model optimized for perwindow performance")
set_seed(SEED)

window_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)
window_model.load_state_dict(torch.load(ce_model_path_window)) 
window_model.eval()

all_window_preds = []
_, y_test_windowed_labels, test_window_source_indices = create_sliding_windows_with_indices(
    X_test_long, y_test_long, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES
)

with torch.no_grad():
    for sequences, _ in tqdm(test_windowed_loader, desc="Getting Per-Window Predictions"):
        outputs = window_model(sequences.to(device))
        all_window_preds.extend(torch.max(outputs.data, 1)[1].cpu().numpy())

fn_video_indices = set()
fp_video_indices = set()

for i, (pred, true) in enumerate(zip(all_window_preds, y_test_windowed_labels)):
    if pred != true:
        source_idx = test_window_source_indices[i]
        if true == 1:
            fn_video_indices.add(source_idx)
        else:
            fp_video_indices.add(source_idx)

print(f"\nIdentified {len(fn_video_indices)} videos with Shoplifter misclassifications.")
print(f"Identified {len(fp_video_indices)} videos with Normal Pedestrian misclassifications.")

fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Fig 9. Samples from failure cases', fontsize=18, y=0.95)

fn_indices_list = list(fn_video_indices)
axes[0, 0].set_title('Shoplifter Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 0]):
    if i < len(fn_indices_list):
        idx = fn_indices_list[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False)

fp_indices_list = list(fp_video_indices)
axes[0, 1].set_title('Normal Pedestrian Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 1]):
    if i < len(fp_indices_list):
        idx = fp_indices_list[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False)

plt.tight_layout(rect=[0, 0.03, 1, 0.93])
plt.show()

In [None]:
print(" Trial 2: Training with Focal Loss ")
set_seed(SEED)

focal_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)

alpha_class_0 = 64 / total_videos
alpha_class_1 = 88 / total_videos
alpha_tensor = torch.tensor([alpha_class_0, alpha_class_1]).to(device)

print(f"Calculated alpha weights for Focal Loss: {alpha_tensor.cpu().numpy()}")

focal_criterion = FocalLoss(alpha=alpha_tensor, gamma=2.0)
focal_optimizer = optim.Adam(focal_model.parameters(), lr=0.001, weight_decay=1e-4)

focal_model_path_window = 'best_focal_model_window.pth'
focal_model_path_realistic = 'best_focal_model_realistic.pth'

focal_history = run_online_training_trial(
    model=focal_model,
    train_loader=train_loader,
    val_windowed_loader=val_windowed_loader,
    val_long_sequences=X_val_long,
    val_long_labels=y_val_long,
    criterion=focal_criterion,
    optimizer=focal_optimizer,
    num_epochs=NUM_EPOCHS,
    model_save_path_window=focal_model_path_window,
    model_save_path_realistic=focal_model_path_realistic,
    device=device
)

In [None]:
print(" Plotting Training History for Trial 2 (Focal Loss) ")
plot_history(focal_history, "Focal Loss")

print("\n--- Loading best Focal Loss model for final evaluation on the Test Set ---")
set_seed(SEED)
final_focal_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)
final_focal_model.load_state_dict(torch.load(focal_model_path_realistic))
print(f"Model loaded successfully from: {focal_model_path_realistic}")

evaluate_and_report_on_test_set(
    model=final_focal_model,
    test_sequences=X_test_long,
    test_labels=y_test_long,
    device=device
)
print("\nVisualizing Failure Cases for Trial 2 (Focal Loss)")
focal_test_predictions = get_model_predictions_for_test_set(final_focal_model, X_test_long, y_test_long, device)
focal_failure_indices = [i for i, (pred, true) in enumerate(zip(focal_test_predictions, y_test_long)) if pred != true]
fn_failures_focal = [i for i in focal_failure_indices if y_test_long[i] == 1]
fp_failures_focal = [i for i in focal_failure_indices if y_test_long[i] == 0]
print(f"Identified {len(focal_failure_indices)} total failure cases for the Focal Loss model.")
print(f" {len(fn_failures_focal)} False Negatives (Missed Shoplifters)")
print(f" {len(fp_failures_focal)} False Positives (Incorrectly Flagged Normals)")
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Fig 9. Samples from failure cases (Focal Loss Model)', fontsize=18, y=0.95)
fn_indices_list_focal = list(fn_failures_focal)
axes[0, 0].set_title('UroKyoro Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 0]):
    if i < len(fn_indices_list_focal):
        idx = fn_indices_list_focal[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False)
fp_indices_list_focal = list(fp_failures_focal)
axes[0, 1].set_title('Normal Pedestrian Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 1]):
    if i < len(fp_indices_list_focal):
        idx = fp_indices_list_focal[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False)
plt.tight_layout(rect=[0, 0.03, 1, 0.93])
plt.show()

# GCN

In [None]:
!pip install -q torch_geometric torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.1.0+cu121.html

print("PyTorch Geometric and its dependencies have been installed.")

In [None]:
import numpy as np
import pandas as pd
import json
import os
import random
import time
from glob import glob
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch_geometric.data import Data, Dataset
from torch_geometric.loader import DataLoader as PyGDataLoader

from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

import matplotlib.pyplot as plt
import seaborn as sns

print("--- Imports for Graph Convolutional Network (GCN) Approach ---")

# --- Configuration & Seeding ---
SEED = 42
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Model and training constants
BATCH_SIZE = 16
NUM_EPOCHS = 20
LEARNING_RATE = 0.001
# We will define other model-specific constants later

def set_seed(seed_value: int):
    """Sets the seed for reproducibility for all relevant libraries."""
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    print(f"Global seed set to {seed_value}")

# --- Execute ---
set_seed(SEED)
print(f"Using device: {device}")
print("All necessary libraries and configurations are set up.")

In [None]:
MAX_FRAMES = 300

edge_index = torch.tensor([
    # Torso
    [1, 2], [1, 5], [2, 8], [5, 11], [8, 11], [1, 0],
    # Left Arm
    [2, 3], [3, 4],
    # Right Arm
    [5, 6], [6, 7],
    # Left Leg
    [8, 9], [9, 10],
    # Right Leg
    [11, 12], [12, 13]
], dtype=torch.long).t().contiguous()

def load_and_label_files(data_root: str) -> pd.DataFrame:
    """Finds all CSVs, labels them based on the folder, and returns a DataFrame."""
    all_csv_files = glob(os.path.join(data_root, '**', '*.csv'), recursive=True)
    labeled_data = [{'path': file_path, 'label': 1 if 'Potential_shoplifter' in file_path else 0} for file_path in all_csv_files]
    df_files = pd.DataFrame(labeled_data)
    print(f"Found {len(df_files)} total CSV files.")
    return df_files

def parse_poses_from_string(poses_str: str) -> np.ndarray:
    """Helper to parse the 'POSES' string into a numpy array."""
    try:
        return np.array(json.loads(poses_str)).reshape(18, 2)
    except (json.JSONDecodeError, ValueError):
        return np.zeros((18, 2))


class SkeletonDataset(Dataset):
    """
    Custom PyTorch Geometric Dataset for loading skeleton action recognition data.
    Each video is represented as a single graph.
    - Nodes: The 18 skeleton joints.
    - Node Features: The flattened (x, y) coordinates of each joint across all frames.
    - Edges: The predefined bone connections.
    """
    def __init__(self, df: pd.DataFrame, max_frames: int):
        super().__init__()
        self.df = df
        self.max_frames = max_frames

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        csv_path = row['path']
        label = row['label']

        # Load and parse pose data from the CSV
        df_poses = pd.read_csv(csv_path)
        raw_poses = np.array(df_poses['POSES'].apply(parse_poses_from_string).tolist()) # Shape: (n_frames, 18, 2)

        # --- Temporal Padding/Truncating ---
        n_frames = raw_poses.shape[0]
        if n_frames < self.max_frames:
            # Pad with zeros if the sequence is too short
            padding = np.zeros((self.max_frames - n_frames, 18, 2))
            processed_poses = np.concatenate([raw_poses, padding], axis=0)
        else:
            # Truncate if the sequence is too long
            processed_poses = raw_poses[:self.max_frames, :, :]

        # --- Feature Engineering ---
        # The feature for each node (joint) is its entire trajectory.
        # Reshape to (num_nodes, features_per_node) where features are (max_frames * 2)
        node_features = torch.tensor(processed_poses.transpose(1, 0, 2).reshape(18, -1), dtype=torch.float)

        # Create the graph data object
        data = Data(
            x=node_features,         # Node features: Shape [18, max_frames * 2]
            edge_index=edge_index,   # Graph connectivity
            y=torch.tensor([label], dtype=torch.long) # Label
        )
        return data

data_root = '/kaggle/input/skeleton-poses-normalvsshoplifter/csvs_Skeleton_poses_normal_potential_shoplifter/'
df_files = load_and_label_files(data_root)
full_dataset = SkeletonDataset(df=df_files, max_frames=MAX_FRAMES)

print(f"\nSuccessfully created a dataset with {len(full_dataset)} graphs.")
print("\nExample of a single graph data object from the dataset:")
print(full_dataset[0])

In [None]:
from torch.utils.data import Subset
def get_group_id(file_path: str) -> str:
    return os.path.basename(file_path).replace('.csv', '')[-23:]
groups = [get_group_id(path) for path in df_files['path']]
labels = df_files['label'].values
indices = np.arange(len(full_dataset))
gss_train_temp = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)
train_indices, temp_indices = next(gss_train_temp.split(indices, labels, groups))
gss_val_test = GroupShuffleSplit(n_splits=1, test_size=0.5, random_state=SEED)
val_indices, test_indices = next(gss_val_test.split(
    indices[temp_indices],
    labels[temp_indices],
    np.array(groups)[temp_indices]
))
final_val_indices = temp_indices[val_indices]
final_test_indices = temp_indices[test_indices]
train_dataset = Subset(full_dataset, train_indices)
val_dataset = Subset(full_dataset, final_val_indices)
test_dataset = Subset(full_dataset, final_test_indices)
train_loader = PyGDataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = PyGDataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = PyGDataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
print("--- Data Splitting and DataLoader Creation Complete ---")
print(f"Total graphs: {len(full_dataset)}")
print(f"Training graphs: {len(train_dataset)}")
print(f"Validation graphs: {len(val_dataset)}")
print(f"Testing graphs: {len(test_dataset)}")
print(f"\nCreated PyG DataLoaders with batch size {BATCH_SIZE}.")

In [None]:
from torch_geometric.nn import GCNConv, global_mean_pool
class SkeletonGCN(nn.Module):
    def __init__(self, in_channels: int, hidden_channels: int, out_channels: int):
        super().__init__()
        torch.manual_seed(SEED)
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.classifier = nn.Sequential(
            nn.Linear(hidden_channels, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, out_channels)
        )

    def forward(self, data: Data) -> torch.Tensor:
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv3(x, edge_index)
        x = F.relu(x)
        x_pooled = global_mean_pool(x, batch)
        out = self.classifier(x_pooled)
        return out
model = SkeletonGCN(
    in_channels=MAX_FRAMES * 2,
    hidden_channels=128,
    out_channels=2
).to(device)
print("--- GCN Model Architecture ---")
print(model)

In [None]:
def train_one_epoch_gcn(
    model: nn.Module,
    loader: PyGDataLoader,
    optimizer: optim.Optimizer,
    criterion: nn.Module
) -> float:
    model.train()
    total_loss = 0
    for data in tqdm(loader, desc="Training"):
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(loader.dataset)
def evaluate_gcn(
    model: nn.Module,
    loader: PyGDataLoader
) -> tuple[float, list, list]:
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for data in tqdm(loader, desc="Evaluating"):
            data = data.to(device)
            out = model(data)
            pred = out.argmax(dim=1)
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(data.y.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds) * 100
    return accuracy, all_preds, all_labels
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
print("Training and evaluation functions are defined.")
print("Criterion: CrossEntropyLoss")
print("Optimizer: Adam")

In [None]:
best_val_accuracy = 0.0
model_save_path = 'best_gcn_model.pth'
history = {'train_loss': [], 'val_acc': []}
print("--- Starting GCN Model Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    start_time = time.time()
    train_loss = train_one_epoch_gcn(model, train_loader, optimizer, criterion)
    val_accuracy, _, _ = evaluate_gcn(model, val_loader)
    end_time = time.time()
    epoch_duration = end_time - start_time
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_accuracy)
    print(f"Epoch: {epoch:02d} | "
          f"Train Loss: {train_loss:.4f} | "
          f"Val Acc: {val_accuracy:.2f}% | "
          f"Duration: {epoch_duration:.2f}s")
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), model_save_path)
        print(f"==> New best model saved with Val Acc: {best_val_accuracy:.2f}%")
print("\n--- Finished Training ---")
print(f"The best model was saved to '{model_save_path}' with a validation accuracy of {best_val_accuracy:.2f}%.")

In [None]:
def plot_gcn_history(history: dict):
    fig, ax1 = plt.subplots(figsize=(12, 6))

    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Training Loss', color='tab:red')
    ax1.plot(history['train_loss'], 'r-', label='Training Loss')
    ax1.tick_params(axis='y', labelcolor='tab:red')
    ax1.grid(True)

    ax2 = ax1.twinx()
    ax2.set_ylabel('Validation Accuracy (%)', color='tab:blue')
    ax2.plot(history['val_acc'], 'b--', label='Validation Acc')
    ax2.tick_params(axis='y', labelcolor='tab:blue')
    ax2.set_ylim(0, 105)

    fig.tight_layout()
    fig.suptitle('GCN Model - Training History', y=1.03)
    fig.legend(loc='upper right', bbox_to_anchor=(0.9, 0.9))
    plt.show()
print("--- Plotting GCN Training History ---")
plot_gcn_history(history)
print("\n--- Loading best model for final evaluation on the Test Set ---")
final_model = SkeletonGCN(
    in_channels=MAX_FRAMES * 2,
    hidden_channels=128,
    out_channels=2
).to(device)
final_model.load_state_dict(torch.load(model_save_path))
print(f"Model loaded successfully from: {model_save_path}")
test_accuracy, test_preds, test_labels = evaluate_gcn(final_model, test_loader)
print(f"\nFinal Test Accuracy: {test_accuracy:.2f}%")
plot_report_and_matrix(
    y_true=test_labels,
    y_pred=test_preds,
    title="GCN Final Test Set Performance"
)

In [None]:
import numpy as np
import pandas as pd
import json
import os
import random
import time
from glob import glob

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_recall_curve

import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

print("All necessary libraries imported.")

In [None]:
# --- Configuration & Seeding ---
SEED = 42
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Constants for the model and data processing
WINDOW_SIZE_FRAMES = 150
STEP_SIZE_FRAMES = 30
BATCH_SIZE = 128
INPUT_FEATURES = 73  # 36 (positions) + 36 (velocities) + 1 (orientation)
HIDDEN_SIZE = 128
NUM_LAYERS = 2
NUM_CLASSES = 2
NUM_EPOCHS = 10 # We'll use 10 epochs for a balance of training and speed

def set_seed(seed_value: int):
    """Sets the seed for reproducibility for all relevant libraries."""
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    print(f"Global seed set to {seed_value}")

# --- Data Handling Functions ---
def load_and_label_files(data_root: str) -> pd.DataFrame:
    """Finds all CSVs, labels them, and returns a DataFrame."""
    all_csv_files = glob(os.path.join(data_root, '**', '*.csv'), recursive=True)
    labeled_data = [{'path': file_path, 'label': 1 if 'Potential_shoplifter' in file_path else 0} for file_path in all_csv_files]
    df_files = pd.DataFrame(labeled_data)
    print(f"Found {len(df_files)} total CSV files.")
    if not df_files.empty:
        print("\nValue counts for labels:")
        print(df_files['label'].value_counts())
    return df_files

def parse_poses_from_string(poses_str: str) -> np.ndarray:
    """Helper function to parse the 'POSES' string into a numpy array."""
    try:
        return np.array(json.loads(poses_str)).reshape(18, 2)
    except (json.JSONDecodeError, ValueError):
        return np.zeros((18, 2))

def extract_features(csv_path: str) -> np.ndarray | None:
    """Reads a CSV and extracts a sequence of feature vectors."""
    df = pd.read_csv(csv_path)
    if df.empty or 'POSES' not in df.columns:
        return None

    raw_poses = np.array(df['POSES'].apply(parse_poses_from_string).tolist())
    
    # Normalized Positions
    neck_positions = raw_poses[:, 1:2, :]
    normalized_poses = raw_poses - neck_positions
    
    # Velocities
    velocities = np.diff(raw_poses, axis=0, prepend=raw_poses[0:1])
    
    # Orientation Angle
    neck_trajectory = raw_poses[:, 1, :]
    deltas = np.diff(neck_trajectory, axis=0, prepend=[neck_trajectory[0]])
    orientation_angles_deg = np.degrees(np.arctan2(deltas[:, 1], deltas[:, 0]))
    
    # Combine Features
    return np.concatenate([
        normalized_poses.reshape(raw_poses.shape[0], -1),
        velocities.reshape(raw_poses.shape[0], -1),
        orientation_angles_deg.reshape(-1, 1)
    ], axis=1)

# --- Execute ---
set_seed(SEED)
print(f"Using device: {device}")

# Define the Kaggle data root directory
data_root = '/kaggle/input/skeleton-poses-normalvsshoplifter/csvs_Skeleton_poses_normal_potential_shoplifter/'

# Load file paths and create labels
df_files = load_and_label_files(data_root)
df_files.head()

In [None]:
def get_group_id(file_path: str) -> str:
    """Extracts a unique group identifier from the file path."""
    return os.path.basename(file_path).replace('.csv', '')[-23:]

def create_sliding_windows(sequences: list, labels: list, window_size: int, step_size: int):
    """Creates overlapping windows from full sequences."""
    windowed_sequences, windowed_labels = [], []
    for i, seq in enumerate(sequences):
        if seq.shape[0] >= window_size:
            for start in range(0, seq.shape[0] - window_size + 1, step_size):
                windowed_sequences.append(seq[start:start + window_size])
                windowed_labels.append(labels[i])
    return np.array(windowed_sequences), np.array(windowed_labels)

# --- Step 1: Process all files into memory ---
print("--- Processing all CSV files into feature sequences ---")
all_sequences, all_labels, all_groups, all_paths = [], [], [], []
for _, row in tqdm(df_files.iterrows(), total=len(df_files), desc="Extracting Features"):
    features = extract_features(row['path'])
    if features is not None and len(features) > 0:
        all_sequences.append(features)
        all_labels.append(row['label'])
        all_groups.append(get_group_id(row['path']))
        all_paths.append(row['path']) # <-- ADDED: Keep track of paths

# --- Step 2: Split groups into Training and Temp (Val + Test) sets ---
print("\n--- Splitting data by group to prevent leakage ---")
gss_train_temp = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)
train_group_indices, temp_group_indices = next(gss_train_temp.split(all_sequences, all_labels, all_groups))

train_sequences = [all_sequences[i] for i in train_group_indices]
train_labels = [all_labels[i] for i in train_group_indices]

temp_sequences = [all_sequences[i] for i in temp_group_indices]
temp_labels = [all_labels[i] for i in temp_group_indices]
temp_groups = np.array(all_groups)[temp_group_indices]
temp_paths = [all_paths[i] for i in temp_group_indices] # <-- ADDED: Track paths for temp set

# --- Step 3: Create all training, validation, and test datasets ---
print("\n--- Creating datasets: augmented for training, and both windowed & full-length for val/test ---")
X_train_windowed, y_train_windowed = create_sliding_windows(
    train_sequences, train_labels, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES
)

gss_val_test = GroupShuffleSplit(n_splits=1, test_size=0.5, random_state=SEED)
val_indices, test_indices = next(gss_val_test.split(temp_sequences, temp_labels, temp_groups))

# Create full-length validation and test sets
X_val_long = [temp_sequences[i] for i in val_indices]
y_val_long = [temp_labels[i] for i in val_indices]
X_test_long = [temp_sequences[i] for i in test_indices]
y_test_long = [temp_labels[i] for i in test_indices]
# Keep track of the final test paths
test_paths_long = [temp_paths[i] for i in test_indices] # <-- ADDED: Final test paths

# Create the windowed versions for validation and test
X_val_windowed, y_val_windowed = create_sliding_windows(X_val_long, y_val_long, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES)
X_test_windowed, y_test_windowed = create_sliding_windows(X_test_long, y_test_long, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES)

# --- Step 4: Create PyTorch DataLoaders ---
X_train_tensor = torch.from_numpy(X_train_windowed).float()
y_train_tensor = torch.from_numpy(y_train_windowed).long()
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
g = torch.Generator()
g.manual_seed(SEED)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, generator=g)

# --- Final Summary ---
print("\n--- Data Preparation Complete ---")
print(f"Total original videos processed: {len(all_sequences)}")
print(f"Training on {len(X_train_tensor)} augmented windows (from {len(train_sequences)} videos).")
print(f"Validating on {len(X_val_long)} full-length videos.")
print(f"Testing on {len(X_test_long)} full-length videos.")

In [None]:
# --- Model Architecture ---
class RealTimeClassifier(nn.Module):
    """LSTM-based classifier with an Attention mechanism."""
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_rate=0.5):
        super(RealTimeClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=False)
        self.dropout = nn.Dropout(dropout_rate)
        self.attention = nn.Linear(hidden_size, 1)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass of the model."""
        out, _ = self.lstm(x)
        attention_scores = self.attention(out)
        attention_weights = torch.softmax(attention_scores, dim=1)
        context_vector = torch.sum(attention_weights * out, dim=1)
        context_vector_dropped = self.dropout(context_vector)
        return self.fc(context_vector_dropped)

# --- Loss Function for Imbalanced Data ---
class FocalLoss(nn.Module):
    """Focal Loss function to address class imbalance."""
    def __init__(self, alpha: torch.Tensor, gamma: float = 2.0, reduction: str = 'mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        """Calculates the focal loss."""
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = (self.alpha[targets] * (1 - pt)**self.gamma * ce_loss)
        return focal_loss.mean() if self.reduction == 'mean' else focal_loss.sum()

# --- Evaluation & Reporting Helper Functions ---
def evaluate_and_report_on_test_set(
    model: nn.Module,
    test_sequences: list[np.ndarray],
    test_labels: list[int],
    device: torch.device
):
    """
    Performs the final evaluation on the test set using three methods:
    1. Per-window classification.
    2. Per-video classification (simple aggregation).
    3. Per-video classification with an optimized suspicion threshold.
    """
    model.eval()
    all_window_preds, all_window_labels = [], []
    video_suspicion_scores = []

    with torch.no_grad():
        for i, long_seq_np in enumerate(tqdm(test_sequences, desc="Evaluating Test Set")):
            y_true_video = test_labels[i]
            seq_windows_tensors = []
            
            # Create windows for the current video
            if long_seq_np.shape[0] >= WINDOW_SIZE_FRAMES:
                for start in range(0, long_seq_np.shape[0] - WINDOW_SIZE_FRAMES + 1, STEP_SIZE_FRAMES):
                    window = long_seq_np[start:start + WINDOW_SIZE_FRAMES]
                    seq_windows_tensors.append(torch.from_numpy(window))
            
            if not seq_windows_tensors:
                video_suspicion_scores.append(0.0)
                continue

            windows_batch = torch.stack(seq_windows_tensors).float().to(device)
            outputs = model(windows_batch)
            
            # For per-window report
            _, predicted_windows = torch.max(outputs, 1)
            all_window_preds.extend(predicted_windows.cpu().numpy())
            all_window_labels.extend([y_true_video] * len(predicted_windows))
            
            # For suspicion score thresholding
            probabilities = torch.softmax(outputs, dim=1)[:, 1] # Probability of 'Shoplifter' class
            avg_suspicion_score = probabilities.mean().item()
            video_suspicion_scores.append(avg_suspicion_score)

    # --- Method 1: Per-Window Report ---
    plot_report_and_matrix(all_window_labels, all_window_preds, "Test Set Evaluation (Per-Window)")
    
    # --- Method 2: Simple Per-Video Report (Flag if any window is positive) ---
    # This logic is inside `evaluate_realistically`, so we call it here for consistency
    simple_accuracy = evaluate_realistically(model, test_sequences, test_labels, device, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES)
    print(f"\n--- Test Set Evaluation (Simple Per-Video) ---")
    print(f"Accuracy (if any window is 1, predict 1): {simple_accuracy:.2f}%")

    # --- Method 3: Smart Thresholding Report ---
    y_true_video_labels = test_labels
    precisions, recalls, thresholds = precision_recall_curve(y_true_video_labels, video_suspicion_scores)
    # Use np.nan_to_num to avoid division by zero warnings
    f1_scores = np.nan_to_num(2 * (precisions * recalls) / (precisions + recalls))
    best_threshold = thresholds[np.argmax(f1_scores)]
    
    y_pred_thresholded = [1 if score >= best_threshold else 0 for score in video_suspicion_scores]
    plot_report_and_matrix(y_true_video_labels, y_pred_thresholded, f"Test Set Evaluation (Optimal Threshold: {best_threshold:.2f})")

def plot_report_and_matrix(y_true, y_pred, title):
    """Generates and prints a classification report and plots a confusion matrix."""
    print(f"\n--- {title} ---")
    print(classification_report(y_true, y_pred, target_names=['Normal', 'Potential Shoplifter'], zero_division=0))
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=['Normal', 'Potential Shoplifter'], yticklabels=['Normal', 'Potential Shoplifter'])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(title)
    plt.show()

def evaluate_realistically(model, long_sequences, long_labels, device, window_size, step_size):
    """Evaluates accuracy on full videos using simple aggregation."""
    model.eval()
    y_pred_final = []
    with torch.no_grad():
        for long_seq_np in long_sequences:
            seq_windows_tensors = []
            if long_seq_np.shape[0] >= window_size:
                for start in range(0, long_seq_np.shape[0] - window_size + 1, step_size):
                    seq_windows_tensors.append(torch.from_numpy(long_seq_np[start:start + window_size]))
            
            if not seq_windows_tensors:
                final_video_prediction = 0
            else:
                outputs = model(torch.stack(seq_windows_tensors).float().to(device))
                final_video_prediction = 1 if 1 in torch.max(outputs, 1)[1] else 0
            y_pred_final.append(final_video_prediction)
    return accuracy_score(long_labels, y_pred_final) * 100

print("Model architecture, loss functions, and evaluation helpers are defined.")

In [None]:
def run_online_training_trial(
    model: nn.Module,
    train_loader: DataLoader,
    val_windowed_loader: DataLoader,
    val_long_sequences: list[np.ndarray],
    val_long_labels: list[int],
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    num_epochs: int,
    model_save_path_window: str,
    model_save_path_realistic: str,
    device: torch.device
):
    """
    Executes a full training trial for the online approach.
    It validates using two methods and saves the best model for each.
    """
    best_val_acc_window = 0.0
    best_val_acc_realistic = 0.0

    # History for plotting
    history = {'train_loss': [], 'val_acc_window': [], 'val_acc_realistic': []}

    for epoch in range(num_epochs):
        start_time = time.time()
        
        # --- Training Phase ---
        model.train()
        train_loss = 0.0
        train_pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} | Training")
        for sequences, labels in train_pbar:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            train_pbar.set_postfix(loss=f"{loss.item():.4f}")
        
        avg_train_loss = train_loss / len(train_loader)
        history['train_loss'].append(avg_train_loss)

        # --- Validation Phase ---
        model.eval()
        # Method 1: Per-window accuracy
        correct_window, total_window = 0, 0
        with torch.no_grad():
            for sequences, labels in val_windowed_loader:
                sequences, labels = sequences.to(device), labels.to(device)
                outputs = model(sequences)
                _, predicted = torch.max(outputs.data, 1)
                total_window += labels.size(0)
                correct_window += (predicted == labels).sum().item()
        epoch_acc_val_window = 100 * correct_window / total_window
        history['val_acc_window'].append(epoch_acc_val_window)

        # Method 2: Realistic per-video accuracy
        epoch_acc_val_realistic = evaluate_realistically(
            model, val_long_sequences, val_long_labels, device, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES
        )
        history['val_acc_realistic'].append(epoch_acc_val_realistic)
        
        # --- Epoch Summary ---
        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_train_loss:.4f} | "
              f"Window Val Acc: {epoch_acc_val_window:.2f}% | "
              f"Realistic Val Acc: {epoch_acc_val_realistic:.2f}%")
        
        # Save best model based on PER-WINDOW accuracy
        if epoch_acc_val_window > best_val_acc_window:
            best_val_acc_window = epoch_acc_val_window
            torch.save(model.state_dict(), model_save_path_window)
            print(f"---> New best WINDOW model saved (Val Acc: {best_val_acc_window:.2f}%)")
            
        # Save best model based on REALISTIC (per-video) accuracy
        if epoch_acc_val_realistic > best_val_acc_realistic:
            best_val_acc_realistic = epoch_acc_val_realistic
            torch.save(model.state_dict(), model_save_path_realistic)
            print(f"---> New best REALISTIC model saved (Val Acc: {best_val_acc_realistic:.2f}%)")
            
    print("\n--- Finished Training Trial ---")
    return history

# --- Main Script for this Cell ---
print("Trial 1: Training with Standard CrossEntropy Loss")
set_seed(SEED)

# Instantiate a fresh model
ce_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)

# Define criterion and optimizer
ce_criterion = nn.CrossEntropyLoss()
ce_optimizer = optim.Adam(ce_model.parameters(), lr=0.001, weight_decay=1e-4)

# Define paths for the two best models we'll save
ce_model_path_window = 'best_ce_model_window.pth'
ce_model_path_realistic = 'best_ce_model_realistic.pth'

# Run the training
ce_history = run_online_training_trial(
    model=ce_model,
    train_loader=train_loader,
    val_windowed_loader=val_windowed_loader,
    val_long_sequences=X_val_long,
    val_long_labels=y_val_long,
    criterion=ce_criterion,
    optimizer=ce_optimizer,
    num_epochs=NUM_EPOCHS,
    model_save_path_window=ce_model_path_window,
    model_save_path_realistic=ce_model_path_realistic,
    device=device
)

In [None]:
def plot_history(history: dict, trial_name: str):
    fig, ax1 = plt.subplots(figsize=(12, 6))

    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Training Loss', color='tab:red')
    ax1.plot(history['train_loss'], 'r-', label='Training Loss')
    ax1.tick_params(axis='y', labelcolor='tab:red')
    ax1.grid(True)

    ax2 = ax1.twinx()
    ax2.set_ylabel('Validation Accuracy (%)', color='tab:blue')
    ax2.plot(history['val_acc_window'], 'b--', label='Window Val Acc')
    ax2.plot(history['val_acc_realistic'], 'g-.', label='Realistic Val Acc')
    ax2.tick_params(axis='y', labelcolor='tab:blue')

    fig.tight_layout()
    fig.suptitle(f'{trial_name} - Training History', y=1.03)
    fig.legend(loc='upper right', bbox_to_anchor=(0.9, 0.9))
    plt.show()

# --- Main Script for this Cell ---

# 1. Plot the training history from the Cross-Entropy trial
print("Plotting Training History for Trial 1 (CrossEntropy)")
plot_history(ce_history, "CrossEntropy Loss")

# 2. Load the best-performing model based on REALISTIC validation accuracy
print("\nLoading best model for final evaluation on the Test Set")
set_seed(SEED) # Reset seed for model instantiation consistency
final_ce_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)
# Load the state dictionary saved from the validation loop
final_ce_model.load_state_dict(torch.load(ce_model_path_realistic))
print(f"Model loaded successfully from: {ce_model_path_realistic}")

# 3. Perform comprehensive evaluation on the TEST set
evaluate_and_report_on_test_set(
    model=final_ce_model,
    test_sequences=X_test_long,
    test_labels=y_test_long,
    device=device
)

In [None]:
# --- Main Script for this Cell ---
print("Loading best model optimized for perwindow performance")
set_seed(SEED)

# Instantiate a fresh model
window_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)

# Load the state dictionary saved from the validation loop
window_model.load_state_dict(torch.load(ce_model_path_window)) 
window_model.eval()

# 2. Get all per-window predictions to identify videos with failures
all_window_preds = []
_, y_test_windowed_labels, test_window_source_indices = create_sliding_windows_with_indices(
    X_test_long, y_test_long, WINDOW_SIZE_FRAMES, STEP_SIZE_FRAMES
)

with torch.no_grad():
    for sequences, _ in tqdm(test_windowed_loader, desc="Getting Per-Window Predictions"):
        outputs = window_model(sequences.to(device))
        all_window_preds.extend(torch.max(outputs.data, 1)[1].cpu().numpy())

# 3. Identify the source video indices that contain failures
fn_video_indices = set() # False Negatives (True Shoplifter, Predicted Normal)
fp_video_indices = set() # False Positives (True Normal, Predicted Shoplifter)

for i, (pred, true) in enumerate(zip(all_window_preds, y_test_windowed_labels)):
    if pred != true:
        source_idx = test_window_source_indices[i]
        if true == 1:
            fn_video_indices.add(source_idx)
        else:
            fp_video_indices.add(source_idx)

print(f"\nIdentified {len(fn_video_indices)} videos with Shoplifter misclassifications.")
print(f"Identified {len(fp_video_indices)} videos with Normal Pedestrian misclassifications.")

# 4. Plot trajectories, mimicking the paper's layout
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Fig 9. Samples from failure cases', fontsize=18, y=0.95)

# Left Column: UroKyoro Failure Cases (False Negatives)
fn_indices_list = list(fn_video_indices)
axes[0, 0].set_title('Shoplifter Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 0]):
    if i < len(fn_indices_list):
        idx = fn_indices_list[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False) # Hide unused subplots

# Right Column: Normal Pedestrian Failure Cases (False Positives)
fp_indices_list = list(fp_video_indices)
axes[0, 1].set_title('Normal Pedestrian Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 1]):
    if i < len(fp_indices_list):
        idx = fp_indices_list[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False)

plt.tight_layout(rect=[0, 0.03, 1, 0.93])
plt.show()

In [None]:
# --- Main Script for this Cell ---
print(" Trial 2: Training with Focal Loss ")
set_seed(SEED)

# Instantiate a fresh model for this new trial
focal_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)

# --- Set up Focal Loss ---
# Calculate alpha weights to counter class imbalance.
# The weights are the inverse of the class frequencies.
# Original counts: 88 Normal (class 0), 64 Shoplifter (class 1)
total_videos = 88 + 64
# Weight for class 0 = (fraction of class 1)
alpha_class_0 = 64 / total_videos
# Weight for class 1 = (fraction of class 0)
alpha_class_1 = 88 / total_videos
alpha_tensor = torch.tensor([alpha_class_0, alpha_class_1]).to(device)

print(f"Calculated alpha weights for Focal Loss: {alpha_tensor.cpu().numpy()}")

# Instantiate the Focal Loss function and a new optimizer
focal_criterion = FocalLoss(alpha=alpha_tensor, gamma=2.0)
focal_optimizer = optim.Adam(focal_model.parameters(), lr=0.001, weight_decay=1e-4)

# Define the paths for the saved models from this trial
focal_model_path_window = 'best_focal_model_window.pth'
focal_model_path_realistic = 'best_focal_model_realistic.pth'

# Run the training trial using the same helper function as before
focal_history = run_online_training_trial(
    model=focal_model,
    train_loader=train_loader,
    val_windowed_loader=val_windowed_loader,
    val_long_sequences=X_val_long,
    val_long_labels=y_val_long,
    criterion=focal_criterion,
    optimizer=focal_optimizer,
    num_epochs=NUM_EPOCHS,
    model_save_path_window=focal_model_path_window,
    model_save_path_realistic=focal_model_path_realistic,
    device=device
)

In [None]:
# --- Main Script for this Cell ---

# 1. Plot the training history from the Focal Loss trial
print(" Plotting Training History for Trial 2 (Focal Loss) ")
plot_history(focal_history, "Focal Loss")

# 2. Load the best-performing model based on REALISTIC validation accuracy
print("\n--- Loading best Focal Loss model for final evaluation on the Test Set ---")
set_seed(SEED) # Reset seed for model instantiation consistency
final_focal_model = RealTimeClassifier(INPUT_FEATURES, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(device)
final_focal_model.load_state_dict(torch.load(focal_model_path_realistic))
print(f"Model loaded successfully from: {focal_model_path_realistic}")

# 3. Perform comprehensive evaluation on the TEST set
evaluate_and_report_on_test_set(
    model=final_focal_model,
    test_sequences=X_test_long,
    test_labels=y_test_long,
    device=device
)
print("\nVisualizing Failure Cases for Trial 2 (Focal Loss)")
focal_test_predictions = get_model_predictions_for_test_set(final_focal_model, X_test_long, y_test_long, device)
focal_failure_indices = [i for i, (pred, true) in enumerate(zip(focal_test_predictions, y_test_long)) if pred != true]
fn_failures_focal = [i for i in focal_failure_indices if y_test_long[i] == 1]
fp_failures_focal = [i for i in focal_failure_indices if y_test_long[i] == 0]
print(f"Identified {len(focal_failure_indices)} total failure cases for the Focal Loss model.")
print(f" {len(fn_failures_focal)} False Negatives (Missed Shoplifters)")
print(f" {len(fp_failures_focal)} False Positives (Incorrectly Flagged Normals)")
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Fig 9. Samples from failure cases (Focal Loss Model)', fontsize=18, y=0.95)
fn_indices_list_focal = list(fn_failures_focal)
axes[0, 0].set_title('UroKyoro Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 0]):
    if i < len(fn_indices_list_focal):
        idx = fn_indices_list_focal[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False)
fp_indices_list_focal = list(fp_failures_focal)
axes[0, 1].set_title('Normal Pedestrian Failure Cases', fontsize=14)
for i, ax in enumerate(axes[:, 1]):
    if i < len(fp_indices_list_focal):
        idx = fp_indices_list_focal[i]
        plot_trajectory(test_paths_long[idx], ax)
    else:
        ax.set_visible(False)
plt.tight_layout(rect=[0, 0.03, 1, 0.93])
plt.show()

# GCN

In [None]:
!pip install -q torch_geometric torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.1.0+cu121.html

print("PyTorch Geometric and its dependencies have been installed.")

In [None]:
import numpy as np
import pandas as pd
import json
import os
import random
import time
from glob import glob
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Import PyTorch Geometric libraries
from torch_geometric.data import Data, Dataset
from torch_geometric.loader import DataLoader as PyGDataLoader

from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

import matplotlib.pyplot as plt
import seaborn as sns

print("--- Imports for Graph Convolutional Network (GCN) Approach ---")

# --- Configuration & Seeding ---
SEED = 42
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Model and training constants
BATCH_SIZE = 16
NUM_EPOCHS = 20
LEARNING_RATE = 0.001
# We will define other model-specific constants later

def set_seed(seed_value: int):
    """Sets the seed for reproducibility for all relevant libraries."""
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    print(f"Global seed set to {seed_value}")

# --- Execute ---
set_seed(SEED)
print(f"Using device: {device}")
print("All necessary libraries and configurations are set up.")

In [None]:
MAX_FRAMES = 300

edge_index = torch.tensor([
    # Torso
    [1, 2], [1, 5], [2, 8], [5, 11], [8, 11], [1, 0],
    # Left Arm
    [2, 3], [3, 4],
    # Right Arm
    [5, 6], [6, 7],
    # Left Leg
    [8, 9], [9, 10],
    # Right Leg
    [11, 12], [12, 13]
], dtype=torch.long).t().contiguous()

def load_and_label_files(data_root: str) -> pd.DataFrame:
    """Finds all CSVs, labels them based on the folder, and returns a DataFrame."""
    all_csv_files = glob(os.path.join(data_root, '**', '*.csv'), recursive=True)
    labeled_data = [{'path': file_path, 'label': 1 if 'Potential_shoplifter' in file_path else 0} for file_path in all_csv_files]
    df_files = pd.DataFrame(labeled_data)
    print(f"Found {len(df_files)} total CSV files.")
    return df_files

def parse_poses_from_string(poses_str: str) -> np.ndarray:
    """Helper to parse the 'POSES' string into a numpy array."""
    try:
        return np.array(json.loads(poses_str)).reshape(18, 2)
    except (json.JSONDecodeError, ValueError):
        return np.zeros((18, 2))


class SkeletonDataset(Dataset):
    """
    Custom PyTorch Geometric Dataset for loading skeleton action recognition data.
    Each video is represented as a single graph.
    - Nodes: The 18 skeleton joints.
    - Node Features: The flattened (x, y) coordinates of each joint across all frames.
    - Edges: The predefined bone connections.
    """
    def __init__(self, df: pd.DataFrame, max_frames: int):
        super().__init__()
        self.df = df
        self.max_frames = max_frames

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        csv_path = row['path']
        label = row['label']

        # Load and parse pose data from the CSV
        df_poses = pd.read_csv(csv_path)
        raw_poses = np.array(df_poses['POSES'].apply(parse_poses_from_string).tolist()) # Shape: (n_frames, 18, 2)

        # --- Temporal Padding/Truncating ---
        n_frames = raw_poses.shape[0]
        if n_frames < self.max_frames:
            # Pad with zeros if the sequence is too short
            padding = np.zeros((self.max_frames - n_frames, 18, 2))
            processed_poses = np.concatenate([raw_poses, padding], axis=0)
        else:
            # Truncate if the sequence is too long
            processed_poses = raw_poses[:self.max_frames, :, :]

        # --- Feature Engineering ---
        # The feature for each node (joint) is its entire trajectory.
        # Reshape to (num_nodes, features_per_node) where features are (max_frames * 2)
        node_features = torch.tensor(processed_poses.transpose(1, 0, 2).reshape(18, -1), dtype=torch.float)

        # Create the graph data object
        data = Data(
            x=node_features,         # Node features: Shape [18, max_frames * 2]
            edge_index=edge_index,   # Graph connectivity
            y=torch.tensor([label], dtype=torch.long) # Label
        )
        return data

data_root = '/kaggle/input/skeleton-poses-normalvsshoplifter/csvs_Skeleton_poses_normal_potential_shoplifter/'
df_files = load_and_label_files(data_root)
full_dataset = SkeletonDataset(df=df_files, max_frames=MAX_FRAMES)

print(f"\nSuccessfully created a dataset with {len(full_dataset)} graphs.")
print("\nExample of a single graph data object from the dataset:")
print(full_dataset[0])

In [None]:
from torch.utils.data import Subset
def get_group_id(file_path: str) -> str:
    return os.path.basename(file_path).replace('.csv', '')[-23:]
groups = [get_group_id(path) for path in df_files['path']]
labels = df_files['label'].values
indices = np.arange(len(full_dataset))
gss_train_temp = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)
train_indices, temp_indices = next(gss_train_temp.split(indices, labels, groups))
gss_val_test = GroupShuffleSplit(n_splits=1, test_size=0.5, random_state=SEED)
val_indices, test_indices = next(gss_val_test.split(
    indices[temp_indices],
    labels[temp_indices],
    np.array(groups)[temp_indices]
))
final_val_indices = temp_indices[val_indices]
final_test_indices = temp_indices[test_indices]
train_dataset = Subset(full_dataset, train_indices)
val_dataset = Subset(full_dataset, final_val_indices)
test_dataset = Subset(full_dataset, final_test_indices)
train_loader = PyGDataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = PyGDataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = PyGDataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
print("--- Data Splitting and DataLoader Creation Complete ---")
print(f"Total graphs: {len(full_dataset)}")
print(f"Training graphs: {len(train_dataset)}")
print(f"Validation graphs: {len(val_dataset)}")
print(f"Testing graphs: {len(test_dataset)}")
print(f"\nCreated PyG DataLoaders with batch size {BATCH_SIZE}.")

In [None]:
from torch_geometric.nn import GCNConv, global_mean_pool
class SkeletonGCN(nn.Module):
    def __init__(self, in_channels: int, hidden_channels: int, out_channels: int):
        super().__init__()
        torch.manual_seed(SEED)
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.classifier = nn.Sequential(
            nn.Linear(hidden_channels, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, out_channels)
        )

    def forward(self, data: Data) -> torch.Tensor:
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv3(x, edge_index)
        x = F.relu(x)
        x_pooled = global_mean_pool(x, batch)
        out = self.classifier(x_pooled)
        return out
model = SkeletonGCN(
    in_channels=MAX_FRAMES * 2,
    hidden_channels=128,
    out_channels=2
).to(device)
print("--- GCN Model Architecture ---")
print(model)

In [None]:
def train_one_epoch_gcn(
    model: nn.Module,
    loader: PyGDataLoader,
    optimizer: optim.Optimizer,
    criterion: nn.Module
) -> float:
    model.train()
    total_loss = 0
    for data in tqdm(loader, desc="Training"):
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(loader.dataset)
def evaluate_gcn(
    model: nn.Module,
    loader: PyGDataLoader
) -> tuple[float, list, list]:
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for data in tqdm(loader, desc="Evaluating"):
            data = data.to(device)
            out = model(data)
            pred = out.argmax(dim=1)
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(data.y.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds) * 100
    return accuracy, all_preds, all_labels
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
print("Training and evaluation functions are defined.")
print("Criterion: CrossEntropyLoss")
print("Optimizer: Adam")

In [None]:
best_val_accuracy = 0.0
model_save_path = 'best_gcn_model.pth'
history = {'train_loss': [], 'val_acc': []}
print("--- Starting GCN Model Training ---")
for epoch in range(1, NUM_EPOCHS + 1):
    start_time = time.time()
    train_loss = train_one_epoch_gcn(model, train_loader, optimizer, criterion)
    val_accuracy, _, _ = evaluate_gcn(model, val_loader)
    end_time = time.time()
    epoch_duration = end_time - start_time
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_accuracy)
    print(f"Epoch: {epoch:02d} | "
          f"Train Loss: {train_loss:.4f} | "
          f"Val Acc: {val_accuracy:.2f}% | "
          f"Duration: {epoch_duration:.2f}s")
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), model_save_path)
        print(f"==> New best model saved with Val Acc: {best_val_accuracy:.2f}%")
print("\n--- Finished Training ---")
print(f"The best model was saved to '{model_save_path}' with a validation accuracy of {best_val_accuracy:.2f}%.")

In [None]:
def plot_gcn_history(history: dict):
    fig, ax1 = plt.subplots(figsize=(12, 6))

    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Training Loss', color='tab:red')
    ax1.plot(history['train_loss'], 'r-', label='Training Loss')
    ax1.tick_params(axis='y', labelcolor='tab:red')
    ax1.grid(True)

    ax2 = ax1.twinx()
    ax2.set_ylabel('Validation Accuracy (%)', color='tab:blue')
    ax2.plot(history['val_acc'], 'b--', label='Validation Acc')
    ax2.tick_params(axis='y', labelcolor='tab:blue')
    ax2.set_ylim(0, 105)

    fig.tight_layout()
    fig.suptitle('GCN Model - Training History', y=1.03)
    fig.legend(loc='upper right', bbox_to_anchor=(0.9, 0.9))
    plt.show()
print("--- Plotting GCN Training History ---")
plot_gcn_history(history)
print("\n--- Loading best model for final evaluation on the Test Set ---")
final_model = SkeletonGCN(
    in_channels=MAX_FRAMES * 2,
    hidden_channels=128,
    out_channels=2
).to(device)
final_model.load_state_dict(torch.load(model_save_path))
print(f"Model loaded successfully from: {model_save_path}")
test_accuracy, test_preds, test_labels = evaluate_gcn(final_model, test_loader)
print(f"\nFinal Test Accuracy: {test_accuracy:.2f}%")
plot_report_and_matrix(
    y_true=test_labels,
    y_pred=test_preds,
    title="GCN Final Test Set Performance"
)