In [5]:
!pip -q install opencv-python-headless facenet-pytorch 

In [6]:
import os, random, math, cv2, glob, shutil, json
import numpy as np
import torch
from facenet_pytorch import MTCNN
from pathlib import Path

SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

CELEB_ROOT = "/kaggle/input/celeb-df-v2"
FFPP_ROOT  = "/kaggle/input/ff-c23/FaceForensics++_C23"

OUT_ROOT = "/kaggle/working/data"
FFPP_OUT = f"{OUT_ROOT}/FFpp_c23_train"  
CELEB_OUT= f"{OUT_ROOT}/CelebDF_test"     

MAX_VIDEOS_PER_CLASS = 1000     
FRAMES_PER_VIDEO     = 1        
IMAGE_SIZE           = 224      
MTCNN_MARGIN         = 20       

for p in [FFPP_OUT, CELEB_OUT]:
    Path(p).mkdir(parents=True, exist_ok=True)
    Path(f"{p}/real").mkdir(parents=True, exist_ok=True)
    Path(f"{p}/fake").mkdir(parents=True, exist_ok=True)

mtcnn = MTCNN(
    image_size=IMAGE_SIZE,
    margin=MTCNN_MARGIN,
    select_largest=True,
    post_process=True,
    device=device,
    keep_all=False
)
print("MTCNN ready.")

def list_videos_celebdf(root: str) -> dict:
    real_dirs = [f"{root}/Celeb-real", f"{root}/YouTube-real"]
    fake_dirs = [f"{root}/Celeb-synthesis"]
    real = []
    for d in real_dirs:
        if os.path.isdir(d):
            real += sorted([str(p) for p in Path(d).glob("*.mp4")])
    fake = []
    for d in fake_dirs:
        if os.path.isdir(d):
            fake += sorted([str(p) for p in Path(d).glob("*.mp4")])
    return {"real": real, "fake": fake}


def list_videos_ffpp(root: str) -> dict:
    real_dir = f"{root}/original"
    fake_dirs = [
        f"{root}/DeepFakeDetection",
        f"{root}/Deepfakes",
        f"{root}/Face2Face",
        f"{root}/FaceShifter",
        f"{root}/FaceSwap",
        f"{root}/NeuralTextures",
    ]
    real = sorted([str(p) for p in Path(real_dir).glob("*.mp4")]) if os.path.isdir(real_dir) else []
    fake = []
    for d in fake_dirs:
        if os.path.isdir(d):
            fake += sorted([str(p) for p in Path(d).glob("*.mp4")])
    return {"real": real, "fake": fake}


def sample_subset(paths: list, k: int) -> list:
    if len(paths) <= k:
        return paths
    rng = random.Random(SEED)
    return rng.sample(paths, k)


def evenly_spaced_indices(n_frames: int, count: int) -> list:
    if n_frames <= 0 or count <= 0:
        return []
    count = min(count, n_frames)
    return sorted(set([int(round(x)) for x in np.linspace(0, n_frames - 1, num=count)]))


def save_face_crop_from_frame(bgr_frame: np.ndarray, save_path: str) -> dict | None:
    rgb = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
    face_tensor = mtcnn(rgb, save_path=save_path)
    if face_tensor is None:
        return None
    boxes, probs = mtcnn.detect(rgb)
    if boxes is None or len(boxes) == 0:
        return None
    areas = [(x2-x1)*(y2-y1) for (x1,y1,x2,y2) in boxes]
    idx = int(np.argmax(areas))
    (x1, y1, x2, y2) = [float(v) for v in boxes[idx]]
    score = float(probs[idx]) if probs is not None else None
    return {"bbox": [x1, y1, x2, y2], "score": score}


def extract_faces_from_video(video_path: str, out_dir: str, label: str, frames_per_video: int) -> int:
    os.makedirs(f"{out_dir}/{label}", exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return 0

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    indices = evenly_spaced_indices(frame_count, frames_per_video)

    saved = 0
    video_stem = Path(video_path).stem
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ok, frame = cap.read()
        if not ok or frame is None:
            continue
        save_path = f"{out_dir}/{label}/{video_stem}_f{idx:06d}.png"
        info = save_face_crop_from_frame(frame, save_path)
        if info is not None:
            saved += 1
    cap.release()
    return saved


def process_dataset(video_dict: dict, out_root: str, frames_per_video: int, max_per_class: int) -> dict: 
    summary = {"real": {"videos": 0, "crops": 0}, "fake": {"videos": 0, "crops": 0}}
    for label in ["real", "fake"]:
        vids = sample_subset(video_dict.get(label, []), max_per_class)
        print(f"Found {len(video_dict.get(label, []))} {label} videos; using {len(vids)}.")
        for v in vids:
            crops = extract_faces_from_video(v, out_root, label, frames_per_video)
            summary[label]["videos"] += 1
            summary[label]["crops"]  += crops
    return summary


Device: cuda
MTCNN ready.


In [7]:
ffpp_vids = list_videos_ffpp(FFPP_ROOT)
ffpp_summary = process_dataset(
    video_dict=ffpp_vids,
    out_root=FFPP_OUT,
    frames_per_video=FRAMES_PER_VIDEO,
    max_per_class=MAX_VIDEOS_PER_CLASS
)
print("FF++ summary:", ffpp_summary)

celeb_vids = list_videos_celebdf(CELEB_ROOT)
celeb_summary = process_dataset(
    video_dict=celeb_vids,
    out_root=CELEB_OUT,
    frames_per_video=FRAMES_PER_VIDEO,
    max_per_class=MAX_VIDEOS_PER_CLASS
)
print("Celeb-DF summary:", celeb_summary)

print("\nOutput folders:")
for p in [f"{FFPP_OUT}/real", f"{FFPP_OUT}/fake", f"{CELEB_OUT}/real", f"{CELEB_OUT}/fake"]:
    n = len(list(Path(p).glob("*.png")))
    print(f"{p}: {n} images")


Found 1000 real videos; using 1000.
Found 6000 fake videos; using 1000.
FF++ summary: {'real': {'videos': 1000, 'crops': 1000}, 'fake': {'videos': 1000, 'crops': 993}}
Found 890 real videos; using 890.
Found 5639 fake videos; using 1000.
Celeb-DF summary: {'real': {'videos': 890, 'crops': 889}, 'fake': {'videos': 1000, 'crops': 1000}}

Output folders:
/kaggle/working/data/FFpp_c23_train/real: 1000 images
/kaggle/working/data/FFpp_c23_train/fake: 771 images
/kaggle/working/data/CelebDF_test/real: 889 images
/kaggle/working/data/CelebDF_test/fake: 1000 images


In [8]:
!pip -q install timm

In [9]:
import os, math
from pathlib import Path

import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder

IMAGE_SIZE = 224

device = "cuda" if torch.cuda.is_available() else "cpu"

data_tfms = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])

train_dir = FFPP_OUT            
test_dir  = CELEB_OUT            

train_ds = ImageFolder(train_dir, transform=data_tfms)
test_ds  = ImageFolder(test_dir,  transform=data_tfms)

BATCH_SIZE = 32
NUM_WORKERS = 4
PIN_MEMORY = True if device == "cuda" else False

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, drop_last=False)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, drop_last=False)

def _count_per_class(ds):
    counts = [0, 0]
    for _, y in ds.samples:
        counts[y] += 1
    return counts

tr_counts = _count_per_class(train_ds)
te_counts = _count_per_class(test_ds)

