### Imports & Paths (no data is downloaded here)

In [None]:
from pathlib import Path
from PIL import Image
from tqdm import tqdm
import torch

# GPU setup and verification
def check_gpu_setup():
    """Check CUDA availability and GPU memory"""
    if torch.cuda.is_available():
        gpu_count = torch.cuda.device_count()
        current_gpu = torch.cuda.current_device()
        gpu_name = torch.cuda.get_device_name(current_gpu)
        gpu_memory = torch.cuda.get_device_properties(current_gpu).total_memory / 1024**3
        print(f"✓ CUDA available with {gpu_count} GPU(s)")
        print(f"✓ Using GPU {current_gpu}: {gpu_name}")
        print(f"✓ GPU Memory: {gpu_memory:.1f} GB")
        return True
    else:
        print("⚠ CUDA not available, falling back to CPU")
        return False

# Check GPU setup
CUDA_AVAILABLE = check_gpu_setup()

# Detect the operating system and set the data root path accordingly
if Path("/mnt/c").exists():  # Windows Subsystem for Linux (WSL)
    DATA_ROOT = Path("../data")  # WSL path
elif Path("D:/data").exists():  # Native Windows
    DATA_ROOT = Path("..\\data")  # Windows path
else:  # Assume Linux
    DATA_ROOT = Path("../data")  # Linux (debian-based) path
try:
    assert DATA_ROOT.exists()
except AssertionError:
    raise FileNotFoundError(f"Data root directory {DATA_ROOT} does not exist.")
HR_VALID = DATA_ROOT / "HR" / "valid"
HR_TRAIN = DATA_ROOT / "HR" / "train"
IMAGE_TYPES = [".png", ".jpg", ".jpeg"]

In [None]:
def lr_dir(scale: int, split: str) -> Path:
    return DATA_ROOT / f"LR_bicubic/X{scale}" / split

Sanity check: do we see the HR images?

In [None]:
def count_images(p: Path):
    return len([x for x in p.glob("*") if x.suffix.lower() in IMAGE_TYPES])

print("HR train dir:", HR_TRAIN.resolve())
print("HR valid dir:", HR_VALID.resolve())
print("HR train count:", count_images(HR_TRAIN))
print("HR valid count:", count_images(HR_VALID))

### Helper to Generate a LR Image

In [None]:
import torch
import torch.nn.functional as F
import torchvision.transforms.functional as TF

def ensure_dir(p: Path):
    if not p.exists():
        p.mkdir(parents=True, exist_ok=True)

def hr_to_lr(hr_path: Path, out_dir: Path, scale: int):
    out_path = out_dir / hr_path.name
    if out_path.exists():
        return # Skip existing files
    
    # Use GPU if available, otherwise CPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Load image and convert to tensor
    img = Image.open(hr_path).convert("RGB")
    w, h = img.size
    
    # Crop to make dimensions divisible by scale
    w2, h2 = w - (w % scale), h - (h % scale)
    if (w2, h2) != (w, h):
        img = img.crop((0, 0, w2, h2))
    
    # Convert PIL to tensor and move to GPU
    img_tensor = TF.to_tensor(img).unsqueeze(0).to(device)  # Add batch dimension and move to GPU
    
    # Calculate new dimensions
    new_h, new_w = h2 // scale, w2 // scale
    
    # Resize using PyTorch's bicubic interpolation on GPU
    lr_tensor = F.interpolate(
        img_tensor, 
        size=(new_h, new_w), 
        mode='bicubic', 
        align_corners=False,
        antialias=True
    )
    
    # Convert back to PIL for saving
    lr_tensor = lr_tensor.squeeze(0).cpu()  # Remove batch dim and move to CPU
    lr_img = TF.to_pil_image(lr_tensor)
    
    # Save the result
    ensure_dir(out_dir)
    lr_img.save(out_path, quality=95)

In [None]:
from collections import defaultdict

