# DeepFake Detection

### 1 - Imports and Configurations

In [1]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset, random_split
from torchvision import datasets, models, transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import pandas as pd
import kaggle
import time
import copy
from IPython.display import clear_output

# Set Device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# Reproducibility (Crucial for comparing models fairness)
SEED = 42
def set_seed(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True

set_seed(SEED)

# Global Config
DATASET_NAME = "ayushmandatta1/deepdetect-2025" 
DOWNLOAD_ROOT = "./deepdetect_data"
BATCH_SIZE = 32
NUM_WORKERS = 2 # Adjust based on OS (0 for Windows if issues arise)

Using device: cuda


### 2 - Data Donwload & Path Setup

In [2]:
# 1. Download Dataset
if not os.path.exists(DOWNLOAD_ROOT):
    print(f"Downloading {DATASET_NAME}...")
    kaggle.api.authenticate()
    kaggle.api.dataset_download_files(DATASET_NAME, path=DOWNLOAD_ROOT, unzip=True)
    print("Download complete.")
else:
    print("Dataset already downloaded.")

# 2. Locate Folders
def find_folder(root_dir, target_name):
    for dirpath, dirnames, filenames in os.walk(root_dir):
        if target_name in dirnames:
            return os.path.join(dirpath, target_name)
    raise FileNotFoundError(f"Could not find folder '{target_name}' inside {root_dir}")

try:
    TRAIN_DIR_RAW = find_folder(DOWNLOAD_ROOT, 'train')
    TEST_DIR_RAW = find_folder(DOWNLOAD_ROOT, 'test') # We will strictly hold this out
    print(f"Training Root: {TRAIN_DIR_RAW}")
    print(f"Test (Holdout) Root:  {TEST_DIR_RAW}")
except FileNotFoundError as e:
    print(f"Error: {e}")

Dataset already downloaded.
Training Root: ./deepdetect_data\ddata\train
Test (Holdout) Root:  ./deepdetect_data\ddata\test


### 3 - Transformations

In [3]:
# Normalization stats from ImageNet (standard for transfer learning)
NORM_MEAN = [0.485, 0.456, 0.406]
NORM_STD = [0.229, 0.224, 0.225]

def get_transforms(img_size=224):
    """
    Returns a tuple of (aug_transforms, basic_transforms)
    """
    # 1. Strong Augmentation (For Training Optimized Models)
    aug_transforms = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 2.0)),
        transforms.ToTensor(),
        transforms.Normalize(mean=NORM_MEAN, std=NORM_STD),
    ])

    # 2. Basic Transforms (For Validation, Testing, and Baseline Training)
    basic_transforms = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=NORM_MEAN, std=NORM_STD),
    ])
    
    return aug_transforms, basic_transforms

### 4 - Data Splitting & Loaders

In [4]:
def get_data_loaders(model_img_size=224, use_augmentation=False):
    """
    Creates dataloaders. 
    If use_augmentation=True: Train set gets 'aug_transforms', Val/Test get 'basic_transforms'.
    If use_augmentation=False: All sets get 'basic_transforms' (For Baseline).
    """
    aug_tf, basic_tf = get_transforms(model_img_size)
    
    # Determine which transform to use for training
    train_tf = aug_tf if use_augmentation else basic_tf
    
    # 1. Create Datasets
    # We load the train folder twice with different transforms
    # This allows us to apply augmentation ONLY to the training subset indices
    full_train_dataset_aug = datasets.ImageFolder(root=TRAIN_DIR_RAW, transform=train_tf)
    full_train_dataset_clean = datasets.ImageFolder(root=TRAIN_DIR_RAW, transform=basic_tf)
    test_dataset = datasets.ImageFolder(root=TEST_DIR_RAW, transform=basic_tf)
    
    # 2. Create Split Indices
    # We get labels to perform a stratified split (keeping class balance)
    train_targets = full_train_dataset_aug.targets
    train_idx, val_idx = train_test_split(
        np.arange(len(train_targets)), 
        test_size=0.2, 
        random_state=SEED, 
        stratify=train_targets
    )
    
    # 3. Create Subsets
    # Train subset uses the Augmented Dataset object
    train_subset = Subset(full_train_dataset_aug, train_idx)
    # Validation subset uses the Clean Dataset object (NO leaks of augmentation)
    val_subset = Subset(full_train_dataset_clean, val_idx)
    
    # 4. Create Loaders
    train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
    val_loader = DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
    
    print(f"Data Loaded for Size {model_img_size} | Augmented: {use_augmentation}")
    print(f"Train samples: {len(train_subset)} | Val samples: {len(val_subset)} | Test samples: {len(test_dataset)}")
    
    return train_loader, val_loader, test_loader, full_train_dataset_aug.classes

### 5 - Training