print(f"Train counts  → real: {tr_counts[0]} | fake: {tr_counts[1]} | total: {len(train_ds)}")
print(f"Test  counts  → real: {te_counts[0]} | fake: {te_counts[1]} | total: {len(test_ds)}")
print("Classes mapping:", train_ds.class_to_idx)  


Train counts  → real: 771 | fake: 1000 | total: 1771
Test  counts  → real: 1000 | fake: 889 | total: 1889
Classes mapping: {'fake': 0, 'real': 1}


In [None]:
import timm
import torch
import torch.nn as nn
import torch.nn.functional as F

class EfficientViT(nn.Module):
    def __init__(
        self,
        num_classes: int = 2,
        backbone_name: str = "efficientnet_b0",
        transformer_layers: int = 4,
        transformer_heads: int = 4,
        dropout: float = 0.1,
        grid_size: int = 7,    
    ):
        super().__init__()
        self.backbone = timm.create_model(backbone_name, pretrained=True, num_classes=0, global_pool="")
        self.feature_dim = getattr(self.backbone, "num_features", 1280)
        self.grid_size = grid_size

        self.cls_token = nn.Parameter(torch.zeros(1, 1, self.feature_dim))
        self.pos_embed = nn.Parameter(torch.zeros(1, 1 + grid_size * grid_size, self.feature_dim))
        nn.init.trunc_normal_(self.cls_token, std=0.02)
        nn.init.trunc_normal_(self.pos_embed, std=0.02)

        enc_layer = nn.TransformerEncoderLayer(
            d_model=self.feature_dim,
            nhead=transformer_heads,
            dim_feedforward=self.feature_dim * 4,
            dropout=dropout,
            activation="gelu",
            batch_first=True,
        )
        self.transformer = nn.TransformerEncoder(enc_layer, num_layers=transformer_layers)
        self.pre_ln = nn.LayerNorm(self.feature_dim)

        self.head = nn.Sequential(
            nn.LayerNorm(self.feature_dim),
            nn.Linear(self.feature_dim, 256),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(256, num_classes),
        )

    @torch.no_grad()
    def _interpolate_pos_enc(self, tokens: torch.Tensor, H: int, W: int) -> torch.Tensor:
        B, N, C = tokens.shape  # N = 1 + H*W
        if H * W == self.grid_size * self.grid_size:
            return self.pos_embed[:, : N, :]
        cls_pe, grid_pe = self.pos_embed[:, :1, :], self.pos_embed[:, 1:, :]
        grid_pe = grid_pe.view(1, self.grid_size, self.grid_size, C).permute(0, 3, 1, 2)
        grid_pe = F.interpolate(grid_pe, size=(H, W), mode="bicubic", align_corners=False)
        grid_pe = grid_pe.permute(0, 2, 3, 1).reshape(1, H * W, C)
        return torch.cat([cls_pe, grid_pe], dim=1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        feat_map = self.backbone.forward_features(x)
        B, C, H, W = feat_map.shape

        tokens = feat_map.flatten(2).transpose(1, 2)         
        cls = self.cls_token.expand(B, -1, -1)              
        tokens = torch.cat([cls, tokens], dim=1)              

        pe = self._interpolate_pos_enc(tokens, H, W)
        tokens = tokens + pe

        tokens = self.pre_ln(tokens)
        tokens = self.transformer(tokens)                     

        cls_out = tokens[:, 0, :]                             
        logits = self.head(cls_out)                            
        return logits


In [11]:
import torch

model = EfficientViT()
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)
model = model.to(device)

xb, yb = next(iter(train_loader))
xb = xb.to(device)
with torch.no_grad():
    out = model(xb)
print("Batch:", xb.shape, "→ logits:", out.shape)

def count_params(m):
    return sum(p.numel() for p in m.parameters() if p.requires_grad)

print("Trainable parameters:", f"{count_params(model):,}")


model.safetensors:   0%|          | 0.00/21.4M [00:00<?, ?B/s]

Batch: torch.Size([32, 3, 224, 224]) → logits: torch.Size([32, 2])
Trainable parameters: 83,116,158


In [12]:
import math, time
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler, autocast

BASELINE_EPOCHS = 8      
LR                = 3e-4
WEIGHT_DECAY      = 1e-4
LABEL_SMOOTHING   = 0.05
MAX_NORM          = 1.0  
USE_AMP           = torch.cuda.is_available()

torch.backends.cudnn.benchmark = True

cls_counts = [0, 0]
for _, y in train_ds.samples:
    cls_counts[y] += 1
N = float(len(train_ds))
cls_weights = [N/(2.0*max(1, c)) for c in cls_counts]  
cls_weights_t = torch.tensor(cls_weights, device=device, dtype=torch.float32)

criterion = nn.CrossEntropyLoss(weight=cls_weights_t, label_smoothing=LABEL_SMOOTHING)

optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=BASELINE_EPOCHS)
scaler = GradScaler(enabled=USE_AMP)

def accuracy_from_logits(logits, targets):
    preds = logits.argmax(dim=1)
    return (preds == targets).float().mean().item()

def train_one_epoch(model, loader, optimizer, scaler):
    model.train()
    epoch_loss, epoch_acc, n = 0.0, 0.0, 0
    for x, y in loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)

        optimizer.zero_grad(set_to_none=True)
        with autocast(enabled=USE_AMP):
            logits = model(x)
            loss = criterion(logits, y)
        scaler.scale(loss).backward()
        nn.utils.clip_grad_norm_(model.parameters(), MAX_NORM)
        scaler.step(optimizer)
        scaler.update()

        bsz = x.size(0)
        epoch_loss += loss.item() * bsz
        epoch_acc  += accuracy_from_logits(logits.detach(), y) * bsz
        n += bsz
    return epoch_loss / n, epoch_acc / n

@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    epoch_loss, epoch_acc, n = 0.0, 0.0, 0
    for x, y in loader:
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)
        with autocast(enabled=USE_AMP):
            logits = model(x)
            loss = criterion(logits, y)
        bsz = x.size(0)
        epoch_loss += loss.item() * bsz
        epoch_acc  += accuracy_from_logits(logits, y) * bsz
        n += bsz
    return epoch_loss / n, epoch_acc / n


  scaler = GradScaler(enabled=USE_AMP)


In [13]:
start = time.time()
print(f"Starting baseline training for {BASELINE_EPOCHS} epochs on {device}.")
print(f"Class counts (fake=0, real=1): {cls_counts} | class weights: {cls_weights}")

history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}

for epoch in range(1, BASELINE_EPOCHS + 1):
    tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, scaler)
    va_loss, va_acc = evaluate(model, test_loader)
    scheduler.step()

    history["train_loss"].append(tr_loss)
    history["train_acc"].append(tr_acc)
    history["val_loss"].append(va_loss)
    history["val_acc"].append(va_acc)

    lr_now = scheduler.get_last_lr()[0]
    print(f"[Epoch {epoch:02d}/{BASELINE_EPOCHS}] "
          f"lr={lr_now:.2e} | train loss={tr_loss:.4f} acc={tr_acc:.4f} | "
          f"val loss={va_loss:.4f} acc={va_acc:.4f}")

import os
CKPT_PATH = "/kaggle/working/baseline_model.pth"
state = model.module.state_dict() if isinstance(model, nn.DataParallel) else model.state_dict()
os.makedirs(os.path.dirname(CKPT_PATH), exist_ok=True)
torch.save(state, CKPT_PATH)
print(f"\nBaseline model checkpoint saved to: {CKPT_PATH}")
print(f"Total training time: {(time.time()-start)/60:.1f} min")


