By Afifa Tariq

Import Libraries


In [0]:
import torch.nn as nn
import torch.nn.functional as F
import torch
from torchvision.models import vgg19
import math
import glob
import random
import os
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as transforms
import numpy as np
import itertools
import sys
from torchvision.utils import save_image, make_grid
from torch.autograd import Variable
import torchvision.datasets as dset
import zipfile
from google.colab import drive
import matplotlib.pyplot as plt


Define Generator, and Discriminator Classes

In [0]:

class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()
        self.conv_block = nn.Sequential(
            nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(in_features, 0.8),
            nn.ReLU(),
            nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(in_features, 0.8),
            nn.ReLU(),
            nn.Upsample(scale_factor=2),
            nn.BatchNorm2d(in_features, 0.8),
            nn.ReLU(),
            nn.ConvTranspose2d(in_features, in_features, kernel_size=3, stride=1, padding=1)
        )

    def forward(self, x):
        return self.conv_block(x)


class GeneratorResNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=3, n_residual_blocks=5):
        super(GeneratorResNet, self).__init__()

        # First layer
        self.conv1 = nn.Sequential(nn.Conv2d(in_channels, 128, kernel_size=4, stride=1, padding=4), nn.PReLU())
        

        # Residual blocks
        res_blocks = []
        for _ in range(n_residual_blocks):
            res_blocks.append(ResidualBlock(128))
        self.res_blocks = nn.Sequential(*res_blocks)

        # Second conv layer post residual blocks
        self.conv2 = nn.Sequential(nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1), nn.ReLU())

        self.conv3 = nn.Sequential(nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1), nn.ReLU())

        # Final output layer
        self.conv4 = nn.Sequential(nn.Conv2d(64, out_channels, kernel_size=3, stride=1, padding=4), nn.Sigmoid())

    def forward(self, x):
        out1 = self.conv1(x)
        out = self.res_blocks(out1)
        out = self.conv2(out)
        out = self.conv3(out)
        out = self.conv4(out)
        return out


class Discriminator(nn.Module):
    def __init__(self, input_shape):
        super(Discriminator, self).__init__()

        self.input_shape = input_shape
        in_channels, in_height, in_width = self.input_shape
        patch_h, patch_w = int(in_height / 2 ** 4), int(in_width / 2 ** 4)
        self.output_shape = (1, patch_h, patch_w)

        def discriminator_block(in_filters, out_filters, first_block=False):
            layers = []
            layers.append(nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=2, padding=1))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            if not first_block:
                layers.append(nn.BatchNorm2d(out_filters))
            return layers

        layers = []
        in_filters = in_channels
        for i, out_filters in enumerate([64, 128, 256, 512]):
            layers.extend(discriminator_block(in_filters, out_filters, first_block=(i == 0)))
            in_filters = out_filters

        layers.append(nn.Conv2d(out_filters, in_filters, kernel_size=3, stride=1, padding=1))
        layers.append(nn.BatchNorm2d(in_filters))
        layers.append(nn.LeakyReLU(0.2, inplace=True))
        layers.append(nn.Conv2d(in_filters, in_filters, kernel_size=3, stride=1, padding=1))
        layers.append(nn.BatchNorm2d(in_filters))
        layers.append(nn.LeakyReLU(0.2, inplace=True))

        self.model = nn.Sequential(*layers)
        self.adv_layer = nn.Sequential(*layers, nn.Sigmoid())
        # The height and width of downsampled image
        #ds_size = 64 // 2 ** 4
        #self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), nn.Sigmoid())

    def forward(self, img):
        #return self.model(img)

        out = self.model(img)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)
        print(validity.size())
        return validity

Create class for dataset which overrides the dataset class. This class will let me divide the CelebA dataset images into low resolution images and high resolution images.


In [0]:
# Normalization parameters for pre-trained PyTorch models
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])


class ImageDataset(Dataset):
    def __init__(self, root, hr_shape):
        hr_height, hr_width = hr_shape
        # Transforms for low resolution images and high resolution images
        self.lr_transform = transforms.Compose(
            [
                transforms.Resize((hr_height // 4, hr_height // 4), Image.BICUBIC),
                transforms.ToTensor(),
                transforms.Normalize(mean, std),
            ]
        )
        self.hr_transform = transforms.Compose(
            [
                transforms.Resize((hr_height, hr_height), Image.BICUBIC),
                transforms.ToTensor(),
                transforms.Normalize(mean, std),
            ]
        )

        self.files = sorted(glob.glob(root + "/*.*"))

    def __getitem__(self, index):
        img = Image.open(self.files[index % len(self.files)])
        img_lr = self.lr_transform(img)
        img_hr = self.hr_transform(img)

        return {"lr": img_lr, "hr": img_hr}

    def __len__(self):
        return len(self.files)

Make folders to save data and define variables


In [0]:
os.makedirs("images", exist_ok=True)
os.makedirs("saved_models", exist_ok=True)

epoch=0
n_epochs=20
root = 'data_faces/img_align_celeba'
dataset_name="img_align_celeba"
batch_size=4
lr=0.0002
b1=0.5
b2=0.999
decay_epoch=100
n_cpu=2
hr_height = 64
hr_width = 64
channels=3
sample_interval=100
checkpoint_interval=-1

Initialize Generator and Discriminator
Define Loss functions

In [0]:
cuda = torch.cuda.is_available()
hr_shape = (hr_height, hr_width)

# Initialize generator and discriminator
generator = GeneratorResNet()
discriminator = Discriminator(input_shape=(channels, *hr_shape))


# Losses
criterion_GAN = torch.nn.BCELoss()
criterion_content = torch.nn.L1Loss()

if cuda:
    generator = generator.cuda()
    discriminator = discriminator.cuda()
    criterion_GAN = criterion_GAN.cuda()
    criterion_content = criterion_content.cuda()

if epoch != 0:
    # Load pretrained models
    generator.load_state_dict(torch.load("saved_models/generator_%d.pth"))
    discriminator.load_state_dict(torch.load("saved_models/discriminator_%d.pth"))

# Optimizers
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(b1, b2))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(b1, b2))

Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor


Download CelebA dataset


In [0]:
!mkdir data_faces && wget https://s3-us-west-1.amazonaws.com/udacity-dlnfd/datasets/celeba.zip 

with zipfile.ZipFile("celeba.zip","r") as zip_ref:
  zip_ref.extractall("data_faces/")

--2019-12-14 19:27:39--  https://s3-us-west-1.amazonaws.com/udacity-dlnfd/datasets/celeba.zip
Resolving s3-us-west-1.amazonaws.com (s3-us-west-1.amazonaws.com)... 52.219.112.41
Connecting to s3-us-west-1.amazonaws.com (s3-us-west-1.amazonaws.com)|52.219.112.41|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1443490838 (1.3G) [application/zip]
Saving to: ‘celeba.zip’


2019-12-14 19:28:52 (20.5 MB/s) - ‘celeba.zip’ saved [1443490838/1443490838]



I used this code to delete some of the image files from the dataset to make it smaller so that the training time would decrease

In [0]:


files = os.listdir(root)  # Get filenames in current folder
files = random.sample(files, 20000)  # Pick 500 random files
for file in files:  # Go over each file name to be deleted
    f = os.path.join(root, file)  # Create valid path to file
    os.remove(f)  # Remove the file

img_list = os.listdir(root)
print(len(img_list))

182599


In [0]:
imgdataset = ImageDataset(root, hr_shape=hr_shape)

# Create the dataloader
dataloader = DataLoader(
    imgdataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=n_cpu,
)

Mounting my drive to google colab so that I can save the output images and models to the drive

In [0]:
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


Make folders in google drive to save the images and models in

In [0]:

!mkdir -p "/content/gdrive/My Drive/images"
!mkdir -p "/content/gdrive/My Drive/saved_models"


# Training 

In [0]:

G_losses = []
D_losses = []

# ----------
#  Training
# ----------

for epoch in range(epoch, n_epochs):
    for i, imgs in enumerate(dataloader):

        # Configure model input
        imgs_lr = Variable(imgs["lr"].type(Tensor))
        imgs_hr = Variable(imgs["hr"].type(Tensor))

        # Adversarial ground truths
        valid = Variable(Tensor(np.ones((imgs_lr.size(0), *discriminator.output_shape))), requires_grad=False)
        fake = Variable(Tensor(np.zeros((imgs_lr.size(0), *discriminator.output_shape))), requires_grad=False)

        # ------------------
        #  Train Generators
        # ------------------

        optimizer_G.zero_grad()

        # Generate a high resolution image from low resolution input
        gen_hr = generator(imgs_lr)
        print(gen_hr.size())

        # Adversarial loss
        loss_GAN = criterion_GAN(discriminator(gen_hr), valid)

        # Content loss
        #gen_features = feature_extractor(gen_hr)
        #real_features = feature_extractor(imgs_hr)
        loss_content = criterion_content(gen_hr, real_features.detach())

        # Total loss
        loss_G = loss_content + 1e-3 * loss_GAN

        loss_G.backward()
        optimizer_G.step()

        # ---------------------
        #  Train Discriminator
        # ---------------------

        optimizer_D.zero_grad()

        # Loss of real and fake images
        loss_real = criterion_GAN(discriminator(imgs_hr), valid)
        loss_fake = criterion_GAN(discriminator(gen_hr.detach()), fake)

        # Total loss
        loss_D = (loss_real + loss_fake) / 2

        loss_D.backward()
        optimizer_D.step()

        # Save Losses for plotting later
        G_losses.append(loss_G.item())
        D_losses.append(loss_D.item())

        # --------------
        #  Log Progress
        # --------------
        if i % 50 == 0:
          print(
              "[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
              % (epoch, n_epochs, i, len(dataloader), loss_D.item(), loss_G.item())
          )

        batches_done = epoch * len(dataloader) + i
        if batches_done % sample_interval == 0:
            # Save image grid with upsampled inputs and SRGAN outputs
            imgs_lr = nn.functional.interpolate(imgs_lr, scale_factor=4)
            gen_hr = make_grid(gen_hr, nrow=1, normalize=True)
            imgs_lr = make_grid(imgs_lr, nrow=1, normalize=True)
            img_grid = torch.cat((imgs_lr, gen_hr), -1)
            save_image(img_grid, "/content/gdrive/My Drive/images/%d.png" % batches_done, normalize=False)

    if checkpoint_interval != -1 and epoch % checkpoint_interval == 0:
        # Save model checkpoints
        torch.save(generator.state_dict(), "/content/gdrive/My Drive/saved_models/generator_%d.pth" % epoch)
        torch.save(discriminator.state_dict(), "/content/gdrive/My Drive/saved_models/discriminator_%d.pth" % epoch)

torch.Size([4, 3, 678, 678])


In [0]:
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [0]:
from google.colab import files
uploaded = files.upload()



In [0]:
from skimage import io

img_file = "16x16-obama.jpg"
img_lr = io.imread(img_file)


input_img =torch.tensor(img_lr).float()
input_img = Variable(input_img)
input_img = np.transpose(input_img, (2, 0, 1))
input_img= input_img.unsqueeze(0)
# Generate a high resolution image from low resolution input
gen_hr = generator(input_img)