# FaceForensics++ C23 – Full Pipeline with YOLO Face Crops

This notebook trains a binary classifier (REAL vs FAKE) on face crops
preprocessed by YOLO (from `processing_detect_n_crop.ipynb`).

**Assumptions**:
- Preprocessed faces are stored under `FFprocessed/` with subfolders:
  - `original/`          → REAL
  - `Deepfakes/`, `FaceSwap/`, `Face2Face/`, `NeuralTextures/`,
    `FaceShifter/`, `DeepFakeDetection/` → FAKE
- Each video corresponds to a folder inside these method folders,
  containing ~20 face images like `frame_000.png`, etc.

This notebook will:
1. Build train/val/test splits at the **video** level.
2. Flatten them into `data_yolo_faces/{train,val,test}/{REAL,FAKE}`.
3. Create PyTorch `ImageFolder` datasets and loaders.
4. Train a ResNet18 classifier and save the best model.
5. Evaluate on the held-out test set.

In [None]:
# 1. Configure dataset paths and environment
from pathlib import Path
import os
import random
import shutil

,
import numpy as np
import torch

,
DATA_ROOT = Path("FFprocessed")       # where YOLO face crops live
OUT_ROOT = Path("data_yolo_faces")   # where we will build REAL/FAKE splits
,
OUT_ROOT.mkdir(exist_ok=True)
,
RANDOM_STATE = 42
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)
torch.manual_seed(RANDOM_STATE)

,
print("DATA_ROOT:", DATA_ROOT.resolve())
print("OUT_ROOT:", OUT_ROOT.resolve())

In [None]:
# 2. Build train/val/test splits at VIDEO level from FFprocessed

from collections import defaultdict

# Methods: original = REAL, others = FAKE
REAL_METHOD = "original"
FAKE_METHODS = [
    "Deepfakes",
    "FaceSwap",
    "Face2Face",
    "NeuralTextures",
    "FaceShifter",
    "DeepFakeDetection",
]

video_entries = []  # list of (method, video_id, label, path)

for method in [REAL_METHOD] + FAKE_METHODS:
    method_dir = DATA_ROOT / method
    if not method_dir.exists():
        print(f"Warning: method folder not found: {method_dir}")
        continue

    for video_dir in method_dir.iterdir():
        if not video_dir.is_dir():
            continue
        imgs = [p for p in video_dir.iterdir() if p.is_file()]
        if not imgs:
            continue
        label = "REAL" if method == REAL_METHOD else "FAKE"
        video_entries.append({
            "method": method,
            "video_id": video_dir.name,
            "label": label,
            "path": str(video_dir),
            "num_frames": len(imgs),
        })

print("Total videos found:", len(video_entries))
labels = [e["label"] for e in video_entries]
print("Label counts:", {"REAL": labels.count("REAL"), "FAKE": labels.count("FAKE")})

import numpy as np
from sklearn.model_selection import train_test_split

indices = np.arange(len(video_entries))
labels_np = np.array(labels)

train_idx, temp_idx = train_test_split(
    indices,
    test_size=0.30,
    stratify=labels_np,
    random_state=RANDOM_STATE,
)

val_idx, test_idx = train_test_split(
    temp_idx,
    test_size=0.50,
    stratify=labels_np[temp_idx],
    random_state=RANDOM_STATE,
)

def subset(entries, idx):
    return [entries[i] for i in idx]

train_videos = subset(video_entries, train_idx)
val_videos = subset(video_entries, val_idx)
test_videos = subset(video_entries, test_idx)

def count_labels(entries):
    c = defaultdict(int)
    for e in entries:
        c[e["label"]] += 1
    return dict(c)

print("Train videos:", len(train_videos), count_labels(train_videos))
print("Val videos:", len(val_videos), count_labels(val_videos))
print("Test videos:", len(test_videos), count_labels(test_videos))

In [None]:
# 3. Flatten videos into data_yolo_faces/{train,val,test}/{REAL,FAKE}

def prepare_split(split_name, entries):
    split_root = OUT_ROOT / split_name
    (split_root / "REAL").mkdir(parents=True, exist_ok=True)
    (split_root / "FAKE").mkdir(parents=True, exist_ok=True)

    num_copied = 0
    for e in entries:
        src_video_dir = Path(e["path"])
        label = e["label"]
        dst_dir = split_root / label

        for img_path in src_video_dir.iterdir():
            if not img_path.is_file():
                continue
            dst_path = dst_dir / f"{e['method']}_{e['video_id']}_{img_path.name}"
            if not dst_path.exists():
                shutil.copy2(img_path, dst_path)
                num_copied += 1
    print(f"Prepared split {split_name}: {num_copied} images")