In [10]:
def train_model(model, train_loader, val_loader, criterion, optimizer, 
                scheduler=None, num_epochs=10, patience=5, device=DEVICE,
                model_name="best_model", save_checkpoint=True):
    
    since = time.time()

    # History tracking
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }

    best_model_wts = copy.deepcopy(model.state_dict())
    best_val_loss = float('inf')
    best_val_acc = 0.0
    patience_counter = 0
    
    print(f"Starting training for {model_name}...")

    for epoch in range(num_epochs):
        epoch_start = time.time() # Start Timer
        
        # --- TRAINING PHASE ---
        model.train()
        running_loss = 0.0
        running_corrects = 0
        total_samples = 0
        
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total_samples += inputs.size(0)

        epoch_loss = running_loss / total_samples
        epoch_acc = running_corrects.double() / total_samples
        history['train_loss'].append(epoch_loss)
        history['train_acc'].append(epoch_acc.item())

        # --- VALIDATION PHASE ---
        model.eval()
        val_running_loss = 0.0
        val_running_corrects = 0
        val_total_samples = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                val_running_loss += loss.item() * inputs.size(0)
                val_running_corrects += torch.sum(preds == labels.data)
                val_total_samples += inputs.size(0)
        
        val_loss = val_running_loss / val_total_samples
        val_acc = val_running_corrects.double() / val_total_samples
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc.item())

        # --- END OF EPOCH LOGIC ---
        epoch_duration = time.time() - epoch_start
        
        if scheduler:
            if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
                scheduler.step(val_loss)
            else:
                scheduler.step()

        # LIVE PLOTTING
        clear_output(wait=True)
        fig, ax = plt.subplots(1, 2, figsize=(12, 4))
        
        # Loss Plot
        ax[0].plot(history['train_loss'], label='Train Loss')
        ax[0].plot(history['val_loss'], label='Val Loss')
        ax[0].set_title(f'{model_name} Loss')
        ax[0].legend()
        ax[0].grid(True)
        
        # Acc Plot
        ax[1].plot(history['train_acc'], label='Train Acc')
        ax[1].plot(history['val_acc'], label='Val Acc')
        ax[1].set_title(f'{model_name} Accuracy')
        ax[1].legend()
        ax[1].grid(True)
        
        plt.tight_layout()
        plt.show()
        plt.close(fig) 
        
        print(f"Epoch {epoch+1}/{num_epochs} ({epoch_duration:.1f}s) - "
              f"Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} | "
              f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}")

        # Early Stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_val_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())

            # Only save when requested
            if save_checkpoint:
                torch.save(model.state_dict(), f"{model_name}.pth")
            patience_counter = 0
            print(f"--> Best model saved! (Val Loss: {val_loss:.4f})")
        else:
            patience_counter += 1
            print(f"--> EarlyStopping counter: {patience_counter}/{patience}")
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break
                
    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best Val Acc: {best_val_acc:.4f}')

    model.load_state_dict(best_model_wts)
    return model, history

In [12]:
def train_with_gradual_unfreezing(model, train_loader, val_loader, 
                                  model_name="Optimized_Model", 
                                  warmup_epochs=3, 
                                  total_epochs=10,
                                  patience=3,
                                  save_checkpoint=True):
    """
    Implements the Gradual Unfreezing strategy:
    Phase 1: Freeze Backbone, train ONLY Head for 'warmup_epochs'.
    Phase 2: Unfreeze last 2 blocks, train for remaining epochs.
    """
    print(f"\n=== PHASE A: WARMUP ({warmup_epochs} Epochs) - Backbone Frozen ===")
    
    # 1. Freeze Backbone / Unfreeze Head
    for param in model.parameters():
        param.requires_grad = False
    
    # Assumes model.classifier or model.fc is the head (standard in torchvision)
    if hasattr(model, 'classifier'):
        for param in model.classifier.parameters(): param.requires_grad = True
    elif hasattr(model, 'fc'):
        for param in model.fc.parameters(): param.requires_grad = True
        
    # 2. Setup Optimizer for Head Only
    # We use a slightly higher LR for the head initialization
    optimizer_warmup = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

    # 3. Train Warmup
    model, _ = train_model(model, train_loader, val_loader, criterion, optimizer_warmup, 
                           num_epochs=warmup_epochs, patience=99, model_name=f"{model_name}_Warmup",
                           save_checkpoint=False) # Patience high to ensure full warmup

    print(f"\n=== PHASE B: FINE-TUNING ({total_epochs - warmup_epochs} Epochs) - Last 2 Blocks Unfrozen ===")
    
    # 4. Unfreeze Last 2 Blocks (Architecture dependent logic)
    # EfficientNet uses 'features', ResNet uses 'layer3', 'layer4', etc.
    if "efficientnet" in model_name.lower():
        # Unfreeze last 2 blocks of EfficientNet features
        for param in model.features[-2:].parameters():
            param.requires_grad = True
    elif "resnet" in model_name.lower():
        # Unfreeze Layer 4 (last block) and Layer 3
        for param in model.layer4.parameters(): param.requires_grad = True
        for param in model.layer3.parameters(): param.requires_grad = True
        
    # 5. Setup Optimizer for Fine-Tuning
    # Lower Learning Rate to protect weights (1e-4)
    optimizer_finetune = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), 
                                    lr=1e-4, weight_decay=1e-4)
    
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer_finetune, mode='min', factor=0.1, patience=2)

    # 6. Train Remaining
    model, history = train_model(model, train_loader, val_loader, criterion, optimizer_finetune, scheduler, 
                                 num_epochs=total_epochs - warmup_epochs, patience=patience,
                                model_name=model_name, save_checkpoint=save_checkpoint)
    
    return model, history

