<a href="https://colab.research.google.com/github/Htets-Corner/SYNTHBUSTER_RAISE-1k/blob/main/mobilevit_kfold.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ==========================
# Step 0: Data Preparation
# ==========================

# 1. Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# 2. Define dataset paths
import os
real_path = "/content/drive/MyDrive/RAISE/PNG"
ai_path   = "/content/drive/MyDrive/synthbuster"

print("Real images path:", real_path)
print("AI images path:", ai_path)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Real images path: /content/drive/MyDrive/RAISE/PNG
AI images path: /content/drive/MyDrive/synthbuster


In [3]:
# 3. Collect all file paths with labels
valid_exts = ('.jpg', '.jpeg', '.png', '.PNG', '.bmp', '.tif', '.tiff', '.webp')

# Real images
real_files = [os.path.join(real_path, f) for f in os.listdir(real_path)
              if f.lower().endswith(valid_exts)]

# AI images (inside multiple subfolders)
ai_files = []
for subfolder in os.listdir(ai_path):
    sub_path = os.path.join(ai_path, subfolder)
    if os.path.isdir(sub_path):
        ai_files.extend([os.path.join(sub_path, f) for f in os.listdir(sub_path)
                         if f.lower().endswith(valid_exts)])

print(f"Found {len(real_files)} real images")
print(f"Found {len(ai_files)} AI images")

# 4. Build dataframe of filepaths + labels
import pandas as pd
df_real = pd.DataFrame({"filepath": real_files, "label": "real"})
df_ai   = pd.DataFrame({"filepath": ai_files, "label": "ai"})
df = pd.concat([df_real, df_ai], ignore_index=True)

print("Total dataset size:", len(df))
print(df["label"].value_counts())

Found 999 real images
Found 3000 AI images
Total dataset size: 3999
label
ai      3000
real     999
Name: count, dtype: int64


In [5]:
# 5. Define transforms (resize + normalization for MobileViT)
import torch
from torchvision import transforms

image_size = 256
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])
])

# 6. Define custom Dataset for dataframe
from PIL import Image
from torch.utils.data import Dataset

class ImageDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform
        self.label_map = {"real": 0, "ai": 1}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        path = self.df.loc[idx, "filepath"]
        label = self.label_map[self.df.loc[idx, "label"]]
        image = Image.open(path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

# Create full dataset
full_dataset = ImageDataset(df, transform=transform)

# 7. K-Fold Split with WeightedRandomSampler
from sklearn.model_selection import StratifiedKFold
from torch.utils.data import DataLoader, Subset, WeightedRandomSampler
from collections import Counter
import numpy as np

n_splits = 10   # Number of folds
batch_size = 32
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

labels = df["label"].map({"real": 0, "ai": 1}).values
folds = []

for fold, (train_idx, val_idx) in enumerate(skf.split(df, labels)):
    print(f"\n🔹 Fold {fold+1}/{n_splits}")
    print("Train size:", len(train_idx), "Val size:", len(val_idx))

    # Train / Val subsets
    train_subset = Subset(full_dataset, train_idx)
    val_subset   = Subset(full_dataset, val_idx)

    # Class balance in training subset
    train_labels = labels[train_idx]
    counts = Counter(train_labels)
    class_weights = {cls: 1.0/count for cls, count in counts.items()}
    sample_weights = np.array([class_weights[l] for l in train_labels])

    # WeightedRandomSampler
    sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

    # DataLoaders
    train_loader = DataLoader(train_subset, batch_size=batch_size, sampler=sampler,
                              num_workers=2, pin_memory=True)
    val_loader   = DataLoader(val_subset, batch_size=batch_size, shuffle=False,
                              num_workers=2, pin_memory=True)

    folds.append({
        "fold": fold,
        "train_idx": train_idx,
        "val_idx": val_idx,
        "train_loader": train_loader,
        "val_loader": val_loader
    })

print("\n✅ All folds prepared. Access them as folds[i]['train_loader'] and folds[i]['val_loader']")


🔹 Fold 1/10
Train size: 3599 Val size: 400

🔹 Fold 2/10
Train size: 3599 Val size: 400

🔹 Fold 3/10
Train size: 3599 Val size: 400

🔹 Fold 4/10
Train size: 3599 Val size: 400

🔹 Fold 5/10
Train size: 3599 Val size: 400

🔹 Fold 6/10
Train size: 3599 Val size: 400

🔹 Fold 7/10
Train size: 3599 Val size: 400

🔹 Fold 8/10
Train size: 3599 Val size: 400

🔹 Fold 9/10
Train size: 3599 Val size: 400

🔹 Fold 10/10
Train size: 3600 Val size: 399

✅ All folds prepared. Access them as folds[i]['train_loader'] and folds[i]['val_loader']


In [4]:
# ==========================
# Step 1: Model Setup
# ==========================

import timm
import torch
import torch.nn as nn

# 1. Choose model
model_name = "mobilevitv2_050"

# 2. Function to create a fresh model (for each fold)
def create_model(model_name="mobilevitv2_050", num_classes=2, device=None):
    # Load pretrained MobileViT
    model = timm.create_model(model_name, pretrained=True)

    # Reset classifier head for binary classification
    if hasattr(model, "reset_classifier"):
        model.reset_classifier(num_classes=num_classes)
    else:
        # fallback if reset_classifier not available
        in_features = model.classifier.in_features
        model.classifier = nn.Linear(in_features, num_classes)

    # Move to device
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    print(f"✅ Loaded {model_name} with classifier reset to {num_classes} classes")
    return model, device

# 3. Detect device once
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# 4. Example: build one model
model, device = create_model(model_name=model_name, num_classes=2, device=device)


Device: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/5.54M [00:00<?, ?B/s]

✅ Loaded mobilevitv2_050 with classifier reset to 2 classes


In [11]:
# @title
import os
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import precision_score, recall_score, f1_score
from tqdm import tqdm

# ============= CONFIG =============
num_epochs = 10
learning_rate = 1e-4
checkpoint_dir = "/content/drive/MyDrive/checkpoints_kfold"
os.makedirs(checkpoint_dir, exist_ok=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ============= TRAINING FUNCTION =============
def train_one_fold(model, train_loader, val_loader, fold, start_epoch=0):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    best_val_acc = 0.0
    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": [],
               "precision": [], "recall": [], "f1": []}

    # Checkpoint path for this fold
    fold_ckpt = os.path.join(checkpoint_dir, f"mobilevit_fold{fold}.pth")

    # Resume training if checkpoint exists
    if os.path.exists(fold_ckpt):
        print(f"🔄 Resuming Fold {fold} from checkpoint...")
        checkpoint = torch.load(fold_ckpt, map_location=device)
        model.load_state_dict(checkpoint["model_state"])
        optimizer.load_state_dict(checkpoint["optimizer_state"])
        best_val_acc = checkpoint["best_val_acc"]
        history = checkpoint["history"]
        start_epoch = checkpoint["epoch"] + 1
        print(f"✅ Resumed from epoch {start_epoch}, best_val_acc={best_val_acc:.4f}")

    for epoch in range(start_epoch, num_epochs):
        print(f"\n📘 Fold {fold} | Epoch [{epoch+1}/{num_epochs}]")

        # ---------- TRAIN ----------
        model.train()
        train_loss, correct, total = 0, 0, 0
        for imgs, labels in tqdm(train_loader, desc="Training", leave=False):
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * imgs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        train_acc = 100. * correct / total
        train_loss /= total

        # ---------- VALIDATION ----------
        model.eval()
        val_loss, correct, total = 0, 0, 0
        all_labels, all_preds = [], []
        with torch.no_grad():
            for imgs, labels in tqdm(val_loader, desc="Validating", leave=False):
                imgs, labels = imgs.to(device), labels.to(device)
                outputs = model(imgs)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * imgs.size(0)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(predicted.cpu().numpy())

        val_acc = 100. * correct / total
        val_loss /= total

        # Compute metrics
        precision = precision_score(all_labels, all_preds, average="binary")
        recall = recall_score(all_labels, all_preds, average="binary")
        f1 = f1_score(all_labels, all_preds, average="binary")

        # Save history
        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)
        history["precision"].append(precision)
        history["recall"].append(recall)
        history["f1"].append(f1)

        print(f"📊 Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"📊 Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, "
              f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

        # Save checkpoint (every epoch)
        checkpoint = {
            "epoch": epoch,
            "model_state": model.state_dict(),
            "optimizer_state": optimizer.state_dict(),
            "best_val_acc": best_val_acc,
            "history": history,
        }
        torch.save(checkpoint, fold_ckpt)

        # Save best model
        if val_acc > best_val_acc:
            print("💾 Saving best model for this fold...")
            best_val_acc = val_acc
            torch.save(model.state_dict(), os.path.join(checkpoint_dir, f"best_fold{fold}.pth"))

    return history

# ============= K-FOLD TRAINING LOOP =============
def train_kfold(model_fn, train_loaders, val_loaders, k_folds):
    fold_histories = []
    for fold in range(k_folds):
        print(f"\n🔹 Starting Fold {fold+1}/{k_folds}")

        # Fresh model per fold
        model = model_fn().to(device)

        history = train_one_fold(model, train_loaders[fold], val_loaders[fold], fold)
        fold_histories.append(history)

    # Compute average metrics across folds
    avg_metrics = {key: sum(h[key][-1] for h in fold_histories)/k_folds
                   for key in fold_histories[0].keys()}

    print("\n✅ K-Fold Training Completed!")
    print(f"📊 Avg Val Acc: {avg_metrics['val_acc']:.2f}%, "
          f"Avg Precision: {avg_metrics['precision']:.4f}, "
          f"Avg Recall: {avg_metrics['recall']:.4f}, "
          f"Avg F1: {avg_metrics['f1']:.4f}")

    return fold_histories, avg_metrics


In [5]:
# =========================================
# FULL PIPELINE: K-Fold MobileViT Training
# =========================================

# -------------------------------
# Step 0: Data Preparation (K-Fold)
# -------------------------------
import os
import glob
import torch
from torchvision import transforms
from torch.utils.data import DataLoader, Subset, WeightedRandomSampler, Dataset
from sklearn.model_selection import KFold
from PIL import Image

# Paths to original datasets
real_path = "/content/drive/MyDrive/RAISE/PNG"
ai_path = "/content/drive/MyDrive/synthbuster"

# -------------------------------
# Custom Dataset
# -------------------------------
class CustomImageDataset(Dataset):
    def __init__(self, files, label, transform=None):
        self.files = files
        self.label = label
        self.transform = transform

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img = Image.open(self.files[idx]).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, self.label

# -------------------------------
# Gather files and create dataset
# -------------------------------
image_size = 256
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize([0.5,0.5,0.5], [0.5,0.5,0.5])
])

