### Pre-processing

In [None]:
import os
import pathlib
import torch
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import transforms, models
from torchvision.models import VGG16_Weights
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import torch.nn.functional as F
from skopt import gp_minimize
from skopt.space import Real
from torch import nn
from torch.optim import Adam
import cv2
import shutil
import numpy as np
from PIL import Image
import random
import json

In [None]:
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

In [None]:
dataset_dir = pathlib.Path('/kaggle/input/deep-fake-detection-dfd-entire-original-dataset')
original_videos = dataset_dir / "DFD_original sequences"
manipulated_videos = dataset_dir / "DFD_manipulated_sequences/DFD_manipulated_sequences"

In [None]:
num_original_videos = len(list(original_videos.glob("*.mp4"))) 
num_manipulated_videos = len(list(manipulated_videos.glob("*.mp4")))
print(f"Original videos: {num_original_videos}")
print(f"Manipulated videos: {num_manipulated_videos}")

In [None]:
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(log_dir="runs/cross_validation")

## Helper functions

In [None]:
from PIL import Image
import numpy as np

def __getitem__(self, idx):
    label = self.labels[idx]
    # Mock image data as a NumPy array
    image = np.random.randint(0, 256, (224, 224, 3), dtype=np.uint8)  # Simulating an image
    
    if self.transform:
        image = self.transform(Image.fromarray(image))  # Convert to PIL Image

    return image, label

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),  
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
class VideoDataset(Dataset):
    def __init__(self, video_paths, labels, transform=None, frames_per_video=16):
        self.video_paths = video_paths
        self.labels = labels
        self.transform = transform
        self.frames_per_video = frames_per_video

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]

        # Load multiple frames from the video
        frames = self.load_video_frames(video_path, self.frames_per_video)

        # Apply transformations to frames
        if self.transform:
            frames = [self.transform(Image.fromarray(frame)) for frame in frames]
        
        # Stack frames to form a sequence
        frames = torch.stack(frames)
        return frames, label


    def load_video_frames(self, video_path, num_frames):
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            print(f"Error opening video: {video_path}")
            return []  # Return empty if the video cannot be opened

        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_indices = np.linspace(0, frame_count - 1, num_frames, dtype=np.int32)
    
        frames = []
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(frame)
            else:
                break
        cap.release()
    
        # If we don't have enough frames, repeat the last frame
        if len(frames) < num_frames:
            frames += [frames[-1]] * (num_frames - len(frames))
    
        return frames


