# Setup


In [None]:
from google.colab import drive

drive.mount("/content/drive")

In [None]:
working_directory = 'GitHub/dl-superresolution-ipynb'
%cd /content/drive/MyDrive/$working_directory
!git status

In [None]:
%pip install -r requirements.txt

## Git Management


In [None]:
!git config --global user.email "e11909390@student.tuwien.ac.at"
!git config --global user.name "Lollobin"

In [None]:
!git pull

In [None]:
!git status

In [None]:
!git commit -a -m "added connection for google colab"

In [None]:
!git push

# Preparation


## Data Preprocessing

Generate patches for training and low res images for validation and testing.


### Patch Generation


In [None]:
from PIL import Image
from tqdm import tqdm

import matplotlib.pyplot as plt
import patchify
import numpy as np
import matplotlib.gridspec as gridspec
import glob as glob
import os
import cv2

SHOW_PATCHES = False
STRIDE = 114
SIZE = 224
SCALE = 4.0  # Upscale factor 2, 3 or 4


def show_patches(patches):
    plt.figure(figsize=(patches.shape[0], patches.shape[1]))
    gs = gridspec.GridSpec(patches.shape[0], patches.shape[1])
    gs.update(wspace=0.01, hspace=0.02)
    counter = 0
    for i in range(patches.shape[0]):
        for j in range(patches.shape[1]):
            ax = plt.subplot(gs[counter])
            plt.imshow(patches[i, j, 0, :, :, :])
            plt.axis("off")
            counter += 1
    plt.show()


def create_patches(
    input_paths,
    out_hr_path,
    out_lr_path,
):
    os.makedirs(out_hr_path, exist_ok=True)
    os.makedirs(out_lr_path, exist_ok=True)
    all_paths = []
    for input_path in input_paths:
        all_paths.extend(glob.glob(f"{input_path}/*"))
    print(f"Creating patches for {len(all_paths)} images")
    for image_path in tqdm(all_paths, total=len(all_paths)):
        image = Image.open(image_path)
        image_name = image_path.split(os.path.sep)[-1].split(".")[0]
        w, h = image.size
        # Create patches of width and height SIZE.
        patches = patchify.patchify(np.array(image), (SIZE, SIZE, 3), STRIDE)
        if SHOW_PATCHES:
            show_patches(patches)
        counter = 0
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                counter += 1
                patch = patches[i, j, 0, :, :, :]
                patch = cv2.cvtColor(patch, cv2.COLOR_RGB2BGR)
                cv2.imwrite(f"{out_hr_path}/{image_name}_{counter}.png", patch)
                # Convert to bicubic and save.
                h, w, _ = patch.shape
                low_res_img = cv2.resize(
                    patch,
                    (int(w * (1.0 / SCALE)), int(h * 1.0 / SCALE)),
                    interpolation=cv2.INTER_CUBIC,
                )
                # Now upscale using BICUBIC.
                high_res_upscale = cv2.resize(
                    low_res_img, (w, h), interpolation=cv2.INTER_CUBIC
                )
                cv2.imwrite(
                    f"{out_lr_path}/{image_name}_{counter}.png", high_res_upscale
                )

In [None]:
create_patches(["input/PIRM"], "input/PIRM_hr_patches_4x", "input/PIRM_lr_patches_4x")

### Bicubic Scaling for Validation


In [None]:
from PIL import Image
import glob as glob
import os

paths = ["input/Set14/original", "input/Set5/original"]
scale_factor = "4x"  # options 2x, 3x, 4x
images = []

for path in paths:
    images.extend(glob.glob(f"{path}/*.png"))
print(len(images))
# Select scaling-factor and set up directories according to that.
if scale_factor == "2x":
    scale_factor = 0.5
    os.makedirs("input/test_bicubic_rgb_2x", exist_ok=True)
    save_path_lr = "input/test_bicubic_rgb_2x"
    os.makedirs("input/test_hr", exist_ok=True)
    save_path_hr = "input/test_hr"
if scale_factor == "3x":
    scale_factor = 0.333
    os.makedirs("input/test_bicubic_rgb_3x", exist_ok=True)
    os.makedirs("input/test_hr", exist_ok=True)
    save_path_lr = "input/test_bicubic_rgb_3x"
    save_path_hr = "input/test_hr"
