In [None]:
project_name = "06b-anime-degan"

In [None]:
# import opendatasets as od

# dataset_url = "https://www.kaggle.com/splcher/animefacedataset"
# od.download(dataset_url)

In [None]:
import os

DATA_DIR = "./animefacedataset"
print(os.listdir(DATA_DIR))

In [None]:
print(os.listdir(DATA_DIR + "/images")[:10])

In [None]:
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import torchvision.transforms as T

In [None]:
image_size = 64
batch_size = 128
stats = (0.5, 0.5, 0.5), (0.5, 0.5, 0.5)

In [None]:
train_ds = ImageFolder(DATA_DIR, transform=T.Compose([
    T.Resize(image_size),
    T.CenterCrop(image_size),
    T.ToTensor(),
    T.Normalize(*stats)]))  # data augementation.
# print(train_ds[0][0].shape)  # torch.Size([3, 64, 64])
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=3, pin_memory=True)

In [None]:
import torch
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
def denorm(img_tensors):  # normalization: (x - avg) / variance
    return img_tensors * stats[1][0] + stats[0][0]  # data/image denormalization

In [None]:
def show_images(images, nmax=64):
    fig, ax = plt.subplots(figsize=(8, 8))  # pyplot.subplot > figure.subplots() > gridspec.subplots() > np.empty(), with element as figure.add_subplot() > returns ~.axes.Axes to the figure as part of a subplot arrangement.
    print(">>>: ", type(ax), ax)
    ax.set_xticks([]); ax.set_yticks([])
    # print(">>>>: ", images[:nmax])
    ax.imshow(make_grid(denorm(images.detach()[:nmax]), nrow=8).permute(1, 2, 0))  # display data as an image, i.e., on a 2D regular raster.
    # make_grid, returns the tensor containing grid of images.
    # imshow, displays data as an image, i.e., on a 2D regular raster.

def show_batch(dl, nmax=64):
    for images, _ in dl:  # the dl is composed of batches, _ is the label tensor.
        # print("<><><>: ", type(_), _)
        show_images(images, nmax)
        break

In [None]:
show_batch(train_dl)

In [None]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

def to_device(data, device):
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device

    def __iter__(self):
        for b in self.dl:
            yield to_device(b, self.device)

    def __len__(self):
        return len(self.dl)

In [None]:
device = get_default_device()
device

In [None]:

train_dl = DeviceDataLoader(train_dl, device)

In [None]:
print(type(train_dl), "\n", train_dl)

In [None]:
import torch.nn as nn

In [None]:
# find out the formula to calculate the size of the output of a module.
discriminator = nn.Sequential(  # the original image's size is 3 x 64 x 64
    # in: 3 x 64 x 64
    nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False),
    # Conv2d.__init__(self, in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias, padding_mode, device, dtype)
    nn.BatchNorm2d(64),  # 64 is num_features, the number of features.
    nn.LeakyReLU(0.2, inplace=True),  # modified ReLU, which gived a slope of 0.2 to the variable when it is less than 0.
    # out: 64 x 32 x 32
    
    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 128 * 16 * 16
    
    nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 256 x 8 x 8
    
    nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True),
    # out: 512 x 4 x 4
    
    nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=0, bias=False),
    # out: 1 x 1 x 1
    nn.Flatten(),  # flattens a contiguous range of dims into a tensor.
    nn.Sigmoid())

In [None]:
discriminator = to_device(discriminator, device)  # put the models into device.

In [None]:
latent_size = 128

In [None]:
generator = nn.Sequential(
    # in: latent_size x 1 x 1
    nn.ConvTranspose2d(latent_size, 512, kernel_size=4, stride=1, padding=0, bias=False),

    nn.BatchNorm2d(512),
    nn.ReLU(True),
    # out: 512 x 4 x 4

    nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.ReLU(True),
    # out: 256 x 8 x 8

    nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.ReLU(True),
    # out: 128 x 16 x 16

    nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.ReLU(True),
    # out: 64 x 32 x 32

    # ConvTranspose2d is like reverse operation of Conv2d module.
    nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False),  # ConvTranspose2d.__init__(self, in_channels, out_channels, kernel_size, ...)
    nn.Tanh()
    # out : 3 x 64 x 64
)
"""
ConvTranspose2d, 
    Applies a 2D transposed convolution operator over an input image composed of several input planes.
    This is set so that
    when a :class:`~torch.nn.Conv2d` and a :class:`~torch.nn.ConvTranspose2d`
    are initialized with same parameters, they are inverses of each other in
    regard to the input and output shapes.
"""

In [None]:
# TODO find out how the multi-dimensional matrix initialization
xb = torch.randn(batch_size, latent_size, 1, 1)  # random latent tensors
# print(">> xb is: ", xb)
print("xb size is :", xb.shape)
fake_images = generator(xb)
print(fake_images.shape)  # batch_size, channels, rows, cols; torch.Size([128, 3, 64, 64])
show_images(fake_images)

In [None]:
generator = to_device(generator, device)

In [None]:
def train_discriminator(real_images, opt_d):  # Note: discriminator is the module, while train_discriminator is the process to train the module.
    # here, real_images is a batch of images.
    # Clear discriminator gradients
    opt_d.zero_grad()

    # Pass real images through discriminator
    real_preds = discriminator(real_images)
    real_targets = torch.ones(real_images.size(0), 1, device=device)  # TODO take a look at this ones method.
    # tensor.size(), returns the size of the tensor.
    real_loss = F.binary_cross_entropy(real_preds, real_targets)  # based on the LI's tutorial, the result is not gonna be good.
    real_score = torch.mean(real_preds).item()

    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images = generator(latent)

    # Pass fake images through discriminator
    fake_targets = torch.zeros(fake_images.size(0), 1, device=device)
    fake_preds = discriminator(fake_images)  # the return of discriminator is a number between 0 and 1.
    fake_loss = F.binary_cross_entropy(fake_preds, fake_targets)
    fake_score = torch.mean(fake_preds).item()

    # Update discriminator weights
    loss = real_loss + fake_loss
    loss.backward()  # this function is used to compute the gradient of current tensor wrt graph leaves.
    opt_d.step()  # execute the optimization once. # after this step, the new weight and bias parameters will be saved in this module chain.
    return loss.item(), real_score, fake_score  # loss.item(), returns the value of the tensor as the standard Python number.

In [None]:
def train_generator(opt_g):
    # Clear generator gradients
    opt_g.zero_grad()

    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images = generator(latent)

    # Try to fool the discriminator 
    preds = discriminator(fake_images)
    targets = torch.ones(batch_size, 1, device=device)  # why here the targets is set to ones, other than zeros. A: here, the loss can denote the difference between the real image and fake image, since the discriminator will tag the real image with 1.
    # therefore, the loss can be the difference between the prediction result and 1.
    loss = F.binary_cross_entropy(preds, targets)

    # Update generator weights
    loss.backward()
    opt_g.step()

    return loss.item()  # here convert the tensor to the standard python number.

In [None]:
from torchvision.utils import save_image

In [None]:
sample_dir = "generated"
os.makedirs(sample_dir, exist_ok=True)  # Note: the method is `makedirs`, not makedir or mkdirs.

In [None]:
def save_samples(index, latent_tensors, show=True):
    fake_images = generator(latent_tensors)  # latent_tensors is like seed in random number generator.
    fake_fname = "generated-images-{0:0=4d}.png".format(index)
    save_image(denorm(fake_images), os.path.join(sample_dir, fake_fname), nrow=8)  # nrow, number of images displayed in each row of the grid.
    print("Saving", fake_fname)
    if show:
        fig, ax = plt.subplots(figsize=(8, 8))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(fake_images.cpu().detach(), nrow=8).permute(1, 2, 0))  # tensor.cpu(), returns a copy of this object in CPU memory.
        # tensor.detach(), returns a new tensor, detached from the current graph. the result will never require gradient

In [None]:
fixed_latent = torch.randn(64, latent_size, 1, 1, device=device)  # latent_size = 128

In [None]:
save_samples(0, fixed_latent)  # in this function, it will use generator to generate the fake image.

In [None]:
from tqdm.notebook import tqdm  # experimental IPython/Jupyter Notebook widget using tqdm!
import torch.nn.functional as F

In [None]:
def fit(epochs, lr, start_idx=1):
    torch.cuda.empty_cache()

    losses_g = []
    losses_d = []
    real_scores = []
    fake_scores = []

    opt_d = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))  # module.parameters(), returns an iterator.
    # betas, coefficients used for computing running averages of gradient and its square.
    opt_g = torch.optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))  # this is an optimizer, which will be put in the training model.

    for epoch in range(epochs):
        for real_images, _ in tqdm(train_dl):
            loss_d, real_score, fake_score = train_discriminator(real_images, opt_d)
            loss_g = train_generator(opt_g)
        losses_g.append(loss_g)
        losses_d.append(loss_d)
        real_scores.append(real_score)
        fake_scores.append(fake_score)
        print("Epoch [{} / {}], loss_g: {:.4f}, loss_d: {:.4f}, real_score: {:.4f}, fake_score: {:.4f}".format(
            epoch+1, epochs, loss_g, loss_d, real_score, fake_score))
        save_samples(epoch+start_idx, fixed_latent, show=False)
    return losses_g, losses_d, real_scores, fake_scores

In [None]:
lr = 0.0002
epochs = 200

In [None]:
history = fit(epochs, lr)

In [None]:
losses_g, losses_d, real_scores, fake_scores = history

In [None]:
from IPython.display import Image

In [None]:
Image('./generated/generated-images-0001.png')

In [None]:
Image('./generated/generated-images-0005.png')

In [None]:
Image('./generated/generated-images-0010.png')

In [None]:
Image('./generated/generated-images-0020.png')

In [None]:
Image('./generated/generated-images-0025.png')

In [None]:
import cv2
import os

vid_fname = 'gans_training.avi'

files = [os.path.join(sample_dir, f) for f in os.listdir(sample_dir) if 'generated' in f]
files.sort()

out = cv2.VideoWriter(vid_fname,cv2.VideoWriter_fourcc(*'MP4V'), 1, (530,530))
[out.write(cv2.imread(fname)) for fname in files]
out.release()

In [None]:
plt.plot(losses_d, '-')
plt.plot(losses_g, '-')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['Discriminator', 'Generator'])
plt.title('Losses');

In [None]:
plt.plot(real_scores, '-')
plt.plot(fake_scores, '-')
plt.xlabel('epoch')
plt.ylabel('score')
plt.legend(['Real', 'Fake'])
plt.title('Scores');