### Model 1: Baseline EfficientNet B0

In [13]:
# --- MODEL 1: BASELINE EFFICIENTNET B0 ---

# 1. Data Loading (NO Augmentation for Baseline)
# We strictly use basic_transforms here to simulate a naive training setup.
train_loader_base, val_loader_base, _, classes = get_data_loaders(model_img_size=224, use_augmentation=False)

# 2. Model Setup
print("Initializing Baseline EfficientNet B0...")
model_b0_base = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1)

# Naive Transfer Learning:
# We do NOT freeze weights. We train the entire network (backbone + head) from start.
# This often leads to overfitting on small datasets as we destroy ImageNet weights too fast.
for param in model_b0_base.parameters():
    param.requires_grad = True 

# Simple Head (No Dropout)
num_features = model_b0_base.classifier[1].in_features
model_b0_base.classifier[1] = nn.Linear(num_features, 2)
model_b0_base = model_b0_base.to(DEVICE)

# 3. Training Config (Standard)
criterion = nn.CrossEntropyLoss() # Standard Hard Loss
optimizer = optim.Adam(model_b0_base.parameters(), lr=0.001) # Standard Adam

# 4. Execution
model_b0_base, hist_b0_base = train_model(
    model_b0_base, 
    train_loader_base, 
    val_loader_base, 
    criterion, 
    optimizer,
    scheduler=None, # No scheduler for baseline
    num_epochs=10, 
    patience=3, 
    model_name="Baseline_B0",
    save_checkpoint=True
)

Data Loaded for Size 224 | Augmented: False
Train samples: 72327 | Val samples: 18082 | Test samples: 21776
Initializing Baseline EfficientNet B0...
Starting training for Baseline_B0...


KeyboardInterrupt: 

### Model 2: Optimized EfficientNet B0

In [None]:
# --- MODEL 2: OPTIMIZED EFFICIENTNET B0 ---

# 1. Data (Standard 224x224 for B0)
train_loader_opt, val_loader_opt, _, _ = get_data_loaders(model_img_size=224, use_augmentation=True)

# 2. Model Setup
print("Initializing Optimized EfficientNet B0...")
model_b0_opt = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1)

# Robust Head Structure
num_features = model_b0_opt.classifier[1].in_features
model_b0_opt.classifier = nn.Sequential(
    nn.Dropout(p=0.5),
    nn.Linear(num_features, 2)
)
model_b0_opt = model_b0_opt.to(DEVICE)

# 3. Execution (Using the Gradual Strategy)
# This will now:
#   1. Train ONLY the head for 3 epochs (Warmup)
#   2. Unfreeze the last 2 blocks and train for 12 more epochs (Fine-tuning)
model_b0_opt, hist_b0_opt = train_with_gradual_unfreezing(
    model_b0_opt, 
    train_loader_opt, 
    val_loader_opt, 
    model_name="Optimized_B0",
    warmup_epochs=3,
    total_epochs=15,
    patience=3,
    save_checkpoint=True
)

Data Loaded for Size 224 | Augmented: True
Train samples: 72327 | Val samples: 18082 | Test samples: 21776
Initializing Optimized EfficientNet B0...

=== PHASE A: WARMUP (3 Epochs) - Backbone Frozen ===
Starting training for Optimized_B0_Warmup...


KeyboardInterrupt: 

### Model 3: Optimized EfficientNet B2

In [None]:
# --- MODEL 3: OPTIMIZED EFFICIENTNET B2 ---

# 1. Data (B2 requires 260x260 resolution for optimal performance)
train_loader_b2, val_loader_b2, _, _ = get_data_loaders(model_img_size=260, use_augmentation=True)

# 2. Model Setup
print("Initializing EfficientNet B2...")
model_b2 = models.efficientnet_b2(weights=models.EfficientNet_B2_Weights.IMAGENET1K_V1)

# Robust Head
num_features = model_b2.classifier[1].in_features
model_b2.classifier = nn.Sequential(
    nn.Dropout(p=0.5),
    nn.Linear(num_features, 2)
)
model_b2 = model_b2.to(DEVICE)