# Real images
real_files = glob.glob(os.path.join(real_path, "*.png"))
real_dataset = CustomImageDataset(real_files, 1, transform)

# AI images (multiple subfolders)
ai_files = []
for sub in os.listdir(ai_path):
    sub_path = os.path.join(ai_path, sub)
    if os.path.isdir(sub_path):
        ai_files += glob.glob(os.path.join(sub_path, "*.png"))
ai_dataset = CustomImageDataset(ai_files, 0, transform)

# Combine datasets
full_dataset = real_dataset + ai_dataset  # Dataset supports concatenation in newer PyTorch

# -------------------------------
# K-Fold Split
# -------------------------------
k_folds = 10
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

folds = []
for fold, (train_idx, val_idx) in enumerate(kf.split(range(len(full_dataset)))):
    # Subsets
    train_subset = Subset(full_dataset, train_idx)
    val_subset = Subset(full_dataset, val_idx)

    # WeightedRandomSampler for class imbalance
    targets = [full_dataset[i][1] for i in train_idx]
    from collections import Counter
    counts = Counter(targets)
    class_weights = {cls: 1.0 / count for cls, count in counts.items()}
    sample_weights = [class_weights[t] for t in targets]
    sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

    # Dataloaders
    batch_size = 16
    train_loader = DataLoader(train_subset, batch_size=batch_size, sampler=sampler, num_workers=2, pin_memory=True)
    val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

    #folds.append({"fold": fold, "train_loader": train_loader, "val_loader": val_loader})

