In [10]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os

class VideoDataset(Dataset):
    def __init__(self, root_dir, num_frames=16, transform=None):
        """
        Args:
            root_dir (str): Path to split_data/train, split_data/val, or split_data/test
            num_frames (int): Number of frames per video
            transform: torchvision transforms to apply to each frame
        """
        self.root_dir = root_dir
        self.num_frames = num_frames
        self.transform = transform

        self.videos = []
        self.labels = []

        classes = {"no_theft": 0, "store_theft": 1}

        for cls_name, label in classes.items():
            cls_dir = os.path.join(root_dir, cls_name)
            if not os.path.exists(cls_dir):
                continue
            for video_name in os.listdir(cls_dir):
                video_path = os.path.join(cls_dir, video_name)
                if os.path.isdir(video_path):
                    self.videos.append(video_path)
                    self.labels.append(label)

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        video_path = self.videos[idx]
        label = self.labels[idx]

        frames = []
        frame_files = sorted(os.listdir(video_path))
        for f in frame_files[:self.num_frames]:  # ensure same number of frames
            frame_path = os.path.join(video_path, f)
            img = Image.open(frame_path).convert("RGB")
            if self.transform:
                img = self.transform(img)
            frames.append(img)

        # Stack frames: shape [num_frames, channels, height, width]
        video_tensor = torch.stack(frames)
        return video_tensor, torch.tensor(label, dtype=torch.long)


In [11]:
# Example transforms for each frame
frame_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),  # convert to [0,1]
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

train_dataset = VideoDataset("split_data/train", num_frames=16, transform=frame_transforms)
val_dataset   = VideoDataset("split_data/val", num_frames=16, transform=frame_transforms)
test_dataset  = VideoDataset("split_data/test", num_frames=16, transform=frame_transforms)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=4)
test_loader  = DataLoader(test_dataset, batch_size=4)


## 3D CNN from Scratch

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Simple3DCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(Simple3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(in_channels=3, out_channels=16, kernel_size=(3,3,3), padding=1)
        self.pool1 = nn.MaxPool3d((1,2,2))  # pool spatially, keep temporal
        self.conv2 = nn.Conv3d(16, 32, kernel_size=(3,3,3), padding=1)
        self.pool2 = nn.MaxPool3d((2,2,2))
        self.conv3 = nn.Conv3d(32, 64, kernel_size=(3,3,3), padding=1)
        self.pool3 = nn.AdaptiveAvgPool3d((1,1,1))  # global avg pool
        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        # x shape: [batch, channels, frames, height, width]
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        x = x.view(x.size(0), -1)  # flatten
        x = self.fc(x)
        return x


In [None]:
import torch.optim as optim

device = "cuda" if torch.cuda.is_available() else "cpu"
model = Simple3DCNN(num_classes=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for videos, labels in train_loader:
        videos = videos.permute(0, 2, 1, 3, 4).float().to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(videos)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Loss: {running_loss/len(train_loader):.4f}, "
          f"Accuracy: {100*correct/total:.2f}%")


### 1

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)


Using device: cpu


