<a href="https://colab.research.google.com/github/Victorious3/pix2pix/blob/master/pix2pix.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import

In [None]:
import glob
import random
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# Dataset

In [None]:
_URL = 'bash ./datasets/download_pix2pix_dataset.sh facades'

class ImageDataset(Dataset):
    def __init__(self, args, root, transforms_=None, mode='train'):
        self.transform = transforms.Compose(transforms_)
        self.args = args
        self.files = sorted(glob.glob(os.path.join(root, mode) + '/*.*'))

    def __getitem__(self, index):

        img = Image.open(self.files[index])
        w, h = img.size

        if self.args.which_direction == 'AtoB':
            img_A = img.crop((0, 0, w/2, h))
            img_B = img.crop((w/2, 0, w, h))
        else:
            img_B = img.crop((0, 0, w/2, h))
            img_A = img.crop((w/2, 0, w, h))


        if np.random.random() < 0.5:
            img_A = Image.fromarray(np.array(img_A)[:, ::-1, :], 'RGB')
            img_B = Image.fromarray(np.array(img_B)[:, ::-1, :], 'RGB')

        img_A = self.transform(img_A)
        img_B = self.transform(img_B)

        return {'A': img_A, 'B': img_B}

    def __len__(self):
        return len(self.files)

# Loader

In [None]:
# Configure dataloaders
def Get_dataloader(args):
    transforms_ = [ transforms.Resize((args.img_height, args.img_width), Image.BICUBIC),
                transforms.ToTensor(),
                transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5)) ]

    train_dataloader = DataLoader(ImageDataset(args, "%s/%s" % (args.data_root,args.dataset_name),
                        transforms_=transforms_,mode='train'),
                        batch_size=args.batch_size, shuffle=True, num_workers=args.n_cpu, drop_last=True)

    test_dataloader = DataLoader(ImageDataset(args, "%s/%s" % (args.data_root,args.dataset_name),
                            transforms_=transforms_, mode='test'),
                            batch_size=10, shuffle=True, num_workers=1, drop_last=True)

    val_dataloader = DataLoader(ImageDataset(args, "%s/%s" % (args.data_root,args.dataset_name),
                            transforms_=transforms_, mode='val'),
                            batch_size=10, shuffle=True, num_workers=1, drop_last=True)

    return train_dataloader, test_dataloader, val_dataloader

# Randomize

# Optimizer

In [None]:
import torch
# Adam Optimizers
def Get_optimizer_func(args, generator, discriminator):
    ArgLr = args.lr
    ArgB1 = args.b1
    ArgB2 = args.b2

    optimizer_G = torch.optim.Adam(
                    generator.parameters(),
                    lr=ArgLr, betas=(ArgB1, ArgB2))
    optimizer_D = torch.optim.Adam(
                    discriminator.parameters(),
                    lr=ArgLr, betas=(ArgB1, ArgB2))
    
    return optimizer_G, optimizer_D

# Loss functions
def Get_loss_func(args):
    crit_GAN = torch.nn.BCELoss()
    crit_pixelwise = torch.nn.L1Loss()
    if torch.cuda.is_available():
        crit_GAN.cuda()
        crit_pixelwise.cuda()
    return crit_GAN, crit_pixelwise

# Discriminator

In [None]:
import torch.nn as nn
import torch

class Discriminator(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels * 2, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.layer4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.layer5 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=4, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.out = nn.Sequential(
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Sigmoid()
        )

    def forward(self, img_a, img_b):
        input_img = torch.cat((img_a, img_b), 1)
        x = self.layer1(input_img)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.out(x)
        return x
    

# Generator
- UNet
- Generator
- Initialize

In [None]:
class UNetDown(nn.Module):
    def __init__(self, in_size, out_size, normalize=True, dropout=0.0):
        super(UNetDown, self).__init__()
        layers = [nn.Conv2d(in_size, out_size, 4, 2, 1, bias=False)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_size))
        layers.append(nn.LeakyReLU(0.2))
        if dropout:
            layers.append(nn.Dropout(dropout))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

class UNetUp(nn.Module):
    def __init__(self, in_size, out_size, dropout=0.0):
        super(UNetUp, self).__init__()
        layers = [  nn.ConvTranspose2d(in_size, out_size, 4, 2, 1, bias=False),
                    nn.InstanceNorm2d(out_size),
                    nn.ReLU(inplace=True)]
        if dropout:
            layers.append(nn.Dropout(dropout))

        self.model = nn.Sequential(*layers)

    def forward(self, x, skip_input):
        x = self.model(x)
        x = torch.cat((x, skip_input), 1)

        return x

class GeneratorUNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=3):
        super(GeneratorUNet, self).__init__()

        self.down1 = UNetDown(in_channels, 64, normalize=False)
        self.down2 = UNetDown(64, 128)
        self.down3 = UNetDown(128, 256)
        self.down4 = UNetDown(256, 512, dropout=0.5)
        self.down5 = UNetDown(512, 512, dropout=0.5)
        self.down6 = UNetDown(512, 512, dropout=0.5)
        self.down7 = UNetDown(512, 512, dropout=0.5)
        self.down8 = UNetDown(512, 512, normalize=False, dropout=0.5)

        self.up1 = UNetUp(512, 512, dropout=0.5)
        self.up2 = UNetUp(1024, 512, dropout=0.5)
        self.up3 = UNetUp(1024, 512, dropout=0.5)
        self.up4 = UNetUp(1024, 512, dropout=0.5)
        self.up5 = UNetUp(1024, 256)
        self.up6 = UNetUp(512, 128)
        self.up7 = UNetUp(256, 64)

        self.up8 = nn.Sequential(
            nn.ConvTranspose2d(128, out_channels, 4, 2, 1),
            nn.Tanh()
        )
        
    def forward(self, x):
        # U-Net generator with skip connections from encoder to decoder
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        d6 = self.down6(d5)
        d7 = self.down7(d6)
        d8 = self.down8(d7)
        u1 = self.up1(d8, d7)
        u2 = self.up2(u1, d6)
        u3 = self.up3(u2, d5)
        u4 = self.up4(u3, d4)
        u5 = self.up5(u4, d3)
        u6 = self.up6(u5, d2)
        u7 = self.up7(u6, d1)

        return self.up8(u7)


def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        torch.nn.init.normal(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm2d') != -1:
        torch.nn.init.normal(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant(m.bias.data, 0.0)

def Create_nets(args):
    generator = GeneratorUNet(args.in_channels, args.out_channels)
    discriminator = Discriminator(args.out_channels)

    if torch.cuda.is_available():
        generator = generator.cuda()
        discriminator = discriminator.cuda()

    # Initialize weights
    generator.apply(weights_init_normal)
    discriminator.apply(weights_init_normal)

    return generator, discriminator        

# Training

# Testing