folds.append({
        "fold": fold,
        "train_idx": train_idx,
        "val_idx": val_idx,
        "train_loader": train_loader,
        "val_loader": val_loader
    })





KeyboardInterrupt: 

In [None]:
# Extract train_loaders and val_loaders
train_loaders = [f['train_loader'] for f in folds]
val_loaders   = [f['val_loader'] for f in folds]

# -------------------------------
# Step 1: MobileViT Model Setup
# -------------------------------
import timm
import torch.nn as nn

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

def create_mobilevit():
    model_name = "mobilevitv2_050"
    model = timm.create_model(model_name, pretrained=True)
    if hasattr(model, "reset_classifier"):
        model.reset_classifier(num_classes=2)
    else:
        in_features = model.classifier.in_features
        model.classifier = nn.Linear(in_features, 2)
    return model

# -------------------------------
# Step 2: K-Fold Training with Early Stopping + Resume
# -------------------------------
import torch.optim as optim
from sklearn.metrics import precision_score, recall_score, f1_score
import json

checkpoint_dir = "/content/drive/MyDrive/kfold_checkpoints"
os.makedirs(checkpoint_dir, exist_ok=True)
"""
def train_one_fold(model, device, train_loader, val_loader, fold, num_epochs=10, patience=3):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    best_val_acc = 0
    patience_counter = 0
    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": [],
               "precision": [], "recall": [], "f1": []}

    fold_dir = os.path.join(checkpoint_dir, f"fold_{fold}")
    os.makedirs(fold_dir, exist_ok=True)
    model_path = os.path.join(fold_dir, "best_model.pth")
    history_path = os.path.join(fold_dir, "history.json")

    if os.path.exists(model_path) and os.path.exists(history_path):
        print(f"⏩ Resuming Fold {fold} from checkpoint...")
        model.load_state_dict(torch.load(model_path))
        with open(history_path, "r") as f:
            history = json.load(f)
        return history, model

    for epoch in range(num_epochs):
        # Training
        model.train()
        running_loss, correct, total = 0, 0, 0
        for imgs, labels in train_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * imgs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / total
        train_acc = 100 * correct / total

        # Validation
        model.eval()
        val_loss, val_correct, val_total = 0, 0, 0
        all_preds, all_labels = [], []
        with torch.no_grad():
            for imgs, labels in val_loader:
                imgs, labels = imgs.to(device), labels.to(device)
                outputs = model(imgs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * imgs.size(0)
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= val_total
        val_acc = 100 * val_correct / val_total
        precision = precision_score(all_labels, all_preds, average="binary")
        recall = recall_score(all_labels, all_preds, average="binary")
        f1 = f1_score(all_labels, all_preds, average="binary")

        # Save history
        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)
        history["precision"].append(precision)
        history["recall"].append(recall)
        history["f1"].append(f1)

        print(f"Fold {fold} | Epoch {epoch+1} | Train Acc {train_acc:.2f}% | Val Acc {val_acc:.2f}% | F1 {f1:.4f}")

        # Early stopping
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            torch.save(model.state_dict(), model_path)
            with open(history_path, "w") as f:
                json.dump(history, f)
            print("💾 Saved new best model!")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("⏹ Early stopping triggered.")
                break

    return history, model """