if scale_factor == "4x":
    scale_factor = 0.25
    os.makedirs("input/test_bicubic_rgb_4x", exist_ok=True)
    os.makedirs("input/test_hr", exist_ok=True)
    save_path_lr = "input/test_bicubic_rgb_4x"
    save_path_hr = "input/test_hr"
print(f"Scaling factor: {scale_factor}")
print(f"Low resolution images save path: {save_path_lr}")
for image in images:
    orig_img = Image.open(image)
    image_name = image.split(os.path.sep)[-1]
    w, h = orig_img.size[:]
    print(f"Original image dimensions: {w}, {h}")
    orig_img.save(f"{save_path_hr}/{image_name}")
    low_res_img = orig_img.resize(
        (int(w * scale_factor), int(h * scale_factor)), Image.BICUBIC
    )
    # Upscale using BICUBIC.
    high_res_upscale = low_res_img.resize((w, h), Image.BICUBIC)
    high_res_upscale.save(f"{save_path_lr}/{image_name}")

## Utils

Define utility functions that are used later on.


In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt
import torch
from torchvision.utils import save_image

plt.style.use("ggplot")


def psnr(label, outputs, max_val=1.0):
    """
    Compute Peak Signal to Noise Ratio (the higher the better).
    PSNR = 20 * log10(MAXp) - 10 * log10(MSE).
    https://en.wikipedia.org/wiki/Peak_signal-to-noise_ratio#Definition
    Note that the output and label pixels (when dealing with images) should
    be normalized as the `max_val` here is 1 and not 255.
    """
    label = label.cpu().detach().numpy()
    outputs = outputs.cpu().detach().numpy()
    diff = outputs - label
    rmse = math.sqrt(np.mean((diff) ** 2))
    if rmse == 0:
        return 100
    else:
        PSNR = 20 * math.log10(max_val / rmse)
        return PSNR


