In [None]:
# ==========================================================
#CIFAR-10 MixUp vs CutMix with AdamW
# ==========================================================

# ==========================================================
# Setup — configs, utils, model, MixUp/CutMix (no local imports)
# ==========================================================
import os, csv, random, numpy as np, torch
import torch.nn as nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import SequentialLR, LinearLR, CosineAnnealingLR
import torchvision
import torchvision.transforms as T
from torchvision.models import resnet18

# ---------- configs ----------
def _merge(a, b):
    out = dict(a)
    for k, v in b.items():
        if isinstance(v, dict) and isinstance(out.get(k), dict):
            out[k] = _merge(out[k], v)
        else:
            out[k] = v
    return out

BASE = {
    "seed": 42,
    "epochs": 100,
    "batch_size": 128,
    "num_workers": 2,  # set 2 for Colab/low-core systems to avoid worker warning
    "amp": True,
    "label_smoothing": 0.0,
    "optimizer": {"name": "adamw", "lr": 5e-4, "weight_decay": 0.02, "betas": (0.9, 0.999), "eps": 1e-8},
    "scheduler": {"warmup_epochs": 5},
    "model": {"name": "resnet18_cifar"},
    "data": {
        "root": "./data",
        "mean": (0.4914, 0.4822, 0.4465),
        "std":  (0.2470, 0.2435, 0.2616),
    },
    "augment_mode": "off",   # off | mixup | cutmix
    "mixup_alpha": 0.0,
    "cutmix_alpha": 0.0,
    "save_dir": "results/run",
}
MIXUP  = {"augment_mode": "mixup",  "mixup_alpha": 0.4, "save_dir": "results/mixup"}
CUTMIX = {"augment_mode": "cutmix", "cutmix_alpha": 1.0, "save_dir": "results/cutmix"}

PROFILES = {"base": BASE, "mixup": _merge(BASE, MIXUP), "cutmix": _merge(BASE, CUTMIX)}

def get_config(profile: str, overrides: dict | None = None):
    if profile not in PROFILES:
        raise ValueError(f"unknown profile: {profile}")
    cfg = dict(PROFILES[profile])
    if overrides:
        cfg = _merge(cfg, overrides)
    return cfg

# ---------- io + reproducibility ----------
def ensure_dir(d): os.makedirs(d, exist_ok=True)
def save_ckpt(state, path): torch.save(state, path)
def csv_logger(path):
    f = open(path, "w", newline="")
    w = csv.DictWriter(f, fieldnames=["epoch","train_loss","val_loss","val_acc"])
    w.writeheader()
    return f, w

def set_seed(seed: int):
    random.seed(seed); np.random.seed(seed)
    torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# ---------- optimizer + scheduler ----------
def build_optimizer(model, cfg):
    opt = cfg["optimizer"]
    if opt["name"].lower() != "adamw":
        raise ValueError("This setup uses AdamW for both MixUp and CutMix.")
    return AdamW(model.parameters(), lr=opt["lr"], weight_decay=opt["weight_decay"],
                 betas=opt.get("betas", (0.9,0.999)), eps=opt.get("eps", 1e-8))

def build_warmup_cosine(optimizer, steps_per_epoch: int, epochs: int, warmup_epochs: int):
    warmup_iters = max(1, warmup_epochs * steps_per_epoch)
    total_iters  = max(1, epochs * steps_per_epoch)
    cosine_iters = max(1, total_iters - warmup_iters)
    warmup = LinearLR(optimizer, start_factor=1e-3, end_factor=1.0, total_iters=warmup_iters)
    cosine = CosineAnnealingLR(optimizer, T_max=cosine_iters)
    return SequentialLR(optimizer, [warmup, cosine], milestones=[warmup_iters])

# ---------- model ----------
def build_model(name: str, num_classes=10):
    if name != "resnet18_cifar":
        raise ValueError(f"unsupported model: {name}")
    m = resnet18(weights=None, num_classes=num_classes)
    m.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
    m.maxpool = nn.Identity()
    return m

# ---------- MixUp / CutMix ----------
def mixup_data(x, y, alpha: float):
    lam = torch.distributions.Beta(alpha, alpha).sample().item() if alpha and alpha > 0 else 1.0
    idx = torch.randperm(x.size(0), device=x.device)
    return lam * x + (1 - lam) * x[idx], y, y[idx], lam

def mixup_criterion(crit, pred, y_a, y_b, lam):
    return lam * crit(pred, y_a) + (1 - lam) * crit(pred, y_b)

