In [1]:
# Imports
import os
import cv2 as cv
import numpy as np
from tqdm import tqdm

import random
from patchify import patchify, unpatchify

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import MultiStepLR

In [2]:
# class Data_Upload
class dataUpload:
    
    def __init__(self, label):
        self.label = label
    
    def create_dataset(self):
        data = []
        for f in tqdm(os.listdir(self.label)):
            path = os.path.join(self.label, f)
            img = cv.imread(path)
            data.append(img)
        data = np.array(data)
        return data

In [3]:
# data preprocess to create training and test sets
def gausBlurImg(img):
    img = cv.GaussianBlur(img, (5, 5), 0)
    return img

def downSample(img, scale_factor):
    img_dim = img.shape
    orig_height, orig_width = img_dim[0], img_dim[1]
    new_height, new_width = int(orig_height / scale_factor), int(orig_width / scale_factor)
    new_img = cv.resize(img, (new_width, new_height))
    return new_img

def changeChannels(img):
    img = cv.cvtColor(img, cv.COLOR_BGR2YUV)
    return img

def preProcess(img, scale_factor, change_channel = False, gausBlurImg = gausBlurImg, downSample = downSample, changeChannels = changeChannels):
    img = gausBlurImg(img)
    img = downSample(img, scale_factor)
    if change_channel:
        img = changeChannels(img)
    return img

In [4]:
def convert_test_images(img, patch_size):
    height, width = img.shape[0], img.shape[1]
    
    while height % patch_size != 0:
        height -= 1
    while width % patch_size != 0:
        width -= 1
    
    img = cv.resize(img, (width, height))
    return img

In [5]:
# def patchify
def img_to_patches(img, patch_size, stride):
    patches = patchify(img, (patch_size, patch_size, 3), stride)
    original_shape = patches.shape
    patches = np.reshape(patches, (-1, patch_size, patch_size, 3))
    return patches, original_shape

In [6]:
# def unpatchify
def patches_to_img(patches_to_merge, merged_image_size):
    reconstructed_img = unpatchify(patches_to_merge, merged_image_size)
    return reconstructed_img

In [7]:
# def data_generator
def data_generator(images_array, batch_size, scale_factor, patch_size, stride, shuffle = True, change_channel = False, test = False, preProcess = preProcess, convert_test_images = convert_test_images):
    num_images = len(images_array)
    indices = [*range(num_images)]
    index = 0
    image_patches_X = []
    image_patches_Y = []
    LR_img_patches = []
    HR_img_patches = []
    
    if shuffle:
        random.shuffle(indices)
    
    while True:
        epoch_counter = 0
        if index >= num_images:
            index = 0
            epoch_counter = 1
            if shuffle:
                random.shuffle(indices)
        
        HR_img = images_array[indices[index]]
        index += 1
        
        if test:
            HR_img = convert_test_images(HR_img, patch_size * scale_factor)
            LR_img = preProcess(HR_img, scale_factor)
            LR_img_shape = LR_img.shape
            LR_img_patches, original_shape = img_to_patches(LR_img, patch_size, stride)
            HR_shape = (original_shape[0], original_shape[1], original_shape[2], original_shape[3] * scale_factor, original_shape[4] * scale_factor, original_shape[5])
        else:
            LR_img = preProcess(HR_img, scale_factor)
            LR_img_patches, _ = img_to_patches(LR_img, patch_size, stride)
            HR_img_patches, _ = img_to_patches(HR_img, patch_size * scale_factor, stride * scale_factor)
        
        if test:
            X = torch.tensor(np.array(LR_img_patches))
            X = X / 255.0
            if index >= num_images - 1:
                yield X, HR_shape, LR_img_shape, 1
            else:
                yield X, HR_shape, LR_img_shape, 0
            continue
        
        for i in range(len(LR_img_patches)):
            image_patches_X.append(LR_img_patches[i])
            image_patches_Y.append(HR_img_patches[i])
            
            if len(image_patches_X) == batch_size:
                X = torch.Tensor(np.array(image_patches_X))
                Y = torch.Tensor(np.array(image_patches_Y))
                X = X / 255.0
                Y = Y / 255.0
                yield X, Y, epoch_counter
                image_patches_X = []
                image_patches_Y = []
        
        image_patches_X = []
        image_patches_Y = []
        LR_img_patches = []
        HR_img_patches = []