"""
# -------------------------------
# Updated train_one_fold with tqdm progress bars
# -------------------------------
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def train_one_fold(model, device, train_loader, val_loader, fold, num_epochs=10, patience=3):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    best_val_acc = 0
    patience_counter = 0
    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": [],
               "precision": [], "recall": [], "f1": []}

    fold_dir = os.path.join(checkpoint_dir, f"fold_{fold}")
    os.makedirs(fold_dir, exist_ok=True)
    model_path = os.path.join(fold_dir, "best_model.pth")
    history_path = os.path.join(fold_dir, "history.json")

    # Resume if checkpoint exists
    if os.path.exists(model_path) and os.path.exists(history_path):
        print(f"⏩ Resuming Fold {fold} from checkpoint...")
        model.load_state_dict(torch.load(model_path))
        with open(history_path, "r") as f:
            history = json.load(f)
        return history, model

    for epoch in range(num_epochs):
        # ---- Training ----
        model.train()
        running_loss, correct, total = 0, 0, 0
        loop = tqdm(train_loader, desc=f"Fold {fold} Epoch {epoch+1}/{num_epochs} [Train]")
        for imgs, labels in loop:
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * imgs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            loop.set_postfix(loss=running_loss/total, acc=100*correct/total)

        train_loss = running_loss / total
        train_acc = 100 * correct / total

        # ---- Validation ----
        model.eval()
        val_loss, val_correct, val_total = 0, 0, 0
        all_preds, all_labels = [], []
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * imgs.size(0)
            _, preds = torch.max(outputs, 1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        val_loss /= val_total
        val_acc = 100 * val_correct / val_total
        precision = precision_score(all_labels, all_preds, average="binary")
        recall = recall_score(all_labels, all_preds, average="binary")
        f1 = f1_score(all_labels, all_preds, average="binary")

        # Save history
        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)
        history["precision"].append(precision)
        history["recall"].append(recall)
        history["f1"].append(f1)

        print(f"Fold {fold} | Epoch {epoch+1} | Train Acc {train_acc:.2f}% | Val Acc {val_acc:.2f}% | F1 {f1:.4f}")

        # ---- Early stopping + save best model ----
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            torch.save(model.state_dict(), model_path)
            with open(history_path, "w") as f:
                json.dump(history, f)
            print("💾 Saved new best model!")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("⏹ Early stopping triggered.")
                break

    # ---- Confusion Matrix ----
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(5,4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=['AI','Real'], yticklabels=['AI','Real'])
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title(f"Fold {fold} Confusion Matrix")
    plt.show()

    return history, model


def train_kfold(create_model_fn, train_loaders, val_loaders, k_folds=5, num_epochs=10, patience=3):
    all_histories = []
    fold_metrics = {"val_acc": [], "precision": [], "recall": [], "f1": []}

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for fold in range(k_folds):
        print(f"\n========== Fold {fold} ==========")
        model = create_model_fn().to(device)
        history, model = train_one_fold(model, device, train_loaders[fold], val_loaders[fold],
                                        fold, num_epochs=num_epochs, patience=patience)
        all_histories.append(history)

        fold_metrics["val_acc"].append(max(history["val_acc"]))
        fold_metrics["precision"].append(max(history["precision"]))
        fold_metrics["recall"].append(max(history["recall"]))
        fold_metrics["f1"].append(max(history["f1"]))

    avg_metrics = {k: sum(v)/len(v) for k,v in fold_metrics.items()}
    return all_histories, avg_metrics

# -------------------------------
# Step 2.5: Run K-Fold Training
# -------------------------------
fold_histories, avg_metrics = train_kfold(create_mobilevit, train_loaders, val_loaders, k_folds)
print("\n📌 Average Metrics Across Folds:")
print(avg_metrics)
"""

