# Low-Light Image Denoising + 4x Super-Resolution (Kaggle-Ready)

In [9]:
# Environment check and GPU
import os, sys, platform, torch
print('Python:', sys.version.split()[0])
print('Torch:', torch.__version__)
print('CUDA available:', torch.cuda.is_available())
if torch.cuda.is_available():
    print('GPU count:', torch.cuda.device_count())
    print('GPU name:', torch.cuda.get_device_name(0))
print('Platform:', platform.platform())
# Workspace-local data roots
WORK_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__) if '__file__' in globals() else os.getcwd()))
WEEK12_DIR = os.path.dirname(WORK_DIR)
print('WORK_DIR:', WORK_DIR)
print('WEEK12_DIR:', WEEK12_DIR)

Python: 3.7.16
Torch: 1.1.0
CUDA available: False
Platform: Linux-6.8.0-60-generic-x86_64-with-debian-trixie-sid
WORK_DIR: /home/sachin/projects/DLP/deep-learning-practices/week-12
WEEK12_DIR: /home/sachin/projects/DLP/deep-learning-practices


In [10]:
# Installs (Kaggle usually has these; safe to re-run if missing)
%pip -q install --no-warn-script-location yacs natsort tqdm opencv-python Pillow

Note: you may need to restart the kernel to use updated packages.


## Utilities: PSNR and common helpers

In [11]:
import torch, numpy as np, cv2, random
import PIL
# Compatibility for older torchvision that imports PILLOW_VERSION
if not hasattr(PIL, 'PILLOW_VERSION'):
    PIL.PILLOW_VERSION = getattr(PIL, '__version__', '0.0.0')
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as TF
from tqdm import tqdm

def set_seed(seed=1234):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def torch_psnr(tar_img, prd_img):
    imdff = torch.clamp(prd_img,0,1) - torch.clamp(tar_img,0,1)
    rmse = (imdff**2).mean().sqrt()
    ps = 20*torch.log10(torch.tensor(1.0, device=rmse.device)/rmse)
    return ps

def save_rgb(path, img_tensor):
    img = torch.clamp(img_tensor, 0, 1).permute(1,2,0).cpu().numpy()
    img = (img*255.0+0.5).astype(np.uint8)
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    cv2.imwrite(path, img)

def is_image_file(filename):
    filename = filename.lower()
    return any(filename.endswith(ext) for ext in ['.jpeg','.jpg','.png','.gif','.bmp','.tif','.tiff'])

In [12]:
# Denoising hyperparams
from types import SimpleNamespace
denoise_cfg = SimpleNamespace(
    batch_size=4, num_epochs=1, lr=2e-4, lr_min=1e-6, train_ps=128, val_ps=256,
    num_workers=2, seed=1234, session='mprnet_denoise'
)
# SR hyperparams (not used in this run)
sr_cfg = SimpleNamespace(
    scale=4, batch_size=4, num_epochs=1, lr=2e-4,
    num_workers=2, seed=1234, session='edsr_small_x4'
)

In [13]:
# Local dataset paths under week-12 (using archive layout)
DENOISE_ARCHIVE_ROOT = os.path.join(WEEK12_DIR, 'week-12', 'archive')
DENOISE_TRAIN_DIR = os.path.join(DENOISE_ARCHIVE_ROOT, 'train')  # inputs under 'train/', targets under 'gt/'
DENOISE_VAL_DIR   = os.path.join(DENOISE_ARCHIVE_ROOT, 'val')    # inputs under 'val/', targets under 'gt/'
DENOISE_TEST_DIR  = os.path.join(DENOISE_ARCHIVE_ROOT, 'test')
# SR placeholders (unused if not provided)
SR_DATA_ROOT      = os.path.join(WEEK12_DIR, 'week-12', 'sr')
SR_TRAIN_LR_DIR   = os.path.join(SR_DATA_ROOT, 'train', 'lr')
SR_TRAIN_HR_DIR   = os.path.join(SR_DATA_ROOT, 'train', 'hr')
SR_VAL_LR_DIR     = os.path.join(SR_DATA_ROOT, 'val', 'lr')
SR_VAL_HR_DIR     = os.path.join(SR_DATA_ROOT, 'val', 'hr')
SR_TEST_DIR       = os.path.join(SR_DATA_ROOT, 'test')
SUBMISSION_DIR    = os.path.join(WEEK12_DIR, 'week-12', 'submission')
os.makedirs(SUBMISSION_DIR, exist_ok=True)
print('DENOISE_TRAIN_DIR =', DENOISE_TRAIN_DIR)
print('DENOISE_VAL_DIR   =', DENOISE_VAL_DIR)
print('DENOISE_TEST_DIR  =', DENOISE_TEST_DIR)
print('SUBMISSION_DIR    =', SUBMISSION_DIR)

DENOISE_TRAIN_DIR = /home/sachin/projects/DLP/deep-learning-practices/week-12/archive/train
DENOISE_VAL_DIR   = /home/sachin/projects/DLP/deep-learning-practices/week-12/archive/val
DENOISE_TEST_DIR  = /home/sachin/projects/DLP/deep-learning-practices/week-12/archive/test
SUBMISSION_DIR    = /home/sachin/projects/DLP/deep-learning-practices/week-12/submission


