# ConvNeXt-Tiny Blur Training on ImageNet-100
This notebook implements a baseline ConvNeXt-Tiny model trained on ImageNet-100, focusing on modernizing the architecture to align with ViT principles while retaining the convolutional nature.

## Dataset: ImageNet-100
- 100 classes subset of ImageNet

- 224x224 input images

- Standard data augmentation

## Model: ConvNeXt-Tiny
- Architecture based on modernizing a ResNet structure towards Vision Transformer design

- ~28.6M parameters (specific to ConvNeXt-Tiny)

- Features include: large kernel sizes, use of Layer Normalization, and GELU activation

- Conventional training (standard image classification setup)


In [None]:
%pip install torchsummary torchvision tqdm

In [None]:
import math, os, time, copy, random
import gc
from pathlib import Path
from typing import Tuple

import torch
import torch.nn as nn
import torchvision
from torchvision import transforms, datasets, models
from torch.utils.data import random_split, DataLoader
from tqdm.auto import tqdm
from contextlib import nullcontext

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

SEED = 42
random.seed(SEED);  torch.manual_seed(SEED);  torch.cuda.manual_seed_all(SEED)


In [None]:
CFG = dict(
    num_epochs       = 100,          # keep it or change as you wish
    batch_size       = 128,          # tune to your GPU memory
    lr               = 4e-3,         # a good starting point for ImageNet-size data
    weight_decay     = 0.05,
    warmup_epochs    = 5,
    num_workers      = 8,            # ImageNet is stored on disk → use more workers
    image_size       = 224,
    blur_percent     = 0.2,
    amp              = True,
    pretrained       = False,
    ckpt_dir         = "./imagenet_blur_checkpoint",

    dataset_path     = "ImageNet100_224",
    num_classes      = 100
)
Path(CFG["ckpt_dir"]).mkdir(exist_ok=True)

CFG

In [None]:
def get_imagenet100_loaders(data_path, batch_size=128, num_workers=5):
    """Load ImageNet-100 dataset with standard augmentation"""
    
    # ImageNet normalization
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                   std=[0.229, 0.224, 0.225])
    
    # Training transforms with augmentation
    transform_train = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        normalize,
    ])
    
    # Validation transforms (no augmentation)
    transform_val = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        normalize,
    ])
    
    blur_transform_train = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)),  # adjust strength as desired
        transforms.ToTensor(),
        normalize,
    ])
    
    # Load datasets
    train_dir = Path(data_path) / 'train'
    val_dir = Path(data_path) / 'val'
    
    train_dataset_normal = datasets.ImageFolder(train_dir, transform=transform_train)
    train_dataset_blur = datasets.ImageFolder(train_dir, transform=blur_transform_train)
    val_dataset = datasets.ImageFolder(val_dir, transform=transform_val)
    
    # Create data loaders
    train_loader_normal = DataLoader(
        train_dataset_normal, 
        batch_size=batch_size, 
        shuffle=True,
        num_workers=num_workers, 
        pin_memory=True,
        drop_last=True
    )
    
    train_loader_blur = DataLoader(
        train_dataset_blur, 
        batch_size=batch_size, 
        shuffle=True,
        num_workers=num_workers, 
        pin_memory=True,
        drop_last=True
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size, 
        shuffle=False,
        num_workers=num_workers, 
        pin_memory=True
    )
    
    return train_loader_normal, train_loader_blur, val_loader, train_dataset_normal, train_dataset_blur, val_dataset

# Load the dataset
print("Loading ImageNet-100 dataset...")
train_loader_normal, train_loader_blur, val_loader, train_dataset_normal, train_dataset_blur, val_dataset = get_imagenet100_loaders(
    data_path=CFG['dataset_path'],
    batch_size=CFG['batch_size'],
    num_workers=CFG['num_workers']
)

