In [1]:
from PIL import Image
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
import os
from torch.utils.data import DataLoader, Dataset
import math
from torchvision.transforms import ColorJitter, Normalize
from torch.utils.data import ConcatDataset
from torch.utils.data import Subset
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import DataLoader, random_split
import numpy as np
from tqdm import tqdm

# torch_xla can be installed and imported for TPU usage
# import torch_xla
# import torch_xla.core.xla_model as xm


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
low_res_folder = "./drive/MyDrive/Upscaling/mensch_low_res"
high_res_folder = "./drive/MyDrive/Upscaling/mensch"


# === creating dataset with all images ===
class CustomDataset(Dataset):
    def __init__(self, low_res_folder, high_res_folder, transform=None):
        self.low_res_folder = low_res_folder
        self.high_res_folder = high_res_folder
        self.low_res_images = sorted(os.listdir(low_res_folder))
        self.high_res_images = sorted(os.listdir(high_res_folder))
        self.transform = transform

    def __len__(self):
        return len(self.low_res_images)

    def __getitem__(self, index):
        low_res_image = Image.open(os.path.join(self.low_res_folder, self.low_res_images[index]))
        high_res_image = Image.open(os.path.join(self.high_res_folder, self.high_res_images[index]))

        if self.transform is not None:
            low_res_image = self.transform(low_res_image)
            high_res_image = self.transform(high_res_image)

        return low_res_image, high_res_image

# transform to tensor & normalize
normalize = Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])

base_transform = transforms.Compose([
    transforms.ToTensor()
])

# original dataset
dataset = CustomDataset(low_res_folder, high_res_folder, transform=base_transform)

### bicubic

In [None]:
class bicubic(nn.Module):
    def __init__(self):
        super(bicubic, self).__init__()
        self.interpolation = nn.Upsample(scale_factor=4, mode='bicubic')

    def forward(self, x):
        x = self.interpolation(x)
        return x

# Training hardware
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# instance of the CNN model
model = bicubic().to(device)

# loss function and optimizer
criterion = nn.MSELoss() # note: standard MSE is used, PSNR normally not used for training (just as metric at the end)

# Validating the model (on validation data)
for input_data, desired_data in val_loader:
    # Move input and desired images to device
    input_data = input_data.to(device)
    desired_data = desired_data.to(device)

    # Forward pass
    output_images = model(input_data)

    # Calculate loss
    loss = criterion(output_images, desired_data)

# Print training loss per epoch
psnr = 10 * math.log10(1 / loss.item())

print(f"Loss (validation): {loss.item():.4f}, PSNR (validation): {psnr}")


# saving the model
torch.save(model.state_dict(), "bicubic.pth")


### SRCNN

In [None]:
# SRCNN model
class SRCNN(nn.Module):
    def __init__(self):
        super(SRCNN, self).__init__()
        self.interpolation = nn.Upsample(scale_factor=4, mode='bicubic')
        self.conv1 = nn.Conv2d(3, 64, kernel_size=9, stride=1, padding=4)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(64, 32, kernel_size=1, stride=1, padding=0)
        self.relu2 = nn.ReLU()
        self.conv3 = nn.Conv2d(32, 3, kernel_size=5, stride=1, padding=2)
        self.relu3 = nn.ReLU()

    def forward(self, x):
        x = self.interpolation(x)
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = self.relu3(self.conv3(x))
        return x

### FSRCNN

In [None]:
class FSRCNN(nn.Module):
    def __init__(self, d=56, s=12, m=4):
        super(FSRCNN, self).__init__()
        # Feature Extraction
        self.conv1 = nn.Conv2d(3, d, kernel_size=5, padding=2)
        self.relu1 = nn.PReLU(d)
        # Shrinking
        self.conv2 = nn.Conv2d(d, s, kernel_size=1)
        self.relu2 = nn.PReLU(s)
        # Non-linear Mapping
        self.mapping = nn.Sequential(*[nn.Sequential(
            nn.Conv2d(s, s, kernel_size=3, padding=1),
            nn.PReLU(s)
        ) for _ in range(m)])
        # Expanding
        self.conv3 = nn.Conv2d(s, d, kernel_size=1)
        self.relu3 = nn.PReLU(d)
        # Deconvolution
        self.deconv = nn.ConvTranspose2d(d, 3, kernel_size=9, stride=5, padding=4, output_padding=4)

    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = self.mapping(x)
        x = self.relu3(self.conv3(x))
        x = self.deconv(x)
        return x

### ESPCN

In [None]:
class ESPCN(nn.Module):
    def __init__(self, upscale_factor=4, num_channels=3):
        super(ESPCN, self).__init__()
        self.upscale_factor = upscale_factor
        
        # Feature Extraction
        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=5, padding=2)
        self.relu1 = nn.ReLU()
        
        # Sub-Pixel Convolution
        self.conv2 = nn.Conv2d(64, num_channels * upscale_factor ** 2, kernel_size=3, padding=1)
        self.pixel_shuffle = nn.PixelShuffle(upscale_factor)
        self.relu2 = nn.ReLU()
    
    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.pixel_shuffle(self.conv2(x))
        x = self.relu2(x)
        return x


### cross validation of bicubic interpolation

In [None]:
dataset_size = len(dataset)
indices = list(range(dataset_size))