In [8]:
# class ESPCN
class ESPCN(nn.Module):
    
    def __init__(self, r):
        super(ESPCN, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size = 5, stride = 1, padding = 2)
        self.conv2 = nn.Conv2d(64, 32, kernel_size = 3, stride = 1, padding = 1)
        self.conv3 = nn.Conv2d(32, 3 * r * r, kernel_size = 3, stride = 1, padding = 1)
        self.pixel_shuffle = nn.PixelShuffle(r)
    
    def forward(self, x):
        x = torch.tanh(self.conv1(x))
        x = torch.tanh(self.conv2(x))
        x = self.conv3(x)
        x = self.pixel_shuffle(x)
        x = torch.sigmoid(x)
        return x

In [9]:
def modelDefinition(scale_factor, learning_rate, model_class = ESPCN):
    model = model_class(scale_factor)
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
    scheduler = MultiStepLR(optimizer, milestones=[30, 80], gamma=0.1)
    return model, loss_fn, optimizer

In [10]:
# def training_loop
def trainingLoop(train_generator, scale_factor, num_epochs, patch_size, model):    
    for epoch in range(num_epochs):
        epoch_loss = 0
        num_patches = 0
        while True:
            num_patches += 32
            X, Y, count = next(train_generator)
            X = torch.reshape(X, (-1, 3, patch_size, patch_size))
            Y = torch.reshape(Y, (-1, 3, patch_size * scale_factor, patch_size * scale_factor))
            optimizer.zero_grad()
            
            output = model(X)
            loss = loss_fn(output, Y)
            epoch_loss += loss
            
            loss.backward()
            optimizer.step()
            #print(f"Loss after {num_patches} patches: {loss.item()}")
            
            if count == 1:
                break
        
        #print(f"Loss After {epoch + 1} Epochs: {epoch_loss}")

In [11]:
# def test_loop
def testLoop(test_generator, scale_factor, patch_size, model):
    SR_images = []
    with torch.no_grad():
        while True:
            X, original_shape, LR_img_shape, count = next(test_generator)
            X = torch.reshape(X, (-1, 3, patch_size, patch_size))
            output = model(X)
            output = np.array(output)
            output = np.reshape(output, original_shape)
            SR_img = patches_to_img(output, (LR_img_shape[0] * scale_factor, LR_img_shape[1] * scale_factor, 3))
            SR_images.append(SR_img)
            if count == 1:
                break
    
    SR_images = np.array(SR_images)
    return SR_images

In [12]:
def imgSave(test_generator, scale_factor, patch_size, model, testLoop = testLoop):
    SR_images = testLoop(test_generator, scale_factor, patch_size, model)
    for i, img in enumerate(SR_images):
        cv.imwrite(r"C:\Users\Gaurav\Datasets\SoC_Assignment\Test_Dataset\Test_Results\Image" + str(i + 1) + ".jpg", img * 255)

In [13]:
train_label = r"C:\Users\Gaurav\Datasets\SoC_Assignment\T91"
test_label = r"C:\Users\Gaurav\Datasets\SoC_Assignment\Test_Dataset"

train_obj = dataUpload(train_label)
test_obj = dataUpload(test_label)

train_data = train_obj.create_dataset()
test_data = test_obj.create_dataset()

100%|██████████| 91/91 [00:00<00:00, 708.96it/s]
  data = np.array(data)
100%|██████████| 10/10 [00:00<00:00, 291.33it/s]


In [14]:
num_epochs = 100
learning_rate = 0.001
batch_size = 32
scale_factor = 2
patch_size = 8
stride = 8

In [15]:
model, loss_fn, optimizer = modelDefinition(scale_factor, learning_rate, model_class = ESPCN)
train_generator = data_generator(train_data, batch_size, scale_factor, patch_size, stride)
test_generator = data_generator(test_data, batch_size, scale_factor, patch_size, stride, shuffle = False, change_channel = False, test = True, preProcess = preProcess)

