In [1]:
import numpy as np
import cv2


import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler


import torchvision
from torchvision import transforms
from torchvision import datasets
from torchvision import transforms



# Loss Function: SSIM
# Activation Function: ReLu

# TODO:
# input: 640 x 480
# output: 1920 x 1440

In [2]:
BATCH_SIZE = 100
LEARNING_RATE = 0.01
MOMENTUM = 0.9
NUM_WORKERS = 8
INPUT_DIRECTORY = 'temp/input'
TRUTH_DIRECTORY = 'temp/truth'

# Data Generation

In [3]:
def get_data_paths(input_dir, ground_dir, num_frames):
    data_paths = []
    for i in range (1, num_frames - 1):
        data_paths.append([
            input_dir + str(i-1) + '.jpg',
            input_dir + str(i) + '.jpg',
            input_dir + str(i+1) + '.jpg',
            ground_dir + str(i) + '.jpg'
        ])
    return data_paths

In [4]:
def get_tensor(path):
    image_rgb = cv2.imread(path)
    image_YCrCb = cv2.cvtColor(image_rgb, cv2.COLOR_BGR2YCrCb)
    image_tensor = torch.tensor(image_YCrCb, dtype=torch.float32)
    image_tensor = image_tensor / 255.0
    return image_tensor

In [6]:
class VHSDataSet(Dataset):
    def __init__(self, data_paths, transform=None):
        self.data_paths = data_paths
        self.transform = transform

    def __len__(self):
        return len(self.data_paths)

    def __getitem__(self, idx):
        tensors = []
        for i in range(4):
            tensors.append(get_tensor(self.data_paths[idx][i]))

        stacked_frames_tensor = torch.cat([tensors[0], tensors[1], tensors[2]], dim=0)
        ground_truth_tensor = tensors[3]
        return stacked_frames_tensor, ground_truth_tensor

In [None]:
# Create a DataLoader instance for training

data_paths = get_data_paths(INPUT_DIRECTORY, TRUTH_DIRECTORY, 2400)
dataset = VHSDataSet(data_paths)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

# Training

In [7]:
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init()
        self.in_channels = 64
        self.conv1 = nn.Conv3d(3, 64, kernel_size=8, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [8]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init()
        self.conv1 = nn.Conv3d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv3d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += residual
        out = self.relu(out)

        return out