import os, copy, sys
import torch
import torch.nn as nn
from tqdm import tqdm
from sklearn.metrics import f1_score, balanced_accuracy_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# 🔹 Training function for one fold
def train_one_fold(fold, model_fn, train_loader, val_loader, num_epochs=10,
                   patience=3, best_model_dir="/content/drive/MyDrive/Dataset/kfold_results"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model_fn().to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4, weight_decay=1e-4)
    criterion = nn.CrossEntropyLoss()
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

    os.makedirs(best_model_dir, exist_ok=True)
    best_model_path = os.path.join(best_model_dir, f"mobilevitv2_fold{fold}_best.pth")

    best_val_acc = 0.0
    patience_counter = 0
    history = {
        "epoch": [], "train_loss": [], "train_acc": [],
        "val_loss": [], "val_acc": [], "val_macro_f1": [], "val_bal_acc": []
    }

    for epoch in range(num_epochs):
        # ---------------- TRAIN ----------------
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        loop = tqdm(train_loader, desc=f"Fold {fold} Epoch {epoch+1}/{num_epochs} [Train]",
                    file=sys.stdout, dynamic_ncols=True)
        for images, labels in loop:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            loop.set_postfix(loss=running_loss/total, acc=100*correct/total)

        train_loss = running_loss / len(train_loader.dataset)
        train_acc = 100. * correct / total

        # ---------------- VALIDATION ----------------
        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0
        all_preds, all_labels = [], []
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * images.size(0)
                preds = outputs.argmax(dim=1)

                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader.dataset)
        val_acc = 100. * val_correct / val_total
        val_macro_f1 = f1_score(all_labels, all_preds, average="macro")
        val_bal_acc = balanced_accuracy_score(all_labels, all_preds)

        # Save history
        history["epoch"].append(epoch+1)
        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)
        history["val_macro_f1"].append(val_macro_f1)
        history["val_bal_acc"].append(val_bal_acc)

        print(f"✅ Fold {fold} | Epoch {epoch+1}/{num_epochs} | "
              f"Train Acc: {train_acc:.2f}% | Val Acc: {val_acc:.2f}% | "
              f"Macro-F1: {val_macro_f1:.4f} | BalAcc: {val_bal_acc:.4f}",
              flush=True)

        # ---------------- EARLY STOPPING ----------------
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), best_model_path)
            patience_counter = 0
            print(f"💾 Best model saved for Fold {fold} at {best_model_path}", flush=True)
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"⏹️ Early stopping at epoch {epoch+1} for Fold {fold}", flush=True)
                break

        scheduler.step()

    # ---------------- CONFUSION MATRIX ----------------
    model.load_state_dict(torch.load(best_model_path, map_location=device))
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(5, 4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                xticklabels=["AI", "Real"], yticklabels=["AI", "Real"])
    plt.title(f"Confusion Matrix - Fold {fold}")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    return history, best_val_acc


# 🔹 Full K-Fold Loop
def train_kfold(model_fn, folds, num_epochs=10, patience=3):
    all_histories, fold_metrics = [], []

    for fold, fold_data in enumerate(folds):
        print(f"\n🚀 Starting Fold {fold+1}/{len(folds)}")
        train_loader, val_loader = fold_data['train_loader'], fold_data['val_loader']

        history, best_val_acc = train_one_fold(
            fold=fold+1,
            model_fn=model_fn,
            train_loader=train_loader,
            val_loader=val_loader,
            num_epochs=num_epochs,
            patience=patience
        )

        all_histories.append(history)
        fold_metrics.append(best_val_acc)

    avg_acc = sum(fold_metrics) / len(fold_metrics)
    print(f"\n📊 Average Val Accuracy across folds: {avg_acc:.2f}%")

    return all_histories, fold_metrics, avg_acc


# 🚀 Run K-Fold Training
fold_histories, fold_metrics, avg_metrics = train_kfold(create_mobilevit, folds, num_epochs=10, patience=3)


In [None]:
# -------------------------------
# Step 3: Results Visualization
# -------------------------------
import matplotlib.pyplot as plt
import numpy as np

def plot_fold_curves(history, fold):
    epochs = range(1, len(history["train_loss"])+1)
    plt.figure(figsize=(12,5))
    plt.subplot(1,2,1)
    plt.plot(epochs, history["train_loss"], label="Train Loss")
    plt.plot(epochs, history["val_loss"], label="Val Loss")
    plt.title(f"Fold {fold} Loss")
    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.legend()

    plt.subplot(1,2,2)
    plt.plot(epochs, history["train_acc"], label="Train Acc")
    plt.plot(epochs, history["val_acc"], label="Val Acc")
    plt.title(f"Fold {fold} Accuracy")
    plt.xlabel("Epoch"); plt.ylabel("Accuracy (%)"); plt.legend()
    plt.show()

