In [None]:
# Imports 
import numpy as np 
import pandas as pd

# Torch Imports
import torch
from torch import nn
from torch import optim
import math
from torch.autograd import Variable

# Image Imports 
from PIL import Image
from os.path import join
from tqdm import tqdm
import os

import pandas as pd
import random

from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import Compose, RandomCrop, ToTensor, ToPILImage, CenterCrop, Resize

from torchvision.models.vgg import vgg16

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# File managing

TRAIN_CSV = "/content/drive/MyDrive/CovidNetImages/train.txt"
TRAIN_IMAGES = "/content/drive/MyDrive/CovidNetImages/train"



In [None]:
myData = pd.read_csv(TRAIN_CSV,delimiter=" ",header=None)


In [None]:
heads = ['id','name','class','type']
myData.columns = heads
negatives = myData[myData['class']=='negative']
positives = myData[myData['class'] == 'positive']

In [None]:
finalNegatives = negatives.iloc[3008:]
finalPositives = positives.iloc[3008:]
positiveImages = finalPositives['name'].tolist()
negativeImages = finalNegatives['name'].tolist()
trainImages = positiveImages+negativeImages
random.shuffle(trainImages)

In [None]:
torch.autograd.set_detect_anomaly(True)

<torch.autograd.anomaly_mode.set_detect_anomaly at 0x7ff3f6b73a30>

In [None]:
def is_image_file(filename):
    return any(filename.endswith(extension) for extension in ['.png', '.jpg', '.jpeg', '.PNG', '.JPG', '.JPEG'])


def calculate_valid_crop_size(crop_size, upscale_factor):
    return crop_size - (crop_size % upscale_factor)


def train_hr_transform(crop_size):
    return Compose([
        Resize((crop_size,crop_size)),
        ToTensor(),
    ])


def train_lr_transform(crop_size, upscale_factor):
    return Compose([
        # ToPILImage(),
        Resize((crop_size // upscale_factor,crop_size // upscale_factor), interpolation=Image.NEAREST),
        ToTensor()
    ])


def display_transform():
    return Compose([
        ToPILImage(),
        Resize((400,400)),
        CenterCrop((400,400)),
        ToTensor()
    ])

In [None]:
class TrainDatasetFromFolder(Dataset):
    def __init__(self, dataset_dir, crop_size, upscale_factor):
        super(TrainDatasetFromFolder, self).__init__()
        self.image_filenames = trainImages
        # print(len(self.image_filenames))
        crop_size = calculate_valid_crop_size(crop_size, upscale_factor)
        self.hr_transform = train_hr_transform(crop_size)
        self.lr_transform = train_lr_transform(crop_size, upscale_factor)

    def __getitem__(self, index):
        imagePath = os.path.join(TRAIN_IMAGES,self.image_filenames[index])
        im = Image.open(imagePath).convert('RGB')
        hr_image = self.hr_transform(im)
        lr_image = self.lr_transform(im)
        # if index == 0:
        #   print("HR: ",hr_image.size())
        #   print("LR: ",lr_image.size())
        # print("The self image is: ",self.image_filenames[index]
        return lr_image, hr_image

    def __len__(self):
        return len(self.image_filenames)

In [None]:
UPSCALE_FACTOR = 2
CROP_SIZE = 256

In [None]:
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])

In [None]:
train_set = TrainDatasetFromFolder('/content/drive/MyDrive/CovidNetImages/train', crop_size=CROP_SIZE, upscale_factor=UPSCALE_FACTOR)
# val_set = ValDatasetFromFolder('DIV2K_valid_HR', upscale_factor=UPSCALE_FACTOR)
train_loader = DataLoader(dataset=train_set, num_workers=2, batch_size=8, shuffle=True)
# val_loader = DataLoader(dataset=val_set, num_workers=4, batch_size=1, shuffle=False)



In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# class ConvBlock(nn.Module):
#   def __init__(self,in_channels,out_channels):
#     super().__init__()
#     self.conv1 = nn.Conv2d(in_channels,out_channels,kernel_size=3,padding=1)
#     self.activation = nn.PReLU()
#     self.bacthNorm = nn.BatchNorm2d(out_channels)
#     self.conv2 = nn.Conv2