def _rand_bbox(W, H, lam):
    r = (1 - lam) ** 0.5
    cw, ch = int(W*r), int(H*r)
    cx, cy = np.random.randint(W), np.random.randint(H)
    x1, y1 = np.clip(cx - cw//2, 0, W), np.clip(cy - ch//2, 0, H)
    x2, y2 = np.clip(cx + cw//2, 0, W), np.clip(cy + ch//2, 0, H)
    return x1, y1, x2, y2

def cutmix_data(x, y, alpha: float):
    lam = torch.distributions.Beta(alpha, alpha).sample().item() if alpha and alpha > 0 else 1.0
    bs, _, H, W = x.size()
    idx = torch.randperm(bs, device=x.device)
    x1, y1, x2, y2 = _rand_bbox(W, H, lam)
    x[:, :, y1:y2, x1:x2] = x[idx, :, y1:y2, x1:x2]
    lam = 1 - ((x2 - x1) * (y2 - y1)) / (W * H)  # exact area ratio
    return x, y, y[idx], lam



In [None]:
# ==========================================================
# CIFAR10 MixUp vs CutMix + AdamW (Notebook Safe
# Wire everything together: data, model, optimizer/scheduler, augmentation, AMP, training loop, validation, and logging.
# Per-iteration stepping keeps LR in sync with updates; AMP speeds up on modern GPUs with low complexity.
# ==========================================================
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as T
from tqdm import tqdm

# ------------------ args (notebook + CLI compatible) ------------------
def parse_args(argv=None):
    ap = argparse.ArgumentParser()
    ap.add_argument("--profile", choices=["base","mixup","cutmix"],
                    default=os.environ.get("PROFILE","mixup"),
                    help="experiment profile; default='mixup' (override with PROFILE env var)")
    ap.add_argument("--save_dir", type=str, default=None)
    ap.add_argument("--epochs", type=int, default=None)
    ap.add_argument("--batch_size", type=int, default=None)
    ap.add_argument("--lr", type=float, default=None)
    ap.add_argument("--weight_decay", type=float, default=None)
    ap.add_argument("--mixup_alpha", type=float, default=None)
    ap.add_argument("--cutmix_alpha", type=float, default=None)
    if argv is None:  # ignore IPython/Colab argv
        argv = []
    return ap.parse_args(argv)

# ------------------ transforms ------------------
def build_transforms(mean, std):
    train_tfms = T.Compose([
        T.RandomCrop(32, padding=4),
        T.RandomHorizontalFlip(),
        T.ToTensor(),
        T.Normalize(mean, std)
    ])
    test_tfms = T.Compose([
        T.ToTensor(),
        T.Normalize(mean, std)
    ])
    return train_tfms, test_tfms

# ------------------ eval ------------------
@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    total = correct = 0
    loss_sum = 0.0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        logits = model(x)
        loss_sum += F.cross_entropy(logits, y).item() * y.size(0)
        pred = logits.argmax(1)
        correct += (pred == y).sum().item()
        total += y.size(0)
    return loss_sum / total, correct / total

# ------------------ train one epoch ------------------
def train_one_epoch(model, loader, optimizer, scaler, device, criterion,
                    scheduler, augment_mode, mixup_alpha, cutmix_alpha, use_amp):
    model.train()
    total = 0
    loss_sum = 0.0
    for x, y in tqdm(loader, leave=False, desc="train"):
        x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)

        if augment_mode == "mixup":
            x, ya, yb, lam = mixup_data(x, y, mixup_alpha)
            loss_fn = lambda logits: mixup_criterion(criterion, logits, ya, yb, lam)
        elif augment_mode == "cutmix":
            x, ya, yb, lam = cutmix_data(x, y, cutmix_alpha)
            loss_fn = lambda logits: mixup_criterion(criterion, logits, ya, yb, lam)
        else:
            loss_fn = lambda logits: criterion(logits, y)

        optimizer.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=use_amp):
            logits = model(x)
            loss = loss_fn(logits)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        if scheduler is not None:
            scheduler.step()

        bs = y.size(0)
        total += bs
        loss_sum += loss.item() * bs
    return loss_sum / total