## Denoising model: MPRNet

In [14]:
import torch.nn as nn
import torch.nn.functional as F

def conv(in_channels, out_channels, kernel_size, bias=False, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size, padding=(kernel_size//2), bias=bias, stride=stride)

class CALayer(nn.Module):
    def __init__(self, channel, reduction=16, bias=False):
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.conv_du = nn.Sequential(
            nn.Conv2d(channel, channel // reduction, 1, bias=bias),
            nn.ReLU(inplace=True),
            nn.Conv2d(channel // reduction, channel, 1, bias=bias),
            nn.Sigmoid()
        )
    def forward(self, x):
        y = self.avg_pool(x)
        y = self.conv_du(y)
        return x * y

class CAB(nn.Module):
    def __init__(self, n_feat, kernel_size, reduction, bias, act):
        super().__init__()
        self.body = nn.Sequential(
            conv(n_feat, n_feat, kernel_size, bias=bias),
            act,
            conv(n_feat, n_feat, kernel_size, bias=bias)
        )
        self.CA = CALayer(n_feat, reduction, bias=bias)
    def forward(self, x):
        res = self.body(x)
        res = self.CA(res)
        return res + x

class SAM(nn.Module):
    def __init__(self, n_feat, kernel_size, bias):
        super().__init__()
        self.conv1 = conv(n_feat, n_feat, kernel_size, bias=bias)
        self.conv2 = conv(n_feat, 3, kernel_size, bias=bias)
        self.conv3 = conv(3, n_feat, kernel_size, bias=bias)
    def forward(self, x, x_img):
        x1 = self.conv1(x)
        img = self.conv2(x) + x_img
        x2 = torch.sigmoid(self.conv3(img))
        x1 = x1 * x2
        x1 = x1 + x
        return x1, img

class DownSample(nn.Module):
    def __init__(self, in_channels, s_factor):
        super().__init__()
        self.down = nn.Sequential(nn.Upsample(scale_factor=0.5, mode='bilinear', align_corners=False),
                                nn.Conv2d(in_channels, in_channels + s_factor, 1, bias=False))
    def forward(self, x):
        return self.down(x)

class UpSample(nn.Module):
    def __init__(self, in_channels, s_factor):
        super().__init__()
        self.up = nn.Sequential(nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
                              nn.Conv2d(in_channels + s_factor, in_channels, 1, bias=False))
    def forward(self, x):
        return self.up(x)

class SkipUpSample(nn.Module):
    def __init__(self, in_channels, s_factor):
        super().__init__()
        self.up = nn.Sequential(nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
                              nn.Conv2d(in_channels + s_factor, in_channels, 1, bias=False))
    def forward(self, x, y):
        x = self.up(x)
        return x + y

class Encoder(nn.Module):
    def __init__(self, n_feat, kernel_size, reduction, act, bias, scale_unetfeats, csff):
        super().__init__()
        self.encoder_level1 = nn.Sequential(*[CAB(n_feat, kernel_size, reduction, bias=bias, act=act) for _ in range(2)])
        self.encoder_level2 = nn.Sequential(*[CAB(n_feat+scale_unetfeats, kernel_size, reduction, bias=bias, act=act) for _ in range(2)])
        self.encoder_level3 = nn.Sequential(*[CAB(n_feat+2*scale_unetfeats, kernel_size, reduction, bias=bias, act=act) for _ in range(2)])
        self.down12  = DownSample(n_feat, scale_unetfeats)
        self.down23  = DownSample(n_feat+scale_unetfeats, scale_unetfeats)
        self.csff = csff
        if csff:
            self.csff_enc1 = nn.Conv2d(n_feat, n_feat, 1, bias=bias)
            self.csff_enc2 = nn.Conv2d(n_feat+scale_unetfeats, n_feat+scale_unetfeats, 1, bias=bias)
            self.csff_enc3 = nn.Conv2d(n_feat+2*scale_unetfeats, n_feat+2*scale_unetfeats, 1, bias=bias)
            self.csff_dec1 = nn.Conv2d(n_feat, n_feat, 1, bias=bias)
            self.csff_dec2 = nn.Conv2d(n_feat+scale_unetfeats, n_feat+scale_unetfeats, 1, bias=bias)
            self.csff_dec3 = nn.Conv2d(n_feat+2*scale_unetfeats, n_feat+2*scale_unetfeats, 1, bias=bias)
    def forward(self, x, encoder_outs=None, decoder_outs=None):
        enc1 = self.encoder_level1(x)
        if self.csff and (encoder_outs is not None) and (decoder_outs is not None):
            enc1 = enc1 + self.csff_enc1(encoder_outs[0]) + self.csff_dec1(decoder_outs[0])
        x = self.down12(enc1)
        enc2 = self.encoder_level2(x)
        if self.csff and (encoder_outs is not None) and (decoder_outs is not None):
            enc2 = enc2 + self.csff_enc2(encoder_outs[1]) + self.csff_dec2(decoder_outs[1])
        x = self.down23(enc2)
        enc3 = self.encoder_level3(x)
        if self.csff and (encoder_outs is not None) and (decoder_outs is not None):
            enc3 = enc3 + self.csff_enc3(encoder_outs[2]) + self.csff_dec3(decoder_outs[2])
        return [enc1, enc2, enc3]

class Decoder(nn.Module):
    def __init__(self, n_feat, kernel_size, reduction, act, bias, scale_unetfeats):
        super().__init__()
        self.decoder_level1 = nn.Sequential(*[CAB(n_feat, kernel_size, reduction, bias=bias, act=act) for _ in range(2)])
        self.decoder_level2 = nn.Sequential(*[CAB(n_feat+scale_unetfeats, kernel_size, reduction, bias=bias, act=act) for _ in range(2)])
        self.decoder_level3 = nn.Sequential(*[CAB(n_feat+2*scale_unetfeats, kernel_size, reduction, bias=bias, act=act) for _ in range(2)])
        self.skip_attn1 = CAB(n_feat, kernel_size, reduction, bias=bias, act=act)
        self.skip_attn2 = CAB(n_feat+scale_unetfeats, kernel_size, reduction, bias=bias, act=act)
        self.up21  = SkipUpSample(n_feat, scale_unetfeats)
        self.up32  = SkipUpSample(n_feat+scale_unetfeats, scale_unetfeats)
    def forward(self, outs):
        enc1, enc2, enc3 = outs
        dec3 = self.decoder_level3(enc3)
        x = self.up32(dec3, self.skip_attn2(enc2))
        dec2 = self.decoder_level2(x)
        x = self.up21(dec2, self.skip_attn1(enc1))
        dec1 = self.decoder_level1(x)
        return [dec1, dec2, dec3]

class ORB(nn.Module):
    def __init__(self, n_feat, kernel_size, reduction, act, bias, num_cab):
        super().__init__()
        modules_body = [CAB(n_feat, kernel_size, reduction, bias=bias, act=act) for _ in range(num_cab)]
        modules_body.append(conv(n_feat, n_feat, kernel_size))
        self.body = nn.Sequential(*modules_body)
    def forward(self, x):
        res = self.body(x)
        return res + x

class ORSNet(nn.Module):
    def __init__(self, n_feat, scale_orsnetfeats, kernel_size, reduction, act, bias, scale_unetfeats, num_cab):
        super().__init__()
        self.orb1 = ORB(n_feat+scale_orsnetfeats, kernel_size, reduction, act, bias, num_cab)
        self.orb2 = ORB(n_feat+scale_orsnetfeats, kernel_size, reduction, act, bias, num_cab)
        self.orb3 = ORB(n_feat+scale_orsnetfeats, kernel_size, reduction, act, bias, num_cab)
        self.up_enc1 = UpSample(n_feat, scale_unetfeats)
        self.up_dec1 = UpSample(n_feat, scale_unetfeats)
        self.up_enc2 = nn.Sequential(UpSample(n_feat+scale_unetfeats, scale_unetfeats), UpSample(n_feat, scale_unetfeats))
        self.up_dec2 = nn.Sequential(UpSample(n_feat+scale_unetfeats, scale_unetfeats), UpSample(n_feat, scale_unetfeats))
        self.conv_enc1 = nn.Conv2d(n_feat, n_feat+scale_orsnetfeats, 1, bias=bias)
        self.conv_enc2 = nn.Conv2d(n_feat, n_feat+scale_orsnetfeats, 1, bias=bias)
        self.conv_enc3 = nn.Conv2d(n_feat, n_feat+scale_orsnetfeats, 1, bias=bias)
        self.conv_dec1 = nn.Conv2d(n_feat, n_feat+scale_orsnetfeats, 1, bias=bias)
        self.conv_dec2 = nn.Conv2d(n_feat, n_feat+scale_orsnetfeats, 1, bias=bias)
        self.conv_dec3 = nn.Conv2d(n_feat, n_feat+scale_orsnetfeats, 1, bias=bias)
    def forward(self, x, encoder_outs, decoder_outs):
        x = self.orb1(x)
        x = x + self.conv_enc1(encoder_outs[0]) + self.conv_dec1(decoder_outs[0])
        x = self.orb2(x)
        x = x + self.conv_enc2(self.up_enc1(encoder_outs[1])) + self.conv_dec2(self.up_dec1(decoder_outs[1]))
        x = self.orb3(x)
        x = x + self.conv_enc3(self.up_enc2(encoder_outs[2])) + self.conv_dec3(self.up_dec2(decoder_outs[2]))
        return x

class MPRNet(nn.Module):
    def __init__(self, in_c=3, out_c=3, n_feat=80, scale_unetfeats=48, scale_orsnetfeats=32, num_cab=8, kernel_size=3, reduction=4, bias=False):
        super().__init__()
        act = nn.PReLU()
        self.shallow_feat1 = nn.Sequential(conv(in_c, n_feat, kernel_size, bias=bias), CAB(n_feat, kernel_size, reduction, bias=bias, act=act))
        self.shallow_feat2 = nn.Sequential(conv(in_c, n_feat, kernel_size, bias=bias), CAB(n_feat, kernel_size, reduction, bias=bias, act=act))
        self.shallow_feat3 = nn.Sequential(conv(in_c, n_feat, kernel_size, bias=bias), CAB(n_feat, kernel_size, reduction, bias=bias, act=act))
        self.stage1_encoder = Encoder(n_feat, kernel_size, reduction, act, bias, scale_unetfeats, csff=False)
        self.stage1_decoder = Decoder(n_feat, kernel_size, reduction, act, bias, scale_unetfeats)
        self.stage2_encoder = Encoder(n_feat, kernel_size, reduction, act, bias, scale_unetfeats, csff=True)
        self.stage2_decoder = Decoder(n_feat, kernel_size, reduction, act, bias, scale_unetfeats)
        self.stage3_orsnet = ORSNet(n_feat, scale_orsnetfeats, kernel_size, reduction, act, bias, scale_unetfeats, num_cab)
        self.sam12 = SAM(n_feat, kernel_size=1, bias=bias)
        self.sam23 = SAM(n_feat, kernel_size=1, bias=bias)
        self.concat12 = conv(n_feat*2, n_feat, kernel_size, bias=bias)
        self.concat23 = conv(n_feat*2, n_feat+scale_orsnetfeats, kernel_size, bias=bias)
        self.tail = conv(n_feat+scale_orsnetfeats, out_c, kernel_size, bias=bias)
    def forward(self, x3_img):
        H, W = x3_img.size(2), x3_img.size(3)
        x2top_img  = x3_img[:, :, 0:H//2, :]
        x2bot_img  = x3_img[:, :, H//2:H, :]
        x1ltop_img = x2top_img[:, :, :, 0:W//2]
        x1rtop_img = x2top_img[:, :, :, W//2:W]
        x1lbot_img = x2bot_img[:, :, :, 0:W//2]
        x1rbot_img = x2bot_img[:, :, :, W//2:W]
        x1ltop = self.shallow_feat1(x1ltop_img)
        x1rtop = self.shallow_feat1(x1rtop_img)
        x1lbot = self.shallow_feat1(x1lbot_img)
        x1rbot = self.shallow_feat1(x1rbot_img)
        feat1_ltop = self.stage1_encoder(x1ltop)
        feat1_rtop = self.stage1_encoder(x1rtop)
        feat1_lbot = self.stage1_encoder(x1lbot)
        feat1_rbot = self.stage1_encoder(x1rbot)
        feat1_top = [torch.cat((k, v), 3) for k, v in zip(feat1_ltop, feat1_rtop)]
        feat1_bot = [torch.cat((k, v), 3) for k, v in zip(feat1_lbot, feat1_rbot)]
        res1_top = self.stage1_decoder(feat1_top)
        res1_bot = self.stage1_decoder(feat1_bot)
        x2top_samfeats, stage1_img_top = self.sam12(res1_top[0], x2top_img)
        x2bot_samfeats, stage1_img_bot = self.sam12(res1_bot[0], x2bot_img)
        stage1_img = torch.cat([stage1_img_top, stage1_img_bot], 2)
        x2top  = self.shallow_feat2(x2top_img)
        x2bot  = self.shallow_feat2(x2bot_img)
        x2top_cat = self.concat12(torch.cat([x2top, x2top_samfeats], 1))
        x2bot_cat = self.concat12(torch.cat([x2bot, x2bot_samfeats], 1))
        feat2_top = self.stage2_encoder(x2top_cat, feat1_top, res1_top)
        feat2_bot = self.stage2_encoder(x2bot_cat, feat1_bot, res1_bot)
        feat2 = [torch.cat((k, v), 2) for k, v in zip(feat2_top, feat2_bot)]
        res2 = self.stage2_decoder(feat2)
        x3_samfeats, stage2_img = self.sam23(res2[0], x3_img)
        x3 = self.shallow_feat3(x3_img)
        x3_cat = self.concat23(torch.cat([x3, x3_samfeats], 1))
        x3_cat = self.stage3_orsnet(x3_cat, feat2, res2)
        stage3_img = self.tail(x3_cat)
        return [stage3_img + x3_img, stage2_img, stage1_img]

class CharbonnierLoss(nn.Module):
    def __init__(self, eps=1e-3):
        super().__init__()
        self.eps = eps
    def forward(self, x, y):
        diff = x - y
        loss = torch.mean(torch.sqrt(diff * diff + self.eps * self.eps))
        return loss

## Datasets: Denoising and SR

In [15]:
import torch.nn.functional as F

class DenoiseTrainDataset(Dataset):
    def __init__(self, root, patch_size=128):
        super().__init__()
        # Expect structure: root/train (inputs) and ../gt (targets)
        self.inp_dir = os.path.join(root, 'train')
        self.tar_dir = os.path.join(root, 'gt')
        self.inp_files = sorted([f for f in os.listdir(self.inp_dir) if is_image_file(f)])
        self.tar_files = sorted([f for f in os.listdir(self.tar_dir) if is_image_file(f)])
        self.patch = patch_size
    def __len__(self):
        return min(len(self.inp_files), len(self.tar_files))
    def __getitem__(self, idx):
        inp_path = os.path.join(self.inp_dir, self.inp_files[idx])
        tar_path = os.path.join(self.tar_dir, self.tar_files[idx])
        inp_img = Image.open(inp_path).convert('RGB')
        tar_img = Image.open(tar_path).convert('RGB')
        # To tensor first
        inp_t = TF.to_tensor(inp_img)
        tar_t = TF.to_tensor(tar_img)
        C, H, W = tar_t.shape
        ps = int(self.patch) if self.patch else 0
        # Replicate-pad tensors to ensure at least patch size
        if ps:
            pad_h = max(0, ps - H)
            pad_w = max(0, ps - W)
            if pad_h or pad_w:
                # pad format (left, right, top, bottom)
                inp_t = F.pad(inp_t, (0, pad_w, 0, pad_h), mode='replicate')
                tar_t = F.pad(tar_t, (0, pad_w, 0, pad_h), mode='replicate')
            _, H, W = tar_t.shape
            # Safe random crop
            if H > ps and W > ps:
                rr = random.randint(0, H - ps)
                cc = random.randint(0, W - ps)
                inp_t = inp_t[:, rr:rr+ps, cc:cc+ps]
                tar_t = tar_t[:, rr:rr+ps, cc:cc+ps]
            else:
                # Exactly ps or smaller handled by pad above; center crop to ps
                inp_t = TF.center_crop(inp_t, (ps, ps))
                tar_t = TF.center_crop(tar_t, (ps, ps))
        fname = os.path.splitext(os.path.basename(tar_path))[0]
        return tar_t, inp_t, fname

class DenoiseValDataset(Dataset):
    def __init__(self, root, patch_size=None):
        super().__init__()
        self.inp_dir = os.path.join(root, 'val')
        self.tar_dir = os.path.join(root, 'gt')
        self.inp_files = sorted([f for f in os.listdir(self.inp_dir) if is_image_file(f)])
        self.tar_files = sorted([f for f in os.listdir(self.tar_dir) if is_image_file(f)])
        self.ps = patch_size
    def __len__(self):
        return min(len(self.inp_files), len(self.tar_files))
    def __getitem__(self, idx):
        inp_path = os.path.join(self.inp_dir, self.inp_files[idx])
        tar_path = os.path.join(self.tar_dir, self.tar_files[idx])
        inp_img = Image.open(inp_path).convert('RGB')
        tar_img = Image.open(tar_path).convert('RGB')
        inp_t = TF.to_tensor(inp_img)
        tar_t = TF.to_tensor(tar_img)
        if self.ps:
            C, H, W = tar_t.shape
            ps = int(self.ps)
            pad_h = max(0, ps - H)
            pad_w = max(0, ps - W)
            if pad_h or pad_w:
                inp_t = F.pad(inp_t, (0, pad_w, 0, pad_h), mode='replicate')
                tar_t = F.pad(tar_t, (0, pad_w, 0, pad_h), mode='replicate')
            inp_t = TF.center_crop(inp_t, (ps, ps))
            tar_t = TF.center_crop(tar_t, (ps, ps))
        fname = os.path.splitext(os.path.basename(tar_path))[0]
        return tar_t, inp_t, fname

class DenoiseTestDataset(Dataset):
    def __init__(self, root):
        super().__init__()
        self.files = sorted([f for f in os.listdir(root) if is_image_file(f)])
        self.root = root
    def __len__(self):
        return len(self.files)
    def __getitem__(self, idx):
        path = os.path.join(self.root, self.files[idx])
        img = Image.open(path).convert('RGB')
        return TF.to_tensor(img), os.path.splitext(os.path.basename(path))[0]

class SRDataset(Dataset):
    def __init__(self, lr_dir=None, hr_dir=None, scale=4, synthesize_if_missing=False, split='train'):
        super().__init__()
        self.scale = scale
        self.lr_dir = lr_dir
        self.hr_dir = hr_dir
        self.synthesize = synthesize_if_missing or (not (lr_dir and os.path.isdir(lr_dir)))
        if self.synthesize:
            self.hr_files = sorted([f for f in os.listdir(hr_dir) if is_image_file(f)]) if hr_dir and os.path.isdir(hr_dir) else []
        else:
            self.lr_files = sorted([f for f in os.listdir(lr_dir) if is_image_file(f)])
            self.hr_files = sorted([f for f in os.listdir(hr_dir) if is_image_file(f)])
        self.split = split
    def __len__(self):
        return len(self.hr_files) if self.synthesize else min(len(self.lr_files), len(self.hr_files))
    def __getitem__(self, idx):
        if self.synthesize:
            hr_path = os.path.join(self.hr_dir, self.hr_files[idx])
            hr = Image.open(hr_path).convert('RGB')
            w, h = hr.size
            lr = hr.resize((max(1,w//self.scale), max(1,h//self.scale)), Image.BICUBIC)
        else:
            lr_path = os.path.join(self.lr_dir, self.lr_files[idx])
            hr_path = os.path.join(self.hr_dir, self.hr_files[idx])
            lr = Image.open(lr_path).convert('RGB')
            hr = Image.open(hr_path).convert('RGB')
        lr_t = TF.to_tensor(lr)
        hr_t = TF.to_tensor(hr)
        name = os.path.splitext(self.hr_files[idx] if self.synthesize else self.hr_files[idx])[0]
        return hr_t, lr_t, name

class SRTestDataset(Dataset):
    def __init__(self, root):
        super().__init__()
        self.files = sorted([f for f in os.listdir(root) if is_image_file(f)])
        self.root = root
    def __len__(self):
        return len(self.files)
    def __getitem__(self, idx):
        path = os.path.join(self.root, self.files[idx])
        lr = Image.open(path).convert('RGB')
        return TF.to_tensor(lr), os.path.splitext(os.path.basename(path))[0]

## Train denoiser (MPRNet)

In [18]:
# Training for denoiser (robust to variable image sizes)
import os, math, time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingLR
from tqdm import tqdm

# Fallback loss if CharbonnierLoss isn't defined earlier
class _L1Like(nn.Module):
    def __init__(self):
        super().__init__()
        self.loss = nn.L1Loss()
    def forward(self, x, y):
        return self.loss(x, y)


def _get_device():
    d = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    try:
        _ = torch.tensor([0.0]).to(d)
        return d
    except Exception:
        return torch.device('cpu')


def _round_up(x, m):
    return int(math.ceil(x / m) * m)


def _prepare_tensor(x: torch.Tensor, min_hw: int = 64, factor: int = 32):
    # Ensure H,W >= min_hw and multiples of factor using bilinear upsample (safe for tiny images)
    B, C, H, W = x.shape
    if H == 0 or W == 0:
        raise RuntimeError(f'Encountered zero-sized tensor with shape {tuple(x.shape)}')
    target_h = max(min_hw, _round_up(H, factor))
    target_w = max(min_hw, _round_up(W, factor))
    if target_h == H and target_w == W:
        return x, (H, W)
    x_up = F.interpolate(x, size=(target_h, target_w), mode='bilinear', align_corners=False)
    return x_up, (H, W)


def _resize_to(x: torch.Tensor, size_hw: tuple):
    H, W = size_hw
    H = int(max(1, H)); W = int(max(1, W))
    if x.shape[-2:] == (H, W):
        return x
    return F.interpolate(x, size=(H, W), mode='bilinear', align_corners=False)


def train_denoiser(train_dir, val_dir, cfg):
    global WORK_DIR
    device = _get_device()

    # Datasets defined earlier in the notebook
    train_ds = DenoiseTrainDataset(train_dir)
    val_ds = DenoiseValDataset(val_dir, patch_size=None)  # avoid forced center-crop size

    # Use batch_size=1 to avoid size mismatch across samples
    train_loader = DataLoader(train_ds, batch_size=1, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=1, shuffle=False, num_workers=0)

    model = MPRNet().to(device)

    # Try to use CharbonnierLoss if available else L1
    criterion = CharbonnierLoss() if 'CharbonnierLoss' in globals() else _L1Like()

    optimizer = Adam(model.parameters(), lr=float(cfg.lr))
    scheduler = CosineAnnealingLR(optimizer, T_max=int(cfg.num_epochs))

    best_psnr = -1.0
    os.makedirs(WORK_DIR, exist_ok=True)
    best_ckpt = os.path.join(WORK_DIR, 'best_denoiser.pth')

    for epoch in range(1, int(cfg.num_epochs) + 1):
        model.train()
        epoch_loss = 0.0
        pbar = tqdm(train_loader, desc=f'Epoch {epoch}/{cfg.num_epochs} [denoise]')
        for tar, inp, _ in pbar:
            tar, inp = tar.to(device), inp.to(device)
            # Ensure no zero-sized tensors
            if tar.numel() == 0 or inp.numel() == 0 or tar.shape[-1] == 0 or tar.shape[-2] == 0 or inp.shape[-1] == 0 or inp.shape[-2] == 0:
                continue
            # Upsample to safe sizes (>=64, multiples of 32) for MPRNet tiling
            inp_proc, _ = _prepare_tensor(inp, min_hw=64, factor=32)
            # For loss, resize output back to target size
            optimizer.zero_grad()
            out = model(inp_proc)
            out = _resize_to(out, (tar.shape[-2], tar.shape[-1]))
            loss = criterion(out, tar)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            pbar.set_postfix(loss=f"{loss.item():.4f}")
        scheduler.step()

        # Validation
        model.eval()
        with torch.no_grad():
            psnrs = []
            for tar, inp, _ in val_loader:
                tar, inp = tar.to(device), inp.to(device)
                if tar.numel() == 0 or inp.numel() == 0 or tar.shape[-1] == 0 or tar.shape[-2] == 0 or inp.shape[-1] == 0 or inp.shape[-2] == 0:
                    continue
                inp_proc, _ = _prepare_tensor(inp, min_hw=64, factor=32)
                pred = model(inp_proc)
                pred = _resize_to(pred, (tar.shape[-2], tar.shape[-1]))
                pred = torch.clamp(pred, 0.0, 1.0)
                mse = torch.mean((pred - tar) ** 2).item()
                psnr = 100.0 if mse == 0 else 10 * math.log10(1.0 / mse)
                psnrs.append(psnr)
            val_psnr = sum(psnrs) / max(1, len(psnrs))

        print(f"Epoch {epoch}: loss={epoch_loss/max(1,len(train_loader)):.4f}, val_psnr={val_psnr:.2f}dB")

        if val_psnr > best_psnr:
            best_psnr = val_psnr
            torch.save({'model': model.state_dict(), 'val_psnr': val_psnr, 'epoch': epoch}, best_ckpt)
            print(f"Saved new best denoiser to {best_ckpt} (PSNR {val_psnr:.2f}dB)")

    return best_ckpt

# Train using local week-12 data
try:
    denoise_ckpt = train_denoiser(DENOISE_TRAIN_DIR, DENOISE_VAL_DIR, denoise_cfg)
    print('Saved best denoiser ckpt at:', denoise_ckpt)
except RuntimeError as e:
    print('Encountered RuntimeError, retrying on CPU. Error was:', str(e))
    torch.cuda.is_available = lambda: False
    denoise_ckpt = train_denoiser(DENOISE_TRAIN_DIR, DENOISE_VAL_DIR, denoise_cfg)
    print('Saved best denoiser ckpt at:', denoise_ckpt)

Epoch 1/1 [denoise]:   0%|          | 0/1105 [00:00<?, ?it/s]

Epoch 1/1 [denoise]:   2%|▏         | 18/1105 [00:00<00:12, 86.06it/s]

: 

## 4x Super-Resolution model: Compact EDSR-like network

In [None]:
class ResBlock(nn.Module):
    def __init__(self, n_feats, res_scale=0.1):
        super().__init__()
        self.body = nn.Sequential(
            nn.Conv2d(64, 64, 3, 1, 1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, 3, 1, 1)
        )
        self.res_scale = res_scale
    def forward(self, x):
        res = self.body(x).mul(self.res_scale)
        return x + res

class EDSRSmall(nn.Module):
    def __init__(self, scale=4, n_resblocks=8, n_feats=64):
        super().__init__()
        self.head = nn.Conv2d(3, n_feats, 3, 1, 1)
        self.body = nn.Sequential(*[ResBlock(n_feats) for _ in range(n_resblocks)])
        # Upsampler to 4x via two PixelShuffle x2
        up = []
        for _ in range(int(math.log2(scale))):
            up += [nn.Conv2d(n_feats, n_feats*4, 3, 1, 1), nn.PixelShuffle(2), nn.ReLU(inplace=True)]
        self.upsample = nn.Sequential(*up)
        self.tail = nn.Conv2d(n_feats, 3, 3, 1, 1)
    def forward(self, x):
        x = self.head(x)
        x = self.body(x)
        x = self.upsample(x)
        x = self.tail(x)
        return x

def train_sr(train_lr_dir, train_hr_dir, val_lr_dir, val_hr_dir, cfg):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    set_seed(cfg.seed)
    model = EDSRSmall(scale=cfg.scale).to(device)
    criterion = nn.L1Loss()
    optimizer = optim.Adam(model.parameters(), lr=cfg.lr)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=cfg.num_epochs, eta_min=1e-6)
    synth = not (train_lr_dir and os.path.isdir(train_lr_dir))
    tr_ds = SRDataset(lr_dir=train_lr_dir, hr_dir=train_hr_dir, scale=cfg.scale, synthesize_if_missing=synth, split='train')
    va_ds = SRDataset(lr_dir=val_lr_dir, hr_dir=val_hr_dir, scale=cfg.scale, synthesize_if_missing=(not (val_lr_dir and os.path.isdir(val_lr_dir))), split='val')
    tr_loader = DataLoader(tr_ds, batch_size=cfg.batch_size, shuffle=True, num_workers=cfg.num_workers, pin_memory=True)
    va_loader = DataLoader(va_ds, batch_size=1, shuffle=False, num_workers=cfg.num_workers, pin_memory=True)
    best_psnr, best_epoch = -1.0, -1
    ckpt_best = os.path.join('.', f'sr_{cfg.session}_best.pth')
    for epoch in range(1, cfg.num_epochs+1):
        model.train()
        epoch_loss = 0.0
        for hr, lr, _ in tqdm(tr_loader, desc=f'Epoch {epoch}/{cfg.num_epochs} [sr]'):
            hr, lr = hr.to(device), lr.to(device)
            optimizer.zero_grad(set_to_none=True)
            sr = model(lr)
            # Ensure size match: center-crop/pad if minor mismatch
            _, _, Hh, Wh = hr.shape
            _, _, Hs, Ws = sr.shape
            H = min(Hh, Hs); W = min(Wh, Ws)
            sr_crop = sr[:, :, :H, :W]
            hr_crop = hr[:, :, :H, :W]
            loss = criterion(sr_crop, hr_crop)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        scheduler.step()
        # Validate
        model.eval()
        psnrs = []
        with torch.no_grad():
            for hr, lr, _ in va_loader:
                hr, lr = hr.to(device), lr.to(device)
                sr = model(lr)
                _, _, Hh, Wh = hr.shape
                _, _, Hs, Ws = sr.shape
                H = min(Hh, Hs); W = min(Wh, Ws)
                psnrs.append(torch_psnr(hr[:, :, :H, :W], sr[:, :, :H, :W]).item())
        mean_psnr = float(np.mean(psnrs)) if psnrs else float('nan')
        print(f'Epoch {epoch}: loss={epoch_loss/len(tr_loader):.4f} val_psnr={mean_psnr:.3f}')
        if mean_psnr > best_psnr:
            best_psnr, best_epoch = mean_psnr, epoch
            torch.save({'epoch': epoch, 'state_dict': model.state_dict()}, ckpt_best)
    print(f'Best SR PSNR: {best_psnr:.3f} @ epoch {best_epoch}')
    return ckpt_best

print('SR train HR exists:', os.path.isdir(SR_TRAIN_HR_DIR))
print('SR train LR exists:', os.path.isdir(SR_TRAIN_LR_DIR))
To train now, uncomment:
sr_ckpt = train_sr(SR_TRAIN_LR_DIR, SR_TRAIN_HR_DIR, SR_VAL_LR_DIR, SR_VAL_HR_DIR, sr_cfg)
print('Saved best SR ckpt at:', sr_ckpt)

## Inference pipeline (Denoise -> 4x SR) and create submission.zip

In [None]:
import zipfile

def _extract_state_dict(ckpt):
    if isinstance(ckpt, dict):
        if 'state_dict' in ckpt:
            return ckpt['state_dict']
        if 'model' in ckpt:
            return ckpt['model']
    return ckpt

def load_denoiser(ckpt_path):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = MPRNet().to(device)
    ckpt = torch.load(ckpt_path, map_location=device)
    state = _extract_state_dict(ckpt)
    model.load_state_dict(state)
    model.eval()
    return model

def load_sr(ckpt_path, scale=4):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = EDSRSmall(scale=scale).to(device)
    ckpt = torch.load(ckpt_path, map_location=device)
    state = _extract_state_dict(ckpt)
    model.load_state_dict(state)
    model.eval()
    return model

def run_pipeline_and_save(denoise_ckpt, sr_ckpt, test_denoise_dir, out_dir, scale=4):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    denoiser = load_denoiser(denoise_ckpt)
    srnet   = load_sr(sr_ckpt, scale=scale)
    ds = DenoiseTestDataset(test_denoise_dir)
    dl = DataLoader(ds, batch_size=1, shuffle=False, num_workers=1)
    os.makedirs(out_dir, exist_ok=True)
    with torch.no_grad():
        for inp, name in tqdm(dl, desc='Infer test'):
            inp = inp.to(device)
            den = denoiser(inp)[0]
            sr  = srnet(den)
            # Save
            save_rgb(os.path.join(out_dir, f'{name[0]}.png'), sr.squeeze(0).cpu())
    # Zip for Kaggle submission
    zip_path = os.path.join(WORK_DIR, 'submission.zip')
    with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
        for fn in sorted(os.listdir(out_dir)):
            if fn.lower().endswith('.png'):
                zf.write(os.path.join(out_dir, fn), arcname=fn)
    print('Submission zipped at:', zip_path)
    return zip_path

Example usage after training:
denoise_ckpt = os.path.join('.', f'denoise_{denoise_cfg.session}_best.pth')
sr_ckpt      = os.path.join('.', f'sr_{sr_cfg.session}_best.pth')
submission_zip = run_pipeline_and_save(denoise_ckpt, sr_ckpt, DENOISE_TEST_DIR, SUBMISSION_DIR, scale=sr_cfg.scale)
submission_zip

## Optional: Validation PSNR of the full pipeline (if HR val available)
If you have LR/HR validation pairs for SR and input/target pairs for denoising, you can compute PSNR of denoised+SR outputs against HR targets.

In [None]:
def validate_pipeline_psnr(denoise_ckpt, sr_ckpt, denoise_val_dir, sr_val_lr_dir, sr_val_hr_dir, max_samples=20):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    denoiser = load_denoiser(denoise_ckpt)
    srnet   = load_sr(sr_ckpt, scale=sr_cfg.scale)
    # Denoise val inputs
    den_val_ds = DenoiseValDataset(denoise_val_dir, patch_size=None)
    sr_val_ds  = SRDataset(lr_dir=sr_val_lr_dir, hr_dir=sr_val_hr_dir, scale=sr_cfg.scale, synthesize_if_missing=(not os.path.isdir(sr_val_lr_dir)), split='val')
    n = min(len(den_val_ds), len(sr_val_ds), max_samples)
    psnrs = []
    with torch.no_grad():
        for i in range(n):
            tar_dn, inp_dn, _ = den_val_ds[i]
            hr, lr, _ = sr_val_ds[i]
            den = denoiser(inp_dn.unsqueeze(0).to(device))[0]
            sr  = srnet(den)
            H = min(hr.shape[1], sr.shape[2]); W = min(hr.shape[2], sr.shape[3])
            psnrs.append(torch_psnr(hr[:, :H, :W].unsqueeze(0).to(device), sr[:, :, :H, :W]).item())
    print('Pipeline val PSNR (approx):', np.mean(psnrs) if psnrs else float('nan'))
# Example:
# validate_pipeline_psnr(denoise_ckpt, sr_ckpt, DENOISE_VAL_DIR, SR_VAL_LR_DIR, SR_VAL_HR_DIR)