<a href="https://colab.research.google.com/github/MalakAhmed2003/Cellula/blob/main/Cellula_Task_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1. Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import zipfile

zip_path = "/content/drive/MyDrive/Shop DataSet.zip"
extract_path = "/content/video_data"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)


In [None]:
import glob

video_files = glob.glob("/content/video_data/**/*.mp4", recursive=True)
print("Found video files:", video_files)

Found video files: ['/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_73.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_214_1.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_83.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_199_1.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_179_1.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_182.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_95_1.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_175.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_71.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_127_1.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_3_1.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_197.mp4', '/content/video_data/Shop DataSet/non shop lifters/shop_lifter_n_50.mp4', 

In [None]:
import os
import torch
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor, Resize, Compose
from PIL import Image
import cv2

class VideoDataset(Dataset):
    def __init__(self, root_dir, frames_per_video=16, transform=None):
        self.root_dir = root_dir
        self.frames_per_video = frames_per_video
        self.transform = transform if transform else Compose([
            Resize((64, 64)),
            ToTensor()
        ])
        self.video_paths = []
        self.labels = []

        # Scan subfolders recursively
        for label, class_folder in enumerate(sorted(os.listdir(root_dir))):
            class_path = os.path.join(root_dir, class_folder)
            if not os.path.isdir(class_path):
                continue
            for root, _, files in os.walk(class_path):
                for file in files:
                    if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
                        self.video_paths.append(os.path.join(root, file))
                        self.labels.append(label)

        print(f"✅ Found {len(self.video_paths)} video files.")

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]

        cap = cv2.VideoCapture(video_path)
        frames = []
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        if total_frames < self.frames_per_video:
            frame_indices = list(range(total_frames))
        else:
            step = total_frames // self.frames_per_video
            frame_indices = [i * step for i in range(self.frames_per_video)]

        frame_indices_set = set(frame_indices)
        current_index = 0

        while len(frames) < len(frame_indices) and cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            if current_index in frame_indices_set:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil_image = Image.fromarray(frame)
                transformed = self.transform(pil_image)
                frames.append(transformed)

            current_index += 1

        cap.release()

        # If no valid frames, pad with zeros
        if len(frames) == 0:
            frames = [torch.zeros(3, 64, 64) for _ in range(self.frames_per_video)]
        while len(frames) < self.frames_per_video:
            frames.append(frames[-1].clone())

        video_tensor = torch.stack(frames)             # (T, C, H, W)
        video_tensor = video_tensor.permute(1, 0, 2, 3) # (C, T, H, W)

        return video_tensor, label


In [None]:
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor()
])

dataset = VideoDataset("/content/video_data", frames_per_video=8, transform=transform)
print("Total videos loaded:", len(dataset))

video, label = dataset[0]
print("Video shape:", video.shape)
print("Label:", label)


✅ Found 855 video files.
Total videos loaded: 855
Video shape: torch.Size([3, 8, 32, 32])
Label: 0


In [None]:
from torch.utils.data import random_split

total_size = len(dataset)
train_size = int(0.7 * total_size)
val_size = int(0.15 * total_size)
test_size = total_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])


In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)


In [None]:
!pip install pytorchvideo



In [None]:
import torch
import torch.nn as nn
from torchvision.models.video import mc3_18, MC3_18_Weights

def get_pretrained_mc3_video_model(num_classes: int):
    """
    Loads a pretrained mc3_18 video model and adapts the final layer.
    Args:
        num_classes (int): Number of classes for your task.
    Returns:
        torch.nn.Module: mc3_18 pretrained model with updated classifier.
    """
    # Load with pretrained weights on Kinetics
    model = mc3_18(weights=MC3_18_Weights.KINETICS400_V1)

    # Replace the final layer
    in_features = model.fc.in_features
    model.fc = nn.Linear(in_features, num_classes)

    return model

# Example usage:
# model = get_pretrained_mc3_video_model(num_classes=2)
# model = model.to(device)

In [None]:
model = get_pretrained_mc3_video_model(num_classes=2).to("cpu")

In [None]:
pip install torch torchvision scikit-learn



In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import time

In [None]:
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    true_labels = []
    pred_labels = []

    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(preds.cpu().numpy())

    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss, evaluate_metrics(true_labels, pred_labels)


In [None]:
@torch.no_grad()
def evaluate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    true_labels = []
    pred_labels = []

    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)

        loss = criterion(outputs, labels)
        running_loss += loss.item() * inputs.size(0)

        _, preds = torch.max(outputs, 1)
        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(preds.cpu().numpy())

    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss, evaluate_metrics(true_labels, pred_labels)


In [None]:
def evaluate_metrics(y_true, y_pred):
    return {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred, average='weighted', zero_division=0),
        'recall': recall_score(y_true, y_pred, average='weighted', zero_division=0),
        'f1_score': f1_score(y_true, y_pred, average='weighted', zero_division=0)
    }


In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, device, epochs=10):
    for epoch in range(epochs):
        since = time.time()

        train_loss, train_metrics = train_one_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_metrics = evaluate(model, val_loader, criterion, device)

        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
        print(f"Train Metrics: {train_metrics}")
        print(f"Val Metrics:   {val_metrics}")
        print(f"Time: {time.time() - since:.2f}s")


In [None]:
@torch.no_grad()
def test_model(model, test_loader, device):
    model.eval()
    true_labels = []
    pred_labels = []

    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)

        _, preds = torch.max(outputs, 1)
        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(preds.cpu().numpy())

    metrics = evaluate_metrics(true_labels, pred_labels)
    print("\n✅ Test Metrics:")
    print(metrics)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# ✅ Model
model = get_pretrained_mc3_video_model(num_classes=2)  # or Tiny3DCNN if you modified it

# ✅ Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# ✅ Loss function
criterion = nn.CrossEntropyLoss()

# ✅ Optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# ✅ Make sure your DataLoaders are defined:
# train_loader, val_loader, test_loader = ...

# ✅ Then training and testing:
train_model(model, train_loader, val_loader, criterion, optimizer, device, epochs=10)
test_model(model, test_loader, device)



Epoch 1/10
Train Loss: 0.2295 | Val Loss: 0.0177
Train Metrics: {'accuracy': 0.8695652173913043, 'precision': 1.0, 'recall': 0.8695652173913043, 'f1_score': 0.9302325581395348}
Val Metrics:   {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0}
Time: 504.72s

Epoch 2/10
Train Loss: 0.0080 | Val Loss: 0.0052
Train Metrics: {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0}
Val Metrics:   {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0}
Time: 496.73s

Epoch 3/10
Train Loss: 0.0037 | Val Loss: 0.0028
Train Metrics: {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0}
Val Metrics:   {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0}
Time: 496.88s

Epoch 4/10
Train Loss: 0.0026 | Val Loss: 0.0021
Train Metrics: {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0}
Val Metrics:   {'accuracy': 1.0, 'precision': 1.0, 'recall': 1.0, 'f1_score': 1.0}
Time: 513.44s

Epoch 5/10
Train Loss: 0.0019 | Va