In [1]:
from google.colab import drive
drive.mount('/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /gdrive


In [0]:
%cd '/gdrive/My Drive'

/gdrive/My Drive


In [0]:
!git clone https://github.com/aitorzip/PyTorch-CycleGAN.git

In [0]:
%cd '/gdrive/My Drive/PyTorch-CycleGAN'

In [0]:
%%sh
sh ./download_dataset summer2winter_yosemite

In [0]:
!mv datasets/summer2winter_yosemite /gdrive/My\ Drive/dl-pytorch/datasets/

In [0]:
!ls /gdrive/My\ Drive/dl-pytorch/datasets/

In [0]:
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
  def __init__(self, in_features):
    super(ResidualBlock, self).__init__()
    
    conv_block = [ nn.ReflectionPad2d(1), #mejor padding
                   nn.Conv2d(in_features, in_features, 3),
                   nn.InstanceNorm2d(in_features), #BN para GANS
                   nn.ReLU(True),
                   nn.ReflectionPad2d(1), #mejor para consrvar distribucion
                   nn.Conv2d(in_features, in_features, 3),
                   nn.InstanceNorm2d(in_features)
                 ]
    
    self.conv_block = nn.Sequential(*conv_block)
  def forward(self, x):
    return self.conv_block(x) + x #una idea poderosa

In [0]:
class Generator(nn.Module):
  def __init__(self, input_nc, output_nc, n_residual_blocks=9):
    super(Generator,self).__init__()
    
    # Bloqueconvolucional
    model = [ nn.ReflectionPad2d(3),
            nn.Conv2d(input_nc, 64, 7), # I - 7 + 6 /1 +1 = I
            nn.InstanceNorm2d(64),
             nn.ReLU(True)
            ]
    
    in_features = 64
    out_features = in_features * 2
    
    #Encoding
    for _ in range(2):
      model += [ nn.Conv2d(in_features, out_features, 3, stride=2, padding=1), #I/2
                 nn.InstanceNorm2d(out_features),
                 nn.ReLU(True)
               ]
      in_features = out_features
      out_features = in_features*2
    #transformaciones residuales
    
    for _ in range(n_residual_blocks):
      model += [ResidualBlock(in_features)]
    
    #decoding
    
    out_features = in_features// 2
    for _ in range(2):
      model += [ nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1), #2I
                 nn.InstanceNorm2d(out_features),
                 nn.ReLU(inplace=True)
               ]
      in_features = out_features
      out_features = in_features //2
  
    #salida
    model += [ nn.ReflectionPad2d(3),
               nn.Conv2d(64, output_nc, 7), #I
               nn.Tanh()
             ]
      
    self.model = nn.Sequential(*model)
      
  def forward(self,x):
    return self.model(x)

In [0]:
class Discriminator(nn.Module):
  "PatchGAN: discrimina estilo o textura"
  def __init__(self, input_nc):
    super(Discriminator, self).__init__()
    
    model = [ nn.Conv2d(input_nc, 64, 4, stride=2, padding=1), #I/2
              nn.LeakyReLU(0.2, inplace=True)
            ]
    
    model += [ nn.Conv2d(64, 128, 4, stride=2, padding=1), #I/2
               nn.InstanceNorm2d(128),
              nn.LeakyReLU(0.2, inplace=True)
             ]
    
    model += [ nn.Conv2d(128, 256, 4, stride=2, padding=1), #I/2
               nn.InstanceNorm2d(256),
              nn.LeakyReLU(0.2, inplace=True)
             ]
    
    model += [ nn.Conv2d(256, 512, 4, padding=1), #I-1
               nn.InstanceNorm2d(512),
              nn.LeakyReLU(0.2, inplace=True)
             ]
    
    # Flatten
    model += [nn.Conv2d(512, 1, 4, padding=1)] #I-1
    
    self.model = nn.Sequential(*model)
    
  def forward(self, x):
    x = self.model(x)
    return F.avg_pool2d(x, x.size()[2:]).view(x.size()[0], -1)

In [0]:
import sys
sys.path.append('/gdrive/My Drive/dl-pytorch/')

In [0]:
import glob
import random
import os
import itertools
from PIL import Image

import torch

from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

from utils import ReplayBuffer