In [13]:
class Simple3DCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(Simple3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(3, 16, kernel_size=(3,3,3), padding=1)
        self.pool1 = nn.MaxPool3d((1,2,2))  # pool spatially only
        self.conv2 = nn.Conv3d(16, 32, kernel_size=(3,3,3), padding=1)
        self.pool2 = nn.MaxPool3d((2,2,2))
        self.conv3 = nn.Conv3d(32, 64, kernel_size=(3,3,3), padding=1)
        self.pool3 = nn.AdaptiveAvgPool3d((1,1,1))  # global avg pool
        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        # x shape: [B, C, T, H, W]
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

model = Simple3DCNN(num_classes=2).to(device)


In [17]:
class Improved3DCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(Improved3DCNN, self).__init__()
        # Conv block 1
        self.conv1 = nn.Conv3d(3, 32, kernel_size=(3,3,3), padding=1)
        self.bn1 = nn.BatchNorm3d(32)
        self.pool1 = nn.MaxPool3d((1,2,2))
        
        # Conv block 2
        self.conv2 = nn.Conv3d(32, 64, kernel_size=(3,3,3), padding=1)
        self.bn2 = nn.BatchNorm3d(64)
        self.pool2 = nn.MaxPool3d((2,2,2))
        
        # Conv block 3
        self.conv3 = nn.Conv3d(64, 128, kernel_size=(3,3,3), padding=1)
        self.bn3 = nn.BatchNorm3d(128)
        self.pool3 = nn.AdaptiveAvgPool3d((1,1,1))  # global avg pool
        
        # Fully connected
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool3(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

model = Improved3DCNN(num_classes=2).to(device)

In [18]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
num_epochs = 15


In [19]:
for epoch in range(num_epochs):
    model.train()
    running_loss = 0
    correct = 0
    total = 0

    for videos, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # Permute to [B, C, T, H, W]
        videos = videos.permute(0, 2, 1, 3, 4).float().to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(videos)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_acc = 100 * correct / total

    # Validation
    model.eval()
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for videos, labels in val_loader:
            videos = videos.permute(0, 2, 1, 3, 4).float().to(device)
            labels = labels.to(device)
            outputs = model(videos)
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()
    val_acc = 100 * val_correct / val_total

    print(f"Epoch {epoch+1}/{num_epochs} | "
          f"Loss: {running_loss/len(train_loader):.4f} | "
          f"Train Acc: {train_acc:.2f}% | Val Acc: {val_acc:.2f}%")


Epoch 1/15: 100%|██████████| 112/112 [01:49<00:00,  1.02it/s]


Epoch 1/15 | Loss: 0.6870 | Train Acc: 53.93% | Val Acc: 58.51%


Epoch 2/15: 100%|██████████| 112/112 [01:41<00:00,  1.11it/s]


Epoch 2/15 | Loss: 0.6552 | Train Acc: 66.29% | Val Acc: 51.06%


Epoch 3/15: 100%|██████████| 112/112 [01:41<00:00,  1.10it/s]


Epoch 3/15 | Loss: 0.6323 | Train Acc: 67.64% | Val Acc: 51.06%


Epoch 4/15: 100%|██████████| 112/112 [01:45<00:00,  1.06it/s]


Epoch 4/15 | Loss: 0.6057 | Train Acc: 71.24% | Val Acc: 48.94%


Epoch 5/15: 100%|██████████| 112/112 [01:41<00:00,  1.11it/s]


Epoch 5/15 | Loss: 0.5865 | Train Acc: 73.93% | Val Acc: 48.94%


Epoch 6/15: 100%|██████████| 112/112 [01:41<00:00,  1.10it/s]


Epoch 6/15 | Loss: 0.5766 | Train Acc: 73.03% | Val Acc: 52.13%


Epoch 7/15: 100%|██████████| 112/112 [01:41<00:00,  1.10it/s]


Epoch 7/15 | Loss: 0.5366 | Train Acc: 80.90% | Val Acc: 51.06%


Epoch 8/15: 100%|██████████| 112/112 [01:41<00:00,  1.11it/s]


Epoch 8/15 | Loss: 0.4861 | Train Acc: 83.82% | Val Acc: 96.81%


Epoch 9/15: 100%|██████████| 112/112 [01:44<00:00,  1.07it/s]


Epoch 9/15 | Loss: 0.4687 | Train Acc: 84.94% | Val Acc: 97.87%


Epoch 10/15: 100%|██████████| 112/112 [01:41<00:00,  1.10it/s]


Epoch 10/15 | Loss: 0.4556 | Train Acc: 84.27% | Val Acc: 51.06%


Epoch 11/15: 100%|██████████| 112/112 [01:41<00:00,  1.10it/s]


Epoch 11/15 | Loss: 0.4531 | Train Acc: 84.72% | Val Acc: 51.06%


Epoch 12/15: 100%|██████████| 112/112 [01:41<00:00,  1.10it/s]


Epoch 12/15 | Loss: 0.4164 | Train Acc: 88.54% | Val Acc: 48.94%


Epoch 13/15: 100%|██████████| 112/112 [01:45<00:00,  1.06it/s]


Epoch 13/15 | Loss: 0.4372 | Train Acc: 85.17% | Val Acc: 63.83%


Epoch 14/15: 100%|██████████| 112/112 [01:42<00:00,  1.09it/s]


Epoch 14/15 | Loss: 0.4112 | Train Acc: 86.07% | Val Acc: 48.94%


Epoch 15/15: 100%|██████████| 112/112 [01:48<00:00,  1.03it/s]


Epoch 15/15 | Loss: 0.3709 | Train Acc: 90.11% | Val Acc: 95.74%


In [20]:
model.eval()
test_correct = 0
test_total = 0

with torch.no_grad():
    for videos, labels in test_loader:
        videos = videos.permute(0, 2, 1, 3, 4).float().to(device)
        labels = labels.to(device)
        outputs = model(videos)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_acc = 100 * test_correct / test_total
print(f"Test Accuracy: {test_acc:.2f}%")


Test Accuracy: 95.92%


In [21]:
torch.save(model.state_dict(), "model_scratch_weights.pth")
