## Loading the Data :

In [None]:
 ! pip install -q kaggle

In [None]:
from google.colab import drive
drive.mount("/content/gdrive", force_remount=True)

Mounted at /content/gdrive


In [None]:
%cd /content/gdrive/MyDrive/Cyclegan/

/content/gdrive/MyDrive/Cyclegan


In [None]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = "/content/gdrive/MyDrive/Cyclegan/"

In [None]:
! kaggle datasets download -d balraj98/horse2zebra-dataset

401 - Unauthorized


## Builiding the model :

In [None]:
import torch
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
def get_default_device():
    # Pick GPU if available, else CPU
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    # Move tensor(s) to chosen device
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    # Wrap a dataloader to move data to a device
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        # Yield a batch of data after moving it to device
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        # Number of batches
        return len(self.dl)

In [None]:
device = get_default_device()
device

device(type='cpu')

In [None]:
import torch.nn as nn
from tqdm.notebook import tqdm
import torch.nn.functional as F

In [None]:
discriminator = nn.Sequential(
    # in: 3 x 64 x 64

    nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 64 x 32 x 32

    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.InstanceNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 128 x 16 x 16

    nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.InstanceNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 256 x 8 x 8

    nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.InstanceNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 512 x 4 x 4

    nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=0, bias=False),
    # out: 1 x 1 x 1

    nn.Flatten(),
    nn.Sigmoid())  
        

In [None]:
class ResBlock(nn.Module):
    def __init__(self, f):
        super(ResBlock, self).__init__()
        self.conv = nn.Sequential(nn.Conv2d(f, f, 3, 1, 1), nn.InstanceNorm2d(f), nn.ReLU(),
                                  nn.Conv2d(f, f, 3, 1, 1))
        self.norm = nn.InstanceNorm2d(f)
    def forward(self, x):
        return nn.functional.relu(self.norm(self.conv(x)+x))

In [None]:
f = 64

In [None]:
gen_layers = [nn.ReflectionPad2d(3),
                  # 3 x 70 x 70                                           
                  nn.Conv2d( 3, f, kernel_size=7, stride=1, padding=0), nn.InstanceNorm2d(f), nn.ReLU(True),
                  # 64 x 64 x 64
                  nn.Conv2d( f, 2*f, kernel_size=3, stride=2, padding=1), nn.InstanceNorm2d(2*f), nn.ReLU(True),
                  # 128 x 32 x 32
                  nn.Conv2d(2*f, 4*f, kernel_size=3, stride=2, padding=1), nn.InstanceNorm2d(4*f), nn.ReLU(True)
                  # 256 x 16 x 16
                  ]
for i in range(int(6)):
    gen_layers.append(ResBlock(4*f))  # 256 x 16 x 16

gen_layers.extend([
                nn.ConvTranspose2d(4*f, 4*2*f, kernel_size=3, stride=1, padding=1), nn.PixelShuffle(2), nn.InstanceNorm2d(2*f), nn.ReLU(True),
                # 128 x 32 x 32
                nn.ConvTranspose2d(2*f, 4*f, kernel_size=3, stride=1, padding=1), nn.PixelShuffle(2), nn.InstanceNorm2d(  f), nn.ReLU(True),
                # 64 x 64 x 64
                nn.ReflectionPad2d(3), nn.Conv2d(f, 3, kernel_size = 7, stride=1, padding=0),
                # 3 x 64 x 64
                nn.Tanh()])

generator = nn.Sequential(*gen_layers)

In [None]:
def train_discriminator_A(real_A, opt_d_A):

  # Clear discriminator gradients
  opt_d_A.zero_grad()

  # Training of Discriminator A

  real_preds = discriminator(real_A)
  real_targets = torch.ones(real_A.size(0), 1, device=device)
  real_loss = F.binary_cross_entropy(real_preds, real_targets)

  # Generate fake images
  latent_vector = torch.randn(batch_size, latent_size, 1, 1, device=device)    # Figure out to input fake A with latent vector
  fake = generator(latent_vector)

  # Training with fake images
  fake_preds = discriminator(fake)
  fake_targets = torch.zeros(fake.size(0), 1, device=device)
  fake_loss = F.binary_cross_entropy(fake_preds, fake_targets)

  # Update discriminator weights
  loss = real_loss + fake_loss
  loss.backward()
  opt_d_A.step()
  return loss.item() 

