In [1]:
#   CS22B1090
#   Shubh Khandelwal

In [None]:
#   Dataset Implementation

import cv2
import os
import pandas as pd
from PIL import Image
import random
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms

class CelebDataset(Dataset):

    def __init__(self, df, transform = None):
        super().__init__()
        self.df = df
        self.transform = transform
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index, max_frames = 64):
        video_path = self.df.loc[index, "path"]
        label = self.df.loc[index, "label"]

        cap = cv2.VideoCapture(video_path)
        all_frames = []
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            all_frames.append(frame)
        cap.release()
        if len(all_frames) > max_frames:
            sampled_indices = sorted(random.sample(range(len(all_frames)), max_frames))
        else:
            all_frames = all_frames + [all_frames[len(all_frames) - 1] for _ in range(max_frames - len(all_frames))]
            sampled_indices = list(range(len(all_frames)))
        frames = []
        for i in sampled_indices:
            frame = all_frames[i]
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = Image.fromarray(frame)
            if self.transform:
                frame = self.transform(frame)
            frames.append(frame)
        video = torch.stack(frames)
        return video, label

root_directory = "./dataset"

label_map = {
    'Celeb-real': 0,
    'Youtube-real': 0,
    'Celeb-synthesis': 1
}

videos = []
for category, label in label_map.items():
    category_path = os.path.join(root_directory, category)
    if not os.path.isdir(category_path):
        continue
    for filename in os.listdir(category_path):
        if filename.endswith('.mp4'):
            video = []
            video.append(os.path.join(category_path, filename))
            video.append(label)
            videos.append(video)

df = pd.DataFrame(videos, columns=["path", "label"])

train_df, test_df = train_test_split(df, test_size = 0.2, random_state = 42)
train_df = train_df.reset_index().drop("index", axis = 1)
test_df = test_df.reset_index().drop("index", axis = 1)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5] * 3, [0.5] * 3)
])

train_dataset = CelebDataset(train_df, transform)
test_dataset = CelebDataset(test_df, transform)

train_loader = DataLoader(train_dataset, batch_size = 2, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = 2, shuffle = True)

In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
# Model Architecture (ViT)

import math
import torch.nn as nn
import torch.optim as optim

class PatchEmbedder(nn.Module):

    def __init__(self, in_channels = 3, image_size = 224, patch_size = 16, embed_dim = 768, dropout = 0.0):
        super().__init__()
        num_patches = (image_size // patch_size) ** 2
        self.conv = nn.Conv2d(in_channels, embed_dim, kernel_size = patch_size, stride = patch_size)
        self.cls_token = nn.Parameter(torch.randn(1, 1 ,embed_dim))
        self.positional_embeddings = nn.Parameter(torch.randn(1, 1 + num_patches, embed_dim))
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        x = self.conv(x)
        x = x.flatten(2)
        x = x.transpose(1, 2)
        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)
        x = torch.cat((cls_tokens, x), dim = 1)
        x += self.positional_embeddings
        x = self.dropout(x)
        return x
    
class MultiHeadSelfAttention(nn.Module):

    def __init__(self, embed_dim = 768, num_heads = 12, dropout = 0.0):
        super().__init__()
        self.num_heads = num_heads
        self.q = nn.Linear(embed_dim, embed_dim)
        self.k = nn.Linear(embed_dim, embed_dim)
        self.v = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)
        self.proj = nn.Linear(embed_dim, embed_dim)
    
    def forward(self, x):
        batch_size, num_tokens, embed_dim = x.shape
        query = self.q(x)
        key = self.k(x)
        value = self.v(x)
        head_dim = embed_dim // self.num_heads
        query = query.view(batch_size, num_tokens, self.num_heads, head_dim).transpose(1, 2)
        key   = key.view(batch_size, num_tokens, self.num_heads, head_dim).transpose(1, 2)
        value = value.view(batch_size, num_tokens, self.num_heads, head_dim).transpose(1, 2)
        attention_scores = (query @ key.transpose(-2, -1)) / (math.sqrt(head_dim))
        attention_weights = attention_scores.softmax(dim = -1)
        attention_weights = self.dropout(attention_weights)
        attention_output = attention_weights @ value
        attention_output = attention_output.transpose(1, 2).contiguous()
        attention_output = attention_output.view(batch_size, num_tokens, embed_dim)
        attention_output = self.proj(attention_output)
        return attention_output
    
class MultiLayerPerceptron(nn.Module):

    def __init__(self, embed_dim, mlp_dim, dropout = 0.0):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(embed_dim, mlp_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(mlp_dim, embed_dim),
            nn.Dropout(dropout)
        )
    
    def forward(self, x):
        x = self.layers(x)
        return x

class TransformerEncoder(nn.Module):

    def __init__(self, embed_dim = 768, num_heads = 8, mlp_dim = 3072, dropout = 0.0):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attention = MultiHeadSelfAttention(embed_dim, num_heads, dropout)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.mlp = MultiLayerPerceptron(embed_dim, mlp_dim, dropout)
    
    def forward(self, x):
        x = x + self.attention(self.norm1(x))
        x = x + self.mlp(self.norm2(x))
        return x

class VisionTransformer(nn.Module):

    def __init__(self, in_channels = 3, image_size = 224, patch_size = 16, embed_dim = 768, num_heads = 8, mlp_dim = 3072, depth = 16, dropout = 0.0):
        super().__init__()
        self.patch_embedder = PatchEmbedder(in_channels, image_size, patch_size, embed_dim, dropout)
        self.blocks = nn.Sequential(*[
            TransformerEncoder(embed_dim, num_heads, mlp_dim, dropout)
            for _ in range(depth)
        ])
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, 1)
    
    def forward(self, x):
        x = self.patch_embedder(x)
        x = self.blocks(x)
        x = self.norm(x)
        output = x[:, 0]
        output = self.head(output)
        return output