print(f"Training samples: {len(train_dataset_normal)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Number of classes: {len(train_dataset_normal.classes)}")
print(f"Classes: {train_dataset_normal.classes[:10]}..." if len(train_dataset_normal.classes) > 10 else f"Classes: {train_dataset_normal.classes}")
print(f"Training batches: {len(train_loader_normal)}")
print(f"Validation batches: {len(val_loader)}")


In [None]:
if CFG["pretrained"]:
    weights = models.ConvNeXt_Tiny_Weights.IMAGENET1K_V1
else:
    weights = None

model = models.convnext_tiny(weights=weights)

# Replace the classifier to output 100 classes instead of 10
in_features = model.classifier[-1].in_features
model.classifier[-1] = nn.Linear(in_features, CFG["num_classes"])
model.to(device)

In [None]:
import math
from torch.optim.lr_scheduler import LinearLR, CosineAnnealingLR, SequentialLR

criterion  = nn.CrossEntropyLoss()
optimizer  = torch.optim.AdamW(model.parameters(),
                               lr=CFG["lr"], weight_decay=CFG["weight_decay"])


def build_warmup_cosine_scheduler(optimizer, steps_per_epoch, num_epochs,
                                  warmup_epochs=1, eta_min=1e-5, accum_steps=1):
    steps_per_epoch = math.ceil(steps_per_epoch / max(1, accum_steps))
    total_steps = num_epochs * steps_per_epoch
    warmup_steps = warmup_epochs * steps_per_epoch
    cosine_steps = max(1, total_steps - warmup_steps)

    scheds = []
    milestones = []

    if warmup_steps > 0:
        # start_factor cannot be 0 in some versions; use a tiny epsilon if needed.
        s1 = LinearLR(optimizer, start_factor=1e-8, end_factor=1.0, total_iters=warmup_steps)
        scheds.append(s1)
        milestones.append(warmup_steps)

    s2 = CosineAnnealingLR(optimizer, T_max=cosine_steps, eta_min=eta_min)
    scheds.append(s2)

    scheduler = SequentialLR(optimizer, schedulers=scheds, milestones=milestones or [0])
    return scheduler

scheduler = build_warmup_cosine_scheduler(
    optimizer,
    steps_per_epoch=len(train_loader_normal),
    num_epochs=CFG["num_epochs"],
    warmup_epochs=CFG["warmup_epochs"]
)
scaler     = torch.cuda.amp.GradScaler(enabled=CFG["amp"])

