In [10]:
import os
import json
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image

from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import random

In [4]:
#1. Image Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                       std=[0.229, 0.224, 0.225])
])

#2. Feature Extractor: ResNet
class ResNetFeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()
        resnet = models.resnet18(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(resnet.children())[:-1])
        self.output_dim = resnet.fc.in_features
        
    def forward(self, x):
        with torch.no_grad():
            features = self.feature_extractor(x).squeeze(-1).squeeze(-1)
        return features

In [5]:
#3. LSTM Classifier
class VideoClassifier(nn.Module):
    def __init__(self, feature_dim, hidden_dim=128, num_layers=1):
        super().__init__()
        self.lstm = nn.LSTM(input_size=feature_dim, hidden_size=hidden_dim,
                                           num_layers=num_layers, batch_first=True)
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        _, (hn, _) = self.lstm(x)
        out = self.classifier(hn[-1])
        return out.squeeze()

In [11]:
#4. Dataset Class
class VideoFrameDataset(Dataset):
    def __init__(self, split_file, metadata_file, dota_dir, bdd_dir, sample_count=64):
        with open(split_file_path, 'r') as f:
            self.video_ids =  [line.strip() for line in f if line.strip()]
        with open(metadata_file, 'r') as f:
            self.metadata = json.load(f)
            
        self.dota_dir = dota_dir
        self.bdd_dir = bdd_dir
        self.sample_count = sample_count
        
    def __len__(self):
        return len(self.video_ids)
    
    def __getitem__(self, idx):
        video_id = self.video_ids[idx]
        label = 0 if self.metadata[video_id]["anomaly_class"] == "normal" else 1
        folder = os.path.join(self.bdd_dir if label == 0 else self.dota_dir, video_id)
        
        frame_paths = sorted([os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.jpg')])
        total = len(frame_paths)
        step = max(total // self.sample_count, 1)
        selected = [frame_paths[i * step] for i in range(self.sample_count)]
        images = [transform(Image.open(f).convert('RGB')) for f in selected[:self.sample_count]]
        
        return torch.stack(images), torch.tensor(label, dtype=torch.float32)

In [13]:
#5. Evaluation
def evaluate(model, loader, feature_extractor, device):
    model.eval()
    preds, targets = [], []
    with torch.no_grad():
        for frames, label in loader:
            frames = frames.squeeze(0).to(device)
            features = feature_extractor(frames)
            sequence = features.unsqueeze(0)
            output = model(sequence.to(device))
            preds.append(int(output.item() > 0.5))
            targets.append(int(label.item()))
            
    acc = accuracy_score(targets, preds)
    prec = precision_score(targets, preds)
    rec = recall_score(targets, preds)
    f1 = f1_score(targets, preds)
    print(f"Val — Acc: {acc:.4f}  Precision: {prec:.4f}  Recall: {rec:.4f}  F1: {f1:.4f}")
    return acc

In [14]:
#6. Training Loop
def train(model, train_loader, val_loader, feature_extractor, device, epochs):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.BCELoss()
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0
        for frames, label in train_loader:
            frames = frames.squeeze(0).to(device)
            label = label.to(device)
            features = feature_extractor(frames)
            sequence = features.unsqueeze(0)
            
            optimizer.zero_grad()
            output = model(sequence)
            loss = criterion(output, label)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
        print(f"Epoch {epoch+1}/{epochs} -- Train Loss: {running_loss / len(train_loader): .4f}")
        evaluate(model, val_loader, feature_extractor, device)

In [None]:
#7. Inference
if __name__ == '__main__':
    #Paths
    train_split_file = "dataset/train_split_merged.txt"
    metadata_file = "dataset/metadata_train_merged.json"
    dota_dir = "DoTA/DoTA_Frames"
    bdd_dir = "BDD100K/BDD_Frames"
    
    # Device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Data
    train_dataset = VideoFrameDataset(train_split, metadata_path, dota_dir, bdd_dir)
    val_dataset = VideoFrameDataset(val_split, metadata_path, dota_dir, bdd_dir)
    train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=1)

    # Models
    feature_extractor = ResNetFeatureExtractor().to(device)
    classifier = VideoClassifier(feature_dim=feature_extractor.output_dim)

    # Train
    train(classifier, train_loader, val_loader, feature_extractor, device, epochs=5)