# Imports

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, Dataset
import torchvision.models as models
from torchvision import transforms
from tqdm.notebook import tqdm
import pandas as pd
import numpy as np
import cv2
import os
import time
import random
from sklearn.metrics import accuracy_score, precision_score, recall_score, classification_report
from PIL import Image
import scipy.stats as ss
import splitfolders

# utils.py

In [None]:
# Hyperparanms
HEIGHT = 128
WIDTH = 128
CHANNELS = 6
FPS = 10
DURATION = 3
SEQ_LEN = FPS * DURATION

# MVO Prediction Logic Mapping
# FRONT: 0, LEFT: 1, RIGHT: 2
def get_label_id(label_name):
    mapping = {'front': 0, 'left': 1, 'right': 2}
    return mapping.get(label_name.lower(), 0)

VIDEO_DIR = r''
LABEL_DIR = r''

# Contains both and only video and label directories
# Folder names are strictly "videos" and "labels"
DATA_DIR = r'D:\Thesis 2\Thesis 2\AIGD\split folder' 

# Intent files
VAL_POSITIONS = ''
TEST_POSITIONS = ''

# Incomplete_EluSEEdate_Dataset
# Complete Final Training Datasets

# Check if paths exist to avoid errors later
if not os.path.exists(VIDEO_DIR):
    print(f"WARNING: Directory not found: {VIDEO_DIR}")
    print("Please update the VIDEO_DIR variable in this cell.")

if not os.path.exists(LABEL_DIR):
    print(f"WARNING: Directory not found: {LABEL_DIR}")
    print("Please update the LABEL_DIR variable in this cell.")

Please update the VIDEO_DIR variable in this cell.
Please update the LABEL_DIR variable in this cell.


# conv_lstm_classifier.py

In [3]:
class ConvLSTMCell(nn.Module):
    """
    The Single Memory Unit of the video.
    """

    def __init__(self, input_dim, hidden_dim, kernel_size, bias):
        super(ConvLSTMCell, self).__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

        self.kernel_size = kernel_size
        self.padding = kernel_size[0] // 2, kernel_size[1] // 2
        self.bias = bias

        self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
                              out_channels=4 * self.hidden_dim,
                              kernel_size=self.kernel_size,
                              padding=self.padding,
                              bias=self.bias)

    def forward(self, input_tensor, cur_state):
        h_cur, c_cur = cur_state

        combined = torch.cat([input_tensor, h_cur], dim=1)
        combined_conv = self.conv(combined)

        cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)
        i = torch.sigmoid(cc_i)
        f = torch.sigmoid(cc_f)
        o = torch.sigmoid(cc_o)
        g = torch.tanh(cc_g)

        c_next = f * c_cur + i * g
        h_next = o * torch.tanh(c_next)

        return h_next, c_next

    def init_hidden(self, batch_size, image_size):
        height, width = image_size
        return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),
                torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))

