In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
DATA_DIR = "/kaggle/input/animefacedataset"
print(os.listdir(DATA_DIR))

In [None]:
print(os.listdir(DATA_DIR+'/images')[:10])

In [None]:
sample_dir = 'generated'
os.makedirs(sample_dir,exist_ok=True)

In [None]:
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import torchvision.transforms as T


In [None]:
image_size = 64
batch_size = 128
stats = (0.5,0.5,0.5),(0.5,0.5,0.5)

In [None]:
train_ds = ImageFolder(DATA_DIR, transform=T.Compose([
    T.Resize(image_size),
    T.CenterCrop(image_size),
    T.ToTensor(),
    T.Normalize(*stats)
                                                       
]))

train_dl = DataLoader(train_ds,batch_size,shuffle=True, num_workers=3, pin_memory=True)

In [None]:
import torch
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
def denorm(img_tensors):
  return img_tensors*stats[1][0]+ stats[0][0]

In [None]:
def show_images(images, nmax=64):
  fig,ax = plt.subplots(figsize=(8,8))
  ax.set_xticks([]); ax.set_yticks([])
  ax.imshow(make_grid(denorm(images.detach()[:nmax]),nrow=8).permute(1,2,0))


def show_batch(dl,nmax=64):
  for images,_ in dl:
    show_images(images,nmax)
    break

In [None]:
show_batch(train_dl)

In [None]:
def get_defualt_deivce():
  """Pick GPU is availabe, else CPU"""

  if torch.cuda.is_available():
    return torch.device('cuda')
  else:
    return torch.device('cpu')

def to_device(data,device):
  """Move tensors to chosen device"""
  if isinstance(data,(list,tuple)):
    return [to_device(x,device) for x in data]
  return data.to(device, non_blocking=True)


class DeviceDataLoader():
  """Wrap a dataLoader to move data to a device"""

  def __init__(self,dl,device):
    self.dl = dl
    self.device = device


  def __iter__(self):
    """Yield a batch of data after moving it to device"""
    for b in self.dl:
      yield to_device(b,self.device)

  def __len__(self):
    """Number of batches"""
    return len(self.dl)

In [None]:
device = get_defualt_deivce()
device

In [None]:
train_dl = DeviceDataLoader(train_dl,device)

In [None]:
import torch.nn as nn

In [None]:
discriminator = nn.Sequential(
    # in:3x64x64

    nn.Conv2d(3,64,kernel_size=4,stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.LeakyReLU(0.2,inplace=True),
    # out: 64x32x32

    nn.Conv2d(64,128,kernel_size=4,stride=2,padding=1,bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),
    # out:128x16x16

    nn.Conv2d(128,256,kernel_size=4,stride=2,padding=1,bias=False),
    nn.BatchNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 256x8x8


    nn.Conv2d(256,512,kernel_size=4,stride=2,padding=1,bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True),
    # out 512x4x4

    nn.Conv2d(512,1,kernel_size=4,stride=1,padding=0,bias=False),
    # out : 1x1x1

    nn.Flatten(),
    nn.Sigmoid()


)

In [None]:
discriminator = to_device(discriminator,device)

In [None]:
latent_size = 128

In [None]:
generator = nn.Sequential(
    # in: latent_size x1x1

    nn.ConvTranspose2d(latent_size,512, kernel_size=4,stride=1,padding=0,bias=False),
    nn.BatchNorm2d(512),
    nn.ReLU(True),
    # out: 512x4x4

    nn.ConvTranspose2d(512,256, kernel_size=4,stride=2,padding=1,bias=False),
    nn.BatchNorm2d(256),
    nn.ReLU(True),
    # out: 256x8x8

    nn.ConvTranspose2d(256,128, kernel_size=4,stride=2,padding=1,bias=False),
    nn.BatchNorm2d(128),
    nn.ReLU(True),
    # out: 128x16x16

    nn.ConvTranspose2d(128,64, kernel_size=4,stride=2,padding=1,bias=False),
    nn.BatchNorm2d(64),
    nn.ReLU(True),
    # out: 64x32x32

    nn.ConvTranspose2d(64,3, kernel_size=4,stride=2,padding=1,bias=False),
    nn.Tanh()
    # out: 512x4x4            

)

In [None]:
xb = torch.randn(batch_size,latent_size, 1,1 )  # random latent tensors
fake_images = generator(xb)
print(fake_images.shape)
show_images(fake_images)

In [None]:
generator = to_device(generator,device)

In [None]:
def train_discriminator(real_images, opt_d):
    # Clear discriminator gradients
    opt_d.zero_grad()

    # Pass real images through discriminator
    real_preds = discriminator(real_images)
    real_targets = torch.ones(real_images.size(0), 1, device=device)
    real_loss = F.binary_cross_entropy(real_preds, real_targets)
    real_score = torch.mean(real_preds).item()
    
    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images = generator(latent)

    # Pass fake images through discriminator
    fake_targets = torch.zeros(fake_images.size(0), 1, device=device)
    fake_preds = discriminator(fake_images)
    fake_loss = F.binary_cross_entropy(fake_preds, fake_targets)
    fake_score = torch.mean(fake_preds).item()

    # Update discriminator weights
    loss = real_loss + fake_loss
    loss.backward()
    opt_d.step()
    return loss.item(), real_score, fake_score

In [None]:
def train_generator(opt_g):
  # Clear generator gradients
  opt_g.zero_grad()

  # Generate fake image
  latent = torch.randn(batch_size,latent_size,1,1,device=device)
  fake_images = generator(latent)

  # Try to fool the discriminator
  preds = discriminator(fake_images)
  targets = torch.ones(batch_size,1,device=device)
  loss = F.binary_cross_entropy(preds,targets)

  # Update generator weights
  loss.backward()
  opt_g.step()

  return loss.item

In [None]:
from torchvision.utils import save_image

In [None]:
sample_dir = 'generated'
os.makedirs(sample_dir,exist_ok=True)

In [None]:
def save_samples(index,latent_tensors,show=True):
  fake_images = generator(latent_tensors)
  fake_fname = 'generated-images-{0:0=4d}.png'.format(index)
  save_image(denorm(fake_images),os.path.join(sample_dir,fake_fname),nrow=8)
  print("Saving",fake_fname)
  if show:
    fig,ax=plt.subplots(figsize=(8,8))
    ax.set_xticks([]);ax.set_yticks([])
    ax.imshow(make_grid(fake_images.cpu().detach(),nrow=8).permute(1,2,0))

In [None]:
fixed_latent = torch.randn(64,latent_size,1,1,device=device)

In [None]:
save_samples(0,fixed_latent)

In [None]:
from tqdm.notebook import tqdm
import torch.nn.functional as F

In [None]:
def fit(epochs, lr, start_idx=1):
    torch.cuda.empty_cache()
    
    # Losses & scores
    losses_g = []
    losses_d = []
    real_scores = []
    fake_scores = []
    
    # Create optimizers
    opt_d = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
    opt_g = torch.optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
    
    for epoch in range(epochs):
        for real_images, _ in tqdm(train_dl):
            # Train discriminator
            loss_d, real_score, fake_score = train_discriminator(real_images, opt_d)
            # Train generator
            loss_g = train_generator(opt_g)
            
        # Record losses & scores
        losses_g.append(loss_g)
        losses_d.append(loss_d)
        real_scores.append(real_score)
        fake_scores.append(fake_score)
        
        # Log losses & scores (last batch)
        print("Epoch [{}/{}], loss_g:{},loss_d:{}, real_scores:{},fake_scroes:{:.4f}".format(epoch+1,epochs,loss_g,loss_d,real_score,fake_score))
    
        # Save generated images
        save_samples(epoch+start_idx, fixed_latent, show=False)
    
    return losses_g, losses_d, real_scores, fake_scores

In [None]:
lr = 0.0002
epochs= 80

In [None]:
history = fit(epochs,lr)

In [None]:
losses_g, losses_d, real_scores, fake_scores =history

In [None]:
# Save the model checkpoints
torch.save(generator.state_dict(),'G.pth')
torch.save(discriminator.state_dict(),'D.pth')

In [None]:
from IPython.display import Image

In [None]:
Image('/kaggle/working/generated/generated-images-0071.png')

In [None]:
import cv2
import os

vid_fname = 'gans_training.avi'

files = [os.path.join(sample_dir,f) for f in os.listdir(sample_dir) if 'generated' in f]

files.sort()

out = cv2.VideoWriter(vid_fname,cv2.VideoWriter_fourcc(*'MP4V'),1,(530,530))
[out.write(cv2.imread(fname)) for fname in files]
out.release()

In [None]:
plt.plot(real_scores, '-')
plt.plot(fake_scores, '-')
plt.xlabel('epoch')
plt.ylabel('score')
plt.legend(['Real', 'Fake'])
plt.title('Scores');

In [None]:
!zip -r file.zip /kaggle/working/generated

#   adding: kaggle/working/ (stored 0%)
#   adding: kaggle/working/__notebook__.ipynb (deflated 42%)
#   adding: kaggle/working/plot.jpg (deflated 17%)



In [None]:
!ls

In [None]:
import zipfile
import os
from IPython.display import FileLink

In [None]:
def zip_dir(directory = os.curdir, file_name = 'file.zip'):
    """
    zip all the files in a directory
    
    Parameters
    _____
    directory: str
        directory needs to be zipped, defualt is current working directory
        
    file_name: str
        the name of the zipped file (including .zip), default is 'directory.zip'
        
    Returns
    _____
    Creates a hyperlink, which can be used to download the zip file)
    """
    os.chdir(directory)
    zip_ref = zipfile.ZipFile(file_name, mode='w')
    for folder, _, files in os.walk(directory):
        for file in files:
            if file_name in file:
                pass
            else:
                zip_ref.write(os.path.join(folder, file))

    return FileLink(file_name)

In [None]:
zip_dir()