In [22]:
import os
from pathlib import Path
import random
import math
import time

import numpy as np
import pandas as pd

from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

import torchvision.transforms as T

from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    f1_score,
    accuracy_score,
)

import timm  # make sure timm is installed

In [23]:
DATA_ROOT = Path("/home/ih2363/forensicbind/data_root")
PROJECT_ROOT = Path("/home/ih2363/forensicbind")
CHECKPOINT_DIR = PROJECT_ROOT / "checkpoints"
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)

print("DATA_ROOT     :", DATA_ROOT)
print("CHECKPOINT_DIR:", CHECKPOINT_DIR)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

# We now ONLY care about 3 classes
CLASS_NAMES = ["Real", "FaceSwap", "Face2Face"]
NUM_CLASSES = len(CLASS_NAMES)
print("CLASS_NAMES:", CLASS_NAMES)

# Original labels mapping (for reference):
#   0: Real
#   1: FaceSwap
#   2: Face2Face
#   3: NeuralTextures  <-- WILL BE DROPPED

# Hyperparams
BATCH_SIZE = 32
NUM_WORKERS = 4
NUM_EPOCHS = 20
BASE_LR = 5e-5       # lower LR that worked better for you
WEIGHT_DECAY = 3e-4  
LABEL_SMOOTHING = 0.05   # start with 0; you can try 0.05 later
VAL_EVERY = 1

# Reproducibility
SEED = 42
def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = False
    torch.backends.cudnn.benchmark = True

set_seed(SEED)

DATA_ROOT     : /home/ih2363/forensicbind/data_root
CHECKPOINT_DIR: /home/ih2363/forensicbind/checkpoints
Using device: cuda
CLASS_NAMES: ['Real', 'FaceSwap', 'Face2Face']


In [24]:
SPLITS_DIR = DATA_ROOT / "splits"
train_csv = SPLITS_DIR / "train.csv"
val_csv   = SPLITS_DIR / "val.csv"
test_csv  = SPLITS_DIR / "test.csv"

print("Splits dir:", SPLITS_DIR)
print("  train.csv exists:", train_csv.exists())
print("  val.csv   exists:", val_csv.exists())
print("  test.csv  exists:", test_csv.exists())

assert train_csv.exists(), f"Missing {train_csv}"
assert val_csv.exists(),   f"Missing {val_csv}"
assert test_csv.exists(),  f"Missing {test_csv}"

Splits dir: /home/ih2363/forensicbind/data_root/splits
  train.csv exists: True
  val.csv   exists: True
  test.csv  exists: True


In [25]:
SPLITS_DIR = DATA_ROOT / "splits"
train_csv = SPLITS_DIR / "train.csv"
val_csv   = SPLITS_DIR / "val.csv"
test_csv  = SPLITS_DIR / "test.csv"

print("Splits dir:", SPLITS_DIR)
print("  train.csv exists:", train_csv.exists())
print("  val.csv   exists:", val_csv.exists())
print("  test.csv  exists:", test_csv.exists())

assert train_csv.exists(), f"Missing {train_csv}"
assert val_csv.exists(),   f"Missing {val_csv}"
assert test_csv.exists(),  f"Missing {test_csv}"

# -----------------------------
# Cell 4: Dataset + transforms (drop NeuralTextures)
# -----------------------------
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

def get_train_transform():
    return T.Compose([
        T.Resize((256, 256)),
        T.RandomResizedCrop(224, scale=(0.7, 1.0), ratio=(0.9, 1.1)),
        T.RandomHorizontalFlip(p=0.5),
        T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        T.RandomApply([T.GaussianBlur(kernel_size=3)], p=0.3),
        T.ToTensor(),
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
    ])

def get_eval_transform():
    return T.Compose([
        T.Resize((256, 256)),
        T.CenterCrop(224),
        T.ToTensor(),
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
    ])