prepare_split("train", train_videos)
prepare_split("val", val_videos)
prepare_split("test", test_videos)

In [None]:
# 4. ImageFolder datasets and DataLoaders for data_yolo_faces

from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

faces_dir = OUT_ROOT  # data_yolo_faces

imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.7, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.08),
    transforms.RandomRotation(degrees=10),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
])

eval_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
])

train_dataset = datasets.ImageFolder(root=str(faces_dir / "train"), transform=train_transform)
val_dataset = datasets.ImageFolder(root=str(faces_dir / "val"), transform=eval_transform)
test_dataset = datasets.ImageFolder(root=str(faces_dir / "test"), transform=eval_transform)

print("Classes:", train_dataset.classes)
print("Train images:", len(train_dataset))
print("Val images:", len(val_dataset))
print("Test images:", len(test_dataset))

batch_size = 64

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

len(train_loader), len(val_loader), len(test_loader)

In [None]:
# 5. Define ResNet18 model, loss, optimizer

import torch.nn as nn

def create_model(num_classes=2, use_pretrained=True):
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1 if use_pretrained else None)
    in_features = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Dropout(p=0.3),
        nn.Linear(in_features, num_classes),
    )
    return model

model = create_model(num_classes=2, use_pretrained=True).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=1)

print(model.fc)

In [None]:
# 6. Training and validation loop

import time

def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    running_corrects = 0
    total = 0

    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        _, preds = torch.max(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels).item()
        total += inputs.size(0)

    epoch_loss = running_loss / total
    epoch_acc = running_corrects / total
    return epoch_loss, epoch_acc

@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    total = 0

    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        _, preds = torch.max(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels).item()
        total += inputs.size(0)

    epoch_loss = running_loss / total
    epoch_acc = running_corrects / total
    return epoch_loss, epoch_acc

num_epochs = 20
best_val_acc = 0.0
best_state_dict = None

history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}

for epoch in range(num_epochs):
    start_time = time.time()
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    elapsed = time.time() - start_time

    history["train_loss"].append(train_loss)
    history["train_acc"].append(train_acc)
    history["val_loss"].append(val_loss)
    history["val_acc"].append(val_acc)

    scheduler.step(val_loss)

    print(
        f"Epoch {epoch+1}/{num_epochs} | ",
        f"train_loss={train_loss:.4f}, train_acc={train_acc:.4f}, ",
        f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f}, ",
        f"time={elapsed:.1f}s"
    )

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state_dict = model.state_dict()

print("Best val acc:", best_val_acc)

if best_state_dict is not None:
    model.load_state_dict(best_state_dict)
    torch.save(model.state_dict(), "best_resnet_ffpp_yolo_faces.pth")
    print("Saved best model to best_resnet_ffpp_yolo_faces.pth")

In [None]:
# 7. Test evaluation and plots

import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

@torch.no_grad()
def evaluate_simple(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        correct += torch.sum(preds == labels).item()
        total += inputs.size(0)
    return correct / total if total > 0 else 0.0

test_acc = evaluate_simple(model, test_loader, device)
print("Test accuracy (YOLO faces):", test_acc)

# Plot training and validation loss/accuracy
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(history["train_loss"], label="Train Loss")
plt.plot(history["val_loss"], label="Val Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Loss over epochs (YOLO faces)")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history["train_acc"], label="Train Acc")
plt.plot(history["val_acc"], label="Val Acc")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Accuracy over epochs (YOLO faces)")
plt.legend()

plt.tight_layout()
plt.show()

@torch.no_grad()
def get_predictions_and_labels(model, loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.append(preds.cpu().numpy())
        all_labels.append(labels.cpu().numpy())
    all_preds = np.concatenate(all_preds)
    all_labels = np.concatenate(all_labels)
    return all_preds, all_labels

y_pred, y_true = get_predictions_and_labels(model, test_loader, device)

cm = confusion_matrix(y_true, y_pred)
print("Confusion Matrix (rows=true, cols=pred):")
print(cm)

print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=train_dataset.classes, digits=4))