Starting baseline training for 8 epochs on cuda.
Class counts (fake=0, real=1): [771, 1000] | class weights: [1.1485084306095978, 0.8855]


  with autocast(enabled=USE_AMP):
  with autocast(enabled=USE_AMP):


[Epoch 01/8] lr=2.89e-04 | train loss=0.6715 acc=0.6166 | val loss=0.6788 acc=0.6109
[Epoch 02/8] lr=2.56e-04 | train loss=0.4883 acc=0.7792 | val loss=0.7136 acc=0.6257
[Epoch 03/8] lr=2.07e-04 | train loss=0.3806 acc=0.8628 | val loss=0.8236 acc=0.5834
[Epoch 04/8] lr=1.50e-04 | train loss=0.2693 acc=0.9317 | val loss=0.9704 acc=0.6146
[Epoch 05/8] lr=9.26e-05 | train loss=0.2538 acc=0.9407 | val loss=0.9916 acc=0.6273
[Epoch 06/8] lr=4.39e-05 | train loss=0.2254 acc=0.9571 | val loss=0.9782 acc=0.6321
[Epoch 07/8] lr=1.14e-05 | train loss=0.1717 acc=0.9757 | val loss=1.3172 acc=0.5961
[Epoch 08/8] lr=0.00e+00 | train loss=0.1570 acc=0.9842 | val loss=1.1612 acc=0.6236

Baseline model checkpoint saved to: /kaggle/working/baseline_model.pth
Total training time: 14.7 min


In [14]:
import torch
import torch.nn as nn

_IMAGENET_MEAN = torch.tensor([0.485, 0.456, 0.406], dtype=torch.float32, device=device)
_IMAGENET_STD  = torch.tensor([0.229, 0.224, 0.225], dtype=torch.float32, device=device)

def _norm_bounds():
    x_min = (0.0 - _IMAGENET_MEAN) / _IMAGENET_STD
    x_max = (1.0 - _IMAGENET_MEAN) / _IMAGENET_STD
    return x_min.view(1,3,1,1), x_max.view(1,3,1,1)

def _scale_eps_alpha(epsilon: float, alpha: float):
    eps = (torch.ones(3, device=device) * epsilon) / _IMAGENET_STD
    alp = (torch.ones(3, device=device) * alpha) / _IMAGENET_STD
    return eps.view(1,3,1,1), alp.view(1,3,1,1)

@torch.no_grad()
def _clip_normed(x, x_min, x_max):
    return torch.max(torch.min(x, x_max), x_min)

def pgd_attack(
    model: nn.Module,
    images: torch.Tensor,
    labels: torch.Tensor,
    epsilon: float = 8/255,  
    alpha: float = 2/255,   
    iters: int = 10,
    random_start: bool = True,
) -> torch.Tensor:
   
    model.eval()  
    x = images.detach().clone()
    y = labels.detach().clone().to(device)

    x_min, x_max = _norm_bounds()
    eps_t, alpha_t = _scale_eps_alpha(epsilon, alpha)

    if random_start:
        x = x + torch.empty_like(x).uniform_(-1.0, 1.0) * eps_t
        x = _clip_normed(x, x_min, x_max)

    x_adv = x.clone().requires_grad_(True)
    ce = nn.CrossEntropyLoss()

    for _ in range(iters):
        logits = model(x_adv)
        loss = ce(logits, y)
        grad = torch.autograd.grad(loss, x_adv, retain_graph=False, create_graph=False)[0]

        x_adv = x_adv.detach() + alpha_t * grad.sign()
        delta = torch.clamp(x_adv - x, min=-eps_t, max=eps_t)
        x_adv = _clip_normed(x + delta, x_min, x_max).detach()
        x_adv.requires_grad_(True)

    return x_adv.detach()

def evaluate_accuracy(model: nn.Module, loader, attack: bool = False, **pgd_kwargs):
    model.eval()
    n_correct = 0
    n_total = 0

    for xb, yb in loader:
        xb = xb.to(device, non_blocking=True)
        yb = yb.to(device, non_blocking=True)

        if attack:
            # PGD needs gradients!
            with torch.enable_grad():
                xb = pgd_attack(model, xb, yb, **pgd_kwargs)

        # No gradients needed for accuracy eval
        with torch.no_grad():
            logits = model(xb)
            preds = logits.argmax(dim=1)

        n_correct += (preds == yb).sum().item()
        n_total += yb.numel()

    return n_correct / max(1, n_total)



In [15]:
import torch
import torch.nn as nn

torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

CKPT_PATH = "/kaggle/working/baseline_model.pth"

def _strip_module(sd):
    return { (k[7:] if k.startswith("module.") else k): v for k, v in sd.items() }

eval_model = EfficientViT().to(device)

ckpt = torch.load(CKPT_PATH, map_location=device)
try:
    eval_model.load_state_dict(ckpt, strict=True)
except RuntimeError:
    eval_model.load_state_dict(_strip_module(ckpt), strict=True)

eval_model.eval()

PGD_CFG = dict(epsilon=8/255, alpha=2/255, iters=10, random_start=True)

clean_acc = evaluate_accuracy(eval_model, test_loader, attack=False)
pgd_acc   = evaluate_accuracy(eval_model, test_loader, attack=True, **PGD_CFG)
print(f"[Celeb-DF v2] Clean Acc: {clean_acc:.4f} | PGD Acc: {pgd_acc:.4f} (Δ={clean_acc-pgd_acc:.4f})")

clean_acc_ff = evaluate_accuracy(eval_model, train_loader, attack=False)
pgd_acc_ff   = evaluate_accuracy(eval_model, train_loader, attack=True, **PGD_CFG)
print(f"[FF++ c23]   Clean Acc: {clean_acc_ff:.4f} | PGD Acc: {pgd_acc_ff:.4f} (Δ={clean_acc_ff-pgd_acc_ff:.4f})")


[Celeb-DF v2] Clean Acc: 0.6241 | PGD Acc: 0.4807 (Δ=0.1435)
[FF++ c23]   Clean Acc: 0.9994 | PGD Acc: 0.4704 (Δ=0.5291)


In [16]:
xb, yb = next(iter(test_loader))
xb = xb.to(device); yb = yb.to(device)

with torch.no_grad():
    clean_pred = eval_model(xb).argmax(1)

xb_adv = pgd_attack(eval_model, xb, yb, **PGD_CFG)
with torch.no_grad():
    adv_pred = eval_model(xb_adv).argmax(1)

flip_rate = (clean_pred != adv_pred).float().mean().item()
print(f"Label-change rate on a single test batch after PGD: {flip_rate:.3f}")


Label-change rate on a single test batch after PGD: 0.500


In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class _DoubleConv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.GELU(),
            nn.Conv2d(out_ch, out_ch, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.GELU(),
        )

    def forward(self, x): return self.block(x)

class _Down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.pool = nn.MaxPool2d(2)
        self.conv = _DoubleConv(in_ch, out_ch)

    def forward(self, x): return self.conv(self.pool(x))

class _Up(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_ch, out_ch, kernel_size=2, stride=2)
        self.conv = _DoubleConv(in_ch, out_ch) 

    def forward(self, x, skip):
        x = self.up(x)
        dh = skip.size(-2) - x.size(-2)
        dw = skip.size(-1) - x.size(-1)
        if dh != 0 or dw != 0:
            x = F.pad(x, (0, max(0, dw), 0, max(0, dh)))
            x = x[:, :, :skip.size(-2), :skip.size(-1)]
        x = torch.cat([x, skip], dim=1)
        return self.conv(x)