In [None]:
def train_discriminator_B(real_B, opt_d_B):

  # Clear discriminator gradients
  opt_d_B.zero_grad()

  # Training of Discriminator A

  real_preds = discriminator(real_B)
  real_targets = torch.ones(real_B.size(0), 1, device=device)
  real_loss = F.binary_cross_entropy(real_preds, real_targets)

  # Generate fake images
  latent_vector = torch.randn(batch_size, latent_size, 1, 1, device=device)    # Figure out to input fake B with latent vector
  fake = generator(latent_vector)

  # Training with fake images
  fake_preds = discriminator(fake)
  fake_targets = torch.zeros(fake.size(0), 1, device=device)
  fake_loss = F.binary_cross_entropy(fake_preds, fake_targets)

  # Update discriminator weights
  loss = real_loss + fake_loss
  loss.backward()
  opt_d_B.step()
  return loss.item() 

In [None]:
def train_generator_A(opt_g_A):
    # Clear generator gradients
    opt_g_A.zero_grad()
    
    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)        # Latent should be input with B
    fake_images = generator(latent)
    
    # Try to fool the discriminator
    preds = discriminator(fake_images)
    targets = torch.ones(batch_size, 1, device=device)
    loss = F.binary_cross_entropy(preds, targets)
    
    # Update generator weights
    loss.backward()
    opt_g_A.step()
    
    return loss.item()

In [None]:
def train_generator_B(opt_g_B):
    # Clear generator gradients
    opt_g_B.zero_grad()
    
    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)        # Latent should be input with A
    fake_images = generator(latent)
    
    # Try to fool the discriminator
    preds = discriminator(fake_images)
    targets = torch.ones(batch_size, 1, device=device)
    loss = F.binary_cross_entropy(preds, targets)
    
    # Update generator weights
    loss.backward()
    opt_g_B.step()
    
    return loss.item()

In [None]:
from torchvision.utils import save_image

In [None]:
sample_dir = 'generated'
os.makedirs(sample_dir, exist_ok=True)

In [None]:
def save_samples(index, latent_input, show=True):
    fake_images = generator(latent_input)
    fake_fname = 'generated-images-{0:0=4d}.png'.format(index)
    save_image(denorm(fake_images), os.path.join(sample_dir, fake_fname), nrow=8)
    print('Saving', fake_fname)
    if show:
        fig, ax = plt.subplots(figsize=(1, 2))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(fake_images.cpu().detach(), nrow=1).permute(1, 2, 0))

In [None]:
def fit(epochs, lr, start_idx=1):
    torch.cuda.empty_cache()
    
    # Losses & scores
    losses_g_A = []
    losses_g_B = []
    losses_d_A = []
    losses_d_B = []
    
    # Create optimizers
    opt_d_A = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
    opt_d_B = opt_d_A
    opt_g_A = torch.optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
    opt_g_B = opt_g_A

    for epoch in range(epochs):
        for real_images_A, _ in tqdm(train_dl_A):
            # Train discriminator A
            loss_d_A = train_discriminator_A(real_images_A, opt_d_A)
            # Train generator A
            loss_g_A = train_generator_A(opt_g_A)
        for real_images_B, _ in tqdm(train_dl_B):  
            # Train discriminator B
            loss_d_B = train_discriminator_B(real_images_B, opt_d_B)
            # Train generator B
            loss_g_B = train_generator_B(opt_g_B)

        # Record losses & scores
        losses_g_A.append(loss_g_A)
        losses_d_A.append(loss_d_A)
        losses_g_B.append(loss_g_B)
        losses_d_B.append(loss_d_B)
        
        # Log losses & scores (last batch)
        print("Epoch [{}/{}], loss_g_A: {:.4f}, loss_d_A: {:.4f}, loss_g_B: {:.4f}, loss_d_B: {:.4f}".format(
            epoch+1, epochs, loss_g_A, loss_d_A, loss_g_B, loss_d_B))
    
        # Save generated images
        save_samples(epoch+start_idx, fixed_latent, show=False)
    
    return losses_g, losses_d, real_scores, fake_scores