In [None]:
def accuracy(preds, targets, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        _, pred = preds.topk(maxk, dim=1, largest=True, sorted=True)
        pred   = pred.t()
        correct= pred.eq(targets.view(1, -1).expand_as(pred))
        return [correct[:k].reshape(-1).float().mean().item()*100. for k in topk]


def run_epoch(loader, model, optimizer=None, epoch:int=0, phase:str="train"):
    """
    If `optimizer` is given → training mode, otherwise evaluation mode.
    Memory-safe: no graph is kept when we don't need gradients.
    """
    train = optimizer is not None
    model.train(train)

    running_loss, running_acc = 0.0, 0.0
    steps = len(loader)

    bar = tqdm(loader, desc=f"{phase.title():>5} | Epoch {epoch:02}", leave=False)

    # Choose the right context managers
    grad_ctx = nullcontext() if train else torch.no_grad()
    amp_ctx  = torch.amp.autocast(device_type="cuda",
                                  dtype=torch.float16,
                                  enabled=CFG["amp"] and torch.cuda.is_available())

    with grad_ctx:
        for images, labels in bar:
            images, labels = images.to(device, non_blocking=True), labels.to(device)

            with amp_ctx:
                outputs = model(images)
                loss    = criterion(outputs, labels)

            if train:
                scaler.scale(loss).backward()
                scaler.step(optimizer); scaler.update()
                optimizer.zero_grad()
                scheduler.step()

            running_loss += loss.item()
            running_acc  += accuracy(outputs, labels)[0]
            bar.set_postfix(loss=f"{loss.item():.4f}")

    torch.cuda.empty_cache()     # free any leftover cached blocks
    return running_loss/steps, running_acc/steps

In [None]:
gc.collect() # These commands help you when you face CUDA OOM error
torch.cuda.empty_cache()

In [None]:
import os
checkpoint_dir = CFG['ckpt_dir']

print(checkpoint_dir)
print(os.path.exists(checkpoint_dir))

# Create the directory if it doesn't exist
os.makedirs(checkpoint_dir, exist_ok=True)

In [None]:
def save_model(model, optimizer, scheduler, metrics, epoch, path):
    torch.save(
        {'model_state_dict'         : model.state_dict(),
         'optimizer_state_dict'     : optimizer.state_dict(),
         'scheduler_state_dict'     : scheduler.state_dict() if scheduler is not None else '',
         'metric'                   : metrics,
         'epoch'                    : epoch},
         path)


def load_model(model, optimizer=None, scheduler=None, path=f"{CFG['ckpt_dir']}/current_epoch.pth"):
    checkpoint = torch.load(path, weights_only=False)
    model.load_state_dict(checkpoint['model_state_dict'])
    if optimizer is not None:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    else:
        optimizer = None
    if scheduler is not None:
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    else:
        scheduler = None
    epoch = checkpoint['epoch']
    metrics = checkpoint['metric']
    return model, optimizer, scheduler, epoch, metrics

In [None]:
best_val_acc = 0.0
patience = 16
epoches_no_improve = 0

history = {"train_loss": [], "train_acc": [],
           "val_loss": [],   "val_acc": []}

blur_epochs = int(CFG["num_epochs"] * CFG["blur_percent"])

for epoch in range(1, CFG["num_epochs"]+1):
    t0 = time.time()
    
    # pick which train loader to use this epoch
    use_blur = epoch <= blur_epochs
    loader_this_epoch = train_loader_blur if use_blur else train_loader_normal
    phase_name = f"train-{'blur' if use_blur else 'normal'}"

    tr_loss, tr_acc = run_epoch(loader_this_epoch, model, optimizer, epoch, phase_name)
    val_loss, val_acc= run_epoch(val_loader,   model, None,     epoch, "val")

    history["train_loss"].append(tr_loss); history["train_acc"].append(tr_acc)
    history["val_loss"].append(val_loss);   history["val_acc"].append(val_acc)

    if val_acc >= best_val_acc:
        epoches_no_improve = 0
        best_val_acc = val_acc
        
        metrics = {
            "train_loss": tr_loss,
            "train_acc": tr_acc,
            "val_loss": val_loss,
            "val_acc": val_loss,
        }
        save_model(model, optimizer, scheduler, metrics, epoch, f"{CFG['ckpt_dir']}/best_convnext_tiny.pth")
        print("Saved best val acc model")
    
    else:
        epoches_no_improve += 1
        
    try:
        os.replace(f"{CFG['ckpt_dir']}/current_epoch.pth", f"{CFG['ckpt_dir']}/last_epoch.pth")
        print("Saved last epoch model")
    except Exception as e:
        print(f"An unexpected error occurred when creating last.pth: {e}")
        
    save_model(model, optimizer, scheduler, metrics, epoch, f"{CFG['ckpt_dir']}/current_epoch.pth")
    print(f"Saved epoch {epoch} model")

    print(f"Epoch {epoch:02}/{CFG['num_epochs']} "
          f"| train loss {tr_loss:.4f} acc {tr_acc:.2f}% "
          f"| val loss {val_loss:.4f} acc {val_acc:.2f}% "
          f"| lr {scheduler.get_last_lr()[0]:.2e} "
          f"| time {(time.time()-t0):.1f}s")

    if epoches_no_improve >= patience:
        print("Early stopping")
        break

# Continue training

In [None]:
model, optimizer, scheduler, start_epoch, metrics = load_model(model, optimizer, scheduler, f"{CFG['ckpt_dir']}/best_convnext_tiny.pth")

In [None]:
optimizer, scheduler, metrics