def save_plot(train_loss, val_loss, train_psnr, val_psnr):

    # Loss plots.
    print("Saving loss plots...")
    plt.figure(figsize=(10, 7))
    plt.plot(train_loss, color="orange", label="train loss")
    plt.plot(val_loss, color="red", label="validation loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.savefig("outputs/loss.png")
    plt.close()

    # PSNR plots.
    print("Saving PSNR plots...")
    plt.figure(figsize=(10, 7))
    plt.plot(train_psnr, color="green", label="train PSNR dB")
    plt.plot(val_psnr, color="blue", label="validation PSNR dB")
    plt.xlabel("Epochs")
    plt.ylabel("PSNR (dB)")
    plt.legend()
    plt.savefig("outputs/psnr.png")
    plt.close()


def save_model_state(model):
    # save the model to disk
    print("Saving model state...")
    torch.save(model.state_dict(), "outputs/model.pth")


def save_model(epochs, model, optimizer, criterion):
    """
    Function to save the trained model to disk.
    """
    # Remove the last model checkpoint if present.
    torch.save(
        {
            "epoch": epochs + 1,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "loss": criterion,
        },
        f"outputs/model_ckpt.pth",
    )


def save_validation_results(outputs, epoch, batch_iter):
    """
    Function to save the validation reconstructed images.
    """
    save_image(outputs, f"outputs/valid_results/val_sr_{epoch}_{batch_iter}.png")

# Data Loading


In [None]:
import glob as glob
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from torchvision import transforms

TRAIN_BATCH_SIZE = 128
TEST_BATCH_SIZE = 1


transform_image = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

transform_label = transforms.Compose(
    [
        transforms.ToTensor(),
    ]
)


class ResNetSRCNNDataset(Dataset):
    def __init__(self, image_paths, label_paths):
        self.all_image_paths = glob.glob(f"{image_paths}/*")
        self.all_label_paths = glob.glob(f"{label_paths}/*")

    def __len__(self):
        return len(self.all_image_paths)

    def __getitem__(self, index):
        image = Image.open(self.all_image_paths[index]).convert("RGB")
        label = Image.open(self.all_label_paths[index]).convert("RGB")

        image = transform_image(image)
        label = transform_label(label)

        return image, label


# Prepare the datasets.
def get_datasets(
    train_image_paths, train_label_paths, valid_image_path, valid_label_paths
):
    dataset_train = ResNetSRCNNDataset(train_image_paths, train_label_paths)
    dataset_valid = ResNetSRCNNDataset(valid_image_path, valid_label_paths)
    return dataset_train, dataset_valid


# Prepare the data loaders
def get_dataloaders(dataset_train, dataset_valid):
    train_loader = DataLoader(
        dataset_train, batch_size=TRAIN_BATCH_SIZE, shuffle=True, pin_memory=True
    )
    valid_loader = DataLoader(
        dataset_valid, batch_size=TEST_BATCH_SIZE, shuffle=False, pin_memory=True
    )
    return train_loader, valid_loader

In [None]:
%matplotlib inline

In [None]:
def imshow(img):
    img = img.numpy().transpose((1, 2, 0))
    img = np.clip(img, 0, 1)
    plt.imshow(img)


TRAIN_LABEL_PATHS = "input/PIRM_hr_patches_224"
TRAN_IMAGE_PATHS = "input/PIRM_lr_patches_224"
VALID_LABEL_PATHS = "input/test_hr"
VALID_IMAGE_PATHS = "input/test_bicubic_rgb_2x"

dataset_train, dataset_valid = get_datasets(
    TRAN_IMAGE_PATHS, TRAIN_LABEL_PATHS, VALID_IMAGE_PATHS, VALID_LABEL_PATHS
)
train_loader, valid_loader = get_dataloaders(dataset_train, dataset_valid)

images = next(iter(train_loader))
# images = next(iter(valid_loader))

imshow(images[1][0])

# Network Definition


## ResSR1

Contains 1 ResNet block in the encoder.

Trainable parameters (decoder): 23.523

In [None]:
import torch.nn as nn
from torchvision.models import resnet18, ResNet18_Weights

# Load a pretrained ResNet model
resnet = resnet18(weights=ResNet18_Weights.DEFAULT)


class ResSR1(nn.Module):
    def __init__(self):
        super(ResSR1, self).__init__()

        # Use only the initial layers of ResNet without downsampling
        self.encoder = nn.Sequential(
            resnet.conv1,  # First convolutional layer
            resnet.bn1,  # Batch normalization
            resnet.relu,  # Activation
            # resnet.maxpool,  # Max pooling
            resnet.layer1,  # First residual block (without downsampling)
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                64, 32, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 16, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, 3, kernel_size=3, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 2
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18, ResNet18_Weights

# Load a pretrained ResNet model
resnet = resnet18(weights=ResNet18_Weights.DEFAULT)


class ResSR1_m(nn.Module):
    def __init__(self):
        super(ResSR1_m, self).__init__()

        # Use only the initial layers of ResNet without downsampling
        self.encoder = nn.Sequential(
            resnet.conv1,  # First convolutional layer
            resnet.bn1,  # Batch normalization
            resnet.relu,  # Activation
            resnet.maxpool,  # Max pooling
            resnet.layer1,  # First residual block (without downsampling)
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                64, 32, kernel_size=3, stride=2, padding=0, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                32, 16, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, 3, kernel_size=1, padding=0),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 4
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

## ResSR2

Contains 2 ResNet blocks in the encoder.

Trainable parameters (decoder): 97.315

In [None]:
import torch.nn as nn
from torchvision.models import resnet18, ResNet18_Weights

# Load a pretrained ResNet model
resnet = resnet18(weights=ResNet18_Weights.DEFAULT)


class ResSR2(nn.Module):
    def __init__(self):
        super(ResSR2, self).__init__()

        # Use only the initial layers of ResNet without downsampling
        self.encoder = nn.Sequential(
            resnet.conv1,  # First convolutional layer
            resnet.bn1,  # Batch normalization
            resnet.relu,  # Activation
            resnet.maxpool,  # Max pooling
            resnet.layer1,  # First residual block (without downsampling)
            resnet.layer2,  # Second residual block (with downsampling)
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                128, 64, kernel_size=3, stride=2, padding=1, output_padding=0
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                64, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                64, 32, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 16, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, 3, kernel_size=3, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 4
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

In [None]:
import torch.nn as nn
from torchvision.models import resnet18, ResNet18_Weights

# Load a pretrained ResNet model
resnet = resnet18(weights=ResNet18_Weights.DEFAULT)


class ResSR2_m(nn.Module):
    def __init__(self):
        super(ResSR2_m, self).__init__()

        # Use only the initial layers of ResNet without downsampling
        self.encoder = nn.Sequential(
            resnet.conv1,  # First convolutional layer
            resnet.bn1,  # Batch normalization
            resnet.relu,  # Activation
            resnet.maxpool,  # Max pooling
            resnet.layer1,  # First residual block (without downsampling)
            resnet.layer2,  # Second residual block (with downsampling)
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                128, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                64, 32, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 16, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, 3, kernel_size=3, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 4
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

## ResSR3

Contains 3 ResNet blocks in the encoder.

Trainable parameters (decoder): 392.355

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet18, ResNet18_Weights

# Load a pretrained ResNet model
resnet = resnet18(weights=ResNet18_Weights.DEFAULT)


class ResSR3(nn.Module):
    def __init__(self):
        super(ResSR3, self).__init__()

        # Use only the initial layers of ResNet without downsampling
        self.encoder = nn.Sequential(
            resnet.conv1,  # First convolutional layer
            resnet.bn1,  # Batch normalization
            resnet.relu,  # Activation
            # resnet.maxpool,  # Max pooling
            resnet.layer1,  # First residual bloc
            resnet.layer2,  # Second residual bloc
            resnet.layer3,  # Third residual block
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                256, 128, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                128, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                64, 32, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 16, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, 3, kernel_size=3, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 8
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

## SRCNN

In [None]:
import torch.nn as nn
import torch.nn.functional as F


class SRCNN(nn.Module):
    def __init__(self):
        super(SRCNN, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=9, stride=(1, 1), padding=(2, 2))
        self.conv2 = nn.Conv2d(64, 32, kernel_size=1, stride=(1, 1), padding=(2, 2))
        self.conv3 = nn.Conv2d(32, 3, kernel_size=5, stride=(1, 1), padding=(2, 2))

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.conv3(x)

        return x

## VGGSR

In [None]:
from torchvision.models import vgg16_bn, VGG16_BN_Weights
from torch import nn

vgg = vgg16_bn(weights=VGG16_BN_Weights.DEFAULT)


class VGGSR(nn.Module):
    def __init__(self):
        super(VGGSR, self).__init__()

        self.encoder = nn.Sequential(
            vgg.features[0],  # Convolutional layer - 3
            vgg.features[1],  # Batch Normalization
            vgg.features[2],  # ReLU
            vgg.features[3],  # Convolutional layer - 5
            vgg.features[4],  # Batch Normalization
            vgg.features[5],  # ReLU
            vgg.features[6],  # Max Pooling - 10
            vgg.features[7],  # Convolutional layer - 12
            vgg.features[8],  # Batch Normalization
            vgg.features[9],  # ReLU
            vgg.features[10],  # Convolutional layer - 14
            vgg.features[11],  # Batch Normalization
            vgg.features[12],  # ReLU
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                128, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 2
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

## VGGRS1p

In [None]:
from torchvision.models import vgg16_bn, VGG16_BN_Weights
from torch import nn

vgg = vgg16_bn(weights=VGG16_BN_Weights.DEFAULT)


class VGGSR1p(nn.Module):
    def __init__(self):
        super(VGGSR1p, self).__init__()

        self.encoder = nn.Sequential(
            vgg.features[0],  # Convolutional layer - 3
            vgg.features[1],  # Batch Normalization
            vgg.features[2],  # ReLU
            vgg.features[3],  # Convolutional layer - 5
            vgg.features[4],  # Batch Normalization
            vgg.features[5],  # ReLU
            vgg.features[6],  # Max Pooling - 10
            vgg.features[7],  # Convolutional layer - 12
            vgg.features[8],  # Batch Normalization
            vgg.features[9],  # ReLU
            vgg.features[10],  # Convolutional layer - 14
            vgg.features[11],  # Batch Normalization
            vgg.features[12],  # ReLU
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                128, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 2
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

In [None]:
from torchvision.models import vgg16_bn, VGG16_BN_Weights
from torch import nn

vgg = vgg16_bn(weights=VGG16_BN_Weights.DEFAULT)


class VGGSR1p(nn.Module):
    def __init__(self):
        super(VGGSR1p, self).__init__()

        self.encoder = nn.Sequential(
            vgg.features[0],  # Convolutional layer - 3
            vgg.features[1],  # Batch Normalization
            vgg.features[2],  # ReLU
            vgg.features[3],  # Convolutional layer - 5
            vgg.features[4],  # Batch Normalization
            vgg.features[5],  # ReLU
            vgg.features[6],  # Max Pooling - 10
            vgg.features[7],  # Convolutional layer - 12
            vgg.features[8],  # Batch Normalization
            vgg.features[9],  # ReLU
            vgg.features[10],  # Convolutional layer - 14
            vgg.features[11],  # Batch Normalization
            vgg.features[12],  # ReLU
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                128, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 2
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

## VGGSR2

In [None]:
from torchvision.models import vgg16_bn, VGG16_BN_Weights
from torch import nn

vgg = vgg16_bn(weights=VGG16_BN_Weights.DEFAULT)
print(vgg)


class VGGSR2(nn.Module):
    def __init__(self):
        super(VGGSR2, self).__init__()

        self.encoder = nn.Sequential(
            vgg.features[0],  # Convolutional layer - 3
            vgg.features[1],  # Batch Normalization
            vgg.features[2],  # ReLU
            vgg.features[3],  # Convolutional layer - 5
            vgg.features[4],  # Batch Normalization
            vgg.features[5],  # ReLU
            vgg.features[6],  # Max Pooling - 10
            vgg.features[7],  # Convolutional layer - 12
            vgg.features[8],  # Batch Normalization
            vgg.features[9],  # ReLU
            vgg.features[10],  # Convolutional layer - 14
            vgg.features[11],  # Batch Normalization
            vgg.features[12],  # ReLU
            vgg.features[13],  # Max Pooling
            vgg.features[14],  # Convolutional layer
            vgg.features[15],
            vgg.features[16],
            # vgg.features[17],
            # vgg.features[18],
            # vgg.features[19],
            # vgg.features[20],
            # vgg.features[21],
            # vgg.features[22],
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                256, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                64, 3, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            # nn.ReLU(inplace=True),
            # nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 4
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

In [None]:
from torchvision.models import vgg16_bn, VGG16_BN_Weights
from torch import nn

vgg = vgg16_bn(weights=VGG16_BN_Weights.DEFAULT)
print(vgg)


class VGGSR2p(nn.Module):
    def __init__(self):
        super(VGGSR2p, self).__init__()

        self.encoder = nn.Sequential(
            vgg.features[0],  # Convolutional layer - 3
            vgg.features[1],  # Batch Normalization
            vgg.features[2],  # ReLU
            vgg.features[3],  # Convolutional layer - 5
            vgg.features[4],  # Batch Normalization
            vgg.features[5],  # ReLU
            vgg.features[6],  # Max Pooling - 10
            vgg.features[7],  # Convolutional layer - 12
            vgg.features[8],  # Batch Normalization
            vgg.features[9],  # ReLU
            vgg.features[10],  # Convolutional layer - 14
            vgg.features[11],  # Batch Normalization
            vgg.features[12],  # ReLU
            vgg.features[13],  # Max Pooling
            vgg.features[14],  # Convolutional layer
            vgg.features[15],
            vgg.features[16],
            # vgg.features[17],
            # vgg.features[18],
            # vgg.features[19],
            # vgg.features[20],
            # vgg.features[21],
            # vgg.features[22],
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                256, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(
                64, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 4
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

## VGGSR0

In [None]:
from torchvision.models import vgg16_bn, VGG16_BN_Weights
from torch import nn

vgg = vgg16_bn(weights=VGG16_BN_Weights.DEFAULT)


class VGGSR0(nn.Module):
    def __init__(self):
        super(VGGSR0, self).__init__()

        self.encoder = nn.Sequential(
            vgg.features[0],  # Convolutional layer - 3
            vgg.features[1],  # Batch Normalization
            vgg.features[2],  # ReLU
            vgg.features[3],  # Convolutional layer - 5
            vgg.features[4],  # Batch Normalization
            vgg.features[5],  # ReLU
            vgg.features[6],  # Max Pooling - 10
            vgg.features[7],  # Convolutional layer - 12
            vgg.features[8],  # Batch Normalization
            vgg.features[9],  # ReLU
        )

        # SRCNN-inspired layers for feature extraction and reconstruction
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                128, 64, kernel_size=3, stride=2, padding=1, output_padding=1
            ),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1),
        )

    def forward(self, x):
        # Pad the input
        original_size = x.size()[2:]
        scale_factor = 2
        pad_h = (scale_factor - original_size[0] % scale_factor) % scale_factor
        pad_w = (scale_factor - original_size[1] % scale_factor) % scale_factor
        padding = (0, pad_w, 0, pad_h)  # (left, right, top, bottom)
        x = nn.functional.pad(x, padding, mode="reflect")

        # Feature extraction
        features = self.encoder(x)

        # Reconstruction
        x = self.decoder(features)

        # Remove padding
        x = x[:, :, : original_size[0], : original_size[1]]

        return x

## Testing the Model

In [None]:
import torch
from torchscan import summary


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


# Create the model
model_dummy = VGGSR2p()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_dummy = model_dummy.to(device)

for param in model_dummy.encoder.parameters():
    param.requires_grad = False

# Example forward pass with a dummy input
dummy_input = torch.randn(1, 3, 224, 221).to(device)
encoder_output = model_dummy.encoder(dummy_input)
decoder_output = model_dummy.decoder(encoder_output)
output = model_dummy(dummy_input)

# Output shape should match the input shape
print(f"Input shape: {dummy_input.shape}")
print(f"Encoder output shape: {encoder_output.shape}")
print(f"Decoder output shape: {decoder_output.shape}")
print(f"Final output shape: {output.shape}")
print(
    f"Number of parameters (encoder + decoder ): {count_parameters(model_dummy.encoder)}+{count_parameters(model_dummy.decoder)} = {count_parameters(model_dummy)}"
)


summary(model_dummy, (3, 224, 221), receptive_field=True)

# Training


In [None]:
import torch
import time
import torch.nn as nn
from tqdm import tqdm
import copy

torch.cuda.is_available()

# Constants
# TRAIN_LABEL_PATHS = 'input/PIRM_hr_patches_4x'
# TRAN_IMAGE_PATHS = 'input/PIRM_lr_patches_4x'
TRAIN_LABEL_PATHS = "input/T91_hr_patches_32_x4"
TRAN_IMAGE_PATHS = "input/T91_lr_patches_32_x4"
VALID_LABEL_PATHS = "input/test_hr"
VALID_IMAGE_PATHS = "input/test_bicubic_rgb_4x"
SAVE_VALIDATION_RESULTS = True

dataset_train, dataset_valid = get_datasets(
    TRAN_IMAGE_PATHS, TRAIN_LABEL_PATHS, VALID_IMAGE_PATHS, VALID_LABEL_PATHS
)
train_loader, valid_loader = get_dataloaders(dataset_train, dataset_valid)

print(f"Training samples: {len(dataset_train)}")
print(f"Validation samples: {len(dataset_valid)}")

In [None]:
def denormalize(tensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    mean = torch.tensor(mean).view(1, 3, 1, 1).to(tensor.device)
    std = torch.tensor(std).view(1, 3, 1, 1).to(tensor.device)
    tensor = tensor * std + mean
    return torch.clamp(tensor, 0, 1)


# Generic function to train a model
def train_model(model, criterion, optimizer=None, scheduler=None, num_epochs=10):
    since = time.time()

    # Copy weights
    best_model_wts = copy.deepcopy(model.state_dict())
    best_psnr = 0.0

    train_loss, val_loss = [], []
    train_psnr, val_psnr = [], []

    lr_list = []
    if scheduler is not None:
        lr_list.append((1, scheduler.get_last_lr()[0]))

    for epoch in range(num_epochs):
        print("Epoch {}/{}".format(epoch, num_epochs - 1))
        print("-" * 10)

        # Each epoch has a training and validation phase
        for phase in ["train", "val"]:
            if phase == "train":
                # if scheduler is not None:
                # scheduler.step()
                model.train()  # Set model to training mode
                dataloader = train_loader
            else:
                model.eval()  # Set model to evaluate mode
                dataloader = valid_loader

            running_loss = 0.0
            running_psnr = 0.0

            # Iterate over data.
            for bi, data in tqdm(enumerate(dataloader), total=len(dataloader)):
                inputs = data[0].to(device)
                labels = data[1].to(device)

                # zero the parameter gradients
                if optimizer is not None:
                    optimizer.zero_grad()

                # forward
                # track history only if in train
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    outputs = denormalize(outputs)

                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item()
                running_psnr += psnr(labels, outputs)

            epoch_loss = running_loss / len(dataloader)
            epoch_psnr = running_psnr / len(dataloader)

            print("{} Loss: {:.4f} PSNR: {:.4f}".format(phase, epoch_loss, epoch_psnr))

            # deep copy the model
            if phase == "val" and epoch_psnr > best_psnr:
                best_psnr = epoch_psnr
                best_model_wts = copy.deepcopy(model.state_dict())

            if phase == "train":
                train_loss.append(epoch_loss)
                train_psnr.append(epoch_psnr)

            if phase == "val":
                val_loss.append(epoch_loss)
                val_psnr.append(epoch_psnr)

        if scheduler is not None:
            last_lr = scheduler.get_last_lr()[0]
            scheduler.step(epoch_loss)
            new_lr = scheduler.get_last_lr()[0]

            if last_lr != new_lr:
                print("LR changed from ", last_lr, " to ", new_lr)
                lr_list.append((epoch + 1, new_lr))

        # save state and plots every 10 epochs
        if (epoch + 1) % 10 == 0 or epoch == num_epochs - 1:
            save_model_state(model)
            save_plot(train_loss, val_loss, train_psnr, val_psnr)
        print()

    time_elapsed = time.time() - since
    print(
        "Training complete in {:.0f}m {:.0f}s".format(
            time_elapsed // 60, time_elapsed % 60
        )
    )
    print("Best val PSNR: {:4f}".format(best_psnr))

    # load best model weights
    model.load_state_dict(best_model_wts)

    # write PSNR values to text file for LaTeX
    with open("outputs/psnr.txt", "w") as f:
        f.write("Train PSNR values: \n")
        for epoch, psnr_value in enumerate(train_psnr):
            f.write(f"({epoch+1}, {psnr_value:.4f})")
        f.write("\n\n")
        f.write("Validation PSNR values: \n")
        for epoch, psnr_value in enumerate(val_psnr):
            f.write(f"({epoch+1}, {psnr_value:.4f})")

        f.write("\n\n")
        f.write("Learning rate values: \n")
        for epoch, lr_value in lr_list:
            f.write(f"({epoch+1}, {lr_value})")
    return model

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(torch.cuda.get_device_name())

model = VGGSR1p().to(device)
# model.load_state_dict(torch.load("outputs/experiments_final/VGGSR_e150_model.pth"))
criterion = nn.MSELoss()
epochs = 500
lr = 0.001

optimizer = torch.optim.Adam(model.parameters(), lr)
scheduler = None

# final_lr = 0.00001
# gamma = (final_lr / lr) ** (1 / epochs)
# scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=gamma)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer=optimizer, mode="min", factor=0.1, patience=25
)


# Freeze the encoder layers
for param in model.encoder.parameters():
    param.requires_grad = False

model = train_model(model, criterion, optimizer, scheduler, num_epochs=epochs)

## Fine Tuning

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(torch.cuda.get_device_name())


model = VGGSR1p().to(device)
model.load_state_dict(torch.load("outputs/experiments_final/model.pth"))
criterion = nn.MSELoss()
epochs = 50
lr = 0.0001

optimizer = torch.optim.Adam(model.parameters(), lr)
scheduler = None



# Unfreeze the encoder layers

for param in model.encoder.parameters():

    param.requires_grad = True


model_fine_tuned = train_model(

    model, criterion, optimizer, scheduler, num_epochs=epochs
)

# Testing


Test loading and upscaling a single image.


In [None]:
index = 0
valid_images = valid_loader.dataset[index]

lr_image = denormalize(valid_images[0].unsqueeze(0)).to(device)
hr_image = valid_images[1].unsqueeze(0).to(device)

with torch.no_grad():
    sr_image = denormalize(model(valid_images[0].unsqueeze(0).to(device)))

lr_psnr = psnr(hr_image, lr_image)
sr_psnr = psnr(hr_image, sr_image)
hr_psnr = psnr(hr_image, hr_image)

lr_image = lr_image.cpu()[0].numpy().transpose((1, 2, 0))
hr_image = hr_image.cpu()[0].numpy().transpose((1, 2, 0))
sr_image = sr_image.cpu()[0].numpy().transpose((1, 2, 0))

plt.figure(figsize=(20, 10))
plt.subplot(1, 3, 1)
plt.title(f"Low Resolution (Bicubic Scaling), PSNR {lr_psnr:.2f}")
plt.imshow(lr_image)
plt.subplot(1, 3, 2)
plt.title(f"Super Resolution (CNN Scaling), PSNR {sr_psnr:.2f}")
plt.imshow(sr_image)
plt.subplot(1, 3, 3)
plt.title(f"High Resolution (Label), PSNR {hr_psnr:.2f}")
plt.imshow(hr_image)
plt.show()

In [None]:
index = 12
valid_images = valid_loader.dataset[index]

lr_image = valid_images[0].unsqueeze(0).to(device)
hr_image = valid_images[1].unsqueeze(0).to(device)

with torch.no_grad():
    sr_image = model(valid_images[0].unsqueeze(0).to(device))

lr_psnr = psnr(hr_image, lr_image)
sr_psnr = psnr(hr_image, sr_image)
hr_psnr = psnr(hr_image, hr_image)

lr_image = lr_image.cpu()[0].numpy().transpose((1, 2, 0))
hr_image = hr_image.cpu()[0].numpy().transpose((1, 2, 0))
sr_image = sr_image.cpu()[0].numpy().transpose((1, 2, 0))

plt.figure(figsize=(20, 10))
plt.subplot(1, 3, 1)
plt.title(f"Low Resolution (Bicubic Scaling), PSNR {lr_psnr:.2f}")
plt.imshow(lr_image)
plt.subplot(1, 3, 2)
plt.title(f"Super Resolution (CNN Scaling), PSNR {sr_psnr:.2f}")
plt.imshow(sr_image)
plt.subplot(1, 3, 3)
plt.title(f"High Resolution (Label), PSNR {hr_psnr:.2f}")
plt.imshow(hr_image)
plt.show()

Test sets of images


In [None]:
import torch
import glob as glob
import numpy as np
import matplotlib.pyplot as plt

from tqdm import tqdm
from PIL import Image
from torch.utils.data import DataLoader, Dataset

SCALE = 4.0

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def validate(model, dataloader, device):
    model.eval()
    running_loss = 0.0
    running_psnr = 0.0
    with torch.no_grad():
        for bi, data in tqdm(enumerate(dataloader), total=len(dataloader)):
            image_data = data[0].to(device)
            label = data[1].to(device)
            outputs = model(image_data)

            outputs = denormalize(outputs)

            # Calculate batch psnr (once every `batch_size` iterations).
            batch_psnr = psnr(label, outputs)
            running_psnr += batch_psnr

    final_loss = running_loss / len(dataloader.dataset)
    final_psnr = running_psnr / len(dataloader)
    return final_loss, final_psnr


# The SRCNN dataset module.
class TestDataset(Dataset):
    def __init__(self, image_paths):
        self.all_image_paths = glob.glob(f"{image_paths}/*")

    def __len__(self):
        return len(self.all_image_paths)

    def __getitem__(self, index):
        # The high resolution ground truth label.
        label = Image.open(self.all_image_paths[index]).convert("RGB")
        w, h = label.size[:]

        # Convert to 2x bicubic.
        low_res_img = label.resize(
            (int(w * (1.0 / SCALE)), int(h * (1.0 / SCALE))), Image.BICUBIC
        )
        # The low resolution input image.
        image = low_res_img.resize((w, h), Image.BICUBIC)

        image = np.array(image, dtype=np.float32)
        label = np.array(label, dtype=np.float32)

        image /= 255.0
        label /= 255.0

        # normalize lr image
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        image = (image - mean) / std

        image = image.transpose([2, 0, 1])
        label = label.transpose([2, 0, 1])

        return (
            torch.tensor(image, dtype=torch.float),
            torch.tensor(label, dtype=torch.float),
        )


# Prepare the datasets.
def get_test_datasets(image_paths):
    dataset_test = TestDataset(image_paths)
    return dataset_test


# Prepare the data loaders
def get_test_dataloaders(dataset_test):
    test_loader = DataLoader(dataset_test, batch_size=1, shuffle=False)
    return test_loader

In [None]:
# Load the model.
device = "cuda" if torch.cuda.is_available() else "cpu"

model = VGGSR1p().to(device)
model.load_state_dict(torch.load("outputs/model.pth"))
data_paths = [["input/Set5/original", "Set5"], ["input/Set14/original", "Set14"]]
for data_path in data_paths:
    dataset_test = get_test_datasets(data_path[0])
    test_loader = get_test_dataloaders(dataset_test)
    _, test_psnr = validate(model, test_loader, device)
    print(f"Test PSNR on {data_path[1]}: {test_psnr:.3f}")

In [None]:
torch.cuda.empty_cache()