def plot_fold_metrics(history, fold):
    epochs = range(1, len(history["f1"])+1)
    plt.figure(figsize=(8,5))
    plt.plot(epochs, history["precision"], label="Precision")
    plt.plot(epochs, history["recall"], label="Recall")
    plt.plot(epochs, history["f1"], label="F1 Score")
    plt.title(f"Fold {fold} Metrics")
    plt.xlabel("Epoch"); plt.ylabel("Score"); plt.legend()
    plt.show()

def plot_cv_summary(all_histories, avg_metrics):
    num_folds = len(all_histories)
    best_acc = [max(h["val_acc"]) for h in all_histories]
    best_prec = [max(h["precision"]) for h in all_histories]
    best_recall = [max(h["recall"]) for h in all_histories]
    best_f1 = [max(h["f1"]) for h in all_histories]

    x = np.arange(num_folds); width=0.2
    plt.figure(figsize=(12,6))
    plt.bar(x-0.3, best_acc, width, label="Accuracy")
    plt.bar(x-0.1, best_prec, width, label="Precision")
    plt.bar(x+0.1, best_recall, width, label="Recall")
    plt.bar(x+0.3, best_f1, width, label="F1")
    plt.xticks(x, [f"Fold {i}" for i in range(num_folds)])
    plt.title("Cross-Validation Metrics per Fold")
    plt.ylabel("Score"); plt.legend(); plt.show()

# Example usage:
for fold_id, history in enumerate(fold_histories):
    plot_fold_curves(history, fold_id)
    plot_fold_metrics(history, fold_id)

plot_cv_summary(fold_histories, avg_metrics)

In [1]:
# Step 0: Setup & Imports
import os
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, WeightedRandomSampler, ConcatDataset
from torchvision import transforms, datasets
import timm


In [9]:
# Step 1: Dataset & K-Fold Data Preparation (Fixed for your structure)
from torch.utils.data import Dataset
from PIL import Image
import glob

class CustomImageDataset(Dataset):
    def __init__(self, real_path, ai_path, transform=None):
        self.transform = transform
        self.images = []
        self.labels = []

        # Real images = class 0
        real_files = glob.glob(os.path.join(real_path, "*.png"))
        self.images.extend(real_files)
        self.labels.extend([0] * len(real_files))

        # AI images = class 1 (all subfolders)
        ai_files = glob.glob(os.path.join(ai_path, "**/*.png"), recursive=True)
        self.images.extend(ai_files)
        self.labels.extend([1] * len(ai_files))

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]

        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)

        return img, label


# Transform
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
dataset = CustomImageDataset(real_path="/content/drive/MyDrive/RAISE/PNG",
                             ai_path="/content/drive/MyDrive/synthbuster",
                             transform=transform)

print(f"✅ Loaded dataset with {len(dataset)} images")
print(f"   Real: {sum(np.array(dataset.labels)==0)}, AI: {sum(np.array(dataset.labels)==1)}")

# Extract labels
targets = np.array(dataset.labels)

# K-Fold split
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

train_loaders, val_loaders = [], []

for fold, (train_idx, val_idx) in enumerate(kf.split(np.arange(len(dataset)))):
    train_subset = torch.utils.data.Subset(dataset, train_idx)
    val_subset = torch.utils.data.Subset(dataset, val_idx)

    # Compute class weights for sampler
    class_sample_counts = np.bincount(targets[train_idx])
    weights = 1. / class_sample_counts
    samples_weights = weights[targets[train_idx]]

    sampler = WeightedRandomSampler(samples_weights, len(samples_weights), replacement=True)

    train_loader = DataLoader(train_subset, batch_size=32, sampler=sampler)
    val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)

    train_loaders.append(train_loader)
    val_loaders.append(val_loader)

print(f"✅ Prepared {k_folds}-Fold DataLoaders")



✅ Loaded dataset with 3999 images
   Real: 999, AI: 3000
✅ Prepared 5-Fold DataLoaders


In [10]:
# Step 2: Model Setup

