Kel - Data import and dataloader

In [None]:
import kagglehub

# Download latest version
directory = kagglehub.dataset_download("tarunpathak/natural-images-with-synthetic-noise")

print("Path to dataset files:", directory)

In [None]:
import shutil
import os

# Copys kaggle dataset to content directory if it doesn't exist already.

new_directory = 'nids'

if not os.path.exists(new_directory):
    shutil.copytree(directory, new_directory)
else:
    print("The directory already exists.")

In [None]:
"""
  Helper function for prefix removal
"""
def remove_noise_prefix(fn):
    prefixes = ["gauss_", "poisson_", "salt and pepper_", "speckle_"]

    for prefix in prefixes:
      if fn.startswith(prefix):
        return fn.split("_", 1)[1]

    return fn

"""
  Removes all prefixes for the NISN Dataset.
"""
def remove_nids_prefix(dir):
  new_fn = ""
  for fn in os.listdir(dir):
    old_fn_dir = os.path.join(dir, fn)
    new_fn = remove_noise_prefix(fn)
    new_fn_dir = os.path.join(dir, new_fn)

    if old_fn_dir != new_fn_dir:
          os.rename(old_fn_dir, new_fn_dir)
    else:
          print(f'Skipping file: {old_fn_dir} as it either the prefix has already been removed or it does not have a prefix.')

remove_nids_prefix("nids/test/test/noisy images")
remove_nids_prefix("nids/train/train/noisy images")
remove_nids_prefix("nids/validate/validate/noisy images")

In [None]:
#  article dependencies
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as Datasets
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
import cv2
from tqdm.notebook import tqdm
from tqdm import tqdm as tqdm_regular
import seaborn as sns
from torchvision.utils import make_grid
import random
import os
from PIL import Image

In [None]:
#  configuring device
if torch.cuda.is_available():
  device = torch.device('cuda:0')
  print('Running on the GPU')
elif torch.backends.mps.is_available():
  device = torch.device('mps')
  print('Running on Metal')
else:
  device = torch.device('cpu')
  print('Running on the CPU')

In [None]:
"""
Creates Dataset class for Natural Image Data Set.
"""
class NaturalImageDataSet(Dataset):

    def __init__(self, dir, transform=None):
        self.noisy_dir = os.path.join(dir, "noisy images")
        self.clean_dir = os.path.join(dir, "ground truth")
        self.transform = transform

        self.images = os.listdir(self.noisy_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        imagefn = self.images[index]
        noisy_image_fp = os.path.join(self.noisy_dir, imagefn)
        clean_image_fp = os.path.join(self.clean_dir, imagefn)

        #Make sure file exists. Otherwise, return an error.
        try:
            noisy_image = Image.open(noisy_image_fp).convert("RGB") #Converts to grayscale.
            clean_image = Image.open(clean_image_fp).convert("RGB")
        except FileNotFoundError:
            print (f'File Not Found: Unable to load matching ground truth image for the following image: {noisy_image_fp}')
            print (f'Missing ground truth image: {clean_image_fp}')
            return None, None

        if self.transform: #Apply transforms here.
            noisy_image = self.transform(noisy_image)
            clean_image = self.transform(clean_image)

        return noisy_image, clean_image

In [None]:
transform = transforms.Compose([
    transforms.Resize(32), # TODO: Placeholder to fit in the dataset. Will remove/change this.
    # Any other possible transformations to do here?
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5)
    # Interesting note: You can add other data transforms like flipLR, FlipUpDown, Stretches, Shears, crops, etc
    # For data augmentation
])


training_data = NaturalImageDataSet("./nids/train/train", transform=transform)
training_NIDS_loader = DataLoader(training_data, batch_size=4, shuffle=True, num_workers=4)
#May or may not need to be included, since the base CAE already has dataloaders.

test_data = NaturalImageDataSet("./nids/test/test", transform=transform)
test_NIDS_loader = DataLoader(test_data, batch_size=4, shuffle=False, num_workers=4)

