In [1]:
# #download dataset
# !wget -N http://host.robots.ox.ac.uk/pascal/VOC/voc2007/VOCtrainval_06-Nov-2007.tar
# !tar -xzf VOCtrainval_06-Nov-2007.tar

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision import models 
from PIL import Image
import cv2
import matplotlib.pyplot as plt
from tqdm import tqdm
import wandb
# wandb.login()
wandb.init(project="Y-Data-DL-Week4-Super-Resolution-Final")


In [None]:
class VOC2007Dataset(Dataset):
    """ 
    
    create Dataset class that takes input of PascalVOC dataset and creates sets of images of 
    sizes X - 72x72x3, y_mid – 144x144x3, y_large – 288x288x3 
    """
    
    def __init__(self, image_set='trainval', root=None, transform=None, sample_slice=None):
        """
        Args:
            image_set (str): one of 'train', 'trainval', or 'val', default "trainval"
            transform (callable, optional): Optional transform to be applied
                on a sample.
            sample_slice (list or tuple): a 2-value list or tuple indicating subset of files to be selected from the dataset
        """
        super(VOC2007Dataset).__init__()
        self.transform = transform
        
        if root is None:
            root = os.path.abspath(os.path.curdir)
        self.root = root
        valid_sets = ['train', 'trainval', 'val']
        assert (image_set in valid_sets), f"{image_set} not among the allowed values. Allowed values are {', '.join(valid_sets)}"
        
        base_dir = os.path.join('VOCdevkit', 'VOC2007')
        voc_root = os.path.join(self.root, base_dir)
        image_dir = os.path.join(voc_root, 'JPEGImages')
        splits_dir = os.path.join(voc_root, 'ImageSets/Main')
        split_f = os.path.join(splits_dir, image_set.rstrip('\n') + '.txt')
        
        with open(os.path.join(split_f), "r") as f:
            file_names = [x.strip() for x in f.readlines()]
        
        if sample_slice is None:
            sample_slice = (0, len(file_names))
        assert (sample_slice[-1] <= len(file_names)), f'sample_slice indices are out bounds. Maximum indices: {len(file_names)-1}'
        self.images = [os.path.join(image_dir, x + ".jpg") for x in file_names[sample_slice[0]:sample_slice[-1]]]
        
    
    def __getitem__(self, index):
        """
        Args:
            index (int): Index

        Returns:
            tuple: (image, target) where target is a dictionary of the XML tree.
        """
        img = cv2.imread(self.images[index], cv2.IMREAD_COLOR)
        img_large = cv2.resize(img, (288, 288))
        img_mid = cv2.resize(img, (144, 144))
        img_small = cv2.resize(img, (72, 72))

        if self.transform is not None:
            img_large = self.transform(img_large)
            img_mid = self.transform(img_mid)
            img_small = self.transform(img_small)

        return dict(y_large=img_large, y_mid=img_mid, X=img_small)
    
    
    def __len__(self):
        return len(self.images)


In [None]:
wandb.config.update(dict(batch_size=5, epochs=10, lr=0.01, no_cuda=True,
                                 seed=42, log_interval=10))        # Initialize config

In [None]:
config = wandb.config
use_cuda = not config.no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {'num_workers': 2, 'pin_memory': True} if use_cuda else {}

random.seed(config.seed)       # python random seed
torch.manual_seed(config.seed) # pytorch random seed
np.random.seed(config.seed) # numpy random seed

In [None]:
transform = transforms.Compose([transforms.ToTensor()])
trainset = VOC2007Dataset(image_set='trainval', transform=transform, sample_slice=[0,1000]) # training dataset
trainloader = DataLoader(trainset, batch_size=config.batch_size, shuffle=False, **kwargs)
testset = VOC2007Dataset(image_set='trainval', transform=transform, sample_slice=[-101, -1]) # validation dataset
testloader = DataLoader(testset, batch_size=config.batch_size, shuffle=False, **kwargs)

In [None]:
def show_images(image_dict):
    """Function to display images from three-image-tuple from VOC2007Dataset class
    Args:
        image_dict (dict): contains three different-sized image tensors of the same content 
    """
    f = plt.figure(figsize=(16, 6))
    n = 1
    for key, img in image_dict.items():
        ax = f.add_subplot(1, len(image_dict), n)
        img = img.detach().permute(1, 2, 0).numpy()
        ax.imshow(img)
        ax.title.set_text(key)
        n += 1
