# SRCNN2024 - Super Resolution Convolutional Neural Network

## 1. Importing Libraries

We import the necessary libraries for image processing and deep learning. This includes libraries for data manipulation (pandas, numpy), visualization (matplotlib), deep learning (torch), and image processing (PIL). These libraries will be used throughout the project for tasks such as loading and processing images, building neural networks, and optimizing model parameters.

In [1]:
import pandas
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms.functional as TF
from torch.utils.data import TensorDataset, DataLoader

import os
import sys
import glob

import cv2
import tifffile
from PIL import Image
from scipy.signal import convolve2d
from skimage.util import random_noise

## 2. Data

We begin by mounting Google Drive to access our project data stored there. This step allows seamless integration with Google Colab, facilitating data loading and manipulation directly from our Drive storage.

In [2]:
from google.colab import drive

# Set the path to the project directory in Google Drive
drive_path = '/content/gdrive/Shareddrives/Final Project/SRCNN2024/'

# Mount Google Drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


### Cropping images

To ensure uniformity across the dataset, we crop all images to a target size of $256\times256$. This step is crucial for maintaining consistency in input dimensions, which is essential for deep learning models that require fixed-size inputs.

In [3]:
def crop_images(input_folder, output_folder):
    # Create output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Iterate through all directories and files in the input folder
    for root, dirs, files in os.walk(input_folder):
        for filename in files:
            if filename.endswith('.tif'):
                # Open image
                image_path = os.path.join(root, filename)
                img = Image.open(image_path)

                # Crop image
                cropped_img = img.crop((0, 0, 256, 256))

                # Save cropped image
                output_path = os.path.join(output_folder, filename)
                cropped_img.save(output_path)

# input_folder = drive_path + "data/Images/"
# output_folder = drive_path + "data/HR_data/"
# crop_images(input_folder, output_folder)

By cropping the images to a consistent size, we ensure that all samples in the dataset have the same dimensions, simplifying the preprocessing pipeline and ensuring compatibility with downstream tasks such as model training and evaluation.

### Image downsampling

We define a function `downsample` to perform downsampling on an image. This function takes parameters such as kernel size, scale factor, and noise statistics to generate a downsampled low-resolution (LR) version of the input image. Additionally, we provide a utility function `create_LR_images` to apply the downsampling function to all images in a specified folder and save the resulting LR images to an output folder.

The downsampling operation involves convolving the image with a downsampling kernel followed by downsampling by a specified factor. Mathematically, given an input image $I_{\text{HR}}$, a downsampling kernel $K$, the downsampling operation with noise can be represented as:

\begin{align}
  I_{\text{LR}} = (I_{\text{HR}} * K) + N
\end{align}

where $I_{\text{LR}}$ is the downsampled image, $*$ denotes the convolution operation, and $N$ represents the noise added to the downsampled image.


In [4]:
def downsample(image, kernel_size=(5, 5), scale_factor=2, noise_mean=0, noise_std=1):
    # Define a downsampling kernel
    kernel = np.ones(kernel_size, dtype=np.float64) / (kernel_size[0] * kernel_size[1])

    # Initialize an empty array to store the downsampled images for each channel
    LR_images = []

    # Iterate over each channel
    for channel in range(image.shape[2]):
        # Convolve the channel with the downsampling kernel
        downsampled_channel = convolve2d(image[:, :, channel], kernel, mode='same')[::scale_factor, ::scale_factor]

        # Add noise to the downsampled channel
        noise = random_noise(np.zeros(downsampled_channel.shape), mean=noise_mean, var=noise_std**2)
        LR_channel = downsampled_channel + noise

        # Clip the values to be within [0, 255]
        LR_channel = np.clip(LR_channel, 0, 255)

        # Append the downsampled channel to the list
        LR_images.append(LR_channel)

    # Convert the list of downsampled channels to an array
    LR_image = np.stack(LR_images, axis=-1)

    return LR_image


def create_LR_images(input_folder, output_folder):
    # Create output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Iterate through all images in the input folder
    for filename in os.listdir(input_folder):
        if filename.endswith('.tif'):
            # Read the image
            image_path = os.path.join(input_folder, filename)
            image = tifffile.imread(image_path)

            # Create the LR image
            LR_image = downsample(image)

            # Save the LR image to the output folder
            output_path = os.path.join(output_folder, filename)
            tifffile.imwrite(output_path, LR_image.astype(np.uint8))


