starting with stage 1

In [9]:
import os
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from tqdm import tqdm
import json
import shutil
import numpy as np

In [10]:
class UCFCrimeBinaryDataset(Dataset):
    def __init__(self, root_dir, clip_len=16, transform=None):
        self.root_dir = root_dir
        self.clip_len = clip_len
        self.transform = transform

        self.samples = []
        self._prepare_samples()

    def _prepare_samples(self):
        for category in os.listdir(self.root_dir):
            category_path = os.path.join(self.root_dir, category)
            if not os.path.isdir(category_path):
                continue

            label = 0 if category.lower() == "normalvideos" else 1  # normal=0, crime=1
            for clip_name in os.listdir(category_path):
                clip_path = os.path.join(category_path, clip_name)
                if os.path.isdir(clip_path):
                    self.samples.append((clip_path, label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        clip_path, label = self.samples[idx]
        frames = sorted([f for f in os.listdir(clip_path) if f.endswith(".png")])
        total_frames = len(frames)

        # Sample frames uniformly across the clip
        idxs = np.linspace(0, total_frames - 1, self.clip_len, dtype=int)
        imgs = []
        for i in idxs:
            img = Image.open(os.path.join(clip_path, frames[i])).convert("RGB")
            if self.transform:
                img = self.transform(img)
            imgs.append(img)

        clip_tensor = torch.stack(imgs, dim=1)  # (C, T, H, W)
        return clip_tensor, torch.tensor(label, dtype=torch.long), clip_path


In [11]:
class BinaryCrimeDetector(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv3d(3, 32, (1,3,3), padding=(0,1,1)),
            nn.ReLU(inplace=True),
            nn.Conv3d(32, 32, (3,1,1), padding=(1,0,0)),
            nn.BatchNorm3d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool3d((1,2,2)),

            nn.Conv3d(32, 64, (1,3,3), padding=(0,1,1)),
            nn.ReLU(inplace=True),
            nn.Conv3d(64, 64, (3,1,1), padding=(1,0,0)),
            nn.BatchNorm3d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool3d((2,2,2))
        )
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool3d(1),
            nn.Flatten(),
            nn.Linear(64, 2)
        )

    def forward(self, x):
        return self.classifier(self.features(x))


In [12]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CLIP_LEN = 16

transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor()
])

train_dir = r"C:\Users\rayaa\Downloads\ucf_crime\Train"
test_dir  = r"C:\Users\rayaa\Downloads\ucf_crime\Test"

train_data = UCFCrimeBinaryDataset(train_dir, clip_len=CLIP_LEN, transform=transform)
test_data  = UCFCrimeBinaryDataset(test_dir, clip_len=CLIP_LEN, transform=transform)

train_loader = DataLoader(train_data, batch_size=4, shuffle=True)
test_loader  = DataLoader(test_data, batch_size=2, shuffle=False)

print(f"Train clips: {len(train_data)}, Test clips: {len(test_data)}")


Train clips: 1610, Test clips: 290


In [13]:
model = BinaryCrimeDetector().to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

EPOCHS = 5
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    for clips, labels, _ in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}"):
        clips, labels = clips.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(clips)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} Loss: {total_loss / len(train_loader):.4f}")

# save checkpoint
os.makedirs(r"C:\Users\rayaa\Downloads\ucf_crime\crime_exist_checkpoints", exist_ok=True)
torch.save(model.state_dict(), r"C:\Users\rayaa\Downloads\ucf_crime\crime_exist_checkpoints\binary_stage1.pt")


Epoch 1/5:   0%|          | 0/403 [00:00<?, ?it/s]

Epoch 1/5: 100%|██████████| 403/403 [09:43<00:00,  1.45s/it]


Epoch 1 Loss: 0.6681


Epoch 2/5: 100%|██████████| 403/403 [03:52<00:00,  1.73it/s]


Epoch 2 Loss: 0.6361


Epoch 3/5: 100%|██████████| 403/403 [03:41<00:00,  1.82it/s]


Epoch 3 Loss: 0.6067


Epoch 4/5: 100%|██████████| 403/403 [03:32<00:00,  1.90it/s]


Epoch 4 Loss: 0.6061


Epoch 5/5: 100%|██████████| 403/403 [03:34<00:00,  1.88it/s]

Epoch 5 Loss: 0.5924





In [14]:
model.eval()
anomaly_dir = "./stage1_output/anomaly_clips"
os.makedirs(anomaly_dir, exist_ok=True)
anomaly_records = []

with torch.no_grad():
    for clips, labels, paths in tqdm(test_loader, desc="Stage 1 Inference"):
        clips = clips.to(DEVICE)
        outputs = model(clips)
        probs = torch.softmax(outputs, dim=1)
        preds = probs.argmax(dim=1).cpu().numpy()
        probs = probs[:,1].cpu().numpy()  # prob of 'crime'

        for i, pred in enumerate(preds):
            if pred == 1:
                src = paths[i]
                dst = os.path.join(anomaly_dir, os.path.basename(src))
                shutil.copytree(src, dst, dirs_exist_ok=True)
                anomaly_records.append({
                    "clip_path": src,
                    "confidence": float(probs[i])
                })

with open("./stage1_output/anomalies.json", "w") as f:
    json.dump(anomaly_records, f, indent=2)

print(f"Stage 1 complete — {len(anomaly_records)} crime clips saved to {anomaly_dir}")


Stage 1 Inference: 100%|██████████| 145/145 [10:22<00:00,  4.29s/it]

Stage 1 complete — 153 crime clips saved to ./stage1_output/anomaly_clips