In [None]:
class CustomAdamOptimizer:
    def __init__(self, params, lr=0.0001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        
        self.params = list(params)
        self.m = [torch.zeros_like(param) for param in self.params]  # Initialize first moment estimate
        self.v = [torch.zeros_like(param) for param in self.params]  # Initialize second moment estimate
        self.t = 0  # Time step
        
    def step(self):
        self.t += 1
        for i, param in enumerate(self.params):
            # Skip if the parameter does not require gradients
            if not param.requires_grad:
                continue
            
            grad = param.grad  # Get the gradient of the parameter
            
            # Update biased first moment estimate
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad
            # Update biased second moment estimate
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad**2
            
            # Compute bias-corrected first moment estimate
            m_hat = self.m[i] / (1 - self.beta1**self.t)
            # Compute bias-corrected second moment estimate
            v_hat = self.v[i] / (1 - self.beta2**self.t)
            
            # Update the parameter using the Adam rule
            param.data = param.data - self.lr * m_hat / (torch.sqrt(v_hat) + self.epsilon)

    def zero_grad(self):
        for param in self.params:
            if param.grad is not None:
                param.grad.zero_()

In [None]:
def balance_dataset(original_videos_dir, manipulated_videos_dir, output_dir, target_count=75):

    # Create output directories
    balanced_original_dir = output_dir / "original"
    balanced_manipulated_dir = output_dir / "manipulated"
    balanced_original_dir.mkdir(parents=True, exist_ok=True)
    balanced_manipulated_dir.mkdir(parents=True, exist_ok=True)

    # Sample videos
    original_videos = list(original_videos_dir.glob("*.mp4"))
    manipulated_videos = list(manipulated_videos_dir.glob("*.mp4"))

    sampled_original = random.sample(original_videos, target_count)
    sampled_manipulated = random.sample(manipulated_videos, target_count)

    # Copy sampled videos to output directories
    for file in sampled_original:
        shutil.copy(file, balanced_original_dir / file.name)

    for file in sampled_manipulated:
        shutil.copy(file, balanced_manipulated_dir / file.name)

    print(f"Balanced dataset created with {target_count} videos in each class.")
    return (sampled_original, [0] * len(sampled_original)), (sampled_manipulated, [1] * len(sampled_manipulated))

In [None]:
num_original_videos = len(list(original_videos.glob("*.mp4")))  # Adjust file extension if needed
num_manipulated_videos = len(list(manipulated_videos.glob("*.mp4")))

print(f"Original videos: {num_original_videos}")
print(f"Manipulated videos: {num_manipulated_videos}")

In [None]:
# Paths
balanced_dir = pathlib.Path('./balanced_dataset')
balanced_samples = balance_dataset(original_videos, manipulated_videos, balanced_dir)

In [None]:
balanced_dir = pathlib.Path('./balanced_dataset')
balanced_original = balanced_dir / "original"
balanced_manipulated = balanced_dir / "manipulated"

# Ensure the output directories exist
balanced_original.mkdir(parents=True, exist_ok=True)
balanced_manipulated.mkdir(parents=True, exist_ok=True)

In [None]:
# Update dataset paths to point to the new subset
balanced_video_paths = []
balanced_labels = []

for video_path in balanced_original.glob("*.mp4"):
    balanced_video_paths.append(video_path)
    balanced_labels.append(0)  # Label 0 for original

for video_path in balanced_manipulated.glob("*.mp4"):
    balanced_video_paths.append(video_path)
    balanced_labels.append(1)  # Label 1 for manipulated

In [None]:
print(f"Balanced dataset created with 350 videos in each class.")
print(f"Original videos saved to: {balanced_original}")
print(f"Manipulated videos saved to: {balanced_manipulated}")

In [None]:
balanced_original_dir = pathlib.Path('./balanced_dataset/original')
balanced_manipulated_dir = pathlib.Path('./balanced_dataset/manipulated')

original_count = len(list(balanced_original_dir.glob("*.mp4")))
manipulated_count = len(list(balanced_manipulated_dir.glob("*.mp4")))
print(f"Number of videos in 'original': {original_count}")
print(f"Number of videos in 'manipulated': {manipulated_count}")

In [None]:
balanced_dataset = VideoDataset(balanced_video_paths,balanced_labels, transform=transform)

In [None]:
# 3-Fold Cross Validation
kf = KFold(n_splits=3, shuffle=True, random_state=42)
fold_splits = [(train_idx, val_idx) for train_idx, val_idx in kf.split(balanced_video_paths)]

### Training

In [None]:
import torch
import torch.nn as nn
from torchvision import models
from torchvision.models import VGG16_Weights

class VGG16LSTM(nn.Module):
    def __init__(self, num_classes=2, lstm_hidden_size=256, lstm_num_layers=1, freeze_feature_extractor=True):
        super(VGG16LSTM, self).__init__()
        
        # Load pretrained VGG16 features
        self.feature_extractor = models.vgg16(weights=VGG16_Weights.IMAGENET1K_V1).features
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))  # VGG16 uses a (7x7) pool by default
        self.fc_features = nn.Linear(512 * 7 * 7, 1024)

        # Optionally freeze the feature extractor
        if freeze_feature_extractor:
            for param in self.feature_extractor.parameters():
                param.requires_grad = False

        # LSTM for temporal modeling
        self.lstm = nn.LSTM(input_size=1024, hidden_size=lstm_hidden_size, num_layers=lstm_num_layers, batch_first=True)
        
        # Fully connected layer for classification
        self.fc = nn.Linear(lstm_hidden_size, num_classes)

    def forward(self, x):
        batch_size, seq_len, c, h, w = x.size()
        
        # Reshape input for the feature extractor
        x = x.view(batch_size * seq_len, c, h, w)
        features = self.feature_extractor(x)
        
        # Pool and flatten
        features = self.avgpool(features)
        features = torch.flatten(features, start_dim=1)
        features = self.fc_features(features)
        
        # Reshape for LSTM input
        features = features.view(batch_size, seq_len, -1)
        
        # Flatten LSTM weights (needed for CuDNN)
        self.lstm.flatten_parameters()
        
        # Forward through LSTM
        lstm_out, _ = self.lstm(features)
        
        # Take the last time step's output
        final_output = lstm_out[:, -1, :]
        
        # Forward through the classifier
        output = self.fc(final_output)
        return output