class ConvLSTM(nn.Module):
    """
    The Observer of the video
    """

    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers,
                 batch_first=True, bias=True, return_all_layers=False): # Changed default to batch_first=True
        super(ConvLSTM, self).__init__()

        self._check_kernel_size_consistency(kernel_size)
        kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
        hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.kernel_size = kernel_size
        self.num_layers = num_layers
        self.batch_first = batch_first
        self.bias = bias
        self.return_all_layers = return_all_layers

        cell_list = []
        for i in range(0, self.num_layers):
            cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]
            cell_list.append(ConvLSTMCell(input_dim=cur_input_dim,
                                          hidden_dim=self.hidden_dim[i],
                                          kernel_size=self.kernel_size[i],
                                          bias=self.bias))

        self.cell_list = nn.ModuleList(cell_list)

    def forward(self, input_tensor, hidden_state=None):
        # input_tensor format: [Batch, Time, Channel, Height, Width]
        
        if not self.batch_first:
            # (Time, Batch, Channel, Height, Width) -> (Batch, Time, Channel, Height, Width)
            input_tensor = input_tensor.permute(1, 0, 2, 3, 4)

        # Get Dimensions using the Correct Indices
        b, seq_len, _, h, w = input_tensor.size()

        if hidden_state is None:
            hidden_state = self._init_hidden(batch_size=b, image_size=(h, w))

        layer_output_list = []
        last_state_list = []

        cur_layer_input = input_tensor

        for layer_idx in range(self.num_layers):
            h, c = hidden_state[layer_idx]
            output_inner = []
            
            # Loop over TIME (seq_len), not Batch
            for t in range(seq_len):
                # Slice the time dimension: [Batch, Channel, H, W]
                # If layer_idx == 0, cur_layer_input is [B, T, C, H, W]
                # If layer_idx > 0, cur_layer_input is [B, T, Hidden, H, W] (from previous layer stack)
                
                input_t = cur_layer_input[:, t, :, :, :]
                
                h, c = self.cell_list[layer_idx](input_tensor=input_t, cur_state=[h, c])
                output_inner.append(h)

            # Stack along Time dimension (dim=1 because we enforce batch_first internally now)
            layer_output = torch.stack(output_inner, dim=1)
            cur_layer_input = layer_output

            layer_output_list.append(layer_output)
            last_state_list.append([h, c])

        if not self.return_all_layers:
            layer_output_list = layer_output_list[-1:]
            last_state_list = last_state_list[-1:]

        return layer_output_list, last_state_list

    def _init_hidden(self, batch_size, image_size):
        init_states = []
        for i in range(self.num_layers):
            init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))
        return init_states

    @staticmethod
    def _check_kernel_size_consistency(kernel_size):
        if not (isinstance(kernel_size, tuple) or
                (isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
            raise ValueError('`kernel_size` must be tuple or list of tuples')

    @staticmethod
    def _extend_for_multilayer(param, num_layers):
        if not isinstance(param, list):
            param = [param] * num_layers
        return param


class ConvLSTMModel(nn.Module):
    """
    The Judge of the video.
    """
    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers, height, width,
                 batch_first=True, bias=True, return_all_layers=False, num_classes=3):
        super(ConvLSTMModel, self).__init__()

        # Ensure batch_first is passed correctly
        self.convlstm = ConvLSTM(input_dim, hidden_dim, kernel_size, num_layers, 
                                 batch_first=batch_first, bias=bias, 
                                 return_all_layers=return_all_layers)

        # Input to linear is: Hidden_Dim * H * W
        self.linear = nn.Linear(hidden_dim[-1] * height * width, num_classes)

    def forward(self, input_tensor, hidden_state=None):
        x, _ = self.convlstm(input_tensor)

        # x[0] shape is now guaranteed to be [Batch, Time, Hidden, H, W]
        # We take the last time step: x[0][:, -1, :, :, :]
        
        last_time_step = x[0][:, -1, :, :, :]
        
        # Flatten starting from dimension 1 (Channels/Hidden)
        flattened = torch.flatten(last_time_step, start_dim=1)

        output = self.linear(flattened)
        return output

# dataset.py

In [4]:
class MVOVideoDataset(Dataset):
    """
    This takes a 3-second video and turns it into
    a 'data packet' for the AI to study.
    """
    def __init__(self, video_folder, label_folder, transforms=None):
        self.video_folder = video_folder
        self.label_folder = label_folder
        self.transforms = transforms

        valid_video_files = []
        valid_csv_files = []
        try:
            all_video_files = [f for f in os.listdir(video_folder) if f.endswith('.mp4')]
            for video_name in all_video_files:
                video_path = os.path.join(video_folder, video_name)

                csv_name = video_name.replace('.mp4', '.csv')
                csv_path = os.path.join(label_folder, csv_name)
                """
                # Check for standard labels first, then fallback to _A version
                csv_name = video_name.replace('.mp4', '_labels.csv')
                csv_path = os.path.join(label_folder, csv_name)

                if not os.path.exists(csv_path):
                    csv_name = video_name.replace('.mp4', '_labels_A.csv')
                    csv_path = os.path.join(label_folder, csv_name)
                """

                if os.path.exists(video_path) and os.path.exists(csv_path):
                    valid_video_files.append(video_name)
                    valid_csv_files.append(csv_path)
                else:
                    if not os.path.exists(video_path):
                        print(f"WARNING: Video file not found: {video_path}. Skipping.")
                    if not os.path.exists(csv_path):
                        print(f"WARNING: Label file not found for video {video_name}. Skipping.")

            self.video_files = valid_video_files
            self.csv_files = valid_csv_files
            print(f"Initialized dataset with {len(self.video_files)} valid video-label pairs.")

        except FileNotFoundError:
            print(f"Error: Could not find folder {video_folder} or {label_folder}.")
            self.video_files = []
            self.csv_files = []
            
        self.split_type = ''
        self.positions = [] # If split_type == 'train', this would not be filled.

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_name = self.video_files[idx]
        video_path = os.path.join(self.video_folder, video_name)

        #  Get the Label from the matching CSV file
        csv_name = video_name.replace('.mp4', '.csv')
        csv_path = os.path.join(self.label_folder, csv_name)
        
        # Read the label
        # For a 3s clip, the label is the maximum turn label present in the last second (24 frames in CSV)
        df = pd.read_csv(csv_path)
        label = self.labeler(df)

        if self.split_type == 'TRAIN':
            intent_position = self.get_intent_position()
        else:
            intent_position = self.positions[idx]

        intent = self.get_intent(intent_position, df)
        
        cap = cv2.VideoCapture(video_path)
        frames = []
        for i in range(30): 
            ret, frame = cap.read()
            if not ret:
                # If a video is shorter than 3s, pad with a black frame
                # Important: Ensure padding is the same shape/type as transformed frames
                frame_tensor = torch.zeros((3, HEIGHT, WIDTH))
            else:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                if self.transforms:
                    frame = Image.fromarray(frame)
                    frame_tensor = self.transforms(frame)
                else:
                    # Fallback if no transforms are provided
                    frame_tensor = torch.from_numpy(frame).permute(2, 0, 1).float() / 255.0
            
            # Create a tensor for the intent with the same spatial dimensions as the video frames
            # Used for no intent
            intent_torch = torch.zeros((3, HEIGHT, WIDTH))
            # If intent exists, add intent in its intent position for 1 second (10 frames)  
            if intent_position != -1 and intent_position <= i and (intent_position + 10) > i:
                # Convert intent to one-hot vector by filling the specified channel with 1
                intent_torch[intent, :, :] = 1

            # Append the intent as a channel to the video frame
            frame_tensor = torch.cat((frame_tensor, intent_torch), dim=0)
            frames.append(frame_tensor) # Moved outside the 'else' to ensure we always have 30 frames
            
        cap.release()

        # Convert list to a 5D tensor [30, 6, 128, 128]
        video_tensor = torch.stack(frames, dim=0)

        return video_tensor, torch.tensor(label).long()

    def labeler(self, df):
        df_lbl_count = []
        # Logic to handle counts for classes 0, 1, and 2
        # Added a check for the column name to prevent KeyErrors
        col = 'label_id_corrected' if 'label_id_corrected' in df.columns else df.columns[-1]
        counts = df[col].tail(24).value_counts()
        
        for i in range(0, 3):
            df_lbl_count.append(counts.get(i, 0))

        if df_lbl_count[0] == 24:
            label = 0 # Front
        elif df_lbl_count[1] > df_lbl_count[2]:
            label = 1 # Left
        elif df_lbl_count[1] < df_lbl_count[2]:
            label = 2 # Right
        else: 
            label = df[col].tail(12).mode()[0]

        return label

    def get_intent_position(self):
        # 50% of the dataset have intent
        if random.random() < 0.6:
            # The time positions of the first 2 seconds (videos - 10 fps)
            start_frame = 0
            end_frame = 20
            median = (start_frame + end_frame)/2
            range_zero = np.arange(-median, median)

            # Obtain the probability of selecting a timestamp using the adjacent 0.5 areas
            smaller_range = range_zero - 0.5 
            higher_range = range_zero + 0.5    

            # Probability is the difference of the probability of higher range and lower range
            probability = ss.norm.cdf(higher_range) - ss.norm.cdf(smaller_range)
            
            # Normalize the probabilities
            # Each probability in probability range is divided by the sum of the probabilities in probability range
            probability /= probability.sum()

            # Select a timestamp based on the probabilities
            range = np.arange(start_frame, end_frame)
            intent_position = np.random.choice(range, p=probability)
        else:
            intent_position = -1
        
        return intent_position 

    def get_intent(self, intent_position, df):
        # Check if the data has no intent
        if intent_position != -1:
            intent = self.labeler(df)
        else:
            intent = -1
        return intent
    
    def class_counter(self):
        label_counts = {0: 0, 1: 0, 2: 0}

        for csv_file in self.csv_files:
            csv_path = os.path.join(self.label_folder, csv_file)
            df = pd.read_csv(csv_path)
            label = self.labeler(df)
            label_counts[label] += 1

        return label_counts, sum(label_counts.values())

    def set_split_type(self, type, len_dataset):
        global VAL_POSITIONS
        global TEST_POSITIONS
        self.split_type = type

        if self.split_type == 'VALIDATION':
            if VAL_POSITIONS == '':
                for _ in range(len_dataset):
                    self.positions.append(self.get_intent_position())
                np.save('val_intent_positions.npy', np.array(self.positions))
                VAL_POSITIONS = 'val_intent_positions.npy'
            else:
                self.positions = list(np.load(VAL_POSITIONS))
        elif self.split_type == 'TEST':
            if TEST_POSITIONS == '':
                for _ in range(len_dataset):
                    self.positions.append(self.get_intent_position())
                np.save('test_intent_positions.npy', np.array(self.positions))
                TEST_POSITIONS = 'test_intent_positions.npy'
            else:
                self.positions = list(np.load(TEST_POSITIONS))

        return ''

# train.py

In [None]:
# Configurations
BATCH = 5
NUM_EPOCHS = 1
LEARNING_RATE = 1e-4
SAVED_MODEL_PATH = "best_convlstm.pth"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Setting a fixed random seed to ensure that
# we get the exact same data split every time we run the script
SEED = 8
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

# Model Parameters
PARAMS = {
    'input_dim': 6,
    'hidden_dim': [64, 32],
    'kernel_size': (3, 3),
    'num_layers': 2,
    'height': HEIGHT,
    'width': WIDTH,
    'num_classes': 3
}

def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    loop = tqdm(loader, leave=True)
    for batch_idx, (data, targets) in enumerate(loop):
        data = data.float().to(DEVICE)
        targets = targets.to(DEVICE)

        # Forward
        scores = model(data)
        loss = criterion(scores, targets)

        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Stats
        running_loss += loss.item()
        _, predictions = scores.max(1)
        correct += (predictions == targets).sum().item()
        total += targets.size(0)

        loop.set_description(f"Loss: {loss.item():.4f}")

    return running_loss / len(loader), 100 * correct / total

def main():
    # Data Setup
    transforms_train = transforms.Compose([
        transforms.Resize((HEIGHT, WIDTH)),
        transforms.ToTensor()
    ])

    # Split the files in the directory into three directories: training (80%), validation (20%), and testing (20%)
    splitfolders.ratio(DATA_DIR, output="output", seed=8, ratio=(.6, .2, .2), group="sibling",move=False, shuffle=True) 

    train_dir = os.path.join("output", "train")
    val_dir = os.path.join("output", "val")
    test_dir = os.path.join("output", "test")

    train_dir_vid = os.path.join(train_dir, "videos")
    val_dir_vid = os.path.join(val_dir, "videos")
    test_dir_vid = os.path.join(test_dir, "videos")

    train_lbl_vid = os.path.join(train_dir, "labels")
    val_lbl_vid = os.path.join(val_dir, "labels")
    test_lbl_vid = os.path.join(test_dir, "labels")

    train_dataset = MVOVideoDataset(train_dir_vid, train_lbl_vid, transforms=transforms_train)
    val_dataset = MVOVideoDataset(val_dir_vid, val_lbl_vid, transforms=transforms_train)
    test_dataset = MVOVideoDataset(test_dir_vid, test_lbl_vid, transforms=transforms_train)

    print(f"Data Split -> Train: {len(train_dataset)} | Val: {len(val_dataset)} | Test (Unused): {len(test_dataset)}")

    train_dataset.set_split_type('TRAIN', len(train_dataset))
    val_dataset.set_split_type('VALIDATION', len(val_dataset))

    train_loader = DataLoader(train_dataset, batch_size=BATCH, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=BATCH, shuffle=False, num_workers=0)

    # Calculate class instances for class weights
    label_counts, total_count = train_dataset.class_counter()
    
    # Add 1 to avoid division by zero
    front_weight = total_count / (label_counts[0] + 1) 
    left_weight = total_count / (label_counts[1] + 1)
    right_weight = total_count / (label_counts[2] + 1)

    print(f"Front class instances: {label_counts[0]} -> Front weight: {front_weight}")
    print(f"Left class instances: {label_counts[1]} -> Left weight: {left_weight}")
    print(f"Right class instances: {label_counts[2]} -> Right weight: {right_weight}")
    
    # Model Setup
    model = ConvLSTMModel(
        input_dim=PARAMS['input_dim'],
        hidden_dim=PARAMS['hidden_dim'],
        kernel_size=PARAMS['kernel_size'],
        num_layers=PARAMS['num_layers'],
        height=PARAMS['height'],
        width=PARAMS['width'],
        num_classes=PARAMS['num_classes']
    ).to(DEVICE)

    # CrossEntropyLoss is used to handle class imbalance
    class_weights = torch.FloatTensor([front_weight,left_weight,right_weight]).to(DEVICE)
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    # Training Loop
    best_acc = 0
    print(f"Training on {DEVICE} with {len(train_dataset)} videos.")

    for epoch in range(NUM_EPOCHS):
        print(f"\nEpoch {epoch+1}/{NUM_EPOCHS}")
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer)

        # Validation
        model.eval()
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for x, y in val_loader:
                x = x.float().to(DEVICE)
                y = y.to(DEVICE)
                scores = model(x)
                _, preds = scores.max(1)
                val_correct += (preds == y).sum().item()
                val_total += y.size(0)

        val_acc = 100 * val_correct / val_total
        print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | Val Acc: {val_acc:.2f}%")

        # Save Best Model
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), SAVED_MODEL_PATH)
            print(f"New best model saved! ({val_acc:.2f}%)")