input_folder = drive_path + "data/HR_data/"
output_folder = drive_path + "data/LR_data/"
# create_LR_images(input_folder, output_folder)

### Loading the data

To facilitate image loading and preprocessing, we define a function `load_images`. This function loads images from a specified directory, performs preprocessing steps such as removing the alpha channel and normalizing pixel values, and constructs a data array with all images. This array ensures uniform dimensions across all images, which is crucial for model training.

In [None]:
def load_images(path):
    images = []
    # Get sorted list of image file paths
    image_paths = sorted(glob.glob(path))

    for file in image_paths:
        img = np.asarray(Image.open(file))      # read the image as a numpy array
        img = img[:, :, :3]                     # remove the alpha channel if exists

        # Normalize the image to have a mean of 0 and standard deviation of 1
        img = (img / 255.0) - 0.5

        images.append(img)

    # Find the maximum dimensions among all images
    max_height = max(img.shape[0] for img in images)
    max_width = max(img.shape[1] for img in images)

    # Prepare data array with the maximum dimensions
    data = np.zeros((len(images), max_height, max_width, 3))

    # Populate data array with images
    for i, img in enumerate(images):
        h, w, _ = img.shape
        data[i, :h, :w] = img

    return data

# np.save(drive_path + "data/HR_data.npy", load_images(drive_path + "data/HR_data/*.tif"))
# np.save(drive_path + "data/LR_data.npy", load_images(drive_path + "data/LR_data/*.tif"))

HR_data = np.load(drive_path + "data/HR_data.npy")
LR_data = np.load(drive_path + "data/LR_data.npy")

In [None]:
print(HR_data.shape, LR_data.shape)

In [None]:
# Plotting HR and LR images side by side
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

# Plot HR image
axes[0].imshow(HR_data[1000] + 0.5)
axes[0].set_title(f"HR Image\nDimensions: {HR_data.shape[1:3]}")

# Plot LR image
axes[1].imshow(LR_data[1000] + 0.5)
axes[1].set_title(f"LR Image\nDimensions: {LR_data.shape[1:3]}")

plt.show()

### Shuffle data, convert to tensor

In [None]:
# Generate shuffled indices
indices = np.arange(HR_data.shape[0])
np.random.shuffle(indices)

# Rearrange both arrays using shuffled indices
LR_data = LR_data[indices]
HR_data = HR_data[indices]

In [None]:
# Convert LR_data and HR_data to PyTorch tensor
LR_data = torch.tensor(LR_data).permute(0, 3, 1, 2).float()
HR_data = torch.tensor(HR_data).permute(0, 3, 1, 2).float()

print(HR_data.shape, LR_data.shape)

In [None]:
train_size = int(0.8 * HR_data.shape[0])
valid_size = train_size + int(0.1 * HR_data.shape[0])
test_size = valid_size + int(0.1 * HR_data.shape[0])

print("train size:", train_size, "valid size:", valid_size - train_size, "test size:", test_size - valid_size)

LR_train_data = LR_data[:train_size]
HR_train_data = HR_data[:train_size]

LR_valid_data = LR_data[train_size:valid_size]
HR_valid_data = HR_data[train_size:valid_size]

LR_test_data = LR_data[valid_size:]
HR_test_data = HR_data[valid_size:]

## 3. Readymade code for upsampling

**Pixel Shuffle dosen't work!**

In [None]:
# Define Transposed Convolution
def transposed_conv(input, k, s):
    return nn.ConvTranspose2d(in_channels=3, out_channels=3, kernel_size=k, stride=s, padding=1)(input)

# Define Pixel Shuffle Upscaling
def pixel_shuffle(input, uf):
    return nn.PixelShuffle(upscale_factor=uf)(input)

# Define Adaptive Group Upscaling
def adaptive_group(input, os):
    return nn.AdaptiveAvgPool2d(output_size=os)(input)

# Define Sub-pixel Upscaling
def sub_pixel(input, oc, uf):
    return nn.Sequential(nn.Conv2d(in_channels=3, out_channels=oc, kernel_size=3, stride=1, padding=1),
                         nn.PixelShuffle(upscale_factor=uf))(input)

# Define Bicubic Interpolation (Bicubic Upscaling)
def bicubic(input, os):
    return F.interpolate(input, size=os, mode='bicubic')

In [None]:
# Apply each function for upscaling by factor 2
transposed_conv_x2 = transposed_conv(LR_data, k=4, s=2)
print(transposed_conv_x2.size())
adaptive_group_x2 = adaptive_group(LR_data, os=(252, 252))
print(adaptive_group_x2.size())
sub_pixel_x2 = sub_pixel(LR_data, oc=12, uf=2)
print(sub_pixel_x2.size())
bicubic_x2 = bicubic(LR_data, os=(252, 252))
print(bicubic_x2.size())

transposed_conv_x3 = transposed_conv(LR_data, k=6, s=3)
print(transposed_conv_x3.size())
adaptive_group_x3 = adaptive_group(LR_data, os=(378, 378))
print(adaptive_group_x3.size())
sub_pixel_x3 = sub_pixel(LR_data, oc=27, uf=3)
print(sub_pixel_x3.size())
bicubic_x3 = bicubic(LR_data, os=(378, 378))
print(bicubic_x3.size())

transposed_conv_x4 = transposed_conv(LR_data, k=8, s=4)
print(transposed_conv_x4.size())
bicubic_x4 = bicubic(LR_data, os=(504, 504))
print(bicubic_x4.size())

# Plot images for visual comparison
fig, axes = plt.subplots(3, 4, figsize=(12, 9))