# Cross-validation setup
num_folds = 5
fold_len = np.floor(len(indices) / num_folds).astype('int')

batch_size = 16
total_psnr = 0

for fold in range(num_folds):

    # Calculate start and end indices for validation data
    val_start = fold * fold_len
    val_end = (fold + 1) * fold_len


    val_indices = indices[val_start:val_end]


    # Normalization transformation

    val_transform = transforms.Compose([
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    val_dataset = Subset(dataset, val_indices)
    val_dataset.transform = val_transform

    # Create train and validation data loaders
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Training hardware
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # instance of the CNN model
    model = bicubic().to(device)

    # loss function and optimizer
    criterion = nn.MSELoss() # note: standard MSE is used, PSNR normally not used for training (just as metric at the end)


    # Validating the model (on validation data)
    val_loss = 0
    number_batches = 0
    for input_data, desired_data in val_loader:
        number_batches += 1
        # Move input and desired images to device
        input_data = input_data.to(device)
        desired_data = desired_data.to(device)

        # Forward pass
        output_images = model(input_data)

        # Calculate loss
        loss = criterion(output_images, desired_data)
        val_loss += loss.item()

    val_loss_avg = val_loss / number_batches

    # Print training loss per epoch
    psnr = 10 * math.log10(1 / val_loss_avg)
    total_psnr += psnr

    print(f"Loss (validation): {val_loss_avg:.4f}, PSNR (validation): {psnr}")

total_psnr = total_psnr/num_folds
print(f"Total PSNR:{total_psnr}")

### cross validation training for SRCNN and FSRCNN
##### (change model in the cell below to training target)

In [None]:
dataset_size = len(dataset)
indices = list(range(dataset_size))

# Cross-validation setup
num_folds = 5
fold_len = np.floor(len(indices) / num_folds).astype('int')

batch_size = 16
total_psnr = 0

# instance of the CNN model
model = FSRCNN().to(device) ### CHANGE MODEL HERE ###

# Training hardware
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

graph_values_folds = []

for fold in range(num_folds):

    graph_values_epochs=[]
    # Initialize train and validation indices
    # Calculate start and end indices for validation data
    val_start = fold * fold_len
    val_end = (fold + 1) * fold_len

    # Split indices into train and validation sets
    val_indices = indices[val_start:val_end]
    train_indices = indices[:val_start] + indices[val_end:]


    # Create train and validation datasets
    train_dataset = Subset(dataset, train_indices)

    val_dataset = Subset(dataset, val_indices)

    # Create train and validation data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    
    # hyperparameters
    learning_rate = 0.001
    num_epochs = 50
    early_stopping_patience = 3
    best_val_loss = float('inf')
    epochs_without_improvement = 0

    # loss function and optimizer
    criterion = nn.MSELoss() # note: standard MSE is used, PSNR normally not used for training (just as metric at the end)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Training process
    print("hi")
    psnr_fold = 0
    for epoch in range(num_epochs):
        for input_data, desired_data in train_loader:
            # Move input and desired images to device
            input_data = input_data.to(device)
            desired_data = desired_data.to(device)

            # Forward pass
            output_images = model(input_data)

            # Calculate loss
            loss = criterion(output_images, desired_data)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Print training loss per epoch
        psnr = 10 * math.log10(1 / loss.item())
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, PSNR: {psnr}")

        # Validating the model (on validation data)
        val_loss = 0
        number_batches = 0
        for input_data, desired_data in val_loader:
            number_batches += 1
            # Move input and desired images to device
            input_data = input_data.to(device)
            desired_data = desired_data.to(device)

            # Forward pass
            output_images = model(input_data)

            # Calculate loss
            loss = criterion(output_images, desired_data)
            val_loss += loss.item()

        val_loss_avg = val_loss / number_batches

        # Print training loss per epoch
        psnr = 10 * math.log10(1 / val_loss_avg)
        psnr_fold += psnr

        print(f"Loss (validation): {val_loss_avg:.4f}, PSNR (validation): {psnr}")
        graph_values_epochs.append(val_loss_avg)

        # Check for early stopping
        if val_loss_avg < best_val_loss:
            best_val_loss = val_loss_avg
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1
            if epochs_without_improvement == early_stopping_patience:
                print("Early stopping triggered. No improvement in validation loss.")
                total_psnr += psnr_fold / (epoch+1)
                break
    total_psnr += psnr_fold / num_epochs
    graph_values_folds.append(graph_values_epochs)

total_psnr = total_psnr/num_folds
print(f"Total PSNR:{total_psnr}")

### plot for PSNR over epochs for different folds

In [None]:
import plotly.graph_objects as go
import plotly.offline as pyo
import numpy as np

data = [[1, 2, 3, 4], [2, 4, 6, 8, 10], [3, 6, 9]]

fig = go.Figure()

# Calculate the average of all sublists
average_line = np.mean(data, axis=0)[:min(len(sublist) for sublist in data)]

for sublist in data:
    fig.add_trace(go.Scatter(x=list(range(len(sublist))), y=sublist, mode='lines'))

fig.add_trace(go.Scatter(x=list(range(len(average_line))), y=average_line, mode='lines', name='Average'))

fig.update_xaxes(title_text='Number of epochs')
fig.update_yaxes(title_text='PSNR')

pyo.plot(fig, filename='PSNR_over_epochs_folds.html')