class DeepfakeFrameDataset(Dataset):
    """
    Frame-level dataset using your existing CSVs, but:
      - drops any rows where class_label == 3 (NeuralTextures)
      - keeps labels 0,1,2 as-is (Real, FaceSwap, Face2Face)
    CSV must contain: frame_path, class_label
    """
    def __init__(self, csv_path: Path, transform=None):
        super().__init__()
        self.csv_path = Path(csv_path)
        df = pd.read_csv(self.csv_path)

        assert "frame_path"  in df.columns, "CSV must contain 'frame_path'"
        assert "class_label" in df.columns, "CSV must contain 'class_label'"

        df["class_label"] = df["class_label"].astype(int)

        # DROP NeuralTextures (label == 3)
        before = len(df)
        df = df[df["class_label"] != 3].reset_index(drop=True)
        after = len(df)
        print(f"[{self.csv_path.name}] dropped {before - after} NeuralTextures frames, kept {after} samples.")

        self.df = df
        self.transform = transform

        print(self.df[["frame_path", "class_label"]].head())

    def __len__(self):
        return len(self.df)

    def _resolve_path(self, p: str) -> Path:
        p = Path(p)
        if p.is_absolute():
            return p
        return DATA_ROOT / p

    def __getitem__(self, idx: int):
        row = self.df.iloc[idx]
        img_path = self._resolve_path(row["frame_path"])
        label = int(row["class_label"])  # should be 0,1,2 only

        try:
            with Image.open(img_path) as img:
                img = img.convert("RGB")
        except Exception:
            # if corrupted/missing, fall back to another index
            new_idx = (idx + 1) % len(self.df)
            row = self.df.iloc[new_idx]
            img_path = self._resolve_path(row["frame_path"])
            label = int(row["class_label"])
            with Image.open(img_path) as img:
                img = img.convert("RGB")

        if self.transform is not None:
            img = self.transform(img)

        return img, label

# Instantiate datasets
train_transform = get_train_transform()
eval_transform  = get_eval_transform()

train_dataset = DeepfakeFrameDataset(train_csv, transform=train_transform)
val_dataset   = DeepfakeFrameDataset(val_csv,   transform=eval_transform)
test_dataset  = DeepfakeFrameDataset(test_csv,  transform=eval_transform)

Splits dir: /home/ih2363/forensicbind/data_root/splits
  train.csv exists: True
  val.csv   exists: True
  test.csv  exists: True
[train.csv] dropped 420 NeuralTextures frames, kept 1599 samples.
                                          frame_path  class_label
0  /home/ih2363/forensicbind/data_root/original_s...            0
1  /home/ih2363/forensicbind/data_root/original_s...            0
2  /home/ih2363/forensicbind/data_root/original_s...            0
3  /home/ih2363/forensicbind/data_root/original_s...            0
4  /home/ih2363/forensicbind/data_root/original_s...            0
[val.csv] dropped 120 NeuralTextures frames, kept 366 samples.
                                          frame_path  class_label
0  /home/ih2363/forensicbind/data_root/original_s...            0
1  /home/ih2363/forensicbind/data_root/original_s...            0
2  /home/ih2363/forensicbind/data_root/original_s...            0
3  /home/ih2363/forensicbind/data_root/original_s...            0
4  /home/ih2363

In [26]:
# -----------------------------
# Cell 5: Class counts, loaders (NO sampler)
# -----------------------------
train_labels = train_dataset.df["class_label"].values
class_counts = np.bincount(train_labels, minlength=NUM_CLASSES).astype(float)
print("Train frame counts per class (3-class):")
for idx, cnt in enumerate(class_counts):
    print(f"  {idx} ({CLASS_NAMES[idx]}): {int(cnt)}")

# Just to see distribution; we won't use it for sampler anymore
print("Class distribution (proportions):", class_counts / class_counts.sum())

# Dataloaders WITHOUT sampler
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,         # <--- important now
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

print("Dataloaders ready (3-class, no sampler).")


Train frame counts per class (3-class):
  0 (Real): 566
  1 (FaceSwap): 640
  2 (Face2Face): 393