# ------------------ main ------------------
def main():
    args = parse_args()

    # overrides from args
    overrides = {}
    if args.save_dir is not None: overrides["save_dir"] = args.save_dir
    if args.epochs is not None: overrides["epochs"] = args.epochs
    if args.batch_size is not None: overrides["batch_size"] = args.batch_size
    if args.lr is not None: overrides.setdefault("optimizer", {})["lr"] = args.lr
    if args.weight_decay is not None: overrides.setdefault("optimizer", {})["weight_decay"] = args.weight_decay
    if args.mixup_alpha is not None: overrides["mixup_alpha"] = args.mixup_alpha
    if args.cutmix_alpha is not None: overrides["cutmix_alpha"] = args.cutmix_alpha

    cfg = get_config(args.profile, overrides)
    set_seed(cfg["seed"])

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    ensure_dir(cfg["save_dir"])
    ckpt_dir = os.path.join(cfg["save_dir"], "ckpts")
    ensure_dir(ckpt_dir)
    log_f, writer = csv_logger(os.path.join(cfg["save_dir"], "metrics.csv"))

    mean, std = cfg["data"]["mean"], cfg["data"]["std"]
    train_tfms, test_tfms = build_transforms(mean, std)

    # CIFAR-10 datasets
    train_set = torchvision.datasets.CIFAR10(root=cfg["data"]["root"], train=True,  download=True, transform=train_tfms)
    test_set  = torchvision.datasets.CIFAR10(root=cfg["data"]["root"], train=False, download=True, transform=test_tfms)

    train_loader = DataLoader(train_set, batch_size=cfg["batch_size"], shuffle=True,
                              num_workers=cfg["num_workers"], pin_memory=True, drop_last=True)
    test_loader  = DataLoader(test_set,  batch_size=cfg["batch_size"], shuffle=False,
                              num_workers=cfg["num_workers"], pin_memory=True)

    # model / opt / loss / AMP / scheduler
    model = build_model(cfg["model"]["name"], num_classes=10).to(device)
    optimizer = build_optimizer(model, cfg)
    criterion = nn.CrossEntropyLoss(label_smoothing=cfg.get("label_smoothing", 0.0))
    scaler = torch.cuda.amp.GradScaler(enabled=cfg["amp"])

    steps_per_epoch = len(train_loader)
    scheduler = build_warmup_cosine(optimizer, steps_per_epoch, cfg["epochs"], cfg["scheduler"]["warmup_epochs"])

    augment_mode = cfg.get("augment_mode", "off")
    mixup_alpha  = cfg.get("mixup_alpha", 0.0)
    cutmix_alpha = cfg.get("cutmix_alpha", 0.0)

    best_acc = 0.0
    for epoch in range(cfg["epochs"]):
        tr_loss = train_one_epoch(model, train_loader, optimizer, scaler, device, criterion, scheduler,
                                  augment_mode, mixup_alpha, cutmix_alpha, cfg["amp"])
        va_loss, va_acc = evaluate(model, test_loader, device)
        writer.writerow({"epoch": epoch, "train_loss": tr_loss, "val_loss": va_loss, "val_acc": va_acc})
        if va_acc > best_acc:
            best_acc = va_acc
            save_ckpt({"epoch": epoch, "model": model.state_dict(), "best_acc": best_acc},
                      os.path.join(ckpt_dir, "best.pth"))
        print(f"epoch {epoch+1}/{cfg['epochs']} | train {tr_loss:.4f} | val {va_loss:.4f} | acc {va_acc*100:.2f}%")

    log_f.close()

# entry
if __name__ == "__main__":
    main()


In [None]:
# ==========================================================
# eval.py — Summarize MixUp vs CutMix results (Notebook Table Output)
# ==========================================================
import os, glob, pandas as pd

def load_runs(root):
    runs = []
    for csv_path in glob.glob(os.path.join(root, "**", "metrics.csv"), recursive=True):
        try:
            df = pd.read_csv(csv_path)
        except Exception:
            continue
        if len(df) == 0:
            continue
        runs.append({
            "run": os.path.dirname(csv_path),
            "final_acc": float(df["val_acc"].iloc[-1])
        })
    return pd.DataFrame(runs)

def summarize(roots):
    parts = []
    for r in roots:
        tag = os.path.basename(r.rstrip("/"))
        df = load_runs(r)
        if df.empty:
            print(f"no runs found under {r}")
            continue
        df["tag"] = tag
        parts.append(df)

    if not parts:
        print("no runs to summarize")
        return None

    out = pd.concat(parts, ignore_index=True)
    summary = out.groupby("tag")["final_acc"].agg(["mean", "std", "count"]).reset_index()
    display(summary.style.set_caption("MixUp vs CutMix Summary").format({"mean": "{:.4f}", "std": "{:.4f}"}))
    return summary


In [None]:
summarize(["results/mixup", "results/cutmix"])