validation_data = NaturalImageDataSet("./nids/validate/validate", transform=transform)
validate_NIDS_loader = DataLoader(validation_data, batch_size=4, shuffle=False, num_workers=4)

In [None]:
vars(training_NIDS_loader.dataset)

In [None]:
for noisy, clean in training_NIDS_loader:
  print(noisy.shape)
  print(clean.shape)
  break

Mine - Architecture initialization

In [None]:
#  defining encoder
class Encoder(nn.Module):
  def __init__(self, in_channels=3, out_channels=16, latent_dim=1000, act_fn=nn.ReLU()):
    super().__init__()
    self.in_channels = in_channels

    self.net = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1), # (32, 32)
        act_fn,
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        act_fn,
        nn.Conv2d(out_channels, 2*out_channels, 3, padding=1, stride=2), # (16, 16)
        act_fn,
        nn.Conv2d(2*out_channels, 2*out_channels, 3, padding=1),
        act_fn,
        nn.Conv2d(2*out_channels, 4*out_channels, 3, padding=1, stride=2), # (8, 8)
        act_fn,
        nn.Conv2d(4*out_channels, 4*out_channels, 3, padding=1),
        act_fn,
        nn.Flatten(),
        nn.Linear(4*out_channels*8*8, latent_dim),
        act_fn
    )

  def forward(self, x):
    x = x.view(-1, self.in_channels, 32, 32)
    output = self.net(x)
    return output


#  defining decoder
class Decoder(nn.Module):
  def __init__(self, in_channels=3, out_channels=16, latent_dim=1000, act_fn=nn.ReLU()):
    super().__init__()

    self.out_channels = out_channels

    self.linear = nn.Sequential(
        nn.Linear(latent_dim, 4*out_channels*8*8),
        act_fn
    )

    self.conv = nn.Sequential(
        nn.ConvTranspose2d(4*out_channels, 4*out_channels, 3, padding=1), # (8, 8)
        act_fn,
        nn.ConvTranspose2d(4*out_channels, 2*out_channels, 3, padding=1,
                           stride=2, output_padding=1), # (16, 16)
        act_fn,
        nn.ConvTranspose2d(2*out_channels, 2*out_channels, 3, padding=1),
        act_fn,
        nn.ConvTranspose2d(2*out_channels, out_channels, 3, padding=1,
                           stride=2, output_padding=1), # (32, 32)
        act_fn,
        nn.ConvTranspose2d(out_channels, out_channels, 3, padding=1),
        act_fn,
        nn.ConvTranspose2d(out_channels, in_channels, 3, padding=1)
    )

  def forward(self, x):
    output = self.linear(x)
    output = output.view(-1, 4*self.out_channels, 8, 8)
    output = self.conv(output)
    return output