# 3. Execution (Using the Gradual Strategy)
model_b2, hist_b2 = train_with_gradual_unfreezing(
    model_b2, 
    train_loader_b2, 
    val_loader_b2, 
    model_name="Optimized_B2",
    warmup_epochs=3,
    total_epochs=15,
    patience=3,
    save_checkpoint=True
)

Data Loaded for Size 260 | Augmented: True
Train samples: 72327 | Val samples: 18082 | Test samples: 21776
Initializing EfficientNet B2...

=== PHASE A: WARMUP (3 Epochs) - Backbone Frozen ===
Starting training for Optimized_B2_Warmup...


KeyboardInterrupt: 

### Model 4: Optimized ResNet50

In [14]:
# --- MODEL 4: OPTIMIZED RESNET50 ---

# 1. Data (ResNet standard is 224x224)
train_loader_rn, val_loader_rn, _, _ = get_data_loaders(model_img_size=224, use_augmentation=True)

# 2. Model Setup
print("Initializing ResNet50...")
model_rn = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2) # V2 weights are stronger

# ResNet Head Structure is different (model.fc instead of model.classifier)
num_features = model_rn.fc.in_features
model_rn.fc = nn.Sequential(
    nn.Dropout(p=0.5),
    nn.Linear(num_features, 2)
)
model_rn = model_rn.to(DEVICE)

# 3. Execution (Using the Gradual Strategy)
# Our function automatically detects "resnet" in the name and unfreezes layer3+layer4
model_rn, hist_rn = train_with_gradual_unfreezing(
    model_rn, 
    train_loader_rn, 
    val_loader_rn, 
    model_name="Optimized_ResNet50",
    warmup_epochs=3,
    total_epochs=15,
    patience=3,
    save_checkpoint=True
)

Data Loaded for Size 224 | Augmented: True
Train samples: 72327 | Val samples: 18082 | Test samples: 21776
Initializing ResNet50...
Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to C:\Users\andre/.cache\torch\hub\checkpoints\resnet50-11ad3fa6.pth


100%|██████████| 97.8M/97.8M [00:12<00:00, 8.34MB/s]



=== PHASE A: WARMUP (3 Epochs) - Backbone Frozen ===
Starting training for Optimized_ResNet50_Warmup...


KeyboardInterrupt: 

### 6 - Testing & Evaluation

In [None]:
# --- EVALUATION ON TEST SET ---
def evaluate_model(model, test_loader, device=DEVICE, model_name="Model"):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = [] # For AUC
    
    print(f"Evaluating {model_name} on Test Set...")
    
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc=f"Testing {model_name}"):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            
            # Get Probabilities (Softmax)
            probs = torch.softmax(outputs, dim=1)[:, 1] # Probability of Class 1 (Deepfake)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())
            
    # Calculate Metrics
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    auc = roc_auc_score(all_labels, all_probs)
    cm = confusion_matrix(all_labels, all_preds)
    
    return {
        "Model": model_name,
        "Accuracy": acc,
        "F1-Score": f1,
        "AUC": auc,
        "Confusion Matrix": cm
    }

In [None]:
# 1. Setup Test Loader (Basic Transforms only!)
# Note: We create one loader per image size needed
_, _, test_loader_224, _ = get_data_loaders(model_img_size=224, use_augmentation=False)
_, _, test_loader_260, _ = get_data_loaders(model_img_size=260, use_augmentation=False)

results = []

# List of models to evaluate
# Structure: (Model Object, Model Name, Specific Loader)
models_to_test = [
    (model_b0_base, "Baseline_B0", test_loader_224),
    (model_b0_opt,  "Optimized_B0", test_loader_224),
    (model_b2,      "Optimized_B2", test_loader_260),
    (model_rn,      "Optimized_ResNet50", test_loader_224)
]

for model, name, loader in models_to_test:
    res = evaluate_model(model, loader, model_name=name)
    results.append(res)

In [None]:
# 1. Create DataFrame
df_results = pd.DataFrame(results)
# Drop CM for the table view (it's too big)
display_df = df_results.drop(columns=["Confusion Matrix"])

print("\n=== FINAL LEADERBOARD ===")
display(display_df.sort_values(by="AUC", ascending=False))

# 2. Visualize Confusion Matrices
plt.figure(figsize=(15, 10))
for i, res in enumerate(results):
    plt.subplot(2, 2, i+1)
    sns.heatmap(res["Confusion Matrix"], annot=True, fmt='d', cmap='Blues',
                xticklabels=['Real', 'Fake'], yticklabels=['Real', 'Fake'])
    plt.title(f"{res['Model']}\nF1: {res['F1-Score']:.3f} | AUC: {res['AUC']:.3f}")
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')

plt.tight_layout()
plt.show()