In [None]:
from tqdm import tqdm  # Progress bar
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Initialize model, loss function, and optimizer
model = VGG16LSTM(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = CustomAdamOptimizer(model.parameters(), lr=0.0001)

In [None]:
# Train and validate
def train_and_validate(model, train_loader, val_loader, epochs):
    for epoch in range(epochs):
        model.train()  # Sezt model to training mode
        running_loss = 0.0

        # Training loop with progress bar
        print(f"Epoch {epoch + 1}/{epochs}")
        train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc="Training")

        for batch_idx, (videos, labels) in train_progress:
            videos, labels = videos.to(device), labels.to(device)

            # Forward pass
            optimizer.zero_grad()
            outputs = model(videos)
            loss = criterion(outputs, labels)

            # Backward pass and optimizer step
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            train_progress.set_postfix(loss=loss.item())

        print(f"Epoch {epoch + 1} Training Loss: {running_loss / len(train_loader):.4f}")

        # Validation loop
        model.eval()  # Set model to evaluation mode
        val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            val_progress = tqdm(enumerate(val_loader), total=len(val_loader), desc="Validating")
            for batch_idx, (videos, labels) in val_progress:
                videos, labels = videos.to(device), labels.to(device)

                # Forward pass
                outputs = model(videos)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                # Accuracy calculation
                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)

        print(f"Epoch {epoch + 1} Validation Loss: {val_loss / len(val_loader):.4f}")
        print(f"Epoch {epoch + 1} Validation Accuracy: {100 * correct / total:.2f}%\n")


In [None]:
from sklearn.metrics import  precision_score, recall_score, f1_score, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns


def evaluate_model(model, val_loader):
    y_true = []
    y_pred = []
    tp=0
    tn=0
    fp=0
    fn=0


    # Collect predictions and true labels
    model.eval()
    with torch.no_grad():
        for videos, labels in val_loader:
            videos, labels = videos.to(device), labels.to(device)
            outputs = model(videos)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
            tp += ((preds == 1) & (labels == 1)).sum().item()
            tn += ((preds == 0) & (labels == 0)).sum().item()
            fp += ((preds == 1) & (labels == 0)).sum().item()
            fn += ((preds == 0) & (labels == 1)).sum().item()


    # Compute metrics
    
    acc = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=1)
    recall = recall_score(y_true, y_pred, average='weighted', zero_division=1)
    f1 = f1_score(y_true, y_pred, average='weighted', zero_division=1)
    cm= np.array([[tp, fn], [fp, tn]])

    return cm,acc, precision, recall, f1

In [None]:
metrics = {
    "accuracy": [],
    "preision": [],
    "recall": [],
    "f1_score": []
}

In [None]:
# Dataset split
train_idx = list(range(0, 120))  # Adjust according to your dataset
val_idx = list(range(120, 150))
train_dataset = Subset(balanced_dataset, train_idx)
val_dataset = Subset(balanced_dataset, val_idx)
# Dataloaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=0)

In [None]:
# Check if these directories contain video files
print(len(list(original_videos.glob("*.mp4"))))
print(len(list(manipulated_videos.glob("*.mp4"))))
print(len(train_dataset))
print(len(val_dataset))

In [None]:
def plot_confusion_matrix(cm, labels=["Positive", "Negative"]):
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=["Positive", "Negative"], 
                yticklabels=["Positive", "Negative"])
    plt.ylabel("Predicted Values")
    plt.xlabel("Actual Values")
    plt.title("Confusion Matrix")
    plt.show()



In [None]:
# Cross-validation loop
for fold_idx, (train_idx, val_idx) in enumerate(fold_splits):
    print(f"\nFold {fold_idx + 1}:")

    # Prepare DataLoaders for the fold
    train_dataset = Subset(balanced_dataset, train_idx)
    val_dataset = Subset(balanced_dataset, val_idx)
    
    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=0)

    # Initialize model and optimizer for each fold
    model = VGG16LSTM(num_classes=2).to(device)
    model.lstm.flatten_parameters()
    optimizer = CustomAdamOptimizer(model.parameters(), lr=0.0001)
    criterion = nn.CrossEntropyLoss()

    # Train and validate for the fold
    train_and_validate(model, train_loader, val_loader, epochs=10)

    
    # Evaluate final metrics for the fold
    cm, acc, precision, recall, f1 = evaluate_model(model, val_loader)
    plot_confusion_matrix(cm)
    
    print(f"Final Fold {fold_idx + 1} Metrics - Accuracy: {acc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}\n")
    # After fold evaluation
    writer.add_scalar(f"Fold_{fold_idx+1}/Accuracy", acc, fold_idx + 1)
    writer.add_scalar(f"Fold_{fold_idx+1}/Precision", precision, fold_idx + 1)
    writer.add_scalar(f"Fold_{fold_idx+1}/Recall", recall, fold_idx + 1)
    writer.add_scalar(f"Fold_{fold_idx+1}/F1_Score", f1, fold_idx + 1)
    writer.add_figure(f"Fold_{fold_idx+1}/Confusion_Matrix", plt.gcf(), fold_idx + 1)
    

In [None]:
torch.save(model.state_dict(), '/kaggle/working/vgg16_best_model.pth')

In [None]:
writer.close()

In [None]:
%load_ext tensorboard
%tensorboard --logdir runs/cross_validation