#   def forward(self,x:torch.Tensor):
#     x = se

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(channels)
        self.prelu = nn.PReLU()
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(channels)

    def forward(self, x):
        residual = self.conv1(x)
        residual = self.bn1(residual)
        residual = self.prelu(residual)
        residual = self.conv2(residual)
        residual = self.bn2(residual)

        return x + residual

In [None]:
class UpsampleBLock(nn.Module):
    def __init__(self, in_channels, up_scale):
        super(UpsampleBLock, self).__init__()
        self.conv = nn.Conv2d(in_channels, in_channels * up_scale ** 2, kernel_size=3, padding=1)
        self.pixel_shuffle = nn.PixelShuffle(up_scale)
        self.prelu = nn.PReLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.pixel_shuffle(x)
        x = self.prelu(x)
        return x

In [None]:
class Generator(nn.Module):
    def __init__(self, scale_factor):
        super().__init__()
        upsample_block_num = int(math.log(scale_factor, 2))

        self.firstConv = nn.Sequential(
            nn.Conv2d(3,64,kernel_size=9,padding=4),
            nn.PReLU()
        )
        self.residualBlocks = nn.ModuleList([
            ResidualBlock(64),
            ResidualBlock(64),
            ResidualBlock(64),
            ResidualBlock(64),
            ResidualBlock(64),
        ])
        self.middleConv = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64)
        ) 
        self.upscaleBlocks = nn.ModuleList([
            UpsampleBLock(64, 2),
            UpsampleBLock(64, 2),
        ])

        self.finalConv = nn.Conv2d(64,3,kernel_size=9,padding=4)

    def forward(self, x):
        block1 = self.firstConv(x)
        block2 = self.residualBlocks[0](block1)
        block3 = self.residualBlocks[1](block2)
        block4 = self.residualBlocks[2](block3)
        block5 = self.residualBlocks[3](block4)
        block6 = self.residualBlocks[4](block5)
        block7 = self.middleConv(block6)
        block8 = self.upscaleBlocks[0](block1 + block7)
        block9 = self.upscaleBlocks[1](block8)
        block10= self.finalConv(block9)
        return (torch.tanh(block10) + 1) / 2

In [None]:
class DiscConvBlocks(nn.Module):
  def __init__(self,in_channels,out_channels):
    super().__init__()
    self.conv1 = nn.Conv2d(in_channels,out_channels, kernel_size=3, stride=1, padding=1)
    self.batchNorm1 = nn.BatchNorm2d(out_channels)
    self.activation1 =  nn.LeakyReLU(0.2)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1,stride=2)
    self.batchNorm2 = nn.BatchNorm2d(out_channels)
    self.activation2 = nn.LeakyReLU(0.2)

  def forward(self,x:torch.Tensor):
    x = self.conv1(x)
    x = self.batchNorm1(x)
    x = self.activation1(x)
    x = self.conv2(x)
    x = self.batchNorm2(x)
    x = self.activation2(x)

    return x

In [None]:
class finalConvBlock(nn.Module):
  def __init__(self,in_channels,out_channels):
    super().__init__()

    self.conv1 = nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=2, padding=1)
    self.batchNorm1 =        nn.BatchNorm2d(in_channels)
    self.activation1 =         nn.LeakyReLU(0.2)
    self.pool =         nn.AdaptiveAvgPool2d(1)
    self.conv2 =        nn.Conv2d(in_channels,out_channels, kernel_size=1)
    self.activation2 =        nn.LeakyReLU(0.2)
    self.conv3 =         nn.Conv2d(out_channels,3 , kernel_size=1)
  
  def forward(self,x:torch.Tensor):
    x = self.conv1(x)
    x = self.batchNorm1(x)
    x = self.activation1(x)
    x = self.pool(x)
    x = self.conv2(x)
    x = self.activation2(x)
    x = self.conv3(x)

    return x

In [None]:
class Discriminator(nn.Module):
    def __init__(self):
      super(Discriminator, self).__init__()
      # self.net = nn.Sequential(
      #     nn.Conv2d(3, 64, kernel_size=3, padding=1),
      #     nn.LeakyReLU(0.2), )
      self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
      self.activation1 =     nn.LeakyReLU(0.2)
      self.conv2 = nn.Conv2d(64,64,kernel_size=3,stride=2,padding=1)
      self.batchNorm = nn.BatchNorm2d(64)
      self.activation2 = nn.LeakyReLU(0.2)
      
      # These are for the final layers
      self.pooling = nn.AdaptiveAvgPool2d(1)
      self.flatten = nn.Conv2d(512,1024,kernel_size=1,)
      self.activation3 = nn.LeakyReLU(0.2) 
      self.flatten2 = nn.Conv2d(1024,1,kernel_size=1)

      self.convBlocks = nn.ModuleList([
          DiscConvBlocks(64,128),
          DiscConvBlocks(128,256),
          DiscConvBlocks(256,512),
      ])


    def forward(self, x):
      x = self.conv1(x)
      x = self.activation1(x)
      x = self.conv2(x)
      x = self.batchNorm(x)
      x = self.activation2(x)
      x = self.convBlocks[0](x)
      x = self.convBlocks[1](x)
      x = self.convBlocks[2](x)
      x = self.pooling(x)
      x = self.flatten(x)
      x = self.activation3(x)
      x = self.flatten2(x)
      batch_size = x.size(0)
      return torch.sigmoid(x.view(batch_size))

In [None]:
class GeneratorLoss(nn.Module):
    def __init__(self):
        super(GeneratorLoss, self).__init__()
        vgg = vgg16(pretrained=True)
        loss_network = nn.Sequential(*list(vgg.features)[:31]).eval()
        for param in loss_network.parameters():
            param.requires_grad = False
        self.loss_network = loss_network
        self.mse_loss = nn.MSELoss()
        self.tv_loss = TVLoss()

    def forward(self, out_labels, out_images, target_images):
        # Adversarial Loss
        adversarial_loss = torch.mean(1 - out_labels)
        # Perception Loss
        perception_loss = self.mse_loss(self.loss_network(out_images), self.loss_network(target_images))
        # Image Loss
        image_loss = self.mse_loss(out_images, target_images)
        # TV Loss
        tv_loss = self.tv_loss(out_images)
        return image_loss + 0.001 * adversarial_loss + 0.006 * perception_loss + 2e-8 * tv_loss

In [None]:
class TVLoss(nn.Module):
    def __init__(self, tv_loss_weight=1):
        super(TVLoss, self).__init__()
        self.tv_loss_weight = tv_loss_weight

    def forward(self, x):
        batch_size = x.size()[0]
        h_x = x.size()[2]
        w_x = x.size()[3]
        count_h = self.tensor_size(x[:, :, 1:, :])
        count_w = self.tensor_size(x[:, :, :, 1:])
        h_tv = torch.pow((x[:, :, 1:, :] - x[:, :, :h_x - 1, :]), 2).sum()
        w_tv = torch.pow((x[:, :, :, 1:] - x[:, :, :, :w_x - 1]), 2).sum()
        return self.tv_loss_weight * 2 * (h_tv / count_h + w_tv / count_w) / batch_size

    @staticmethod
    def tensor_size(t):
        return t.size()[1] * t.size()[2] * t.size()[3]

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [None]:
netG = Generator(UPSCALE_FACTOR)
netD = Discriminator()
generator_criterion = GeneratorLoss()

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth


  0%|          | 0.00/528M [00:00<?, ?B/s]

In [None]:
# generator_criterion = generator_criterion.to('cpu')
generator_criterion = generator_criterion.to(device)
netG = netG.to(device)
netD = netD.to(device)

In [None]:
optimizerG = optim.Adam(netG.parameters(), lr=0.0002)
optimizerD = optim.Adam(netD.parameters(), lr=0.0002)

In [None]:
results = {'d_loss': [], 'g_loss': [], 'd_score': [], 'g_score': [], 'psnr': [], 'ssim': []}

In [None]:
N_EPOCHS = 10

In [None]:
# t = next(iter(train_loader))
# t[0].size()

In [None]:
# from torchsummary import summary
# summary(netG,(3,22,22))

In [None]:
# summary(netD,(3,88,88))

In [None]:
generatorLoss = []
discriminatorLoss = []
import sys

In [None]:

torch.cuda.empty_cache()

In [None]:
for epoch in range(1, N_EPOCHS + 1):
    train_bar = tqdm(train_loader)
    running_results = {'batch_sizes': 0, 'd_loss': 0, 'g_loss': 0, 'd_score': 0, 'g_score': 0}

    netG.train()
    netD.train()
    gL,dL = 0,0
    for data, target in train_bar:
        # print(data.size())
        # print(target.size())
        g_update_first = True
        batch_size = data.size(0)
        running_results['batch_sizes'] += batch_size
        
        real_img = Variable(target)
        z = Variable(data)
        if torch.cuda.is_available():
            real_img = real_img.cuda()
            z = z.cuda()
        
        fake_img = netG(z) # z-> 22 *22 fakeImg -> 88*88
        netD.zero_grad()
        real_out = netD(real_img) # RealImg ->org 88*88
        real_out = real_out.mean()
        fake_out = netD(fake_img).mean() # gen 88*88
        d_loss = 1 - real_out + fake_out
        d_loss.backward(retain_graph=True)
        optimizerD.step()

        fake_img = netG(z)
        fake_out = netD(fake_img).mean()
        netG.zero_grad()
        # fake_out = fake_out.to('cpu')
        # fake_img = fake_img.to('cpu')
        # real_img =  real_img.to('cpu')
        g_loss = generator_criterion(fake_out, fake_img, real_img)
        g_loss = g_loss.to(device)
        g_loss.backward()

        # fake_out = fake_out.to(device)
        # fake_img = fake_img.to(device)

        # fake_img = netG(z)
        # fake_out = netD(fake_img).mean()

        optimizerG.step()

        running_results['g_loss'] += g_loss.item() * batch_size
        running_results['d_loss'] += d_loss.item() * batch_size
        running_results['d_score'] += real_out.item() * batch_size
        running_results['g_score'] += fake_out.item() * batch_size
        gL += running_results['g_loss']
        dL += running_results['d_loss']
        train_bar.set_description(desc='[%d/%d] Loss_D: %.4f Loss_G: %.4f D(x): %.4f D(G(z)): %.4f' % (
            epoch, N_EPOCHS, running_results['d_loss'] / running_results['batch_sizes'],
            running_results['g_loss'] / running_results['batch_sizes'],
            running_results['d_score'] / running_results['batch_sizes'],
            running_results['g_score'] / running_results['batch_sizes']))
    torch.cuda.empty_cache()
    netG.eval()
    out_path = 'training_results/SRF_' + str(UPSCALE_FACTOR) + '/'
    torch.save(netG,"/content/drive/MyDrive/CovidNetImages/SavedSR/generator"+epoch+".pt")
    generatorLoss.append(gL)
    discriminatorLoss.append(dL)
    if not os.path.exists(out_path):
        os.makedirs(out_path)

  0%|          | 0/2997 [00:32<?, ?it/s]


OutOfMemoryError: ignored

In [None]:
torch.save(netG,"/content/drive/MyDrive/CovidNetImages/SavedSR/generatorV1.pt")
torch.save(netD,"/content/drive/MyDrive/CovidNetImages/SavedSR/discV1.pt")

In [None]:
newModel = Generator(UPSCALE_FACTOR)
newModel = torch.load("/content/drive/MyDrive/CovidNetImages/SavedSR/generatorV1.pt")
newModel =newModel.to('cpu')

In [None]:
train_bar = tqdm(train_loader)
running_results = {'batch_sizes': 0, 'd_loss': 0, 'g_loss': 0, 'd_score': 0, 'g_score': 0}

In [None]:
import cv2

In [None]:
from torchvision.utils import save_image

In [None]:
for data,path in train_loader:
  z = Variable(data)
  fake_img = newModel(z)
  for i in range(16):
    save_image(fake_img[i],"/content/drive/MyDrive/CovidNetImages/SR_Out/"+path[i])