# Factor 2
axes[0, 0].imshow(transposed_conv_x2.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[0, 0].set_title("Transposed Convolution x2")
axes[0, 1].imshow(adaptive_group_x2.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[0, 1].set_title("Adaptive Group x2")
axes[0, 2].imshow(sub_pixel_x2.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[0, 2].set_title("Sub-pixel x2")
axes[0, 3].imshow(bicubic_x2.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[0, 3].set_title("Bicubic x2")

# Factor 3
axes[1, 0].imshow(transposed_conv_x3.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[1, 0].set_title("Transposed Convolution x3")
axes[1, 1].imshow(adaptive_group_x3.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[1, 1].set_title("Bicubic x3")
axes[1, 2].imshow(sub_pixel_x3.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[1, 3].imshow(bicubic_x3.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)

# Factor 4
axes[2, 0].imshow(transposed_conv_x4.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[2, 0].set_title("Transposed Convolution x4")
axes[2, 1].imshow(bicubic_x4.permute(0, 2, 3, 1).detach().numpy()[0] + 0.5)
axes[2, 1].set_title("Bicubic x4")
axes[2, 2].axis('off')  # No result for adaptive_group_x4
axes[2, 3].axis('off')  # No result for sub_pixel_x4

plt.tight_layout()
plt.show()

## CNN - U-Net based architecture

In [None]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv_op = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv_op(x)

In [None]:
class DownSample(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = DoubleConv(in_channels, out_channels)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        down = self.conv(x)
        p = self.pool(down)

        return down, p

In [None]:
class UpSample(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
        self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        x = torch.cat([x1, x2], dim=1)
        return self.conv(x)

In [None]:
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.down_conv_1 = DownSample(in_channels, 64)
        self.down_conv_2 = DownSample(64, 128)
        self.down_conv_3 = DownSample(128, 256)
        self.down_conv_4 = DownSample(256, 512)

        self.bottle_neck = DoubleConv(512, 1024)

        self.up_conv_1 = UpSample(1024, 512)
        self.up_conv_2 = UpSample(512, 256)
        self.up_conv_3 = UpSample(256, 128)
        self.up_conv_4 = UpSample(128, 64)

        self.upsample = nn.Upsample(scale_factor=2, mode='bicubic')

        self.out = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        down_1, p1 = self.down_conv_1(x)
        down_2, p2 = self.down_conv_2(p1)
        down_3, p3 = self.down_conv_3(p2)
        down_4, p4 = self.down_conv_4(p3)

        b = self.bottle_neck(p4)

        up_1 = self.up_conv_1(b, down_4)
        up_2 = self.up_conv_2(up_1, down_3)
        up_3 = self.up_conv_3(up_2, down_2)
        up_4 = self.up_conv_4(up_3, down_1)

        upsample = self.upsample(up_4)

        out = self.out(upsample)

        return out

In [None]:
input_image = torch.rand((5, 3, 128, 128))
model = UNet(3, 3)
output = model(input_image)
print(output.shape)

## Train

In [None]:
LEARNING_RATE = 5e-4
BATCH_SIZE = 18
EPOCHS = 20

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

train_dataset = TensorDataset(LR_train_data, HR_train_data)
valid_dataset = TensorDataset(LR_valid_data, HR_valid_data)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True)

model = UNet(3, 3).to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.MSELoss()
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.5)

train_loss, valid_loss = 0.0, 0.0

for epoch in range(EPOCHS):
    model.train()
    train_running_loss = 0

    for idx, LR_HR in enumerate(train_loader):
        LR = LR_HR[0].float().to(DEVICE)
        HR = LR_HR[1].float().to(DEVICE)

        pred = model(LR)
        optimizer.zero_grad()

        loss = criterion(pred, HR)
        train_running_loss += loss.item()

        loss.backward()
        optimizer.step()

    train_loss = train_running_loss / (idx + 1)

    model.eval()
    valid_running_loss = 0
    with torch.no_grad():
        for idx, LR_HR in enumerate(valid_loader):
            LR = LR_HR[0].float().to(DEVICE)
            HR = LR_HR[1].float().to(DEVICE)

            pred = model(LR)
            loss = criterion(pred, HR)
            valid_running_loss += loss.item()

        valid_loss = train_running_loss / (idx + 1)

    scheduler.step()

    print("-"*30)
    print(f"Train loss EPOCH {epoch + 1}: {train_loss:.4f}")
    print(f"Valid loss EPOCH {epoch + 1}: {valid_loss:.4f}")
    print("-"*30)


## Save the model

In [None]:
torch.save(model.state_dict(), drive_path + "best_model.pk")

## Test

In [None]:
# Choose an LR image from the training set
lr_image = LR_train_data[30].to(DEVICE)  # Change the index as needed

# Bicubic interpolation
# bicubic_x2 = bicubic(LR_train_data, os=(256, 256))
bicubic_output = bicubic_x2.permute(0, 2, 3, 1).detach().numpy()[30] + 0.5

# Pass the LR image through the model to get the SR output
model.eval()
with torch.no_grad():
    sr_output = model(lr_image.unsqueeze(0)).cpu()  # Unsqueeze to add batch dimension and move to CPU

# Convert tensors to numpy arrays and remove batch dimension
lr_image = lr_image.permute(1, 2, 0).cpu().numpy() + 0.5
sr_output = sr_output.squeeze().permute(1, 2, 0).numpy() + 0.5


# Plot the LR image, SR output, HR image, and Bicubic interpolation
plt.figure(figsize=(20, 5))

plt.subplot(1, 4, 1)
plt.imshow(lr_image)
plt.title('LR Image')
plt.axis('off')

plt.subplot(1, 4, 2)
plt.imshow(bicubic_output)
plt.title('Bicubic Interpolation')
plt.axis('off')

plt.subplot(1, 4, 3)
plt.imshow(sr_output)
plt.title('SR output')
plt.axis('off')

plt.subplot(1, 4, 4)
plt.imshow(HR_train_data[30].permute(1, 2, 0).cpu().numpy() + 0.5)
plt.title('HR Image')
plt.axis('off')

plt.show()

In [None]:
# Choose an LR image from the training set
lr_image = LR_test_data[15].to(DEVICE)  # Change the index as needed

# Bicubic interpolation
bicubic_x2_test = bicubic(LR_test_data, os=(256, 256))
bicubic_output = bicubic_x2_test.permute(0, 2, 3, 1).detach().numpy()[15] + 0.5

# Pass the LR image through the model to get the SR output
model.eval()
with torch.no_grad():
    sr_output = model(lr_image.unsqueeze(0)).cpu()  # Unsqueeze to add batch dimension and move to CPU

# Convert tensors to numpy arrays and remove batch dimension
lr_image = lr_image.permute(1, 2, 0).cpu().numpy() + 0.5
sr_output = sr_output.squeeze().permute(1, 2, 0).numpy() + 0.5


# Plot the LR image, SR output, HR image, and Bicubic interpolation
plt.figure(figsize=(20, 5))

plt.subplot(1, 4, 1)
plt.imshow(lr_image)
plt.title('LR Image')
plt.axis('off')

plt.subplot(1, 4, 2)
plt.imshow(bicubic_output)
plt.title('Bicubic Interpolation')
plt.axis('off')

plt.subplot(1, 4, 3)
plt.imshow(sr_output)
plt.title('SR output')
plt.axis('off')

plt.subplot(1, 4, 4)
plt.imshow(HR_test_data[15].permute(1, 2, 0).cpu().numpy() + 0.5)
plt.title('HR Image')
plt.axis('off')

plt.show()