def hr_to_lr_batch(hr_paths: list, out_dir: Path, scale: int, batch_size: int = 8):
    """Process multiple images in batches for better GPU utilization, grouping by shape"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    ensure_dir(out_dir)

    # Group images by their cropped size
    shape_groups = defaultdict(list)
    for hr_path in hr_paths:
        out_path = out_dir / hr_path.name
        if out_path.exists():
            continue  # Skip existing files

        img = Image.open(hr_path).convert("RGB")
        w, h = img.size
        w2, h2 = w - (w % scale), h - (h % scale)
        shape_groups[(w2, h2)].append(hr_path)

    # Process each shape group separately
    for (w2, h2), paths in shape_groups.items():
        for i in range(0, len(paths), batch_size):
            batch_paths = paths[i:i+batch_size]
            batch_tensors = []
            valid_paths = []

            for hr_path in batch_paths:
                img = Image.open(hr_path).convert("RGB")
                img = img.crop((0, 0, w2, h2))
                img_tensor = TF.to_tensor(img)
                batch_tensors.append(img_tensor)
                valid_paths.append(hr_path)

            if not batch_tensors:
                continue

            batch_tensor = torch.stack(batch_tensors).to(device)
            with torch.no_grad():
                new_h, new_w = h2 // scale, w2 // scale
                lr_batch = F.interpolate(
                    batch_tensor,
                    size=(new_h, new_w),
                    mode='bicubic',
                    align_corners=False,
                    antialias=True
                )

            lr_batch_cpu = lr_batch.cpu()
            for j, hr_path in enumerate(valid_paths):
                out_path = out_dir / hr_path.name
                lr_tensor = lr_batch_cpu[j]
                lr_img = TF.to_pil_image(lr_tensor)
                lr_img.save(out_path, quality=95)

            del batch_tensor, lr_batch
            if device.type == 'cuda':
                torch.cuda.empty_cache()


Tiny smoke test (valid split, X4, first 5 images)

In [None]:
SCALE = 4
SPLIT = "valid"
OUT_DIR = lr_dir(SCALE, SPLIT)

valid_imgs = sorted([p for p in HR_VALID.glob("*") if p.suffix.lower() in IMAGE_TYPES])
subset = valid_imgs[:5] # Take the first 5 images

for p in tqdm(subset, desc=f"Generating X{SCALE} ({SPLIT})"):
    hr_to_lr(p, OUT_DIR, SCALE)

print("Wrote to:", OUT_DIR.resolve())

### Small Batch Function (Choose Split + Scale + Limit)

In [None]:
# Flexible wrapper to perform low-level image operations with GPU acceleration
def generate_lr_split(scale: int, split : str, limit: int | None = None, verbose: bool = True, use_batch: bool = True, batch_size: int = 16):
    hr_dir = HR_TRAIN if split == "train" else HR_VALID
    out_dir = lr_dir(scale, split)
    imgs = sorted([p for p in hr_dir.glob("*") if p.suffix.lower() in IMAGE_TYPES])
    if limit is not None:
        imgs = imgs[:limit]
    
    if use_batch and torch.cuda.is_available():
        # Use batch processing for better GPU utilization
        hr_to_lr_batch(imgs, out_dir, scale, batch_size)
        if verbose:
            print(f"Done (GPU batch): {out_dir.resolve()} ({count_images(out_dir)} images)")
    else:
        # Fall back to sequential processing
        for p in tqdm(imgs, desc=f"X{scale} {split} ({len(imgs)} imgs)"):
            hr_to_lr(p, out_dir, scale)
        if verbose:
            print(f"Done: {out_dir.resolve()} ({count_images(out_dir)} images)")

In [None]:
def optimize_gpu_settings():
    """Optimize GPU settings for better performance"""
    if torch.cuda.is_available():
        # Enable mixed precision for faster processing
        torch.backends.cudnn.benchmark = True
        torch.backends.cudnn.deterministic = False
        
        # Clear GPU cache
        torch.cuda.empty_cache()
        
        print("✓ GPU optimizations enabled")
        print(f"✓ Current GPU memory allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
        print(f"✓ Current GPU memory cached: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")
    else:
        print("⚠ No CUDA GPU available for optimization")

def get_optimal_batch_size(scale: int) -> int:
    """Determine optimal batch size based on GPU memory and scale factor"""
    if not torch.cuda.is_available():
        return 1
    
    gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
    
    # Estimate memory usage per image (rough approximation)
    # Higher scales need less memory per LR image but more for processing
    if scale == 2:
        return min(32, max(8, int(gpu_memory_gb * 2)))
    elif scale == 4:
        return min(24, max(6, int(gpu_memory_gb * 1.5)))
    elif scale == 8:
        return min(16, max(4, int(gpu_memory_gb)))
    else:
        return 8

# Initialize GPU optimizations
optimize_gpu_settings()

Try slightly larger test to confirm functionality

In [None]:
generate_lr_split(scale=2, split="train", limit=20, use_batch=False)

### Generate Full LR Sets (x2, x4, x8)

In [None]:
# Run generation for all splits and scales with GPU optimization
SCALES = [2, 4, 8]

def generate_all_scales_for_split(split: str):
    assert split in {"train", "valid"}
    for s in SCALES:
        # Use optimal batch size for each scale
        optimal_batch_size = get_optimal_batch_size(s) if torch.cuda.is_available() else 1
        print(f"Processing scale X{s} with batch size {optimal_batch_size}")
        
        # Use the modified function with optimal batch size
        hr_dir = HR_TRAIN if split == "train" else HR_VALID
        out_dir = lr_dir(s, split)
        imgs = sorted([p for p in hr_dir.glob("*") if p.suffix.lower() in IMAGE_TYPES])
        
        if torch.cuda.is_available():
            hr_to_lr_batch(imgs, out_dir, s, optimal_batch_size)
            print(f"Done (GPU batch): {out_dir.resolve()} ({count_images(out_dir)} images)")
        else:
            for p in tqdm(imgs, desc=f"X{s} {split} ({len(imgs)} imgs)"):
                hr_to_lr(p, out_dir, s)
            print(f"Done: {out_dir.resolve()} ({count_images(out_dir)} images)")


In [None]:
def monitor_gpu_memory():
    """Monitor and display current GPU memory usage"""
    if torch.cuda.is_available():
        allocated = torch.cuda.memory_allocated() / 1024**3
        reserved = torch.cuda.memory_reserved() / 1024**3
        total = torch.cuda.get_device_properties(0).total_memory / 1024**3
        print(f"GPU Memory - Allocated: {allocated:.2f}GB | Reserved: {reserved:.2f}GB | Total: {total:.1f}GB")
        print(f"Memory utilization: {(allocated/total)*100:.1f}%")
    else:
        print("No CUDA GPU available for monitoring")

In [None]:
# Monitor GPU memory before processing
print("=== Initial GPU Memory Status ===")
monitor_gpu_memory()

# Perform splits on all scales
print("\n== VALID ==")
generate_all_scales_for_split("valid")
print("\n=== GPU Memory After Valid ===")
monitor_gpu_memory()

print("\n== TRAIN ==")
generate_all_scales_for_split("train")
print("\n=== Final GPU Memory Status ===")
monitor_gpu_memory()

Integrity checks

In [None]:
# Checking integrity and validity of generated images
from PIL import Image

def image_list(p):
    return sorted([x for x in p.glob("*") if x.suffix.lower() in IMAGE_TYPES])

def check_image_integrity(image_path: Path):
    try:
        img = Image.open(image_path)
        img.verify()  # Verify the image is not corrupted
        return True
    except (IOError, SyntaxError) as e:
        print(f"Corrupted image {image_path}: {e}")
        return False

In [None]:
# Check the integrity of the dataset splits
def check_split(split="valid", max_check=None):
    hr_dir  = HR_TRAIN if split == "train" else HR_VALID
    hr_imgs = image_list(hr_dir)
    if max_check is not None:
        hr_imgs = hr_imgs[:max_check]
    
    problems = []

    for hr_path in hr_imgs:
        hr = Image.open(hr_path).convert("RGB")
        w, h = hr.size
        for s in SCALES:
            lr_path = lr_dir(s, split) / hr_path.name
            if not lr_path.exists():
                problems.append((hr_path.name, f"missing LR X{s}", str(lr_path)))
                continue
            lr = Image.open(lr_path).convert("RGB")
            ew, eh = w // s, h // s  # Expected size
            if lr.size != (ew, eh):
                problems.append((hr_path.name, f"size mismatch X{s}", f"got={lr.size}, expected=({ew}, {eh})"))
    return problems

 Test valid images first

In [None]:
problems_valid = check_split(split="valid")
if problems_valid:
    print(f"[FAIL] Found {len(problems_valid)} problems (showing first 10):")
    for row in problems_valid[:10]:
        print(" -", row)
else:
    print("[OK] VALID split looks consistent for all images.")

Same check for train (optional)

In [None]:
problems_train = check_split(split="train")
if problems_train:
    print(f"[FAIL] Found {len(problems_train)} problems (showing first 10):")
    for row in problems_train[:10]:
        print(" -", row)
else:
    print("[OK] TRAIN split looks consistent for all images.")

(Optional) Visual spot-check of one triplet

In [None]:
import random
from IPython.display import display

split = "valid"
hr_dir = HR_VALID if split == "valid" else HR_TRAIN
hr_imgs = image_list(hr_dir)
random.seed(0)
hr_path = random.choice(hr_imgs)

im_hr = Image.open(hr_path).convert("RGB")
im_x2 = Image.open(lr_dir(2, split) / hr_path.name).convert("RGB")
im_x4 = Image.open(lr_dir(4, split) / hr_path.name).convert("RGB")
im_x8 = Image.open(lr_dir(8, split) / hr_path.name).convert("RGB")

print("Picked:", hr_path.name)
print("HR:", im_hr.size, " | X2:", im_x2.size, " | X4:", im_x4.size, " | X8:", im_x8.size)
display(im_hr.resize((im_hr.size[0]//4, im_hr.size[1]//4)))  # shrink for notebook view
display(im_x2)
display(im_x4)
display(im_x8)


### Import PyTorch and Set Helpers

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as TF
from PIL import Image
from pathlib import Path

def pil_to_tensor(img: Image.Image) -> torch.Tensor:
    """
    WHAT: Convert a PIL image (H x W x 3, uint8) to a float tensor in [0,1] shaped (3, H, W).
    WHY: PyTorch models expect CHW tensors, not PIL images.
    """
    t = TF.to_tensor(img)  # scales to [0,1], returns float32 CHW
    return t


Minimal dataset for aligned LR/HR pairs

What this does:

You choose a split ("train" or "valid") and a scale (2, 4, or 8).

It finds all HR files, then loads the matching LR file with the same filename from LR_bicubic/X{scale}/{split}.

It returns a dict with lr, hr, filename, and scale.

In [None]:
# Define class
class DIV2KDataset(Dataset):
    """
    WHAT: Paired dataset that yields {"lr": tensor, "hr": tensor, "filename": str, "scale": int}
    WHY: Training needs perfectly aligned LR-HR pairs for stable learning.
    """
    # Define data paths
    def __init__(self, data_root: Path, split: str="train", scale: int=4, patch_size: int | None = None):
        # Validate inputs
        assert split in {"train", "valid"}, "split must be 'train' or 'valid'"
        assert scale in {2, 4, 8}, "scale must be one of 2, 4, or 8"

        # Store parameters
        self.data_root = Path(data_root)
        self.split = split
        self.scale = scale
        self.patch_size = patch_size # HR patch size

        # Directories
        self.hr_dir = (self.data_root / "HR" / split)
        self.lr_dir = (self.data_root / "LR_bicubic" / f"X{scale}" / split)

        # List all HR images and keep only those that have a matching LR
        exts = {".png", ".jpg", ".jpeg"}
        hr_paths = sorted([p for p in self.hr_dir.iterdir() if p.suffix.lower() in exts])

        # Keep only items with matching LR images
        self.items = []
        for hrp in hr_paths:
            lrp = self.lr_dir / hrp.name
            if lrp.exists():
                self.items.append((lrp, hrp))
            else:
                print(f"Warning: Missing LR image for {hrp.name}, skipping.")
                pass
        
        if len(self.items) == 0:
            raise RuntimeError(f"No paired items found for split={split}, scale=X{scale}.\
                               \nCheck folders: {self.lr_dir} and {self.hr_dir}")

    # Dataset length
    def __len__(self):
        return len(self.items)

    # Aligned random crop helper
    def _aligned_random_crop(self, lr_img: Image.Image, hr_img: Image.Image, ps: int):
        """
        Crop an HR patch of size (ps x ps) at a random location,
        and the matching LR patch at (ps/scale x ps/scale).
        """
        assert ps % self.scale == 0, f"patch_size ({ps}) must be divisible by scale ({self.scale})"
        lr_ps = ps // self.scale

        # Random top-left in HR space
        max_x = hr_img.width  - ps
        max_y = hr_img.height - ps
        if max_x < 0 or max_y < 0:
            raise ValueError(f"HR image smaller than patch_size={ps}: got {(hr_img.width, hr_img.height)}")

        # Choose coordinates (0 allowed so +1 only if max>0)
        x = 0 if max_x == 0 else torch.randint(0, max_x + 1, (1,)).item()
        y = 0 if max_y == 0 else torch.randint(0, max_y + 1, (1,)).item()

        # HR crop
        hr_crop = hr_img.crop((x, y, x + ps, y + ps))

        # Corresponding LR coords (scaled down)
        lr_x, lr_y = x // self.scale, y // self.scale
        lr_crop = lr_img.crop((lr_x, lr_y, lr_x + lr_ps, lr_y + lr_ps))

        return lr_crop, hr_crop

    # Convert images to tensors
    def __getitem__(self, idx: int):
        lrp, hrp = self.items[idx]

        # Load as PIL
        lr_img = Image.open(lrp).convert("RGB")
        hr_img = Image.open(hrp).convert("RGB")

        # Aligned crop
        if self.patch_size is not None:
            lr_img, hr_img = self._aligned_random_crop(lr_img, hr_img, self.patch_size)

        # To tensors
        lr = pil_to_tensor(lr_img)  # (3, h, w), [0,1]
        hr = pil_to_tensor(hr_img)  # (3, H, W), [0,1]

        return {
            "lr": lr,
            "hr": hr,
            "filename": hrp.name,
            "scale": self.scale,
        }
    



Quick smoke test to validate shapes, scale relationship, and filenames

In [None]:
# X4 sanity check
ds_valid_x4 = DIV2KDataset(data_root=DATA_ROOT, split="valid", scale=4)
print("Pairs in valid X4:", len(ds_valid_x4))

sample = ds_valid_x4[0]
print("filename:", sample["filename"])
print("scale:", sample["scale"])
print("LR shape:", tuple(sample["lr"].shape))   # (3, h, w)
print("HR shape:", tuple(sample["hr"].shape))   # (3, H, W)

# double-check the scale relationship numerically (H == h*scale, W == w*scale)
_, h, w = sample["lr"].shape
_, H, W = sample["hr"].shape
print("H == h*scale ? ", H == h * sample["scale"], "(", H, "==", h, "*", sample["scale"], ")")
print("W == w*scale ? ", W == w * sample["scale"], "(", W, "==", w, "*", sample["scale"], ")")

In [None]:
# DataLoader smoke test
loader = DataLoader(ds_valid_x4, batch_size=2, shuffle=False, num_workers=0) # num_workers=0 keeps it simple; we can tune later.

batch = next(iter(loader))
print("Batch keys:", list(batch.keys()))
print("LR batch:", tuple(batch["lr"].shape))  # (B, 3, h, w)
print("HR batch:", tuple(batch["hr"].shape))  # (B, 3, H, W)
print("filenames:", batch["filename"])
print("scales:", batch["scale"])

In [None]:
# Patch extraction test
ds = DIV2KDataset(DATA_ROOT, split="train", scale=4, patch_size=128)
s = ds[0]
print("HR patch:", s["hr"].shape)  # (3, 128, 128)
print("LR patch:", s["lr"].shape)  # (3, 32, 32)