Class distribution (proportions): [0.35397123 0.40025016 0.24577861]
Dataloaders ready (3-class, no sampler).


In [27]:
BACKBONE_NAME = "efficientnet_b0"

class EfficientNetDeepfake(nn.Module):
    def __init__(self, num_classes: int = NUM_CLASSES, backbone_name: str = BACKBONE_NAME):
        super().__init__()
        self.backbone = timm.create_model(
            backbone_name,
            pretrained=True,
            num_classes=0,   # feature extractor
            in_chans=3,
        )

        if hasattr(self.backbone, "num_features"):
            feat_dim = self.backbone.num_features
        elif hasattr(self.backbone, "classifier") and hasattr(self.backbone.classifier, "in_features"):
            feat_dim = self.backbone.classifier.in_features
        else:
            feat_dim = 1280  # fallback

        self.dropout = nn.Dropout(p=0.3)
        self.classifier = nn.Linear(feat_dim, num_classes)

    def forward(self, x):
        feats = self.backbone(x)
        feats = self.dropout(feats)
        logits = self.classifier(feats)
        return logits

model = EfficientNetDeepfake(num_classes=NUM_CLASSES).to(DEVICE)
print(model)

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nTotal parameters:     {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")

EfficientNetDeepfake(
  (backbone): EfficientNet(
    (conv_stem): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn1): BatchNormAct2d(
      32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
      (drop): Identity()
      (act): SiLU(inplace=True)
    )
    (blocks): Sequential(
      (0): Sequential(
        (0): DepthwiseSeparableConv(
          (conv_dw): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
          (bn1): BatchNormAct2d(
            32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
            (drop): Identity()
            (act): SiLU(inplace=True)
          )
          (aa): Identity()
          (se): SqueezeExcite(
            (conv_reduce): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
            (act1): SiLU(inplace=True)
            (conv_expand): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
            (gate): Sigmoid()
          )
          (co

In [28]:
# -----------------------------
# Cell 7: Loss, optimizer, scheduler (no class weights)
# -----------------------------
if LABEL_SMOOTHING > 0.0:
    criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING)
else:
    criterion = nn.CrossEntropyLoss()

optimizer = optim.AdamW(
    model.parameters(),
    lr=BASE_LR,
    weight_decay=WEIGHT_DECAY,
)

scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=NUM_EPOCHS,
)

scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())

  scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())


In [29]:
# -----------------------------
# Cell 8: Epoch loop helper
# -----------------------------
def run_one_epoch(
    model,
    loader,
    optimizer=None,
    scaler=None,
    device=DEVICE,
    train: bool = True,
):
    if train:
        model.train()
    else:
        model.eval()

    epoch_loss = 0.0
    all_labels = []
    all_preds = []

    for images, labels in loader:
        images = images.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        if train:
            optimizer.zero_grad(set_to_none=True)

            with torch.amp.autocast("cuda", enabled=torch.cuda.is_available()):
                logits = model(images)
                loss = criterion(logits, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            with torch.no_grad(), torch.amp.autocast("cuda", enabled=torch.cuda.is_available()):
                logits = model(images)
                loss = criterion(logits, labels)

        epoch_loss += loss.item() * images.size(0)

        preds = torch.argmax(logits, dim=1)
        all_labels.append(labels.detach().cpu().numpy())
        all_preds.append(preds.detach().cpu().numpy())

    all_labels = np.concatenate(all_labels)
    all_preds = np.concatenate(all_preds)

    avg_loss = epoch_loss / len(loader.dataset)
    acc = accuracy_score(all_labels, all_preds)
    macro_f1 = f1_score(all_labels, all_preds, average="macro")

    return avg_loss, acc, macro_f1, all_labels, all_preds

In [30]:

# -----------------------------
# Cell 9: Training loop (3-class)
# -----------------------------
best_val_f1 = 0.0
best_epoch = -1
best_ckpt_path = CHECKPOINT_DIR / "efficientnet_3class_best.pth"

print("\nStarting training (3 classes: Real, FaceSwap, Face2Face)...")
print(f"  epochs   : {NUM_EPOCHS}")
print(f"  base lr  : {BASE_LR}")
print(f"  batch_sz : {BATCH_SIZE}")

for epoch in range(1, NUM_EPOCHS + 1):
    start_time = time.time()

    train_loss, train_acc, train_f1, _, _ = run_one_epoch(
        model,
        train_loader,
        optimizer=optimizer,
        scaler=scaler,
        device=DEVICE,
        train=True,
    )

    if (epoch % VAL_EVERY) == 0:
        val_loss, val_acc, val_f1, _, _ = run_one_epoch(
            model,
            val_loader,
            optimizer=None,
            scaler=scaler,
            device=DEVICE,
            train=False,
        )
    else:
        val_loss = val_acc = val_f1 = float("nan")

    scheduler.step()
    elapsed = time.time() - start_time

    print(
        f"Epoch {epoch:03d} | "
        f"train_loss={train_loss:.4f} | train_acc={train_acc:.4f} | train_f1={train_f1:.4f} || "
        f"val_loss={val_loss:.4f} | val_acc={val_acc:.4f} | val_f1={val_f1:.4f} | "
        f"time={elapsed:.1f}s"
    )

    if not math.isnan(val_f1) and val_f1 > best_val_f1:
        best_val_f1 = val_f1
        best_epoch = epoch
        torch.save(
            {
                "epoch": epoch,
                "model_state": model.state_dict(),
                "optimizer_state": optimizer.state_dict(),
                "scheduler_state": scheduler.state_dict(),
                "val_f1": best_val_f1,
                "class_names": CLASS_NAMES,
            },
            best_ckpt_path,
        )
        print(f"  üî• New best model at epoch {epoch}, val_f1={best_val_f1:.4f}")
        print(f"    Saved to: {best_ckpt_path}")

print("\nTraining complete (3-class).")
print(f"Best val F1: {best_val_f1:.4f} at epoch {best_epoch}")
print(f"Best checkpoint: {best_ckpt_path}")



Starting training (3 classes: Real, FaceSwap, Face2Face)...
  epochs   : 20
  base lr  : 5e-05
  batch_sz : 32
Epoch 001 | train_loss=0.8985 | train_acc=0.7036 | train_f1=0.6926 || val_loss=1.0801 | val_acc=0.4016 | val_f1=0.3286 | time=6.3s
  üî• New best model at epoch 1, val_f1=0.3286
    Saved to: /home/ih2363/forensicbind/checkpoints/efficientnet_3class_best.pth
Epoch 002 | train_loss=0.4700 | train_acc=0.9681 | train_f1=0.9675 || val_loss=0.9955 | val_acc=0.5328 | val_f1=0.4777 | time=6.8s
  üî• New best model at epoch 2, val_f1=0.4777
    Saved to: /home/ih2363/forensicbind/checkpoints/efficientnet_3class_best.pth
Epoch 003 | train_loss=0.2633 | train_acc=0.9881 | train_f1=0.9871 || val_loss=0.9891 | val_acc=0.5874 | val_f1=0.5641 | time=6.3s
  üî• New best model at epoch 3, val_f1=0.5641
    Saved to: /home/ih2363/forensicbind/checkpoints/efficientnet_3class_best.pth
Epoch 004 | train_loss=0.2129 | train_acc=0.9937 | train_f1=0.9934 || val_loss=0.9744 | val_acc=0.6148 | val

In [31]:
# -----------------------------
# Cell 10: Load best and TEST evaluation (3-class)
# -----------------------------
print("\nLoading best checkpoint for TEST evaluation (3-class)...")
ckpt = torch.load(best_ckpt_path, map_location=DEVICE)
model.load_state_dict(ckpt["model_state"])
model.to(DEVICE)
model.eval()

test_loss, test_acc, test_f1, test_labels, test_preds = run_one_epoch(
    model,
    test_loader,
    optimizer=None,
    scaler=scaler,
    device=DEVICE,
    train=False,
)

print("\n==== EfficientNet ‚Äì TEST performance (3-class) ====")
print(f"TEST loss    : {test_loss:.4f}")
print(f"TEST accuracy: {test_acc:.4f}")
print(f"TEST macro F1: {test_f1:.4f}")

cm = confusion_matrix(test_labels, test_preds, labels=list(range(NUM_CLASSES)))
print("Confusion matrix (rows=true, cols=pred):\n", cm)

print("\nClassification report (test, 3-class):")
print(classification_report(test_labels, test_preds, target_names=CLASS_NAMES, digits=3))


Loading best checkpoint for TEST evaluation (3-class)...

==== EfficientNet ‚Äì TEST performance (3-class) ====
TEST loss    : 0.7983
TEST accuracy: 0.6288
TEST macro F1: 0.6007
Confusion matrix (rows=true, cols=pred):
 [[41 32 47]
 [ 6 92  0]
 [ 7  6 33]]

Classification report (test, 3-class):
              precision    recall  f1-score   support

        Real      0.759     0.342     0.471       120
    FaceSwap      0.708     0.939     0.807        98
   Face2Face      0.412     0.717     0.524        46

    accuracy                          0.629       264
   macro avg      0.626     0.666     0.601       264
weighted avg      0.680     0.629     0.605       264



In [32]:
# ============================================================
# FINAL CELL ‚Äî SAVE WEIGHTS, CONFIG, METRICS
# ============================================================
import json
from pathlib import Path

# 1) Export directory
EXPORT_DIR = PROJECT_ROOT / "export"
EXPORT_DIR.mkdir(parents=True, exist_ok=True)
print("Export dir:", EXPORT_DIR)

# ------------------------------------------------------------
# 2) Save the model weights (state_dict only)
# ------------------------------------------------------------
weights_path = EXPORT_DIR / "efficientnet_3class.pth"
torch.save(model.state_dict(), weights_path)
print("Saved model weights to:", weights_path)

# ------------------------------------------------------------
# 3) Save model configuration (for anyone loading the model)
# ------------------------------------------------------------
model_config = {
    "backbone_name": BACKBONE_NAME,        # e.g. "efficientnet_b0"
    "num_classes": int(NUM_CLASSES),       # should be 3
    "class_names": CLASS_NAMES,            # ["Real", "FaceSwap", "Face2Face"]
    "input_size": [3, 224, 224],           # channels, height, width
    "imagenet_mean": IMAGENET_MEAN,
    "imagenet_std": IMAGENET_STD,
    "dropout": 0.3,
    "checkpoint": "efficientnet_3class.pth"
}

config_path = EXPORT_DIR / "model_config.json"
with open(config_path, "w") as f:
    json.dump(model_config, f, indent=2)
print("Saved model config to:", config_path)

# ------------------------------------------------------------
# 4) Save test metrics (if test was already run)
# ------------------------------------------------------------
try:
    test_metrics = {
        "test_loss": float(test_loss),
        "test_accuracy": float(test_acc),
        "test_macro_f1": float(test_f1),
    }
    metrics_path = EXPORT_DIR / "test_metrics.json"
    with open(metrics_path, "w") as f:
        json.dump(test_metrics, f, indent=2)
    print("Saved test metrics to:", metrics_path)

except NameError:
    print("‚ö†Ô∏è  test_loss/test_acc/test_f1 not defined ‚Äî "
          "run the test evaluation cell first if you want metrics exported.")


Export dir: /home/ih2363/forensicbind/export
Saved model weights to: /home/ih2363/forensicbind/export/efficientnet_3class.pth
Saved model config to: /home/ih2363/forensicbind/export/model_config.json
Saved test metrics to: /home/ih2363/forensicbind/export/test_metrics.json