def create_mobilevit():
    model_name = "mobilevitv2_050"
    model = timm.create_model(model_name, pretrained=True)

    if hasattr(model, "reset_classifier"):
        model.reset_classifier(num_classes=2)
    else:
        in_features = model.classifier.in_features
        model.classifier = nn.Linear(in_features, 2)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    return model, device

print("✅ Model setup function ready")


✅ Model setup function ready


In [11]:
# Step 3: Training Loop

def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for inputs, labels in tqdm(loader, desc="Training", leave=False):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    return running_loss / total, correct / total


def validate(model, loader, criterion, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    all_labels, all_preds = [], []

    with torch.no_grad():
        for inputs, labels in tqdm(loader, desc="Validation", leave=False):
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    acc = correct / total
    prec = precision_score(all_labels, all_preds, average="weighted")
    rec = recall_score(all_labels, all_preds, average="weighted")
    f1 = f1_score(all_labels, all_preds, average="weighted")

    return running_loss / total, acc, prec, rec, f1, all_labels, all_preds


def train_kfold(create_model_fn, train_loaders, val_loaders, k_folds=5, num_epochs=10, patience=3):
    all_fold_metrics = []
    histories = []

    for fold in range(k_folds):
        print(f"\n🚀 Training Fold {fold+1}/{k_folds}")
        model, device = create_model_fn()

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=1e-4)

        best_val_loss = np.inf
        patience_counter = 0
        history = {"train_loss": [], "val_loss": [], "val_acc": []}

        for epoch in range(num_epochs):
            print(f"\n📌 Fold {fold+1}, Epoch {epoch+1}/{num_epochs}")

            train_loss, train_acc = train_one_epoch(model, train_loaders[fold], criterion, optimizer, device)
            val_loss, val_acc, val_prec, val_rec, val_f1, y_true, y_pred = validate(model, val_loaders[fold], criterion, device)

            history["train_loss"].append(train_loss)
            history["val_loss"].append(val_loss)
            history["val_acc"].append(val_acc)

            print(f"   Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f}")
            print(f"   Val   Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, Prec: {val_prec:.4f}, Rec: {val_rec:.4f}, F1: {val_f1:.4f}")

            # Save best model per fold
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                torch.save(model.state_dict(), f"best_model_fold{fold+1}.pth")
                print("   ✅ Best model saved")
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print("   ⏹️ Early stopping triggered")
                    break

        # Confusion matrix
        cm = confusion_matrix(y_true, y_pred)
        disp = ConfusionMatrixDisplay(cm, display_labels=["Real", "AI"])
        disp.plot(cmap="Blues")
        plt.title(f"Confusion Matrix - Fold {fold+1}")
        plt.show()

        all_fold_metrics.append({"acc": val_acc, "prec": val_prec, "rec": val_rec, "f1": val_f1})
        histories.append(history)

    avg_metrics = {
        "acc": np.mean([m["acc"] for m in all_fold_metrics]),
        "prec": np.mean([m["prec"] for m in all_fold_metrics]),
        "rec": np.mean([m["rec"] for m in all_fold_metrics]),
        "f1": np.mean([m["f1"] for m in all_fold_metrics]),
    }

    return histories, avg_metrics


In [None]:
# Step 4: Run Training
fold_histories, avg_metrics = train_kfold(create_mobilevit, train_loaders, val_loaders, k_folds=5, num_epochs=10, patience=3)
print("\n✅ Training complete!")
print("Average Metrics:", avg_metrics)



🚀 Training Fold 1/5


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



📌 Fold 1, Epoch 1/10




   Train Loss: 0.6552, Acc: 0.6386
   Val   Loss: 0.6029, Acc: 0.7662, Prec: 0.7986, Rec: 0.7662, F1: 0.7761
   ✅ Best model saved

📌 Fold 1, Epoch 2/10




   Train Loss: 0.5014, Acc: 0.8024
   Val   Loss: 0.3821, Acc: 0.8738, Prec: 0.8762, Rec: 0.8738, F1: 0.8748
   ✅ Best model saved

📌 Fold 1, Epoch 3/10


Training:  14%|█▍        | 14/100 [04:43<29:14, 20.40s/it]