# Downloading requirements

This project is realisation of pix2pix model on pytorch
The source: https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
import torch.optim as optim
from time import time

import os
import pathlib
import time
import datetime

from matplotlib import pyplot as plt
from IPython import display
from PIL import Image

from pathlib import Path
import torch
import numpy as np
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from torchvision.transforms.functional import crop

import os

# Downloading dataset

Let's download the dataset of shoes to train our model (taken from https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/tutorials/generative/pix2pix.ipynb#scrollTo=2CbTEt448b4R)

In [None]:
!git clone https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix

In [None]:
os.chdir('pytorch-CycleGAN-and-pix2pix/')

In [None]:
!pip install -r requirements.txt

In [None]:
!bash ./datasets/download_pix2pix_dataset.sh edges2shoes

In [None]:
!pip3 install Pillow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
TRAIN_DIR=Path('/content/pytorch-CycleGAN-and-pix2pix/datasets/edges2shoes/train')
VAL_DIR=Path('/content/pytorch-CycleGAN-and-pix2pix/datasets/edges2shoes/train/10000_AB.jpg')
train_files=sorted(list(TRAIN_DIR.rglob('*.jpg')))
val_files=sorted(list(VAL_DIR.rglob('*.jpg')))

In [None]:
reshape_size=256

In [None]:
class P2PDataset(Dataset):

  def __init__(self, files):
    super().__init__()
    self.files=sorted(files)
    self.len_=len(self.files)

  def load_sample(self, file):
    image=Image.open(file)
    image.load()
    return image

  def __len__(self):
    return self.len_
  def __getitem__(self, index):
    transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
        ])
    x=self.load_sample(self.files[index])
    y=crop(x, top=0, left=0, height=256, width=256)
    x=crop(x, top=0, left=256, height=256, width=256)
    x=np.array(x, dtype='float32')
    y=np.array(x, dtype='float32')
    x=transform(x)
    y=transform(y)
    return x, y

In [None]:
train_dataset=P2PDataset(train_files)
val_dataset=P2PDataset(val_files)

#Building the model

In [None]:
class downsample(nn.Module):
  def __init__(self, in_channels, out_channels, kernel_size, stride, padding=1,use_batchnorm=True):
    super().__init__()
    if use_batchnorm:
      self.core=nn.Sequential(
          nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
          nn.BatchNorm2d(out_channels),
          nn.LeakyReLU(inplace=True)
      )
    else:
      self.core=nn.Sequential(
        nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
        nn.LeakyReLU(inplace=True)
    )
  def forward(self, x):
    return self.core(x)

In [None]:
class upsample(nn.Module):
  def __init__(self, in_channels, out_channels, kernel_size, stride, padding=1,use_dropout=False):
    super().__init__()
    if use_dropout:
      self.core=nn.Sequential(
          nn.ConvTranspose2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
          nn.BatchNorm2d(out_channels),
          nn.Dropout(0.5),
          nn.ReLU(inplace=True)
      )
    else:
      self.core=nn.Sequential(
        nn.ConvTranspose2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    )
  def forward(self, x):
    return self.core(x)

In [None]:
class generator(nn.Module):
  def __init__(self):
    super().__init__()

    self.encoders=[
        downsample(in_channels=3, out_channels=64, kernel_size=4, stride=2, use_batchnorm=False),
        downsample(in_channels=64, out_channels=128, kernel_size=4, stride=2),
        downsample(in_channels=128, out_channels=256, kernel_size=4, stride=2),
        downsample(in_channels=256, out_channels=512, kernel_size=4, stride=2),
        downsample(in_channels=512, out_channels=512, kernel_size=4, stride=2),
        downsample(in_channels=512, out_channels=512, kernel_size=4, stride=2),
        downsample(in_channels=512, out_channels=512, kernel_size=4, stride=2),
        downsample(in_channels=512, out_channels=1024, kernel_size=4, stride=2)
    ]
    self.decoders=[
        upsample(in_channels=1024, out_channels=512, kernel_size=4, stride=2,use_dropout=True),
        upsample(in_channels=512*2, out_channels=512, kernel_size=4, stride=2, use_dropout=True),
        upsample(in_channels=512*2, out_channels=512, kernel_size=4, stride=2, use_dropout=True),
        upsample(in_channels=512*2, out_channels=512, kernel_size=4, stride=2),
        upsample(in_channels=512*2, out_channels=256, kernel_size=4, stride=2),
        upsample(in_channels=256*2, out_channels=128, kernel_size=4, stride=2),
        upsample(in_channels=128*2, out_channels=64, kernel_size=4, stride=2),
    ]
    self.last=nn.ConvTranspose2d(in_channels=64*2, out_channels=3, kernel_size=4, stride=2, padding=1)
    
  def forward(self, input):
    x=input
    skips=[]
    
    for encoder in self.encoders:
      x=encoder(x)
      skips.append(x)
    
    skips=reversed(skips[:-1])
    step=0

    for decoder,skip in zip(self.decoders, skips):
      x=decoder(x)
      x=torch.cat((x,skip),1)
      step+=1
    x=self.last(x)
    return x