In [16]:
trainingLoop(train_generator, scale_factor, num_epochs, patch_size, model)
torch.save(model.state_dict(), r"C:\Users\Gaurav\Seasons_of_Code_Project_Work\Model_Weights\ESPCN_Weights")

In [17]:
imgSave(test_generator, scale_factor, patch_size, model)

  SR_images = np.array(SR_images)


In [18]:
def PSNR(SR_img, HR_img):
    HR_img = cv.resize(HR_img, (SR_img.shape[1], SR_img.shape[0]))
    SR_img = cv.cvtColor(SR_img, cv.COLOR_BGR2YUV)
    HR_img = cv.cvtColor(HR_img, cv.COLOR_BGR2YUV)
    SR_img, _, _ = cv.split(SR_img)
    HR_img, _, _ = cv.split(HR_img)
    
    peak_value = np.max(HR_img)
    mse = np.sum(np.sum(np.square(HR_img - SR_img), axis = 0)) / (HR_img.shape[0] * HR_img.shape[1])
    
    psnr = np.log10(peak_value / np.sqrt(mse))
    return psnr

In [19]:
SR_Images_Obj = dataUpload(r"C:\Users\Gaurav\Datasets\SoC_Assignment\Test_Results")
HR_Images_Obj = dataUpload(r"C:\Users\Gaurav\Datasets\SoC_Assignment\T91")

SR_Images = SR_Images_Obj.create_dataset()
HR_Images = HR_Images_Obj.create_dataset()

100%|██████████| 10/10 [00:00<00:00, 177.52it/s]
  data = np.array(data)
100%|██████████| 91/91 [00:00<00:00, 583.08it/s]


In [20]:
def psnr_calculator(SR_Images, HR_Images, PSNR = PSNR):
    psnr_total = 0
    
    for SR_img, HR_img in zip(SR_Images, HR_Images):
        psnr_total += PSNR(SR_img, HR_img)
    
    return psnr_total

In [23]:
class dataUploadVideo:
    
    def __init__(self, label):
        self.label = label
    
    def create_dataset(self):
        data = []
        for f in tqdm(os.listdir(self.label)):
            cur_video_frames = []
            path = os.path.join(self.label, f)
            
            if ".yuv" in path:
                capture = cv.VideoCapture(path)

                while True:
                    isTrue, frame = capture.read()
                    if not isTrue:
                        break
                    cur_video_frames.append(frame)

                data.append(np.array(cur_video_frames))
        
        return np.array(data)

In [39]:
label = r"C:\Users\Gaurav\Datasets\SoC_Final_Video"

video_obj = dataUploadVideo(label)

video_dataset = video_obj.create_dataset()

100%|██████████| 9/9 [00:00<00:00, 19.90it/s]


In [40]:
print(len(video_dataset))

3


In [41]:
video_dataset_BGR = []
video_dataset_LR = []

for video in video_dataset:
    video_BGR = []
    video_LR = []
    for frame in video:
        frame = cv.cvtColor(frame, cv.COLOR_YUV2BGR)
        LR_frame = gausBlurImg(frame)
        LR_frame = downSample(LR_frame, scale_factor)
        video_BGR.append(frame)
        video_LR.append(LR_frame)
    video_dataset_BGR.append(video)

video_dataset_BGR = np.array(video_dataset_BGR)
video_dataset_LR = np.array(video_dataset_LR)

In [43]:
dataset_to_be_passed = []

for video in video_dataset_LR:
    video_to_be_passed = []
    for frame in video:
        frame_patches, original_patch_shape = img_to_patches(frame, patch_size, stride)
        video_to_be_passed.append(frame_patches)
    dataset_to_be_passed.append(video_to_be_passed)

data_set_to_be_passed = np.array(dataset_to_be_passed)

In [44]:
with torch.no_grad():
    for video in dataset_to_be_passed:
        for frame in video:
            for patch in frame:
                patch = torch.Tensor(patch)
                patch = torch.reshape(patch, (-1, 3, patch_size, patch_size))
                output = model(patch)