if __name__ == "__main__":
    main()

Copying files: 81 files [00:02, 40.37 files/s]

Initialized dataset with 5520 valid video-label pairs.
Data Split -> Train: 3312 | Val: 1104 | Test (Unused): 1104
Training on cpu with 3312 videos.

Epoch 1/1





  0%|          | 0/828 [00:00<?, ?it/s]

: 

# tester.py

In [None]:
class Tester:
    """
    It loads a trained model, feeds it unseen data,
    and records how accurately and how fast the model makes decisions.
    """
    def __init__(self, model_path, device):
        self.model_path = model_path
        self.device = device
        self.transforms = transforms.Compose([
            transforms.ToTensor(),
            transforms.Resize((HEIGHT, WIDTH))
        ])

        # Load the Architecture
        self.model = ConvLSTMModel(
            input_dim=6,
            hidden_dim=[64, 32],
            kernel_size=(3, 3),
            num_layers=2,
            height=HEIGHT,
            width=WIDTH,
            num_classes=3
        ).to(self.device)

        # Load the Weights
        self.load_weights()

    def load_weights(self):
        # Attempts to load the best model file and sets it to evaluation mode
        if os.path.exists(self.model_path):
            print(f"Loading model from {self.model_path}...")
            self.model.load_state_dict(torch.load(self.model_path, map_location=self.device))
            self.model.eval()
        else:
            raise FileNotFoundError(f"Model file not found at {self.model_path}. Did you run train.py?")

    def test(self):
        # The main evaluation loop.
        print("Preparing Test Data...")
        
        test_dir = os.path.join("output", "test")
        test_dir_vid = os.path.join(test_dir, "videos")
        test_lbl_vid = os.path.join(test_dir, "labels")
        test_dataset = MVOVideoDataset(test_dir_vid, test_lbl_vid, transforms=self.transforms)
        test_dataset.set_split_type('TEST', len(test_dataset))
        
        test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)

        print(f"Evaluating {len(test_dataset)} videos and measuring latency...")

        all_preds = []
        all_labels = []
        latencies = []

        # Evaluation Loop
        with torch.no_grad():
            for i, (video_tensor, labels) in enumerate(tqdm(test_loader, leave=True)):
                video_tensor = video_tensor.float().to(self.device)
                labels = labels.to(self.device)

                # Latency Measurement Start
                if self.device.type == 'cuda':
                    torch.cuda.synchronize()

                start_time = time.perf_counter() # Timer

                outputs = self.model(video_tensor) # Forward pass (The Inference)

                if self.device.type == 'cuda':
                    torch.cuda.synchronize() # Wait for the GPU to finish the math

                end_time = time.perf_counter()
                # Latency Measurement End

                # We skip the first 5 frames ('warm-up')
                if i >= 5:
                    latencies.append(end_time - start_time)

                # Convert raw scores to the predicted class index (0, 1, or 2)
                _, predicted = torch.max(outputs.data, 1)

                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # Calculate Latency Stats
        avg_latency_ms = np.mean(latencies) * 1000 if len(latencies) > 0 else 0
        inf_fps = 1 / np.mean(latencies) if len(latencies) > 0 else 0

        # Calculate and Print all results
        self.calculate_metrics(all_labels, all_preds, avg_latency_ms, inf_fps)

        # Save detailed logs to a CSV
        self.save_results(all_labels, all_preds)

    def calculate_metrics(self, y_true, y_pred, avg_latency_ms, inf_fps):
        # Computes statistical performance and prints the Final Report
        print(f"Avg Latency:        {avg_latency_ms:.2f} ms per video clip")
        print(f"Inference Speed:    {inf_fps:.2f} clips per second")
        # Computes statistical performance and prints the Final Report
        print("\n" + "-"*40)
        print("       FINAL PERFORMANCE REPORT       ")
        print("-"*40)

        # Accuracy
        acc = accuracy_score(y_true, y_pred)
        print(f"Overall Accuracy:   {acc*100:.2f}%")

        # Precision and Recall
        precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
        recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)

        print(f"Precision:          {precision:.4f}")
        print(f"Recall:             {recall:.4f}")
        print("-" * 40)

        print(f"Avg Latency:        {avg_latency_ms:.2f} ms per video clip")
        print(f"Inference Speed:    {inf_fps:.2f} clips per second")

        print("-" * 40)
        print("Detailed Class Report:")
        # Generates a table for Front(0), Left(1), Right(2)
        print(classification_report(y_true, y_pred, target_names=['Front', 'Left', 'Right'], zero_division=0))

    def save_results(self, y_true, y_pred):
        # Creates a CSV to see exactly which videos failed
        df = pd.DataFrame({
            'Actual_Label': y_true,
            'Predicted_Label': y_pred
        })

        # Map numbers back to words for readability
        label_map = {0: 'Front', 1: 'Left', 2: 'Right'}
        df['Actual_Text'] = df['Actual_Label'].map(label_map)
        df['Predicted_Text'] = df['Predicted_Label'].map(label_map)

        # Check if correct
        df['Correct'] = df['Actual_Label'] == df['Predicted_Label']

        save_path = "test_results.csv"
        df.to_csv(save_path, index=False)
        print(f"\nDetailed predictions saved to '{save_path}'")

if __name__ == "__main__":
    # Configuration
    MODEL_PATH = "best_convlstm.pth"
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Run the Tester
    tester = Tester(MODEL_PATH, DEVICE)
    tester.test()