In [None]:
class discriminator(nn.Module):
  def __init__(self):
    super().__init__()
    self.down0=downsample(in_channels=6, out_channels=64, kernel_size=4, stride=2, use_batchnorm=False)
    self.down1=downsample(in_channels=64, out_channels=128, kernel_size=4, stride=2)
    self.down2=downsample(in_channels=128, out_channels=256, kernel_size=4, stride=2)
    self.zero_pad0=nn.ZeroPad2d(1)
    self.conv=nn.Conv2d(in_channels=256,out_channels=512, kernel_size=4, stride=1)
    self.batchnorm=nn.BatchNorm2d(512)
    self.activation=nn.LeakyReLU()
    self.zero_pad1=nn.ZeroPad2d(1)
    self.last=nn.Conv2d(in_channels=512, out_channels=1, kernel_size=4, stride=1)
  def forward(self, input):
    e=self.down0(input)
    e=self.down1(e)
    e=self.down2(e)
    e=self.zero_pad0(e)
    e=self.conv(e)
    e=self.batchnorm(e)
    e=self.activation(e)
    e=self.zero_pad1(e)
    e=self.last(e)
    return e

#Loss functions

In [None]:
def gen_loss(disc_output, gen_output, target):
  Lambda=100 #by the authors of the model
  gan_loss=nn.BCEWithLogitsLoss()
  d_loss=gan_loss(torch.ones_like(gen_output), gen_output)
  l1_loss=torch.mean(torch.abs(target - gen_output))
  loss=d_loss-Lambda*l1_loss
  return loss, l1_loss, d_loss

In [None]:
def disc_loss(real_output, generated_output):
  loss=nn.BCEWithLogitsLoss()
  real_loss=loss(torch.ones_like(real_output),real_output)
  gen_loss=loss(torch.ones_like(generated_output), generated_output)
  return real_loss+gen_loss

#Train

In [None]:
device='cpu'
if torch.cuda.is_available():
  device='cuda'

In [None]:
def train(discriminator, generator, train_dataset, val_dataset, epochs, batch_size):
  train_loader=DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
  val_loader=DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

  history = []
  log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
  val_loss {v_loss:0.4f} train_acc {t_acc:0.4f} val_acc {v_acc:0.4f}"

  with tqdm(desc="epoch", total=epochs) as pbar_outer:
        gen_opt = torch.optim.Adam(generator.parameters())

        disc_opt= torch.optim.Adam(discriminator.parameters())
        used_data=0
        #train
        for epoch in range(epochs):
          running_loss=0.0
          running_corrects = 0
          processed_data = 0
          train_loss=0
          val_loss=0
          counter=0
          for X_batch, Y_batch in train_loader:
            X_batch = X_batch.to(device)
            Y_batch = Y_batch.to(device)
            gen_opt.zero_grad()
            disc_opt.zero_grad()

            gen_output=generator(X_batch)
            disc_real_output=discriminator(torch.cat((X_batch, Y_batch),1))
            disc_generated_output=discriminator(torch.cat((X_batch, gen_output),1))

            gen_total_loss, gen_gan_loss, gen_l1_loss=gen_loss(disc_generated_output, gen_output, Y_batch)
            discriminator_loss=disc_loss(disc_real_output, disc_generated_output)

            gen_total_loss.backward(retain_graph=True)
            discriminator_loss.backward()

            gen_opt.step()
            disc_opt.step()
            train_loss+=gen_total_loss/len(X_batch)
            used_data+=1
            print('Data passed:',used_data*batch_size, '/n', 'Data left:', len(train_dataset)-used_data*batch_size)
            counter+=batch_size
            if counter>1200:
              break
            #validation
          counter=0
          for X_batch, Y_batch in val_loader:
            X_batch = X_batch.to(device)
            Y_batch = Y_batch.to(device)
            with torch.no_grad():
              gen_output=generator(X_batch)
              disc_real_output=discriminator(torch.cat((X_batch, Y_batch),1))
              disc_generated_output=discriminator(torch.cat((X_batch, gen_output),1))

              gen_total_loss, gen_gan_loss, gen_l1_loss=gen_loss(disc_generated_output, gen_output, Y_batch)
              discriminator_loss=disc_loss(disc_real_output, disc_generated_output)
              val_loss+=gen_total_loss/len(X_batch)
            counter+=batch_size
            if counter>500:
              break
          history.append((train_loss, val_loss))
          pbar_outer.update(1)
          tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,\
                                           v_loss=val_loss))

  return history

In [None]:
disc=discriminator()
gen=generator()
gen=gen.to(device)
disc=disc.to(device)
epochs=2
batch_size=32

In [None]:
history=train(disc, gen, train_dataset, val_dataset, epochs, batch_size)

#Predict

In [2]:
def predict(generator, dataset, model, device, batch_size):
  test_loader=DataLoader(dataset, batch_size=batch_size, shuffle=False)
  outputs=[]
  generator.eval()
  with torch.no_grad():
    for X in tqdm(test_loader):
      input=X.to(device)
      
      output=model(input).cpu()

      outputs.append(output)
  
  return outputs

In [None]:
: