In [8]:
import os
import cv2 as cv
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from torch import nn, optim
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from collections import deque

# Paths
BASE_DIR = r"C:\Users\Keelan.Butler\Desktop\python_projects\Final Project"
VIDEO_DIR = os.path.join(BASE_DIR, "OneDrive_2025-01-30", "MSAD Dataset", "MSAD_blur")
OUTPUT_DIR = os.path.join(BASE_DIR, "Dataset")
ANOM_PATH = os.path.join(BASE_DIR, "OneDrive_2025-01-30", "MSAD Dataset", "anomaly_annotation.csv")
META_PATH = os.path.join(OUTPUT_DIR, "metadata.csv")

# Constants
FRAME_INTERVAL = 5   # Capture every 5th frame
CLIP_LENGTH = 16     # Number of frames per clip
FRAME_HEIGHT, FRAME_WIDTH = 112, 112  # ResNet3D expects 112x112
EPOCHS = 10
BATCH_SIZE = 8
LEARNING_RATE = 0.001

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Ensure directories exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(os.path.join(OUTPUT_DIR, "clips"), exist_ok=True)

# Define transforms
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((FRAME_HEIGHT, FRAME_WIDTH)), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load ResNet3D (r3d_18) model
model = models.video.r3d_18(pretrained=True)
model.fc = nn.Linear(512, 1)  # Modify final layer for binary classification
model = model.to(device)

# Loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Data Preparation
class VideoDataset(Dataset):
    def __init__(self, metadata_df, annotation_df, video_dir, transform=None):
        self.metadata_df = metadata_df
        self.annotation_df = annotation_df
        self.video_dir = video_dir
        self.transform = transform

    def __len__(self):
        return len(self.metadata_df)

    def __getitem__(self, idx):
        video_file = self.metadata_df.iloc[idx]["video_file"]
        clip_start = self.metadata_df.iloc[idx]["Start_of_Clip"]
        clip_end = self.metadata_df.iloc[idx]["End_of_Clip"]
        video_path = os.path.join(self.video_dir, video_file)

        # Read video frames
        cap = cv.VideoCapture(video_path)
        frame_idx = 0
        frames = []
        while len(frames) < CLIP_LENGTH:
            success, frame = cap.read()
            if not success:
                break
            if clip_start <= frame_idx <= clip_end:
                if self.transform:
                    frame = self.transform(frame)
                frames.append(frame)
            frame_idx += 1
        cap.release()

        # Pad if necessary
        while len(frames) < CLIP_LENGTH:
            frames.append(frames[-1])

        # Convert to tensor
        clip_tensor = torch.stack(frames, dim=1)  # Shape: (C, T, H, W)

        # Assign anomaly label
        video_name = video_file[:-4]
        anomaly_rows = self.annotation_df[self.annotation_df["name"] == video_name]
        anomaly_frames = set()
        for _, row in anomaly_rows.iterrows():
            anomaly_frames.update(range(row["starting frame of anomaly"], row["ending frame of anomaly"] + 1))

        # If any frame in the clip is in the anomaly range, label it as anomaly (1)
        clip_label = 1 if any(frame_idx in anomaly_frames for frame_idx in range(clip_start, clip_end)) else 0

        return clip_tensor, torch.tensor([clip_label], dtype=torch.float32)

# Load metadata and annotation files
metadata_df = pd.read_csv(META_PATH)
anomaly_df = pd.read_csv(ANOM_PATH)


metadata_df["name"] = metadata_df["video_file"].str.replace(".mp4", "", regex=True)

# Step 2: Calculate Start & End Frame for each Clip
FRAME_INTERVAL = 5   # Frames per clip step
CLIP_LENGTH = 16     # Number of frames per clip

metadata_df["Start_of_Clip"] = metadata_df["clip_index"] * FRAME_INTERVAL
metadata_df["End_of_Clip"] = metadata_df["Start_of_Clip"] + (CLIP_LENGTH * FRAME_INTERVAL)

# Step 3: Merge metadata with anomaly annotations
df = metadata_df.merge(anomaly_df, on="name", how="left")

# Step 4: Assign Anomaly Labels
df["Anomaly"] = np.where(
    (df["starting frame of anomaly"] <= df["End_of_Clip"]) &  
    (df["ending frame of anomaly"] >= df["Start_of_Clip"]), 1, 0
)

df["Anomaly_Type"] = np.where(
    df["Anomaly"] == 1, df["name"].str.split("_").str[0], "Normal"
)

# Split dataset
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df["Anomaly"], random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.2, stratify=train_df["Anomaly"], random_state=42)

# Create Datasets & Dataloaders
train_dataset = VideoDataset(train_df, anomaly_df, VIDEO_DIR, transform)
val_dataset = VideoDataset(val_df, anomaly_df, VIDEO_DIR, transform)
test_dataset = VideoDataset(test_df, anomaly_df, VIDEO_DIR, transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Training Loop
def train_model(model, train_loader, val_loader, epochs):
    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for clips, labels in train_loader:
            clips, labels = clips.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(clips).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)
        avg_val_loss = validate_model(model, val_loader)

        print(f"Epoch [{epoch+1}/{epochs}] | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

# Validation
def validate_model(model, val_loader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for clips, labels in val_loader:
            clips, labels = clips.to(device), labels.to(device)
            outputs = model(clips).squeeze()
            loss = criterion(outputs, labels)
            total_loss += loss.item()

    return total_loss / len(val_loader)

# Testing
def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for clips, labels in test_loader:
            clips, labels = clips.to(device), labels.to(device)
            outputs = torch.sigmoid(model(clips).squeeze())  # Convert logits to probabilities
            predictions = (outputs > 0.5).float()
            correct += (predictions == labels).sum().item()
            total += labels.size(0)

    print(f"Test Accuracy: {correct / total:.4f}")

# Train and Evaluate
train_model(model, train_loader, val_loader, EPOCHS)
test_model(model, test_loader)




IndexError: list index out of range

In [9]:
import os
import cv2
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

class MSADDataset(Dataset):
    def __init__(self, video_paths, labels, clip_length=16, transform=None):
        self.video_paths = video_paths
        self.labels = labels
        self.clip_length = clip_length
        self.transform = transform

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]

        cap = cv2.VideoCapture(video_path)
        frames = []
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_interval = max(1, total_frames // self.clip_length)

        for i in range(self.clip_length):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i * frame_interval)
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, (112, 112))  # Resize for ResNet
            frames.append(frame)

        cap.release()

        if len(frames) < self.clip_length:
            frames += [frames[-1]] * (self.clip_length - len(frames))

        frames = torch.tensor(frames, dtype=torch.float32).permute(3, 0, 1, 2) / 255.0  # (C, T, H, W)

        if self.transform:
            frames = self.transform(frames)

        return frames, label


In [10]:
train_loader = DataLoader(MSADDataset(train_videos, train_labels), batch_size=8, shuffle=True)
test_loader = DataLoader(MSADDataset(test_videos, test_labels), batch_size=8, shuffle=False)


ValueError: num_samples should be a positive integer value, but got num_samples=0