#  defining autoencoder
class Autoencoder(nn.Module):
  def __init__(self, encoder, decoder):
    super().__init__()
    self.encoder = encoder
    self.encoder.to(device)

    self.decoder = decoder
    self.decoder.to(device)

  def forward(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

Yousef - Hyperparams and training loop

In [None]:
from __future__ import annotations
import math
import os
from dataclasses import dataclass, asdict
from typing import Optional, Tuple, Dict, Any

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import Optimizer
from torch.optim.lr_scheduler import _LRScheduler, CosineAnnealingLR, SequentialLR, LinearLR

Util:

In [None]:
def seed_all(seed: int = 42):
    import random
    import numpy as np
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def psnr_from_mse(mse: torch.Tensor) -> torch.Tensor:
    """PSNR in dB assuming targets in [0,1]"""
    return 10.0 * torch.log10(1.0 / (mse + 1e-12))


class AverageMeter:
    """Tracks running average of a scalar."""
    def __init__(self):
        self.reset()
    def reset(self):
        self.sum = 0.0
        self.count = 0
    def update(self, val: float, n: int = 1):
        self.sum += float(val) * n
        self.count += n
    @property
    def avg(self) -> float:
        return self.sum / max(1, self.count)

Hyperparamtere:

In [None]:
@dataclass
class TrainConfig:
    # Core
    epochs: int = 20
    batch_size: int = 128  # TODO: reduce once we modify architecture
    lr: float = 1e-3
    weight_decay: float = 0.0
    optimizer: str = "adamw"  # one of {"adam", "adamw", "sgd"}

    # Scheduler
    scheduler: str = "cosine"  # or "none"
    warmup_epochs: int = 0

    # Optimization niceties
    amp: bool = True  # automatic mixed precision
    grad_clip_norm: float = 1.0  # 0 or None to disable
    accum_steps: int = 1  # gradient accumulation steps

    # Repro / IO
    seed: int = 42
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    out_dir: str = "./outputs"
    ckpt_last: str = "last.pt"
    ckpt_best: str = "best.pt"
    resume: Optional[bool] = False

    # Loss
    loss: str = "mse"  # {"mse", "l1", "charbonnier"}
    charbonnier_eps: float = 1e-3



Factory Function:

In [None]:
def create_optimizer(model: nn.Module, cfg: TrainConfig) -> Optimizer:
    params = [p for p in model.parameters() if p.requires_grad]
    if cfg.optimizer.lower() == "adam":
        return torch.optim.Adam(params, lr=cfg.lr, weight_decay=cfg.weight_decay)
    if cfg.optimizer.lower() == "adamw":
        return torch.optim.AdamW(params, lr=cfg.lr, weight_decay=cfg.weight_decay)
    if cfg.optimizer.lower() == "sgd":
        return torch.optim.SGD(params, lr=cfg.lr, weight_decay=cfg.weight_decay, momentum=0.9, nesterov=True)
    raise ValueError("Unsupported optimizer: %s" % cfg.optimizer)


def create_scheduler(opt: Optimizer, cfg: TrainConfig, steps_per_epoch: int) -> Optional[_LRScheduler]:
    if cfg.scheduler == "none":
        return None
    if cfg.scheduler == "cosine":
        total_epochs = cfg.epochs
        if cfg.warmup_epochs > 0:
            warmup = LinearLR(opt, start_factor=0.01, end_factor=1.0, total_iters=cfg.warmup_epochs * steps_per_epoch)
            cosine = CosineAnnealingLR(opt, T_max=max(1, (total_epochs - cfg.warmup_epochs) * steps_per_epoch))
            return SequentialLR(opt, schedulers=[warmup, cosine], milestones=[cfg.warmup_epochs * steps_per_epoch])
        else:
            return CosineAnnealingLR(opt, T_max=max(1, total_epochs * steps_per_epoch))
    raise ValueError("Unsupported scheduler: %s" % cfg.scheduler)


def create_loss(cfg: TrainConfig):
    if cfg.loss == "mse":
        return nn.MSELoss(reduction='mean')
    if cfg.loss == "l1":
        return nn.L1Loss(reduction='mean')
    if cfg.loss == "charbonnier":
        class Charbonnier(nn.Module):
            def __init__(self, eps: float = 1e-3):
                super().__init__()
                self.eps = eps
            def forward(self, pred, target):
                diff = pred - target
                return torch.mean(torch.sqrt(diff * diff + self.eps * self.eps))
        return Charbonnier(cfg.charbonnier_eps)
    raise ValueError("Unsupported loss: %s" % cfg.loss)

For every epoch:
    train
    validate

test

Train Loop:

In [None]:
def train_one_epoch(
        model: nn.Module,
        train_loader: DataLoader,
        # val_loader: DataLoader,
        optimizer: Optimizer,
        device: str,
        loss_fn: nn.Module,
        scaler: Optional[torch.cuda.amp.GradScaler],
        cfg: TrainConfig
) -> Dict[str, float]:
    model.train()
    optimizer.zero_grad()

    # Training logs
    train_loss_meter = AverageMeter()
    train_psnr_meter = AverageMeter()

    # # Val logs
    # val_loss_meter = AverageMeter()
    # val_psnr_meter = AverageMeter()

    for step, (noisy, clean) in enumerate(train_loader, start=1):
        noisy = noisy.to(device, non_blocking=True)
        clean = clean.to(device, non_blocking=True)

        # with torch.cuda.amp.autocast(enabled=cfg.amp):
        #     recon = model(noisy)
        #     loss = loss_fn(recon, clean) / max(1, cfg.accum_steps)

        # if scaler is not None and cfg.amp:
        #     scaler.scale(loss).backward()
        # else:
        #     loss.backward()

        # if step % cfg.accum_steps == 0:
        #     if cfg.grad_clip_norm and cfg.grad_clip_norm > 0:
        #         if scaler is not None and cfg.amp:
        #             scaler.unscale_(optimizer)
        #         torch.nn.utils.clip_grad_norm_(model.parameters(), cfg.grad_clip_norm)

        #     if scaler is not None and cfg.amp:
        #         scaler.step(optimizer)
        #         scaler.update()
        #     else:
        #         optimizer.step()
        #     optimizer.zero_grad(set_to_none=True)

        # Forward pass
        recon = model(noisy)

        # Loss calculation
        loss = loss_fn(recon, clean)

        # Gradient calculation
        loss.backward()

        # Optimizer step
        optimizer.step()

        # Metrics
        with torch.no_grad():
            batch_mse = F.mse_loss(recon, clean, reduction='none').view(clean.size(0), -1).mean(dim=1)
            batch_psnr = psnr_from_mse(batch_mse).mean().item()
            train_loss_meter.update(loss.item() * max(1, cfg.accum_steps), n=clean.size(0))
            train_psnr_meter.update(batch_psnr, n=clean.size(0))

    # with torch.no_grad():
    #     for step, (noisy, clean) in enumerate(val_loader, start=1):
    #         noisy = noisy.to(device, non_blocking=True)
    #         clean = clean.to(device, non_blocking=True)

    #         recon = model(noisy)

    #         batch_mse = F.mse_loss(recon, clean, reduction='none').view(clean.size(0), -1).mean(dim=1)
    #         batch_psnr = psnr_from_mse(batch_mse).mean().item()
    #         val_loss_meter.update(loss.item() * max(1, cfg.accum_steps), n=clean.size(0))
    #         val_psnr_meter.update(batch_psnr, n=clean.size(0))


    return {
        "loss": train_loss_meter.avg,
        "psnr": train_psnr_meter.avg,
        # "val_loss": val_loss_meter.avg,
        # "val_psnr": val_psnr_meter.avg
    }


def evaluate(model: nn.Module,
             test_loader: DataLoader,
             device: str,
             loss_fn: nn.Module) -> Dict[str, float]:
    model.eval()
    loss_meter = AverageMeter()
    psnr_meter = AverageMeter()
    with torch.no_grad():
        for noisy, clean in test_loader:
            noisy = noisy.to(device, non_blocking=True)
            clean = clean.to(device, non_blocking=True)
            recon = model(noisy)
            loss = loss_fn(recon, clean)
            batch_mse = F.mse_loss(recon, clean, reduction='none').view(clean.size(0), -1).mean(dim=1)
            batch_psnr = psnr_from_mse(batch_mse).mean().item()
            loss_meter.update(loss.item(), n=clean.size(0))
            psnr_meter.update(batch_psnr, n=clean.size(0))
    return {"loss": loss_meter.avg, "psnr": psnr_meter.avg}

Checks:

In [None]:
def save_checkpoint(
    path: str,
    model: nn.Module,
    scheduler: Optional[_LRScheduler],
    scaler: Optional[torch.cuda.amp.GradScaler],
    epoch: int,
    cfg: TrainConfig,
    best_val: float
):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    torch.save({
        "model": model.state_dict(),
        "scheduler": None if scheduler is None else scheduler.state_dict(),
        "scaler": None if scaler is None else scaler.state_dict(),
        "epoch": epoch,
        "cfg": asdict(cfg),
        "best_val": best_val,
    }, path)


def load_checkpoint(
    path: str,
    model: nn.Module,
    scheduler: Optional[_LRScheduler] = None,
    scaler: Optional[torch.cuda.amp.GradScaler] = None,
    map_location: Optional[str] = None
) -> Tuple[int, float, TrainConfig]:
    ckpt = torch.load(path, map_location=map_location)
    model.load_state_dict(ckpt["model"])
    # if optimizer is not None and ckpt.get("optimizer") is not None:
    #     optimizer.load_state_dict(ckpt["optimizer"])
    if scheduler is not None and ckpt.get("scheduler") is not None:
        scheduler.load_state_dict(ckpt["scheduler"])
    if scaler is not None and ckpt.get("scaler") is not None:
        scaler.load_state_dict(ckpt["scaler"])
    start_epoch = int(ckpt.get("epoch", 0)) + 1
    best_val = float(ckpt.get("best_val", float("inf")))
    cfg_dict = ckpt.get("cfg", {})
    cfg = TrainConfig(**cfg_dict) if cfg_dict else TrainConfig()
    return start_epoch, best_val, cfg

Fit Loop:

In [None]:
def fit(model: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader,
        cfg: TrainConfig) -> Dict[str, Any]:
    seed_all(cfg.seed)
    device = cfg.device
    model.to(device)

    optimizer = create_optimizer(model, cfg)
    steps_per_epoch = max(1, math.ceil(len(train_loader.dataset) / (cfg.batch_size)))
    scheduler = create_scheduler(optimizer, cfg, steps_per_epoch)
    loss_fn = create_loss(cfg)

    scaler = torch.cuda.amp.GradScaler(enabled=cfg.amp)

    os.makedirs(cfg.out_dir, exist_ok=True)
    last_path = os.path.join(cfg.out_dir, cfg.ckpt_last)
    best_path = os.path.join(cfg.out_dir, cfg.ckpt_best)

    start_epoch = 1
    best_val_loss = float("inf")

    if cfg.resume and os.path.isfile(cfg.resume):
        start_epoch, best_val_loss, _ = load_checkpoint(cfg.resume, model, scheduler, scaler, map_location=device)
        print(f"[Resume] Starting from epoch {start_epoch}, best_val_loss={best_val_loss:.6f}")

    log_dict = {}

    for epoch in range(start_epoch, cfg.epochs + 1):
        # Train
        train_stats = train_one_epoch(model, train_loader, optimizer, device, loss_fn, scaler, cfg)
        if scheduler is not None:
            # Step per-iteration schedulers are already stepped inside; ours is per-iteration via SequentialLR/LinearLR
            # For simplicity, step here once per epoch when using epoch-based T_max.
            try:
                scheduler.step()
            except TypeError:
                # Some schedulers require step() every iteration; we chose epoch-wise to keep it simple.
                pass

        # Validate
        val_stats = evaluate(model, val_loader, device, loss_fn)

        # Logging
        print(f"Epoch {epoch:03d}/{cfg.epochs} | "
              f"Train loss {train_stats['loss']:.6f} PSNR {train_stats['psnr']:.2f} dB | "
              f"Val loss {val_stats['loss']:.6f} PSNR {val_stats['psnr']:.2f} dB | "
              f"LR {optimizer.param_groups[0]['lr']:.6g}")
        log_dict[epoch] = {**train_stats, **val_stats}

        # Checkpoints
        save_checkpoint(last_path, model, scheduler, scaler, epoch, cfg, best_val_loss)
        if val_stats["loss"] < best_val_loss:
            best_val_loss = val_stats["loss"]
            save_checkpoint(best_path, model, scheduler, scaler, epoch, cfg, best_val_loss)
            print(f"[Best] Val loss improved to {best_val_loss:.6f} -> saved {best_path}")

    print("Training complete.")
    return {"best_val_loss": best_val_loss}, log_dict


Driver Code

In [None]:
model = Autoencoder(Encoder(in_channels=3), Decoder(in_channels=3))
cfg = TrainConfig(
    epochs=5,
    batch_size=4,
    optimizer="Adam",
    device="cpu",
    out_dir="./outputs",
)

In [None]:
best_loss, log_dict = fit(model, training_NIDS_loader, validate_NIDS_loader, cfg)

Krishna - Metrics log and test code