model = VisionTransformer().to(device)
optimizer = optim.AdamW(model.parameters(), lr = 3e-4, weight_decay = 0.05)
criterion = nn.BCEWithLogitsLoss()

In [None]:
# Training Loop

import numpy as np
from sklearn.metrics import roc_auc_score, precision_score, confusion_matrix

def train(model, train_loader, epochs = 10):
    print("Training Start.")
    for epoch in range(epochs):
        print(f"\nEpoch: [{epoch + 1}/{epochs}]")
        for batch, (videos, labels) in enumerate(train_loader):
            print(f"Batch: {batch + 1}")
            for video, label in zip(videos, labels):
                video = video.to(device)
                label = label.float().view(1, 1).to(device)
                output = model(video)
                output = output.mean(dim = 0, keepdim = True)

                loss = criterion(output, label)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            if batch % 100 == 0:
                torch.save(model.state_dict(), "model.pth")
    print("\nTraining Complete.")

train(model, train_loader, 1)
torch.save(model.state_dict(), "model.pth")

Training Start.

Epoch: [1/1]
Batch: 1
Batch: 2
Batch: 3
Batch: 4
Batch: 5
Batch: 6
Batch: 7
Batch: 8
Batch: 9
Batch: 10
Batch: 11
Batch: 12
Batch: 13
Batch: 14
Batch: 15
Batch: 16
Batch: 17
Batch: 18
Batch: 19
Batch: 20
Batch: 21
Batch: 22
Batch: 23
Batch: 24
Batch: 25
Batch: 26
Batch: 27
Batch: 28
Batch: 29
Batch: 30
Batch: 31
Batch: 32
Batch: 33
Batch: 34
Batch: 35
Batch: 36
Batch: 37
Batch: 38
Batch: 39
Batch: 40
Batch: 41
Batch: 42
Batch: 43
Batch: 44
Batch: 45
Batch: 46
Batch: 47
Batch: 48
Batch: 49
Batch: 50
Batch: 51
Batch: 52
Batch: 53
Batch: 54
Batch: 55
Batch: 56
Batch: 57
Batch: 58
Batch: 59
Batch: 60
Batch: 61
Batch: 62
Batch: 63
Batch: 64
Batch: 65
Batch: 66
Batch: 67
Batch: 68
Batch: 69
Batch: 70
Batch: 71
Batch: 72
Batch: 73
Batch: 74
Batch: 75
Batch: 76
Batch: 77
Batch: 78
Batch: 79
Batch: 80
Batch: 81
Batch: 82
Batch: 83
Batch: 84
Batch: 85
Batch: 86
Batch: 87
Batch: 88
Batch: 89
Batch: 90
Batch: 91
Batch: 92
Batch: 93
Batch: 94
Batch: 95
Batch: 96
Batch: 97
Batch: 98

In [6]:
# Testing Loop

def test(model, test_loader):
    model.eval()
    print("\nTesting Start.")
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for batch, (videos, labels) in enumerate(test_loader):
            print(f"Batch: {batch + 1}")
            for video, label in zip(videos, labels):
                video = video.to(device)
                output = model(video)
                output = output.mean(dim=0)
                prob = torch.sigmoid(output)
                pred = (prob >= 0.5).long().item()
                true_label = label.item()

                all_preds.append(pred)
                all_labels.append(true_label)
                all_probs.append(prob.item())

    accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
    auc = roc_auc_score(all_labels, all_probs)
    precision = precision_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)
    tn, fp, fn, tp = cm.ravel()
    eer = fn / (fn + tp)

    print("Testing Complete.")
    print(f"\nAccuracy: {accuracy:.4f}")
    print(f"AUC: {auc:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"EER: {eer:.4f}")

model.load_state_dict(torch.load("/kaggle/working/model.pth"))
test(model, test_loader)

  model.load_state_dict(torch.load("/kaggle/working/model.pth"))



Testing Start.
Batch: 1
Batch: 2
Batch: 3
Batch: 4
Batch: 5
Batch: 6
Batch: 7
Batch: 8
Batch: 9
Batch: 10
Batch: 11
Batch: 12
Batch: 13
Batch: 14
Batch: 15
Batch: 16
Batch: 17
Batch: 18
Batch: 19
Batch: 20
Batch: 21
Batch: 22
Batch: 23
Batch: 24
Batch: 25
Batch: 26
Batch: 27
Batch: 28
Batch: 29
Batch: 30
Batch: 31
Batch: 32
Batch: 33
Batch: 34
Batch: 35
Batch: 36
Batch: 37
Batch: 38
Batch: 39
Batch: 40
Batch: 41
Batch: 42
Batch: 43
Batch: 44
Batch: 45
Batch: 46
Batch: 47
Batch: 48
Batch: 49
Batch: 50
Batch: 51
Batch: 52
Batch: 53
Batch: 54
Batch: 55
Batch: 56
Batch: 57
Batch: 58
Batch: 59
Batch: 60
Batch: 61
Batch: 62
Batch: 63
Batch: 64
Batch: 65
Batch: 66
Batch: 67
Batch: 68
Batch: 69
Batch: 70
Batch: 71
Batch: 72
Batch: 73
Batch: 74
Batch: 75
Batch: 76
Batch: 77
Batch: 78
Batch: 79
Batch: 80
Batch: 81
Batch: 82
Batch: 83
Batch: 84
Batch: 85
Batch: 86
Batch: 87
Batch: 88
Batch: 89
Batch: 90
Batch: 91
Batch: 92
Batch: 93
Batch: 94
Batch: 95
Batch: 96
Batch: 97
Batch: 98
Batch: 99
Bat