#     return fig

In [None]:
show_images(trainset[50])

In [None]:
class convnet(nn.Module):
    
    def __init__(self):
        super(convnet, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
        self.conv2 = nn.Conv2d(64, 64, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(64, 64, 3, stride=2, padding=1)
#         self.upsample1 = nn.Upsample(scale_factor=2.0, mode='nearest')
        self.conv3 = nn.Conv2d(64, 3, 1)
        
    def forward(self, x):
#         print('input shape: ', x.shape)
        x = self.conv1(x)
#         print('1st layer output shape: ', x.shape)
        x = self.conv2(F.leaky_relu(x))
#         print('2nd layer output shape: ', x.shape)
        x = self.upsample1(F.leaky_relu(x), output_size=(144, 144))
#         print('Upsample layer output shape: ', x.shape)
        x = self.conv3(F.leaky_relu(x))
#         print('final layer output shape: ', x.shape)
        return x

In [None]:
def train_model_y_mid(config, net, train_data, optimizer, epoch, loss_fn=None):
    net.train()
    train_loss = 0
    if loss_fn is None:
        loss_fn = nn.MSELoss()
    for i, batch in tqdm(enumerate(train_data), total=len(train_data)):
        X = batch['X']
        y_mid = batch['y_mid']
        optimizer.zero_grad()
        output = net(X)
        loss = loss_fn(output, y_mid)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    wandb.log({'Train Loss - Model 1': train_loss/len(train_data), 'Epoch': epoch}, commit=False)

def test_model_y_mid(config, net, test_data, epoch, loss_fn=None):
    net.eval()
    test_loss = 0
    example_images = []
    avg_psnr = 0
    n = len(test_data)
    if loss_fn is None:
        loss_fn = nn.MSELoss()
    with torch.no_grad():
        for j, batch in tqdm(enumerate(test_data), total=len(test_data)):
            X = batch['X']
            y_mid = batch['y_mid']
            output = net(X)
            loss = loss_fn(output, y_mid).item()
            test_loss += loss
            avg_psnr += 10 * np.log10(1/loss)
            example_images.append(wandb.Image(transforms.ToPILImage(mode='RGB')(output[0]), 
                                              caption="Output Reconstruction"))
            example_images.append(wandb.Image(transforms.ToPILImage(mode='RGB')(y_mid[0]), 
                                              caption="Target"))
    wandb.log({'Test Loss - Model 1': test_loss/n, "Examples": example_images, 'Epoch': epoch, 
              'Avg Peak Signal To Noise Ratio': avg_psnr/n})

In [None]:
def run_training(network):
    model = network().to(device)
    wandb.watch(model, log="all")
    optimizer = optim.Adam(model.parameters(), lr=config.lr)
    loss_fn = nn.MSELoss()
    for epoch in range(1, config.epochs + 1):
        train_model_y_mid(config, model, trainloader, optimizer, epoch, loss_fn)
        test_model_y_mid(config, model, testloader, epoch, loss_fn)
    return model

In [None]:
model1 = run_training(convnet)

In [None]:
dataiter = iter(trainloader)
test = next(dataiter)
y_mid = model1(test['X'])
img_dct = dict(y_mid=y_mid[0], X=test['X'][0])
show_images(img_dct)

In [None]:
class convnet2(convnet):
    
    def __init__(self):
        super(convnet2, self).__init__()
        self.upsample2 = nn.ConvTranspose2d(64, 64, 3, stride=2, padding=1)
        self.conv3_1 = nn.Conv2d(64, 3, 1)
        self.conv3_2 = nn.Conv2d(64, 3, 1)
    
    def forward(self, x):
        h, w = x.shape[-2:]
        x = self.conv1(x)
#         print('1st layer output shape: ', x.shape)
        x = self.conv2(F.leaky_relu(x))
#         print('2nd layer output shape: ', x.shape)
        x = self.upsample1(x, output_size=(h*2, w*2))
        h, w = x.shape[-2:]
#         print('Upsample layer output shape: ', x.shape)
        x = F.leaky_relu(x)
        x_mid = self.conv3_1(x)
        # print('final layer x_mid output shape: ', x_mid.shape)
        x_large = self.upsample2(x, output_size=(h*2, w*2))
        x_large = self.conv3_2(F.leaky_relu(x_large))
        # print('final layer x_large output shape: ', x_large.shape)
        return x_mid, x_large

In [None]:
def train_model_y_mid_large(config, net, train_data, optimizer, epoch, loss_fn=None, model_number=2):
    net.train()
    train_loss = 0
    if loss_fn is None:
        loss_fn = nn.MSELoss()
    for i, batch in tqdm(enumerate(train_data), total=len(train_data)):
        X = batch['X']
        y_mid = batch['y_mid']
        y_large = batch['y_large']
        optimizer.zero_grad()
        mid_output, large_output = net(X)
        loss = loss_fn(mid_output, y_mid) + loss_fn(large_output, y_large)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    wandb.log({f'Train Loss - Model {model_number}': train_loss/len(train_data), 'Epoch': epoch}, commit=False)

def test_model_y_mid_large(config, net, test_data, epoch, loss_fn=None, model_number=2):
    net.eval()
    test_loss = 0
    example_images = []
    avg_psnr = 0
    n = len(test_data)
    if loss_fn is None:
        loss_fn = nn.MSELoss()
    with torch.no_grad():
        for j, batch in tqdm(enumerate(test_data), total=len(test_data)):
            X = batch['X'].to(device)
            y_mid = batch['y_mid'].to(device)
            y_large = batch['y_large'].to(device)
            mid_output, large_output = net(X)
            loss = loss_fn(mid_output, y_mid).item() + loss_fn(large_output, y_large).item() 
            test_loss += loss
            avg_psnr += 10 * np.log10(1/loss)
            example_images.append(wandb.Image(transforms.ToPILImage(mode='RGB')(mid_output[0]), 
                                                caption="Mid Output Reconstruction"))
            example_images.append(wandb.Image(transforms.ToPILImage(mode='RGB')(large_output[0]), 
                                                caption="Large Output Reconstruction"))
            example_images.append(wandb.Image(transforms.ToPILImage(mode='RGB')(y_mid[0]), 
                                                caption="Target"))
    wandb.log({f'Test Loss - Model {model_number}': test_loss/len(test_data), "Examples": example_images, 'Epoch': epoch,
                'Avg Peak Signal To Noise Ratio': avg_psnr/n})

In [None]:
def run_training2(network, model_number):
    model = network().to(device)
    wandb.watch(model, log="all")
    wandb.re
    optimizer = optim.Adam(model.parameters(), lr=config.lr)
    loss_fn = nn.MSELoss()
    for epoch in range(1, config.epochs + 1):
        train_model_y_mid_large(config, model, trainloader, optimizer, epoch, loss_fn=loss_fn, model_number=model_number)
        test_model_y_mid_large(config, model, testloader, epoch, loss_fn=loss_fn, model_number=model_number)
    return model

In [None]:
model2 = run_training2(convnet2, model_number=2)

In [None]:
dataiter = iter(trainloader)
test = next(dataiter)
y_mid, y_large = model2(test['X'])
img_dct = dict(y_large=y_large[0], y_mid=y_mid[0], X=test['X'][0])
show_images(img_dct)

In [None]:
class resnet(nn.Module):
    
    def __init__(self):
        super(resnet, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 1)
        self.res_block1_1 = nn.Conv2d(32,32, 3, padding=1)
        self.res_block1_2 = nn.Conv2d(32,32, 3, padding=1)
        self.res_block2_1 = nn.Conv2d(32,32, 3, padding=1)
        self.res_block2_2 = nn.Conv2d(32,32, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(32, 32, 3, stride=2, padding=1)
        self.res_block3_1 = nn.Conv2d(32,32, 3, padding=1)
        self.res_block3_2 = nn.Conv2d(32,32, 3, padding=1)
        self.upsample2 = nn.ConvTranspose2d(32, 32, 3, stride=2, padding=1)
        self.conv3_1 = nn.Conv2d(32, 3, 1)
        self.conv3_2 = nn.Conv2d(32, 3, 1)

    def forward(self, x):
#         print('input shape: ', x.shape)
        h, w = x.shape[-2:]
        x = self.conv1(x)
        # print('1st layer output shape: ', x.shape)
        x_res = self.res_block1_1(x)
        x_res = self.res_block1_2(x_res)
        x = F.leaky_relu(x + x_res)
        # print('1st residual layer output shape: ', x.shape)
        x_res = self.res_block2_1(x)
        x_res = self.res_block2_2(x_res)
        x = F.leaky_relu(x_res + x)
        # print('2nd residual layer output shape: ', x.shape)
        x = self.upsample1(x, output_size=(h*2, w*2))
        h, w = x.shape[-2:]
        # print('1st Upsample layer output shape: ', x.shape)
        x_mid = self.conv3_1(F.leaky_relu(x))
        # print('final layer x_mid output shape: ', x_mid.shape)
        x_res = self.res_block3_1(x)
        x_res = self.res_block3_2(x_res)
        x = F.leaky_relu(x_res + x)
        x_large = self.upsample2(x, output_size=(h*2, w*2))
        # print('final layer x_large output shape: ', x_large.shape)
        x_large = self.conv3_2(F.leaky_relu(x_large))
        return x_mid, x_large

In [None]:
model3 = run_training2(resnet, model_number=3)

In [None]:
dataiter = iter(trainloader)
test = next(dataiter)
y_mid, y_large = model3(test['X'])
img_dct = dict(y_large=y_large[0], y_mid=y_mid[0], X=test['X'][0])
show_images(img_dct)

In [None]:
class dilation_net(nn.Module):
    
    def __init__(self):
        super(dilation_net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 1)
        self.dil_block1_1 = nn.Conv2d(32,32, 3, padding=1, dilation=1)
        self.dil_block1_2 = nn.Conv2d(32,32, 3, padding=2, dilation=2)
        self.dil_block1_3 = nn.Conv2d(32,32, 3, padding=4, dilation=4)
        self.dil_block1_4 = nn.Conv2d(96,32, 3, padding=1)
        self.dil_block2_1 = nn.Conv2d(32,32, 3, padding=1, dilation=1)
        self.dil_block2_2 = nn.Conv2d(32,32, 3, padding=2, dilation=2)
        self.dil_block2_3 = nn.Conv2d(32,32, 3, padding=4, dilation=4)
        self.dil_block2_4 = nn.Conv2d(96,32, 3, padding=1)
        self.dil_block3_1 = nn.Conv2d(32,32, 3, padding=1, dilation=1)
        self.dil_block3_2 = nn.Conv2d(32,32, 3, padding=2, dilation=2)
        self.dil_block3_3 = nn.Conv2d(32,32, 3, padding=4, dilation=4,)
        self.dil_block3_4 = nn.Conv2d(96,32, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(32, 32, 3, stride=2, padding=1)
        self.upsample2 = nn.ConvTranspose2d(32, 32, 3, stride=2, padding=1)
        self.conv3_1 = nn.Conv2d(32, 3, 1)
        self.conv3_2 = nn.Conv2d(32, 3, 1)

    def forward(self, x):
        h, w = x.shape[-2:]
        x = self.conv1(x)
        # print('1st layer output shape: ', x.shape)
        x1 = self.dil_block1_1(x)
        # print('1st dilation output shape: ', x1.shape)
        x2 = self.dil_block1_2(x)
        # print('2nd dilation output shape: ', x2.shape)
        x3 = self.dil_block1_3(x)
        # print('3rd dilation output shape: ', x3.shape)
        x = self.dil_block1_4(F.leaky_relu(torch.cat([x1, x2, x3], dim=1)))
        # print('1st dilation layer output shape: ', x.shape)
        x1 = self.dil_block2_1(x)
        x2 = self.dil_block2_2(x)
        x3 = self.dil_block2_3(x)
        x = self.dil_block2_4(F.leaky_relu(torch.cat([x1, x2, x3], dim=1)))
        # print('2nd dilation layer output shape: ', x.shape)
        x = self.upsample1(x, output_size=(h*2, w*2))
        h, w = x.shape[-2:]
        # print('1st Upsample layer output shape: ', x.shape)
        x_mid = self.conv3_1(F.leaky_relu(x))
        # print('final layer x_mid output shape: ', x_mid.shape)
        x1 = self.dil_block3_1(x)
        x2 = self.dil_block3_2(x)
        x3 = self.dil_block3_3(x)
        x = self.dil_block3_4(F.leaky_relu(torch.cat([x1, x2, x3], dim=1)))
        x_large = self.upsample2(x, output_size=(h*2, w*2))
        # print('final layer x_large output shape: ', x_large.shape)
        x_large = self.conv3_2(F.leaky_relu(x_large))
        return x_mid, x_large

In [None]:
model4 = run_training2(dilation_net, model_number=4)

In [None]:
dataiter = iter(trainloader)
test = next(dataiter)
y_mid, y_large = model4(test['X'])
img_dct = dict(y_large=y_large[0], y_mid=y_mid[0], X=test['X'][0])
show_images(img_dct)

In [None]:
class pretrained_net(nn.Module):
    
    def __init__(self):
        super(pretrained_net, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
        self.conv2 = nn.Conv2d(64, 64, 3, padding=1)
        self.upsample1 = nn.ConvTranspose2d(128, 128, 3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(128, 3, 1)
        self.conv4 = nn.Conv2d(128, 128, 3, padding=1)
        self.upsample2 = nn.ConvTranspose2d(128, 128, 3, stride=2, padding=1)
        self.conv5 = nn.Conv2d(128, 3, 1)
        self.backbone = models.vgg16(pretrained=True)._modules['features'][:4]
        for param in self.backbone.parameters():
            param.requires_grad = False
        self.mean = torch.Tensor([0.485,0.456, 406]).reshape(1,3,1,1)
        self.std = torch.Tensor([0.229, 0.224, 0.225]).reshape(1,3,1,1)

    def forward(self, x):
        h, w = x.shape[-2:]
        out = self.backbone((x-self.mean)/self.std)
        # print('backbone layer output shape: ', out.shape)
        x = F.leaky_relu(self.conv1(x))
        # print('1st layer output shape: ', x.shape)
        x = F.leaky_relu(self.conv2(x))
        # print('2nd layer output shape: ', x.shape)
        x = torch.cat([x, out], dim=1)
        # print('Concatenated output shape: ', x.shape)
        x = self.upsample1(x, output_size=(h*2, w*2))
        # print('1st upsample layer output shape: ', x.shape)
        h, w = x.shape[-2:]
        x_mid = F.leaky_relu(self.conv3(x))
        # print('Mid_output shape: ', x_mid.shape)
        x = F.leaky_relu(self.conv4(x))
        # print('4th conv layer output shape: ', x.shape)
        x_large = self.upsample2(x, output_size=(h*2, w*2))
        # print('2nd upsample layer output shape: ', x_large.shape)
        x_large = F.leaky_relu(self.conv5(x_large))
        # print('x_large output shape: ', x_large.shape)
        return x_mid, x_large

In [None]:
model5 = run_training2(pretrained_net, model_number=5)

In [None]:
dataiter = iter(trainloader)
test = next(dataiter)
y_mid, y_large = model5(test['X'])
img_dct = dict(y_large=y_large[0], y_mid=y_mid[0], X=test['X'][0])
show_images(img_dct)

In [None]:
class pixel_shuffle_net(pretrained_net):
    
    def __init__(self):
        super(pixel_shuffle_net, self).__init__()
        self.conv2_1 = nn.Conv2d(64, 128, 3, padding=1)
        self.conv2_2 = nn.Conv2d(128, 192, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 3, 1)
        self.conv4 = nn.Conv2d(64, 128, 3, padding=1)
        self.conv5 = nn.Conv2d(32, 3, 1)
    def forward(self, x):
        h, w = x.shape[-2:]
        out = self.backbone(self.normalizer(x))
        # print('backbone layer output shape: ', out.shape)
        x = F.leaky_relu(self.conv1(x))
        # print('1st layer output shape: ', x.shape)
        x = F.leaky_relu(self.conv2_1(x))
        # print('2_1 layer output shape: ', x.shape)
        x = F.leaky_relu(self.conv2_2(x))
        # print('2_2 layer output shape: ', x.shape)
        x = torch.cat([x, out], dim=1)
        # print('Concatenated output shape: ', x.shape)
        x = F.pixel_shuffle(x, upscale_factor=2)
        # print('1st upsample layer output shape: ', x.shape)
        h, w = x.shape[-2:]
        x_mid = F.leaky_relu(self.conv3(x))
        # print('Mid_output shape: ', x_mid.shape)
        x = F.leaky_relu(self.conv4(x))
        # print('4th conv layer output shape: ', x.shape)
        x_large = F.pixel_shuffle(x, upscale_factor=2)
        # print('2nd upsample layer output shape: ', x_large.shape)
        x_large = F.leaky_relu(self.conv5(x_large))
        # print('x_large output shape: ', x_large.shape)
        return x_mid, x_large

In [None]:
model6 = run_training2(pixel_shuffle_net, model_number=6)

In [None]:
dataiter = iter(trainloader)
test = next(dataiter)
y_mid, y_large = model6(test['X'])
img_dct = dict(y_large=y_large[0], y_mid=y_mid[0], X=test['X'][0])
show_images(img_dct)