In [0]:
class ImageDataset(Dataset):
  def __init__(self, base_dir, transform=None, split='train'):
    self.transform = transforms.Compose(transform)
    self.files_A = sorted(glob.glob(os.path.join(base_dir, '{}/A/*.*'.format(split))))
    self.files_B = sorted(glob.glob(os.path.join(base_dir, '{}/B/*.*'.format(split))))
    
  def __len__(self):
    return max(len(self.files_A), len(self.files_B))
  
  def __getitem__(self,idx):
    image_A = self.transform(Image.open(self.files_A[idx]))
    image_B = self.transform(Image.open(self.files_B[random.randint(0,len(self.files_B)-1)]))
    return {'A': image_A, 'B': image_B}

In [0]:
epoch = 0
n_epochs = 200
batch_size = 4
lr = 0.0002
size = 256
input_nc = 3
output_nc = 3
decay_epoch = 100 #pending

cuda = True
n_cpu = 8

base_dir = '/gdrive/My Drive/dl-pytorch/datasets/summer2winter_yosemite/'

In [42]:
device = torch.device('cuda' if cuda else 'cpu')

def weights_init_normal(m):
  if isinstance(m, nn.Conv2d):
    torch.nn.init.normal(m.weight.data, 0.0, 0.02)
  elif isinstance(m, nn.BatchNorm2d):
    torch.nn.init.normal(m.weight.data, 1.0, 0.02)
    torch.nn.init.constant(m.bias, 0.0)
    
netG_A2B = Generator(input_nc, output_nc)
netG_B2A = Generator(input_nc, output_nc)
netD_A = Discriminator(input_nc)
netD_B = Discriminator(input_nc)

netG_A2B.apply(weights_init_normal)
netG_B2A.apply(weights_init_normal)
netD_A.apply(weights_init_normal)
netD_B.apply(weights_init_normal)

if cuda:
  netG_A2B.to(device)
  netG_B2A.to(device)
  netD_A.to(device)
  netD_B.to(device)
  
criterion_GAN = torch.nn.MSELoss()
criterion_cycle = torch.nn.L1Loss()
criterion_identity = torch.nn.L1Loss()

optimizer_G = torch.optim.Adam(itertools.chain(netG_A2B.parameters(), netG_B2A.parameters()),
                              lr=lr, betas=(0.5, 0.999))
optimizer_D_A = torch.optim.Adam(netD_A.parameters(), lr=lr, betas=(0.5,0.999))
optimizer_D_B = torch.optim.Adam(netD_B.parameters(), lr=lr, betas=(0.5,0.999))

#schedulers (actualizar el learning rate de forma dinamica durante el entrenamiento)

class LambdaLR():
  def __init__(self, n_epochs, offset, decay_start_epoch):
    assert ((n_epochs - decay_start_epoch) > 0)
    self.n_epochs = n_epochs
    self.offset = offset
    self.decay_start_epoch = decay_start_epoch
    
  def step(self, epoch):
    return 1 - max(0, epoch + self.offset - self.decay_start_epoch)/(self.n_epochs - self.decay_start_epoch)

lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(optimizer_G, lr_lambda=LambdaLR(n_epochs,epoch,decay_epoch).step)
lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(optimizer_D_A, lr_lambda=LambdaLR(n_epochs,epoch,decay_epoch).step)
lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(optimizer_D_B, lr_lambda=LambdaLR(n_epochs,epoch,decay_epoch).step)

  """


In [0]:
#inputs y targets

Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor
target_real = Tensor(batch_size).fill_(1.0)
target_fake = Tensor(batch_size).fill_(0.0)

fake_A_buffer = ReplayBuffer()
fake_B_buffer = ReplayBuffer()

#Dataloader

transform = [ transforms.Resize(int(size*1.12), Image.BICUBIC),
              transforms.RandomCrop(size),
             transforms.RandomHorizontalFlip(),
             transforms.ToTensor(),
             transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
            ]

dataloader = DataLoader(ImageDataset(base_dir,transform=transform),
                       batch_size=batch_size, shuffle=True, num_workers=n_cpu, drop_last=True)

def Gen_GAN_loss(G, D, real, loss, target_real):
  fake = G(real)
  pred_fake = D(fake)
  L = loss(pred_fake, target_real)
  return L, fake

def cycle_loss(G1, G2, real, loss):
  recovered = G2(G1(real))
  L = loss(recovered, real)
  return L

def identity_loss(G, real, loss):
  same = G(real)
  L = loss(same,real)
  return L



In [0]:
for epoch in range(epoch, n_epochs):
  for i, batch in enumerate(dataloader):
    real_A = batch['A'].to(device)
    real_B = batch['B'].to(device)
    
    # Generativas
    optimizer_G.zero_grad()
    
    loss_GAN_A2B, fake_B = Gen_GAN_loss(netG_A2B, netD_B, real_A, criterion_GAN, target_real)
    loss_GAN_B2A, fake_A = Gen_GAN_loss(netG_B2A, netD_A, real_B, criterion_GAN, target_real)
    
    loss_cycle_ABA = cycle_loss(netG_A2B, netG_B2A, real_A, criterion_cycle)
    loss_cycle_BAB = cycle_loss(netG_B2A, netG_A2B, real_B, criterion_cycle)
    
    loss_identity_A = identity_loss(netG_B2A, real_A, criterion_identity)
    loss_identity_B = identity_loss(netG_A2B, real_B, criterion_identity)
    
    loss_G = (loss_GAN_A2B + loss_GAN_B2A) + 10.0*(loss_cycle_ABA + loss_cycle_BAB) + 5.0 *(loss_identity_A + loss_identity_B)
    loss_G.backward()
    
    optimizer_G.step
    
    #Discriminativas
    optimizer_D_A.zero_grad()
    
    loss_D_A = Disc_GAN_loss(netD_A, fake_A, real_A, fake_A_buffer, criterion_GAN, target_real, target_fake)
    loss_D_A.backward()
    optimizer_D_A.step()
    
    optimizer_D_B.zero_grad()
    
    loss_D_B = Disc_GAN_loss(netD_B, fake_B, real_B, fake_B_buffer, criterion_GAN, target_real, target_fake)
    loss_D_B.backward()
    optimizer_D_B.step()
    
  lr_scheduler_G.step()
  lr_scheduler_D_A.step()
  lr_scheduler_D_B.step()
  

In [0]:
import sys
import os

import torchvision.transforms as transforms
from torchvision.utils import save_image
from torch.utils.data import DataLoader
from torch.autograd import Variable
import torch


from datasets import ImageDataset

generator_A2B='output/netG_A2B.pth'
generator_B2A='output/netG_B2A.pth'

###### Definition of variables ######
# Networks
netG_A2B = Generator(input_nc, output_nc)
netG_B2A = Generator(output_nc, input_nc)

if cuda:
    netG_A2B.cuda()
    netG_B2A.cuda()

# Load state dicts
netG_A2B.load_state_dict(torch.load(generator_A2B))
netG_B2A.load_state_dict(torch.load(generator_B2A))

# Set model's test mode
netG_A2B.eval()
netG_B2A.eval()

# Inputs & targets memory allocation
Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor
input_A = Tensor(batchSize, input_nc, size, size)
input_B = Tensor(batchSize, output_nc, size, size)

# Dataset loader
transforms_ = [ transforms.ToTensor(),
                transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5)) ]
dataloader = DataLoader(ImageDataset(dataroot, transforms_=transforms_, mode='test'), 
                        batch_size=batchSize, shuffle=False, num_workers=n_cpu, drop_last=True)
###################################

###### Testing######

# Create output dirs if they don't exist
if not os.path.exists('output/A'):
    os.makedirs('output/A')
if not os.path.exists('output/B'):
    os.makedirs('output/B')

for i, batch in enumerate(dataloader):
    # Set model input
    real_A = Variable(input_A.copy_(batch['A']))
    real_B = Variable(input_B.copy_(batch['B']))

    # Generate output
    fake_B = 0.5*(netG_A2B(real_A).data + 1.0)
    fake_A = 0.5*(netG_B2A(real_B).data + 1.0)

    # Save image files
    save_image(fake_A, 'output/A/%04d.png' % (i+1))
    save_image(fake_B, 'output/B/%04d.png' % (i+1))

    sys.stdout.write('\rGenerated images %04d of %04d' % (i+1, len(dataloader)))

sys.stdout.write('\n')
###################################