class TinyUNetAttacker(nn.Module):
   
    def __init__(self, in_ch=3, base_ch=16, depth=3, epsilon=8/255.0):
        super().__init__()
        assert depth == 3, "This tiny configuration is hard-coded for depth=3."

        self.epsilon = float(epsilon)  
        # Encoder
        self.inc   = _DoubleConv(in_ch, base_ch)       
        self.down1 = _Down(base_ch, base_ch * 2)       
        self.down2 = _Down(base_ch * 2, base_ch * 4)
        self.bot   = _DoubleConv(base_ch * 4, base_ch * 8)
        self.up2   = _Up(base_ch * 8, base_ch * 4)      
        self.up1   = _Up(base_ch * 4, base_ch * 2)      
        self.up0   = _Up(base_ch * 2, base_ch)      
        self.outc  = nn.Conv2d(base_ch, in_ch, kernel_size=3, padding=1)
        self.tanh  = nn.Tanh()

    def _eps_tensor(self, like: torch.Tensor):
        eps_t, _ = _scale_eps_alpha(self.epsilon, self.epsilon)  
        return eps_t.to(like.device, like.dtype)  

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x1 = self.inc(x)        
        x2 = self.down1(x1)       
        x3 = self.down2(x2)       
        xb = self.bot(x3)         

        y2 = self.up2(xb, x3)    
        y1 = self.up1(y2, x2)     
        y0 = self.up0(y1, x1)     

        raw = self.outc(y0)      
        delta = self.tanh(raw) * self._eps_tensor(raw)
        return delta


In [18]:
attacker = TinyUNetAttacker(epsilon=8/255.0).to(device)
attacker.eval()  

xb, yb = next(iter(test_loader))
xb = xb.to(device)

with torch.no_grad():
    delta = attacker(xb)                
    x_adv = xb + delta                 
    x_min, x_max = _norm_bounds()
    x_adv = _clip_normed(x_adv, x_min, x_max)

print("Input   :", tuple(xb.shape))
print("Delta   :", tuple(delta.shape), 
      "| min/max (per-batch):", float(delta.min().item()), "/", float(delta.max().item()))
print("x_adv   :", tuple(x_adv.shape),
      "| within bounds:",
      bool((x_adv <= x_max + 1e-6).all().item() and (x_adv >= x_min - 1e-6).all().item()))

def _num_params(m): return sum(p.numel() for p in m.parameters() if p.requires_grad)
print("TinyUNetAttacker params:", f"{_num_params(attacker):,}")


Input   : (32, 3, 224, 224)
Delta   : (32, 3, 224, 224) | min/max (per-batch): -0.008672297932207584 / 0.010881473310291767
x_adv   : (32, 3, 224, 224) | within bounds: True
TinyUNetAttacker params: 483,155


In [19]:
@torch.no_grad()
def unet_attack(attacker: nn.Module, images: torch.Tensor, epsilon: float = 8/255.0) -> torch.Tensor:
    attacker.eval()
    if hasattr(attacker, "epsilon") and abs(attacker.epsilon - float(epsilon)) > 1e-12:
        attacker.epsilon = float(epsilon)
    delta = attacker(images)
    x_min, x_max = _norm_bounds()
    return _clip_normed(images + delta, x_min, x_max)


In [20]:
import torch
import torch.nn.functional as F

def total_variation_loss(delta: torch.Tensor, reduction: str = "mean") -> torch.Tensor:
    dh = delta[:, :, 1:, :] - delta[:, :, :-1, :]
    dw = delta[:, :, :, 1:] - delta[:, :, :, :-1]
    tv = dh.abs() + dw.abs()
    if reduction == "sum":
        return tv.sum()
    return tv.mean() 

_FREQ_MASK_CACHE: dict[tuple, torch.Tensor] = {}

def _low_freq_mask(h: int, w: int, radius_frac: float, device, dtype) -> torch.Tensor:
    key = (h, w, float(radius_frac), str(device), str(dtype))
    m = _FREQ_MASK_CACHE.get(key)
    if m is not None and m.device == device and m.dtype == dtype:
        return m

    yy, xx = torch.meshgrid(
        torch.arange(h, device=device, dtype=torch.float32),
        torch.arange(w, device=device, dtype=torch.float32),
        indexing="ij",
    )
    cy, cx = (h - 1) / 2.0, (w - 1) / 2.0
    dist = torch.sqrt((yy - cy) ** 2 + (xx - cx) ** 2)
    radius = max(1.0, radius_frac * float(min(h, w)))
    mask = (dist <= radius).to(dtype).unsqueeze(0).unsqueeze(0)  
    _FREQ_MASK_CACHE[key] = mask
    return mask

def frequency_loss(
    delta: torch.Tensor,
    radius_frac: float = 0.125,   
    power: int = 2,              
    norm: str = "ortho",
) -> torch.Tensor:
    B, C, H, W = delta.shape
    f = torch.fft.fft2(delta, norm=norm)
    f = torch.fft.fftshift(f, dim=(-2, -1))

    mag = f.abs()
    if power == 2:
        mag = mag * mag

    mask = _low_freq_mask(H, W, radius_frac, device=delta.device, dtype=delta.dtype)
    # Broadcast over (B,C,H,W)
    loss = (mag * mask).mean()
    return loss

def realism_loss(
    delta: torch.Tensor,
    lambda_tv: float = 1e-4,
    lambda_freq: float = 1e-4,
    radius_frac: float = 0.125,
) -> tuple[torch.Tensor, dict]:
    l_tv = total_variation_loss(delta)
    l_fq = frequency_loss(delta, radius_frac=radius_frac)
    total = lambda_tv * l_tv + lambda_freq * l_fq
    return total, {"tv": l_tv.detach(), "freq": l_fq.detach()}


In [21]:
def total_variation_loss(delta: torch.Tensor, reduction: str = "mean", beta: float = 1.0) -> torch.Tensor:
      dh = delta[:, :, 1:, :] - delta[:, :, :-1, :]
    dw = delta[:, :, :, 1:] - delta[:, :, :, :-1]

    if beta != 1.0:
        dh = dh.abs().pow(beta)
        dw = dw.abs().pow(beta)
    else:
        dh = dh.abs()
        dw = dw.abs()

    if reduction == "sum":
        return dh.sum() + dw.sum()
    return dh.mean() + dw.mean()


In [22]:
xb, yb = next(iter(test_loader))
xb = xb.to(device)
attacker.eval()
with torch.no_grad():
    delta_pred = attacker(xb)

tv_val  = total_variation_loss(delta_pred).item()
fq_val  = frequency_loss(delta_pred, radius_frac=0.125).item()
comb, parts = realism_loss(delta_pred, lambda_tv=1e-4, lambda_freq=1e-4, radius_frac=0.125)

print(f"Predicted δ → TV: {tv_val:.6f} | Freq(low): {fq_val:.6f} | "
      f"Combined (λ_tv=λ_fq=1e-4): {comb.item():.6f}")

def _make_sine_delta(batch: int, ch: int, H: int, W: int, freq: int, amp: float = 0.01, device="cpu"):
    yy, xx = torch.meshgrid(
        torch.arange(H, device=device, dtype=torch.float32),
        torch.arange(W, device=device, dtype=torch.float32),
        indexing="ij"
    )
    wave = torch.sin(2.0 * torch.pi * freq * xx / float(W)).unsqueeze(0).unsqueeze(0)
    wave = wave.expand(batch, ch, H, W) * amp
    return wave

B, C, H, W = xb.shape
delta_low  = _make_sine_delta(B, C, H, W, freq=2,  amp=0.01, device=device)
delta_high = _make_sine_delta(B, C, H, W, freq=32, amp=0.01, device=device)

print("Low-freq sine  → TV:", float(total_variation_loss(delta_low)),
      "| Freq(low):", float(frequency_loss(delta_low)))
print("High-freq sine → TV:", float(total_variation_loss(delta_high)),
      "| Freq(low):", float(frequency_loss(delta_high)))


Predicted δ → TV: 0.000057 | Freq(low): 0.000055 | Combined (λ_tv=λ_fq=1e-4): 0.000000
Low-freq sine  → TV: 0.0003562300989869982 | Freq(low): 4.999998418497853e-05
High-freq sine → TV: 0.005560940597206354 | Freq(low): 3.718621545723168e-16


In [None]:
import os, time
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler, autocast

os.environ.setdefault("PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION", "python")
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2")
os.environ.setdefault("TENSORBOARD_NO_TF", "1")
os.environ.setdefault("MPLCONFIGDIR", "/kaggle/working/mpl")
os.makedirs(os.environ["MPLCONFIGDIR"], exist_ok=True)

try:
    from torch.utils.tensorboard import SummaryWriter
    _TB_IMPL = "torch.utils.tensorboard"
except Exception as e:
    print("Native TensorBoard import failed, falling back to tensorboardX. Reason:", repr(e))
    %pip -q install tensorboardX
    from tensorboardX import SummaryWriter
    _TB_IMPL = "tensorboardX"

print("Using SummaryWriter from:", _TB_IMPL)

ROBUST_EPOCHS         = 8             
LR_DETECTOR           = 2e-4
LR_ATTACKER           = 1e-4
WD_DETECTOR           = 1e-4
WD_ATTACKER           = 1e-5
MAX_NORM_DET          = 1.0
MAX_NORM_ATT          = 1.0
LAMBDA_TV             = 1e-4           
LAMBDA_FREQ           = 1e-4
FREQ_RADIUS_FRAC      = 0.125           

PGD_KW = dict(epsilon=8/255, alpha=2/255, iters=10, random_start=True)
if 'PGD_CFG' in globals() and isinstance(PGD_CFG, dict):
    PGD_KW.update(PGD_CFG)

optimizer_det = torch.optim.AdamW(
    model.parameters(), lr=LR_DETECTOR, weight_decay=WD_DETECTOR
)
optimizer_att = torch.optim.AdamW(
    attacker.parameters(), lr=LR_ATTACKER, weight_decay=WD_ATTACKER
)

USE_AMP = torch.cuda.is_available() if 'USE_AMP' not in globals() else USE_AMP
det_scaler = GradScaler(enabled=USE_AMP)
att_scaler = GradScaler(enabled=USE_AMP)

TB_DIR = "/kaggle/working/tensorboard/phase3_2"
os.makedirs(TB_DIR, exist_ok=True)
writer = SummaryWriter(log_dir=TB_DIR)

print("Robust training config:",
      {"epochs": ROBUST_EPOCHS, "lr_det": LR_DETECTOR, "lr_att": LR_ATTACKER,
       "lambda_tv": LAMBDA_TV, "lambda_freq": LAMBDA_FREQ, "pgd": PGD_KW,
       "tb_impl": _TB_IMPL, "tb_dir": TB_DIR})


In [24]:
@torch.no_grad()
def _batch_acc(logits, y):
    return (logits.argmax(1) == y).float().mean().item()

def train_robust_epoch(epoch_idx: int):
    model.train()
    attacker.train()

    sums = {
        "loss_clean": 0.0,
        "loss_pgd": 0.0,
        "loss_unet_det": 0.0,
        "loss_att_total": 0.0,
        "loss_att_tv": 0.0,
        "loss_att_freq": 0.0,
        "acc_clean": 0.0,
        "acc_pgd": 0.0,
        "acc_unet": 0.0,
        "n": 0,
    }

    x_min, x_max = _norm_bounds()  

    for xb, yb in train_loader:
        xb = xb.to(device, non_blocking=True)
        yb = yb.to(device, non_blocking=True)
        bsz = xb.size(0)

        optimizer_det.zero_grad(set_to_none=True)
        with autocast(enabled=USE_AMP):
            logits_clean = model(xb)
            loss_clean   = criterion(logits_clean, yb)

        with torch.enable_grad():
            xb_pgd = pgd_attack(model, xb, yb, **PGD_KW)
        with autocast(enabled=USE_AMP):
            logits_pgd = model(xb_pgd)
            loss_pgd   = criterion(logits_pgd, yb)

        optimizer_att.zero_grad(set_to_none=True)

        for p in model.parameters():
            p.requires_grad_(False)

        with autocast(enabled=USE_AMP):
            delta = attacker(xb)                      
            xb_un = _clip_normed(xb + delta, x_min, x_max)
            logits_un_for_att = model(xb_un)         
            ce_for_att = criterion(logits_un_for_att, yb)
            l_tv  = total_variation_loss(delta)
            l_fq  = frequency_loss(delta, radius_frac=FREQ_RADIUS_FRAC)
            loss_att_total = (-ce_for_att) + LAMBDA_TV * l_tv + LAMBDA_FREQ * l_fq

        att_scaler.scale(loss_att_total).backward()
        nn.utils.clip_grad_norm_(attacker.parameters(), MAX_NORM_ATT)
        att_scaler.step(optimizer_att)
        att_scaler.update()

        # Unfreeze detector for its own update
        for p in model.parameters():
            p.requires_grad_(True)

        with autocast(enabled=USE_AMP):
            with torch.no_grad():
                delta_det = attacker(xb)
            xb_un = _clip_normed(xb + delta_det, x_min, x_max)
            logits_un_det = model(xb_un)
            loss_unet_det = criterion(logits_un_det, yb)

            loss_total_det = loss_clean + loss_pgd + loss_unet_det

        det_scaler.scale(loss_total_det).backward()
        nn.utils.clip_grad_norm_(model.parameters(), MAX_NORM_DET)
        det_scaler.step(optimizer_det)
        det_scaler.update()

        with torch.no_grad():
            sums["loss_clean"]     += loss_clean.item()     * bsz
            sums["loss_pgd"]       += loss_pgd.item()       * bsz
            sums["loss_unet_det"]  += loss_unet_det.item()  * bsz
            sums["loss_att_total"] += loss_att_total.item() * bsz
            sums["loss_att_tv"]    += l_tv.item()           * bsz
            sums["loss_att_freq"]  += l_fq.item()           * bsz
            sums["acc_clean"]      += _batch_acc(logits_clean, yb)   * bsz
            sums["acc_pgd"]        += _batch_acc(logits_pgd, yb)     * bsz
            sums["acc_unet"]       += _batch_acc(logits_un_det, yb)  * bsz
            sums["n"]              += bsz

    n = max(1, sums["n"])
    logs = {k: (v / n) for k, v in sums.items() if k != "n"}
    # TensorBoard
    writer.add_scalar("train/loss_clean",     logs["loss_clean"],    epoch_idx)
    writer.add_scalar("train/loss_pgd",       logs["loss_pgd"],      epoch_idx)
    writer.add_scalar("train/loss_unet_det",  logs["loss_unet_det"], epoch_idx)
    writer.add_scalar("train/loss_att_total", logs["loss_att_total"],epoch_idx)
    writer.add_scalar("train/att_tv",         logs["loss_att_tv"],   epoch_idx)
    writer.add_scalar("train/att_freq",       logs["loss_att_freq"], epoch_idx)
    writer.add_scalar("train/acc_clean",      logs["acc_clean"],     epoch_idx)
    writer.add_scalar("train/acc_pgd",        logs["acc_pgd"],       epoch_idx)
    writer.add_scalar("train/acc_unet",       logs["acc_unet"],      epoch_idx)

    return logs


In [25]:
def frequency_loss(
    delta: torch.Tensor,
    radius_frac: float = 0.125,
    power: int = 2,
    norm: str = "ortho",
) -> torch.Tensor:
    B, C, H, W = delta.shape
    x = delta.float() 

    f = torch.fft.fft2(x, norm=norm)
    f = torch.fft.fftshift(f, dim=(-2, -1))

    mag = f.abs()
    if power == 2:
        mag = mag * mag

    mask = _low_freq_mask(H, W, radius_frac, device=x.device, dtype=torch.float32)  
    loss = (mag * mask).mean()
    return loss


In [26]:
start = time.time()
print(f"Starting Phase 3.2 robust training for {ROBUST_EPOCHS} epochs on {device}.")

history_robust = []
for ep in range(1, ROBUST_EPOCHS + 1):
    logs = train_robust_epoch(ep)
    history_robust.append(logs)
    print(f"[Robust Ep {ep:02d}/{ROBUST_EPOCHS}] "
          f"clean loss {logs['loss_clean']:.4f} acc {logs['acc_clean']:.3f} | "
          f"pgd loss {logs['loss_pgd']:.4f} acc {logs['acc_pgd']:.3f} | "
          f"unet loss {logs['loss_unet_det']:.4f} acc {logs['acc_unet']:.3f} | "
          f"att(L): {logs['loss_att_total']:.5f} (tv {logs['loss_att_tv']:.5f}, fq {logs['loss_att_freq']:.5f})")

ROBUST_CKPT = "/kaggle/working/robust_model.pth"
state_dict = model.module.state_dict() if isinstance(model, nn.DataParallel) else model.state_dict()
torch.save(state_dict, ROBUST_CKPT)
writer.flush(); writer.close()

print(f"\nSaved robust checkpoint to: {ROBUST_CKPT}")
print(f"Phase 3.2 total time: {(time.time()-start)/60:.1f} min")
print(f"TensorBoard logs at: {TB_DIR}")

Starting Phase 3.2 robust training for 8 epochs on cuda.


  with autocast(enabled=USE_AMP):
  with autocast(enabled=USE_AMP):
  with autocast(enabled=USE_AMP):
  with autocast(enabled=USE_AMP):


[Robust Ep 01/8] clean loss 0.1203 acc 0.999 | pgd loss nan acc 0.460 | unet loss 0.1954 acc 0.968 | att(L): -0.20297 (tv 0.01482, fq 0.00127)
[Robust Ep 02/8] clean loss 0.1233 acc 0.998 | pgd loss nan acc 0.464 | unet loss 0.1239 acc 0.998 | att(L): -0.12411 (tv 0.00634, fq 0.00157)
[Robust Ep 03/8] clean loss 0.1239 acc 0.998 | pgd loss nan acc 0.453 | unet loss 0.1223 acc 0.999 | att(L): -0.12230 (tv 0.00521, fq 0.00152)
[Robust Ep 04/8] clean loss 0.1207 acc 0.999 | pgd loss nan acc 0.477 | unet loss 0.1206 acc 1.000 | att(L): -0.12056 (tv 0.00467, fq 0.00142)
[Robust Ep 05/8] clean loss 0.1227 acc 0.998 | pgd loss nan acc 0.441 | unet loss 0.1214 acc 0.999 | att(L): -0.12147 (tv 0.00440, fq 0.00137)
[Robust Ep 06/8] clean loss 0.1219 acc 0.999 | pgd loss nan acc 0.458 | unet loss 0.1222 acc 0.998 | att(L): -0.12209 (tv 0.00404, fq 0.00128)
[Robust Ep 07/8] clean loss 0.1219 acc 0.999 | pgd loss nan acc 0.469 | unet loss 0.1216 acc 0.998 | att(L): -0.12161 (tv 0.00375, fq 0.00119)

In [27]:
@torch.no_grad()
def evaluate_unet_acc(model, loader, attacker):
    model.eval(); attacker.eval()
    n_correct, n_total = 0, 0
    for xb, yb in loader:
        xb = xb.to(device, non_blocking=True)
        yb = yb.to(device, non_blocking=True)
        xb_un = unet_attack(attacker, xb) 
        logits = model(xb_un)
        n_correct += (logits.argmax(1) == yb).sum().item()
        n_total   += yb.numel()
    return n_correct / max(1, n_total)

clean_acc_celeb = evaluate_accuracy(model, test_loader, attack=False)
pgd_acc_celeb   = evaluate_accuracy(model, test_loader, attack=True, **PGD_KW)
unet_acc_celeb  = evaluate_unet_acc(model, test_loader, attacker)

print(f"[Celeb-DF v2]  Clean Acc: {clean_acc_celeb:.4f} | "
      f"PGD Acc: {pgd_acc_celeb:.4f} | U-Net Acc: {unet_acc_celeb:.4f}")


[Celeb-DF v2]  Clean Acc: 0.6167 | PGD Acc: 0.4711 | U-Net Acc: 0.6067


In [28]:
import os, uuid, json, shutil, tempfile
import numpy as np
import torch
import torch.nn.functional as F
import cv2

device = "cuda" if torch.cuda.is_available() else "cpu"

PGD_KW = dict(epsilon=8/255, alpha=2/255, iters=10, random_start=True) if 'PGD_KW' not in globals() else PGD_KW

_IMAGENET_MEAN = _IMAGENET_MEAN if '_IMAGENET_MEAN' in globals() else torch.tensor([0.485, 0.456, 0.406], device=device, dtype=torch.float32)
_IMAGENET_STD  = _IMAGENET_STD  if '_IMAGENET_STD'  in globals() else torch.tensor([0.229, 0.224, 0.225], device=device, dtype=torch.float32)

def _denorm(x: torch.Tensor) -> torch.Tensor:
    return (x * _IMAGENET_STD.view(1,3,1,1)) + _IMAGENET_MEAN.view(1,3,1,1)

def _renorm(x01: torch.Tensor) -> torch.Tensor:
    return (x01 - _IMAGENET_MEAN.view(1,3,1,1)) / _IMAGENET_STD.view(1,3,1,1)

def _to_uint8(x_norm: torch.Tensor) -> np.ndarray:
    x01 = _denorm(x_norm).clamp(0,1)
    return (x01.detach().cpu().permute(0,2,3,1).numpy() * 255.0).round().astype(np.uint8)

def _from_uint8(nhwc: np.ndarray) -> torch.Tensor:
    x = torch.tensor(nhwc, device=device, dtype=torch.float32) / 255.0
    x = x.permute(0,3,1,2)
    return _renorm(x)

def jpeg_compress_batch(x_norm: torch.Tensor, quality: int = 50) -> torch.Tensor:
    x_uint8 = _to_uint8(x_norm)
    out = []
    for img in x_uint8:
        ok, buf = cv2.imencode(".jpg", cv2.cvtColor(img, cv2.COLOR_RGB2BGR),
                               [int(cv2.IMWRITE_JPEG_QUALITY), int(quality)])
        if not ok:
            out.append(img)
            continue
        dec = cv2.imdecode(buf, cv2.IMREAD_COLOR)
        dec = cv2.cvtColor(dec, cv2.COLOR_BGR2RGB)
        out.append(dec)
    return _from_uint8(np.stack(out, 0))

def h264_like_compress_batch(x_norm: torch.Tensor) -> torch.Tensor:
    x_uint8 = _to_uint8(x_norm)
    B, H, W, _ = x_uint8.shape
    out = []
    tmp = tempfile.mkdtemp(prefix="h264like_")
    try:
        for img in x_uint8:
            p = os.path.join(tmp, f"{uuid.uuid4().hex}.mp4")
            fourcc = cv2.VideoWriter_fourcc(*"mp4v")
            vw = cv2.VideoWriter(p, fourcc, 1.0, (W, H))
            if not vw.isOpened():
                p = p.replace(".mp4", ".avi")
                vw = cv2.VideoWriter(p, cv2.VideoWriter_fourcc(*"XVID"), 1.0, (W, H))
            vw.write(cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
            vw.release()

            cap = cv2.VideoCapture(p)
            ok, frame = cap.read()
            cap.release()
            if ok and frame is not None:
                out.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            else:
                out.append(img)
    finally:
        shutil.rmtree(tmp, ignore_errors=True)
    return _from_uint8(np.stack(out, 0))

CKPT_BASELINE = "/kaggle/working/baseline_model.pth"
CKPT_ROBUST   = "/kaggle/working/robust_model.pth"

def _strip_module(sd):
    return { (k[7:] if k.startswith("module.") else k): v for k, v in sd.items() }

def load_eval_model(ckpt_path: str) -> torch.nn.Module:
    m = EfficientViT().to(device) 
    sd = torch.load(ckpt_path, map_location=device)
    try:
        m.load_state_dict(sd, strict=True)
    except RuntimeError:
        m.load_state_dict(_strip_module(sd), strict=True)
    m.eval()
    return m

eval_baseline = load_eval_model(CKPT_BASELINE)
eval_robust   = load_eval_model(CKPT_ROBUST)
print("Loaded:", os.path.basename(CKPT_BASELINE), "and", os.path.basename(CKPT_ROBUST))

if 'attacker' not in globals() or not isinstance(attacker, torch.nn.Module):
    attacker = TinyUNetAttacker(epsilon=8/255.0).to(device)
attacker.eval()

def _fake_index_from_loader(loader) -> int:
    mapping = getattr(loader.dataset, "class_to_idx", {"fake": 1})
    return int(mapping.get("fake", 1))


Loaded: baseline_model.pth and robust_model.pth


In [29]:
import numpy as np

def roc_curve_np(labels: np.ndarray, scores: np.ndarray):

    labels = labels.astype(np.int32)
    scores = scores.astype(np.float64)

    # Sort by score descending (stable)
    order = np.argsort(-scores, kind="mergesort")
    y = labels[order]
    s = scores[order]

    distinct = np.where(np.diff(s))[0]
    thr_idx = np.r_[distinct, y.size - 1]

    tps = np.cumsum(y == 1)[thr_idx]
    fps = (1 + thr_idx) - tps

    P = max(1, int(labels.sum()))
    N = max(1, int(labels.size - labels.sum()))
    tpr = tps / P
    fpr = fps / N

    tpr = np.r_[0.0, tpr]
    fpr = np.r_[0.0, fpr]
    thresholds = np.r_[np.inf, s[thr_idx]]
    return fpr, tpr, thresholds

def roc_auc_score_np(labels: np.ndarray, scores: np.ndarray) -> float:
    fpr, tpr, _ = roc_curve_np(labels, scores)
    return float(np.trapz(tpr, fpr))

def tpr_at_fpr_np(labels: np.ndarray, scores: np.ndarray, target_fpr=0.01) -> float:
    fpr, tpr, _ = roc_curve_np(labels, scores)
    target_fpr = float(np.clip(target_fpr, 0.0, 1.0))
    return float(np.interp(target_fpr, fpr, tpr))

@torch.no_grad()
def _batch_argmax_acc(logits: torch.Tensor, y: torch.Tensor) -> float:
    return (logits.argmax(1) == y).float().mean().item()

def evaluate_scenario(
    model: torch.nn.Module,
    loader,
    attacker_module: torch.nn.Module | None = None,
    attack: str | None = None,        
    compression: str | None = None,    
    jpeg_quality: int = 50,
    pgd_kwargs: dict | None = None,
) -> dict:
    model.eval()
    if attacker_module is not None:
        attacker_module.eval()
    if pgd_kwargs is None:
        pgd_kwargs = PGD_KW

    labels_all, scores_all = [], []
    n_correct, n_total = 0, 0
    fake_idx = _fake_index_from_loader(loader)

    for xb, yb in loader:
        xb = xb.to(device, non_blocking=True)
        yb = yb.to(device, non_blocking=True)

        # 1) attack (normalized)
        if attack == "pgd":
            with torch.enable_grad():
                xb = pgd_attack(model, xb, yb, **pgd_kwargs)
        elif attack == "unet":
            if attacker_module is None:
                raise ValueError("U-Net attack requested but attacker_module is None.")
            with torch.no_grad():
                delta = attacker_module(xb)
            x_min, x_max = _norm_bounds()
            xb = _clip_normed(xb + delta, x_min, x_max)

        # 2) compression
        if compression == "jpeg":
            xb = jpeg_compress_batch(xb, quality=jpeg_quality)
        elif compression == "h264":
            xb = h264_like_compress_batch(xb)

        # 3) inference
        with torch.no_grad():
            logits = model(xb)
            probs = torch.softmax(logits, dim=1)[:, fake_idx]
        n_correct += (logits.argmax(1) == yb).sum().item()
        n_total   += yb.numel()

        labels_all.extend(yb.detach().cpu().numpy().tolist())
        scores_all.extend(probs.detach().cpu().numpy().tolist())

    labels_np = np.array(labels_all, dtype=np.int32)
    scores_np = np.array(scores_all, dtype=np.float32)

    # Make 'fake' the positive label (1)
    if fake_idx == 0:
        labels_np = 1 - labels_np

    roc_auc = roc_auc_score_np(labels_np, scores_np)
    tpr001  = tpr_at_fpr_np(labels_np, scores_np, target_fpr=0.01)
    acc     = n_correct / max(1, n_total)
    return {"acc": acc, "roc_auc": roc_auc, "tpr@1%fpr": tpr001}


In [30]:
import pandas as pd

SCENARIOS = [
    ("Clean",                 dict(attack=None,    compression=None)),
    ("JPEG(50)",              dict(attack=None,    compression="jpeg", jpeg_quality=50)),
    ("H264-like",             dict(attack=None,    compression="h264")),
    ("PGD",                   dict(attack="pgd",   compression=None,   pgd_kwargs=PGD_KW)),
    ("U-Net",                 dict(attack="unet",  compression=None)),
    ("U-Net + JPEG(50)",      dict(attack="unet",  compression="jpeg", jpeg_quality=50)),
    ("U-Net + H264-like",     dict(attack="unet",  compression="h264")),
]

def run_all(model, loader, loader_name: str, attacker_module=None):
    rows = []
    for name, kw in SCENARIOS:
        res = evaluate_scenario(model, loader, attacker_module=attacker_module, **kw)
        rows.append({"dataset": loader_name, "scenario": name,
                     "acc": res["acc"], "roc_auc": res["roc_auc"], "tpr@1%fpr": res["tpr@1%fpr"]})
        print(f"[{loader_name:12}] {name:16} | acc {res['acc']:.3f} | AUC {res['roc_auc']:.3f} | TPR@0.01 {res['tpr@1%fpr']:.3f}")
    return pd.DataFrame(rows)

print("\n=== Evaluating BASELINE model ===")
df_base_celeb = run_all(eval_baseline, test_loader,  "Celeb-DF v2", attacker_module=attacker)
df_base_ffpp  = run_all(eval_baseline, train_loader, "FF++ c23",   attacker_module=attacker)

print("\n=== Evaluating ROBUST model ===")
df_rob_celeb = run_all(eval_robust,   test_loader,  "Celeb-DF v2", attacker_module=attacker)
df_rob_ffpp  = run_all(eval_robust,   train_loader, "FF++ c23",    attacker_module=attacker)

# Save + pivot
df_all = pd.concat([
    df_base_celeb.assign(model="baseline"),
    df_base_ffpp.assign(model="baseline"),
    df_rob_celeb.assign(model="robust"),
    df_rob_ffpp.assign(model="robust"),
], ignore_index=True)

csv_path  = "/kaggle/working/phase4_1_eval_results.csv"
json_path = "/kaggle/working/phase4_1_eval_results.json"
df_all.to_csv(csv_path, index=False)
with open(json_path, "w") as f:
    json.dump(df_all.to_dict(orient="records"), f, indent=2)
print(f"\nSaved results to:\n- {csv_path}\n- {json_path}")

pivot = df_all.pivot_table(index=["dataset","scenario"], columns="model",
                           values=["acc","roc_auc","tpr@1%fpr"]).sort_index()
pivot



=== Evaluating BASELINE model ===
[Celeb-DF v2 ] Clean            | acc 0.624 | AUC 0.678 | TPR@0.01 0.014


  return float(np.trapz(tpr, fpr))


[Celeb-DF v2 ] JPEG(50)         | acc 0.625 | AUC 0.672 | TPR@0.01 0.023
[Celeb-DF v2 ] H264-like        | acc 0.636 | AUC 0.693 | TPR@0.01 0.022
[Celeb-DF v2 ] PGD              | acc 0.467 | AUC 0.460 | TPR@0.01 0.001
[Celeb-DF v2 ] U-Net            | acc 0.619 | AUC 0.675 | TPR@0.01 0.015
[Celeb-DF v2 ] U-Net + JPEG(50) | acc 0.625 | AUC 0.675 | TPR@0.01 0.020
[Celeb-DF v2 ] U-Net + H264-like | acc 0.644 | AUC 0.690 | TPR@0.01 0.025
[FF++ c23    ] Clean            | acc 0.999 | AUC 1.000 | TPR@0.01 1.000
[FF++ c23    ] JPEG(50)         | acc 0.851 | AUC 0.922 | TPR@0.01 0.344
[FF++ c23    ] H264-like        | acc 0.912 | AUC 0.965 | TPR@0.01 0.641
[FF++ c23    ] PGD              | acc 0.511 | AUC 0.465 | TPR@0.01 0.003
[FF++ c23    ] U-Net            | acc 0.999 | AUC 1.000 | TPR@0.01 1.000
[FF++ c23    ] U-Net + JPEG(50) | acc 0.859 | AUC 0.924 | TPR@0.01 0.340
[FF++ c23    ] U-Net + H264-like | acc 0.906 | AUC 0.966 | TPR@0.01 0.626

=== Evaluating ROBUST model ===
[Celeb-DF v2 ] C

Unnamed: 0_level_0,Unnamed: 1_level_0,acc,acc,roc_auc,roc_auc,tpr@1%fpr,tpr@1%fpr
Unnamed: 0_level_1,model,baseline,robust,baseline,robust,baseline,robust
dataset,scenario,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2
Celeb-DF v2,Clean,0.62414,0.616728,0.678174,0.675695,0.014,0.012
Celeb-DF v2,H264-like,0.636316,0.661196,0.693363,0.702983,0.022,0.02389
Celeb-DF v2,JPEG(50),0.624669,0.640551,0.671669,0.681253,0.023,0.023
Celeb-DF v2,PGD,0.467443,0.476443,0.459703,0.483069,0.001,0.001
Celeb-DF v2,U-Net,0.618846,0.60667,0.675271,0.674021,0.015,0.015
Celeb-DF v2,U-Net + H264-like,0.643727,0.654314,0.690201,0.698939,0.025,0.03
Celeb-DF v2,U-Net + JPEG(50),0.625199,0.646903,0.674873,0.685051,0.02,0.029
FF++ c23,Clean,0.999435,0.998306,0.999997,0.999999,1.0,1.0
FF++ c23,H264-like,0.912479,0.911914,0.96548,0.96239,0.640726,0.670558
FF++ c23,JPEG(50),0.850932,0.835686,0.922041,0.91637,0.343709,0.386511


In [32]:
import os, shutil, zipfile, json, glob, sys
from pathlib import Path

BASE_DIR = "/kaggle/working"
ART_DIR  = f"{BASE_DIR}/phase4_export"
Path(ART_DIR).mkdir(parents=True, exist_ok=True)

to_copy = [
    ("/kaggle/working/baseline_model.pth", f"{ART_DIR}/baseline_model.pth"),
    ("/kaggle/working/robust_model.pth",   f"{ART_DIR}/robust_model.pth"),
    ("/kaggle/working/phase4_1_eval_results.csv", f"{ART_DIR}/phase4_1_eval_results.csv"),
    ("/kaggle/working/phase4_1_eval_results.json", f"{ART_DIR}/phase4_1_eval_results.json"),
]

ATTACKER_PTH = "/kaggle/working/attacker_unet.pth"
if os.path.exists(ATTACKER_PTH):
    to_copy.append((ATTACKER_PTH, f"{ART_DIR}/attacker_unet.pth"))

for src, dst in to_copy:
    if os.path.exists(src):
        os.makedirs(os.path.dirname(dst), exist_ok=True)
        shutil.copyfile(src, dst)
        print("✓", os.path.basename(src))
    else:
        print("⚠ missing:", src)

DATA_ROOT = "/kaggle/working/data"
datasets = ["FFpp_c23_train", "CelebDF_test"]
DATA_OUT = f"{ART_DIR}/data"
for d in datasets:
    src = f"{DATA_ROOT}/{d}"
    if os.path.isdir(src):
        shutil.copytree(src, f"{DATA_OUT}/{d}", dirs_exist_ok=True)
        print("✓ copied dataset:", d)
    else:
        print("⚠ dataset folder not found:", src)

def make_zip(zip_path, base_folder, include_data=False):
    with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
        for p in Path(base_folder).rglob("*"):
            if p.is_dir(): 
                continue
            rel = p.relative_to(base_folder)
            if not include_data and str(rel).startswith("data/"):
                continue
            z.write(p, rel)
    print("→ wrote", zip_path)

make_zip(f"{BASE_DIR}/phase4_artifacts_light.zip", ART_DIR, include_data=False)
make_zip(f"{BASE_DIR}/phase4_artifacts_full.zip",  ART_DIR, include_data=True)

print("\nDownload from file browser:")
print("- /kaggle/working/phase4_artifacts_light.zip   (models + 4.1 results)")
print("- /kaggle/working/phase4_artifacts_full.zip    (+ cropped images)")

✓ baseline_model.pth
✓ robust_model.pth
✓ phase4_1_eval_results.csv
✓ phase4_1_eval_results.json
✓ copied dataset: FFpp_c23_train
✓ copied dataset: CelebDF_test
→ wrote /kaggle/working/phase4_artifacts_light.zip
→ wrote /kaggle/working/phase4_artifacts_full.zip

Download from file browser:
- /kaggle/working/phase4_artifacts_light.zip   (models + 4.1 results)
- /kaggle/working/phase4_artifacts_full.zip    (+ cropped images)
