# Practice : CycleGAN

[CycleGAN official homepage (from authors)] : https://junyanz.github.io/CycleGAN/ <br>
[CycleGAN original paper] : https://arxiv.org/abs/1703.10593

### Unpaired Image-to-Image Translation

> In many cases, there're no paired images between two distributions. So it's quite hard to directly apply paired image-to-image translation algorithm in this case, such as pix2pix. <br>
The CycleGAN leverages two GAN architectures(2 generators, 2 discriminators) and cycle consistency loss to deal with this case

Install tensorflow 2.8.3 <br>
(just to avoid the bugs which makes the implementation of data augmentation extremely slow)

make sure to install below version of tensorflow, or there might be an error at restoration step

In [1]:
!pip install tensorflow==2.8.3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
!pip install tensorflow-addons

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
!python --version

Python 3.9.16


In [4]:
!nvidia-smi

Mon Apr 10 15:55:25 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.85.12    Driver Version: 525.85.12    CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   55C    P8    10W /  70W |      0MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

### Import useful libraries

In [5]:
import os
import glob
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from tqdm.auto import tqdm
import matplotlib.pyplot as plt


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.8.3 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


In [6]:
print(f'tf_version : {tf.__version__}')

tf_version : 2.8.3


### Connect to google drive (where the images are in)

In [7]:
from google.colab import drive
drive.mount('/tmp/drive')

# base directory
base_path = "/tmp/drive/MyDrive/practice/CycleGAN/"
# change the current working directory to base_path
os.chdir(base_path)
# images directory
image_path = os.path.join(base_path, "images/")

Drive already mounted at /tmp/drive; to attempt to forcibly remount, call drive.mount("/tmp/drive", force_remount=True).


In [8]:
# define category name (string)
name_A = 'henesys'
name_B = 'ellinia'

# image directory for each distribution
image_directory_A = os.path.join(image_path, name_A)
image_directory_B = os.path.join(image_path, name_B)

# just to make sure
images_A = glob.glob(image_directory_A + '/*.*')
images_B = glob.glob(image_directory_B + '/*.*')

num_of_examples_A = len(images_A)
num_of_examples_B = len(images_B)

# print it
print(f'image_directory_A : {image_directory_A}')
print(f'image_directory_B : {image_directory_B}')
print(f'number of images of category A : {num_of_examples_A}')
print(f'number of images of category B : {num_of_examples_B}')

image_directory_A : /tmp/drive/MyDrive/practice/CycleGAN/images/henesys
image_directory_B : /tmp/drive/MyDrive/practice/CycleGAN/images/ellinia
number of images of category A : 173
number of images of category B : 135


### Make our datasets

Note: `Dataset.cache` stores the data from the first epoch and replays it in order. So, using the `cache` method disables any shuffles earlier in the pipeline. Below, `Dataset.shuffle` is added back in after `Dataset.cache`.

from https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/tutorials/load_data/csv.ipynb

In [9]:
BATCH_SIZE = 1
AUTOTUNE = tf.data.AUTOTUNE

def map_training_images(file):
    '''
    convert the images to floats and preprocess them
    '''

    img = tf.io.decode_png(tf.io.read_file(file), channels=3) # decode it as RGB images (3 channels), not RGBA
    img = tf.cast(img, tf.float32)
    img = img / 127.5 - 1                                     # image tensor should lie in [-1, 1]
    img = tf.clip_by_value(img, -1, 1)
    
    return img


def generate_dataset(directory, num_of_examples):
    '''
    function that return tf.data.Dataset instance containing images in given directory
    '''

    dataset = tf.data.Dataset.list_files(os.path.join(directory, "*.*"))
    dataset = dataset.map(map_training_images).cache().shuffle(num_of_examples).batch(BATCH_SIZE).prefetch(AUTOTUNE)

    return dataset


training_dataset_A = generate_dataset(image_directory_A, num_of_examples_A)
training_dataset_B = generate_dataset(image_directory_B, num_of_examples_B)

### Visualize it for test

In [10]:
def plot_image(image_tensor, category_name):
    '''
    function that plot the given image tensor
    '''
    plt.figure(figsize=(10,10))
    plt.grid(False)
    plt.title(category_name)

    image_tensor = np.squeeze(image_tensor)
    # matplotlib.image:Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers)
    # convert it from [-1, 1] to [0, 1]
    image_tensor = (image_tensor + 1) / 2.0
    
    plt.imshow(image_tensor)
    plt.show()


# # comment below if it's unnecessary
# for image_batch in training_dataset_A.take(2):
#     print(f'image_batch_shape : {image_batch.shape}')
#     plot_image(image_batch, name_A)

# for image_batch in training_dataset_B.take(2):
#     print(f'image_batch_shape : {image_batch.shape}')
#     plot_image(image_batch, name_B)

## Define our model

(followed the architecture of the original paper)

As the original authors had noted, since the CycleGAN has 4 models (2 for generators, 2 for discriminators), it is quite memory-intensive <br>
<br>
So, instead of using the original image, we need to crop the portion of it <br>
1. crop the image into size (IMAGE_SIZE, IMAGE_SIZE, 3)
2. randomly flip it horizontally

In [11]:
# cropped image
IMAGE_SIZE = 480    # it should be multiple of 4, since we will use resnet based generator with 4x downsampling & upsampling
                    # to compute the cycle consistency loss later, the size of input & output should match

# data augmentation
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomCrop(IMAGE_SIZE, IMAGE_SIZE),
    tf.keras.layers.RandomFlip(mode='horizontal'),
])

augmented image looks like this

In [12]:
# # comment below if it's unnecessary
# for image_batch in training_dataset_A.take(2):
#     augmented_batch = data_augmentation(image_batch)
#     print(f'augmented_batch_shape : {augmented_batch.shape}')
#     plot_image(augmented_batch, name_A)

# for image_batch in training_dataset_B.take(2):
#     augmented_batch = data_augmentation(image_batch)
#     print(f'augmented_batch_shape : {augmented_batch.shape}')
#     plot_image(augmented_batch, name_B)

### Generator

First define our residual block

In [13]:
class Residual_Block(tf.keras.Model):
    '''
    Residual Block class:
        consists of Conv2d - InstanceNorm - Relu - Conv2d - InstanceNorm - Add(Residual Connection)
        reflection padding was used to reduce artifacts
    '''

    def __init__(self, input_channels):
        super(Residual_Block, self).__init__()

        self.conv_1 = tf.keras.layers.Conv2D(filters=input_channels, kernel_size=3, padding='valid', use_bias=False, 
                                             kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.instance_norm_1 = tfa.layers.InstanceNormalization()

        self.conv_2 = tf.keras.layers.Conv2D(filters=input_channels, kernel_size=3, padding='valid', use_bias=False, 
                                             kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.instance_norm_2 = tfa.layers.InstanceNormalization()

        self.activation = tf.keras.layers.ReLU()

    def reflection_pad(self, input, pad_size):
        return tf.pad(input, [[0, 0], [pad_size, pad_size], [pad_size, pad_size], [0, 0]], mode='REFLECT')

    def call(self, inputs):
        x = self.reflection_pad(inputs, 1)
        x = self.conv_1(x)
        x = self.instance_norm_1(x)
        x = self.activation(x)

        x = self.reflection_pad(x, 1)
        x = self.conv_2(x)
        x = self.instance_norm_2(x)

        return x + inputs

Next, define the CycleGAN generator composed of contracting block & 9 residual blocks & expanding block

In [14]:
class CycleGAN_Generator(tf.keras.Model):
    '''
    CycleGAN Generator class
    contracting block + 9 residual blocks + expanding block
    '''

    def __init__(self, input_channels, output_channels, hidden_channels=64, name=""):
        super(CycleGAN_Generator, self).__init__()

        if name:
            self._name = name

        # followed the notation of the paper
        # for c7s1-64
        self.c7s1_64_conv = tf.keras.layers.Conv2D(filters=hidden_channels, kernel_size=7, padding='valid', use_bias=False, 
                                                   kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.c7s1_64_instance_norm = tfa.layers.InstanceNormalization()

        # for d128
        self.d128_conv = tf.keras.layers.Conv2D(filters=2*hidden_channels, kernel_size=3, strides=2, padding='valid', use_bias=False, 
                                                kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.d128_instance_norm = tfa.layers.InstanceNormalization()

        # for d256
        self.d256_conv = tf.keras.layers.Conv2D(filters=4*hidden_channels, kernel_size=3, strides=2, padding='valid', use_bias=False, 
                                                kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.d256_instance_norm = tfa.layers.InstanceNormalization()

        # for residual blocks
        R_channels = 4*hidden_channels
        self.R256_1 = Residual_Block(R_channels)
        self.R256_2 = Residual_Block(R_channels)
        self.R256_3 = Residual_Block(R_channels)
        self.R256_4 = Residual_Block(R_channels)
        self.R256_5 = Residual_Block(R_channels)
        self.R256_6 = Residual_Block(R_channels)
        self.R256_7 = Residual_Block(R_channels)
        self.R256_8 = Residual_Block(R_channels)
        self.R256_9 = Residual_Block(R_channels)

        # for u128
        self.u128_conv_transpose = tf.keras.layers.Conv2DTranspose(filters=2*hidden_channels, kernel_size=3, strides=2, padding='same', use_bias=False, 
                                                                   kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.u128_instance_norm = tfa.layers.InstanceNormalization()

        # for u256
        self.u256_conv_transpose = tf.keras.layers.Conv2DTranspose(filters=hidden_channels, kernel_size=3, strides=2, padding='same', use_bias=False, 
                                                                   kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.u256_instance_norm = tfa.layers.InstanceNormalization()

        # for c7s1-3
        self.c7s1_3_conv = tf.keras.layers.Conv2D(filters=output_channels, kernel_size=7, padding='valid', 
                                                  kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))

        # activation
        self.relu = tf.keras.layers.ReLU()


    def reflection_pad(self, input, pad_size):
        return tf.pad(input, [[0, 0], [pad_size, pad_size], [pad_size, pad_size], [0, 0]], mode='REFLECT')


    def call(self, inputs):
        # c7s1-64
        x = self.reflection_pad(inputs, 3)
        x = self.c7s1_64_conv(x)
        x = self.c7s1_64_instance_norm(x)
        x = self.relu(x)

        # d128
        x = self.reflection_pad(x, 1)
        x = self.d128_conv(x)
        x = self.d128_instance_norm(x)
        x = self.relu(x)

        # d256
        x = self.reflection_pad(x, 1)
        x = self.d256_conv(x)
        x = self.d256_instance_norm(x)
        x = self.relu(x)

        # R256_1~9
        x = self.R256_1(x)
        x = self.R256_2(x)
        x = self.R256_3(x)
        x = self.R256_4(x)
        x = self.R256_5(x)
        x = self.R256_6(x)
        x = self.R256_7(x)
        x = self.R256_8(x)
        x = self.R256_9(x)

        # u128
        x = self.u128_conv_transpose(x)
        x = self.u128_instance_norm(x)
        x = self.relu(x)

        # u256
        x = self.u256_conv_transpose(x)
        x = self.u256_instance_norm(x)
        x = self.relu(x)

        # c7s1-3
        x = self.reflection_pad(x, 3)
        x = self.c7s1_3_conv(x)

        return tf.keras.activations.tanh(x)


In [15]:
# temp_generator = CycleGAN_Generator(3, 3)

In [16]:
# for image_batch in training_dataset_A.take(1):
#     augmented_batch = data_augmentation(image_batch)
#     temp_generated = temp_generator(augmented_batch)
#     print(temp_generated.shape)

In [17]:
# temp_generator.summary()

### Discriminator

PatchGAN distriminator with receptive field : 70 <br>
- C64(w/o instance_norm) 
- C128 
- C256 
- C512 
- (Conv to output 1 channel prediction map)

In [18]:
class CycleGAN_discriminator(tf.keras.Model):
    '''
    PatchGAN discriminator with receptive field : 70
    '''

    def __init__(self, input_channels, hidden_channels=64, name=""):
        super(CycleGAN_discriminator, self).__init__()

        if name:
            self._name = name

        # for C64
        self.C64_conv = tf.keras.layers.Conv2D(filters=hidden_channels, kernel_size=4, strides=2, padding='valid', use_bias=True, 
                                               kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        
        # for C128
        self.C128_conv = tf.keras.layers.Conv2D(filters=2*hidden_channels, kernel_size=4, strides=2, padding='valid', use_bias=False, 
                                                kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.C128_instance_norm = tfa.layers.InstanceNormalization()

        # for C256
        self.C256_conv = tf.keras.layers.Conv2D(filters=4*hidden_channels, kernel_size=4, strides=2, padding='valid', use_bias=False, 
                                                kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.C256_instance_norm = tfa.layers.InstanceNormalization()

        # for C512
        self.C512_conv = tf.keras.layers.Conv2D(filters=8*hidden_channels, kernel_size=4, strides=1, padding='valid', use_bias=False, 
                                                kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))
        self.C512_instance_norm = tfa.layers.InstanceNormalization()

        # for output
        self.output_conv = tf.keras.layers.Conv2D(filters=1, kernel_size=4, strides=1, padding='valid', use_bias=True, 
                                                  kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.02))

        # activation
        self.LeakyReLU = tf.keras.layers.LeakyReLU(alpha=0.2)


    def reflection_pad(self, input, pad_size):
        return tf.pad(input, [[0, 0], [pad_size, pad_size], [pad_size, pad_size], [0, 0]], mode='REFLECT')

    
    def call(self, inputs):
        # C64
        x = self.reflection_pad(inputs, 1)
        x = self.C64_conv(x)
        # authors didn't used instance norm in the very first C64 block in discriminator
        x = self.LeakyReLU(x)

        # C128
        x = self.reflection_pad(x, 1)
        x = self.C128_conv(x)
        x = self.C128_instance_norm(x)
        x = self.LeakyReLU(x)

        # C256
        x = self.reflection_pad(x, 1)
        x = self.C256_conv(x)
        x = self.C256_instance_norm(x)
        x = self.LeakyReLU(x)

        # C512
        x = self.reflection_pad(x, 1)
        x = self.C512_conv(x)
        x = self.C512_instance_norm(x)
        x = self.LeakyReLU(x)

        # output
        x = self.reflection_pad(x, 1)
        x = self.output_conv(x)
        return x


In [19]:
# temp_discriminator = CycleGAN_discriminator(3)
# temp_result = temp_discriminator(temp_generated)
# print(temp_result.shape)

In [20]:
# temp_discriminator.summary()

## Define our loss

### Discriminator Loss

- Adversarial Loss (least square loss from LSGAN) <br>
https://arxiv.org/abs/1611.04076

In [21]:
def discriminator_loss(real_D_out, fake_D_out):
    '''
    CycleGAN discriminator loss (LSGAN loss)

    <params>
        real_D_out : discriminator's output given real images from certain distribution
        fake_D_out : discriminator's output given fake images generated from the other distribution
                     (by putting the image of other distribution to corresponding generator)
    '''

    # followed the authors, divide it by 2, which slows down the the rate at which D learns
    return 0.5 * (tf.math.reduce_mean(tf.math.squared_difference(real_D_out, tf.ones_like(real_D_out))) + 
                  tf.math.reduce_mean(tf.math.squared_difference(fake_D_out, tf.zeros_like(fake_D_out))))

### Generator Loss

- Adversarial Loss (least square loss from LSGAN) <br>
https://arxiv.org/abs/1611.04076
- Cycle Consistency Loss
- (optional) Identity Loss

Adversarial Loss

In [22]:
def generator_adversarial_loss(fake_D_out):
    '''
    adversarial loss (LSGAN loss) of CycleGAN generator loss

    <params>
        fake_D_out : discriminator's output given fake images generated from the other distribution
    '''

    return tf.math.reduce_mean(tf.math.squared_difference(fake_D_out, tf.ones_like(fake_D_out)))

Cycle Consistency Loss

In [23]:
# weight for cycle consistency loss
LAMBDA = 10

In [24]:
def generator_cycle_consistency_loss(real_images, cycled_images):
    '''
    cycle consistency loss of CycleGAN generator loss

    <params>
        real_images : real images from certain distribution
        cycled_images : images generated by putting the above real_images
                        to the two generators (in appropriate order)
    '''

    return tf.math.reduce_mean(tf.math.abs(real_images - cycled_images))

Omit the Identity Loss

## Initialize our Models & Optimizers & Checkpoints

In [25]:
####### initialize our models #######
# generators
generator_A_to_B = CycleGAN_Generator(input_channels=3, output_channels=3, name=f'{name_A}2{name_B}_generator')
generator_B_to_A = CycleGAN_Generator(input_channels=3, output_channels=3, name=f'{name_B}2{name_A}_generator')

# discriminators
discriminator_A = CycleGAN_discriminator(input_channels=3, name=f'{name_A}_discriminator')
discriminator_B = CycleGAN_discriminator(input_channels=3, name=f'{name_B}_discriminator')

In [26]:
####### corresponding optimizers #######
learning_rate = 2e-4  # 0.0002

generator_A_to_B_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.5, beta_2=0.999)
generator_B_to_A_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.5, beta_2=0.999)

discriminator_A_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.5, beta_2=0.999)
discriminator_B_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.5, beta_2=0.999)

Checkpoints

In [27]:
# current model name (A2B)
cur_model_checkpoint_directory = f"checkpoints/{name_A}2{name_B}/"

# checkpoint_path
checkpoint_path = os.path.join(base_path, cur_model_checkpoint_directory)

# checkpoint
ckpt = tf.train.Checkpoint(generator_A_to_B=generator_A_to_B,
                           generator_B_to_A=generator_B_to_A,
                           discriminator_A=discriminator_A,
                           discriminator_B=discriminator_B,
                           generator_A_to_B_optimizer=generator_A_to_B_optimizer,
                           generator_B_to_A_optimizer=generator_B_to_A_optimizer,
                           discriminator_A_optimizer=discriminator_A_optimizer,
                           discriminator_B_optimizer=discriminator_B_optimizer)

# checkpoint manager
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# if checkpoint exists, restore the latest checkpoint
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print('Latest checkpoint restored!!')

Latest checkpoint restored!!


## Training

In [28]:
@tf.function
def train_step(real_A, real_B, LAMBDA):
    '''
    function that proceeds 1 step of training process

    <params>
        real_A : batch of real images in A
        real_B : batch of real images in B
    '''

    with tf.GradientTape(persistent=True) as tape:
        # fake_A & fake_B
        fake_A = generator_B_to_A(real_B)
        fake_B = generator_A_to_B(real_A)

        # cycled_A & cycled_B
        cycled_A = generator_B_to_A(fake_B)
        cycled_B = generator_A_to_B(fake_A)

        # discriminator's outputs for real & fake images
        disc_real_A = discriminator_A(real_A)
        disc_real_B = discriminator_B(real_B)

        disc_fake_A = discriminator_A(fake_A)
        disc_fake_B = discriminator_B(fake_B)

        # calculate the loss
        # note that the cycled_A & cycled_B used both generator_A_to_B & B_to_A -> precompute it
        total_cycle_consistency_loss = generator_cycle_consistency_loss(real_A, cycled_A) + \
                                       generator_cycle_consistency_loss(real_B, cycled_B)

        # generator loss
        generator_A_to_B_loss = generator_adversarial_loss(disc_fake_B) + \
                                LAMBDA * total_cycle_consistency_loss
        generator_B_to_A_loss = generator_adversarial_loss(disc_fake_A) + \
                                LAMBDA * total_cycle_consistency_loss
        
        # discriminator loss
        discriminator_A_loss = discriminator_loss(disc_real_A, disc_fake_A)
        discriminator_B_loss = discriminator_loss(disc_real_B, disc_fake_B)

    # compute the gradients by backpropagation
    # we set the persistent parameter to True above in the tf.GradientTape
    # since we're going to calculate gradients more than 1 times
    # (otherwise, after one call, the tape will expire)
    generator_A_to_B_gradients = tape.gradient(generator_A_to_B_loss, 
                                               generator_A_to_B.trainable_variables)
    generator_B_to_A_gradients = tape.gradient(generator_B_to_A_loss,
                                               generator_B_to_A.trainable_variables)
    discriminator_A_gradients = tape.gradient(discriminator_A_loss,
                                              discriminator_A.trainable_variables)
    discriminator_B_gradients = tape.gradient(discriminator_B_loss,
                                              discriminator_B.trainable_variables)

    # update the weights in the models using optimizer
    generator_A_to_B_optimizer.apply_gradients(zip(generator_A_to_B_gradients,
                                                   generator_A_to_B.trainable_variables))
    generator_B_to_A_optimizer.apply_gradients(zip(generator_B_to_A_gradients,
                                                   generator_B_to_A.trainable_variables))
    discriminator_A_optimizer.apply_gradients(zip(discriminator_A_gradients,
                                                  discriminator_A.trainable_variables))
    discriminator_B_optimizer.apply_gradients(zip(discriminator_B_gradients,
                                                  discriminator_B.trainable_variables))

    # free it
    del tape

    return generator_A_to_B_loss, generator_B_to_A_loss, discriminator_A_loss, discriminator_B_loss

In [29]:
def plot_images_during_training(real_A, real_B, name_A, name_B):
    '''
    function that plot the images during training
    just to see how it progress
    '''
    fake_A = generator_B_to_A(real_B)
    fake_B = generator_A_to_B(real_A)

    plt.figure(figsize=(10, 10))

    display_list = [np.squeeze(real_A), np.squeeze(fake_B), np.squeeze(real_B), np.squeeze(fake_A)]
    title = [f'real {name_A}', f'fake {name_B}', f'real {name_B}', f'fake {name_A}']

    for i in range(4):
        plt.subplot(2, 2, i+1)
        plt.title(title[i])
        # from [-1, 1] to [0, 1]
        plt.imshow((display_list[i] + 1) / 2.0)
        plt.axis('off')

    plt.show()

Now we could finally train our models

In [30]:
# if we load one & decide to continue training
# set the epoch offset
epoch_offset = 2000
num_of_epochs = 0

display_frequency = 1     # in epochs
save_frequency = 25       # in epochs

# generator_A_to_B_loss, generator_B_to_A_loss, discriminator_A_loss, discriminator_B_loss
sum_of_generator_A_to_B_loss = 0
sum_of_generator_B_to_A_loss = 0
sum_of_discriminator_A_loss = 0
sum_of_discriminator_B_loss = 0

# since we have the datasets with different number of examples,
# the minimum of their size will become the number of steps per epoch
steps_per_epoch = min(num_of_examples_A, num_of_examples_B)

# training process
for epoch in range(epoch_offset, epoch_offset + num_of_epochs):

    print(f'================= Epoch {epoch + 1} begins =================')
    print()

    # for epoch 1000 to 2000, apply linear decay to 0 for learning rate
    if epoch >= 1000:
        # compute the learning rate of cur epoch
        cur_learning_rate = learning_rate - (learning_rate/1000)*(epoch - 1000)
        
        # update the learning rate of 4 optimizers
        tf.keras.backend.set_value(generator_A_to_B_optimizer.learning_rate, cur_learning_rate)
        tf.keras.backend.set_value(generator_B_to_A_optimizer.learning_rate, cur_learning_rate)
        tf.keras.backend.set_value(discriminator_A_optimizer.learning_rate, cur_learning_rate)
        tf.keras.backend.set_value(discriminator_B_optimizer.learning_rate, cur_learning_rate)

        print(f'Linearly decay the learning rate => current learning rate : {cur_learning_rate}')
    
    for real_A_raw, real_B_raw in tqdm(tf.data.Dataset.zip((training_dataset_A, training_dataset_B))):
        # augment it
        real_A = data_augmentation(real_A_raw)
        real_B = data_augmentation(real_B_raw)

        # train_step
        generator_A_to_B_loss, generator_B_to_A_loss, discriminator_A_loss, discriminator_B_loss = train_step(real_A, real_B, LAMBDA)

        # cumulate it
        # later we need to divide it by steps_per_epoch
        sum_of_generator_A_to_B_loss += generator_A_to_B_loss
        sum_of_generator_B_to_A_loss += generator_B_to_A_loss
        sum_of_discriminator_A_loss += discriminator_A_loss
        sum_of_discriminator_B_loss += discriminator_B_loss

    # print the losses
    print(f'mean_generator_A_to_B_loss : {sum_of_generator_A_to_B_loss / steps_per_epoch}')
    print(f'mean_generator_B_to_A_loss : {sum_of_generator_B_to_A_loss / steps_per_epoch}')
    print(f'mean_discriminator_A_loss : {sum_of_discriminator_A_loss / steps_per_epoch}')
    print(f'mean_discriminator_B_loss : {sum_of_discriminator_B_loss / steps_per_epoch}')

    # updata them for the next epoch
    sum_of_generator_A_to_B_loss = 0
    sum_of_generator_B_to_A_loss = 0
    sum_of_discriminator_A_loss = 0
    sum_of_discriminator_B_loss = 0

    # display for each display_frequency epochs
    if (epoch + 1) % display_frequency == 0:
        plot_images_during_training(real_A, real_B, name_A, name_B)

    # save the model for each save_frequency epochs
    if (epoch + 1) % save_frequency == 0:
        ckpt_save_path = ckpt_manager.save()
        print('Saving checkpoint for epoch {} at {}'.format(epoch+1, ckpt_save_path))
    
    print()
    print(f'================= Epoch {epoch + 1} ends ===================')

    

## Application

- Since the network is fully convolutional, when training is done, we could apply it to arbitrary size image (even the original one)<br>
- If width or height is not multiple of 4, then the size of the resulting image might not be the same as that of the original

### To original image

- Simply by skipping the augmentation part

In [31]:
from PIL import Image

# # the number of the original images to print
# num_of_original_images_to_print = 10

# counter = 1

# # take the original images
# for real_A, real_B in tqdm(tf.data.Dataset.zip((training_dataset_A, training_dataset_B)).take(num_of_original_images_to_print)):

#     # crop the images to make width & height multiple of 4 (otherwise, at the concatenation step, error will occur)
#     real_A = real_A[:, :(real_A.shape[1] // 4) * 4, :(real_A.shape[2] // 4) * 4, :]
#     real_B = real_B[:, :(real_B.shape[1] // 4) * 4, :(real_B.shape[2] // 4) * 4, :]

#     # code block in the function plot_images_during_training
#     fake_A = generator_B_to_A(real_B)
#     fake_B = generator_A_to_B(real_A)

#     plt.figure(figsize=(18, 18))

#     squeezed_real_A = np.squeeze(real_A)
#     squeezed_fake_B = np.squeeze(fake_B)
#     squeezed_real_B = np.squeeze(real_B)
#     squeezed_fake_A = np.squeeze(fake_A)

#     display_list = [squeezed_real_A, squeezed_fake_B, squeezed_real_B, squeezed_fake_A]
#     title = [f'real {name_A}', f'fake {name_B}', f'real {name_B}', f'fake {name_A}']

#     for i in range(4):
#         plt.subplot(2, 2, i+1)
#         plt.title(title[i])
#         # from [-1, 1] to [0, 1]
#         plt.imshow((display_list[i] + 1) / 2.0)
#         plt.axis('off')

#     plt.show()

#     # concatenate them & download it
#     concatenated1 = np.concatenate((squeezed_real_A, squeezed_fake_B), axis=np.argmin(squeezed_real_A.shape[:2]))
#     concatenated2 = np.concatenate((squeezed_real_B, squeezed_fake_A), axis=np.argmin(squeezed_real_B.shape[:2]))

#     # from [-1, 1] to [0, 255]
#     concatenated1 = ((concatenated1 + 1) * 127.5).astype(np.uint8)
#     concatenated2 = ((concatenated2 + 1) * 127.5).astype(np.uint8)

#     im1 = Image.fromarray(concatenated1, mode="RGB")
#     im2 = Image.fromarray(concatenated2, mode="RGB")

#     # since our current working directory base_path (in the google drive)
#     # we could see the result there
#     im1.save(f"{name_A}2{name_B} result{counter}.jpeg")
#     im2.save(f"{name_B}2{name_A} result{counter}.jpeg")

#     counter += 1

### To test image


- test it for the image which hasn't been used in training
- use the same scale (in pixels) as in the game (not necessary, but recommended)
- to make the test distribution as similar as possible to training one

In [32]:
# test images directory
test_image_path = os.path.join(base_path, "test_images/")

# test image directory for each distribution
test_image_directory_A = os.path.join(test_image_path, name_A)
test_image_directory_B = os.path.join(test_image_path, name_B)

# just to make sure
test_images_A = glob.glob(test_image_directory_A + '/*.*')
test_images_B = glob.glob(test_image_directory_B + '/*.*')

num_of_test_examples_A = len(test_images_A)
num_of_test_examples_B = len(test_images_B)

# print it
print(f'test_image_directory_A : {test_image_directory_A}')
print(f'test_image_directory_B : {test_image_directory_B}')
print(f'number of test images of category A : {num_of_test_examples_A}')
print(f'number of test images of category B : {num_of_test_examples_B}')

test_dataset_A = generate_dataset(test_image_directory_A, num_of_test_examples_A)
test_dataset_B = generate_dataset(test_image_directory_B, num_of_test_examples_B)

test_image_directory_A : /tmp/drive/MyDrive/practice/CycleGAN/test_images/henesys
test_image_directory_B : /tmp/drive/MyDrive/practice/CycleGAN/test_images/ellinia
number of test images of category A : 6
number of test images of category B : 5


In [33]:
# # A to B

# counter = 1

# for real_A in tqdm(test_dataset_A):
#     real_A = real_A[:, :(real_A.shape[1] // 4) * 4, :(real_A.shape[2] // 4) * 4, :]

#     # generate the fake image
#     fake_B = generator_A_to_B(real_A)

#     plt.figure(figsize=(18, 18))

#     # squeeze the batch dimension
#     squeezed_real_A = np.squeeze(real_A)
#     squeezed_fake_B = np.squeeze(fake_B)

#     display_list = [squeezed_real_A, squeezed_fake_B]
#     title = [f'real {name_A}', f'fake {name_B}']

#     for i in range(2):
#         plt.subplot(1, 2, i+1)
#         plt.title(title[i])
#         # from [-1, 1] to [0, 1]
#         plt.imshow((display_list[i] + 1) / 2.0)
#         plt.axis('off')

#     plt.show()

#     # concatenate them & download it
#     concatenated = np.concatenate((squeezed_real_A, squeezed_fake_B), axis=np.argmin(squeezed_real_A.shape[:2]))

#     # from [-1, 1] to [0, 255]
#     concatenated = ((concatenated + 1) * 127.5).astype(np.uint8)

#     im = Image.fromarray(concatenated, mode="RGB")
#     image_name = f"{name_A}2{name_B} result{counter}.jpeg"
#     im.save(image_name)

#     counter += 1

In [34]:
# # B to A

# counter = 1

# for real_B in tqdm(test_dataset_B):
#     real_B = real_B[:, :(real_B.shape[1] // 4) * 4, :(real_B.shape[2] // 4) * 4, :]

#     # generate the fake image
#     fake_A = generator_B_to_A(real_B)

#     plt.figure(figsize=(18, 18))

#     # squeeze the batch dimension
#     squeezed_real_B = np.squeeze(real_B)
#     squeezed_fake_A = np.squeeze(fake_A)

#     display_list = [squeezed_real_B, squeezed_fake_A]
#     title = [f'real {name_B}', f'fake {name_A}']

#     for i in range(2):
#         plt.subplot(1, 2, i+1)
#         plt.title(title[i])
#         # from [-1, 1] to [0, 1]
#         plt.imshow((display_list[i] + 1) / 2.0)
#         plt.axis('off')

#     plt.show()

#     # concatenate them & download it
#     concatenated = np.concatenate((squeezed_real_B, squeezed_fake_A), axis=np.argmin(squeezed_real_B.shape[:2]))

#     # from [-1, 1] to [0, 255]
#     concatenated = ((concatenated + 1) * 127.5).astype(np.uint8)

#     im = Image.fromarray(concatenated, mode="RGB")
#     image_name = f"{name_B}2{name_A} result{counter}.jpeg"
#     im.save(image_name)

#     counter += 1

### To video (frame by frame)

https://www.tensorflow.org/tutorials/load_data/video
<br>
- during processing, you will likely to encounter out of memory error
- then consider reducing the number of frames
- or free the unnecessary variables
- or restart runtime & proceed again
- the generated gif file would likely to have large file size => try compressing it

In [35]:
# !pip install scikit-video

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [36]:
# import skvideo.io  

# def preprocess_frames(frames):
#     '''
#     function that preprocess the video frames

#     <params>
#         frames : video frames (shape : [number_of_frames, height, width, channels])
#     '''

#     # frames might be considered as image batch
#     frames = frames.astype(np.float32)
#     frames = frames / 127.5 - 1
#     frames = np.clip(frames, -1, 1)

#     return frames

# # set the video_name & the generator_model that we want to apply
# video_name = "where the forest sings.mp4"
# generator_model = generator_B_to_A

# video_directory = os.path.join(base_path, 'videos/')
# video_path = os.path.join(video_directory, video_name)
# print(f'video_path : {video_path}')

# # extract frames
# video_frames = skvideo.io.vread(video_path)[::8, :, :, :]
# video_frames = preprocess_frames(video_frames)
# print()
# print(f'type of video_frames : {type(video_frames)}')
# print(f'shape of video_frames : {video_frames.shape}')

video_path : /tmp/drive/MyDrive/practice/CycleGAN/videos/where the forest sings.mp4

type of video_frames : <class 'numpy.ndarray'>
shape of video_frames : (52, 776, 1376, 3)


In [37]:
# # generate the fake videos
# generated_frames = generator_model.predict(video_frames, batch_size=8)

In [38]:
# # concatenate them vertically
# concatenated_frames = tf.concat([video_frames, generated_frames], axis=1)
# print(f'concatenated shape : {concatenated_frames.shape}')

concatenated shape : (52, 1552, 1376, 3)


In [39]:
# !pip install -q git+https://github.com/tensorflow/docs

  Preparing metadata (setup.py) ... [?25l[?25hdone


In [40]:
# # https://www.tensorflow.org/tutorials/load_data/video#create_frames_from_each_video_file
# import imageio
# from tensorflow_docs.vis import embed

# def to_gif(images):
#     converted_images = np.clip((images + 1) * 127.5, 0, 255).astype(np.uint8)
#     imageio.mimsave('./animation.gif', converted_images, fps=5)
#     return embed.embed_file('./animation.gif')

# to_gif(concatenated_frames)

https://chacha95.github.io/2019-10-24-Movipy/

learning rate를 수동으로 바꾸고 저장하면 이것도 저장될듯 => 혹시 모르니 나중에
주석 처리 하거나, 꼭 수동으로 돌려주거나 그런거 해야할지 생각 ㄱㄱ

1000 이후에 그거 테스트해보기 전에 1000 epoch 따로 저장해두자 local과 구글 드라이브에 => 나중에 다 완료 후 최종이랑 비교해보자, 같은 이미지에 적용해서

할 일
1. 모델 & optimizer 만들고
2. checkpoint 그 튜토리얼 보고 지정하고
3. custom training loop 만들자
4. 시간이 너무 오래 걸리면 이미지 사이즈 줄이자
5. 7.1 Training detail에 보면 D가 학습하는걸 상대적으로 더 느리게 하려고, D를 학습 시 objective를 2로 나눔 
6. tf.GradientTape(persistent=True) 요거 기억 & persistant tape를 사용한 이후에 -> del로 제거해주자
7. 충분히 학습한 이후 -> linearly decay 적용, finetuning 하자
8. requirements.txt 저장



https://stackoverflow.com/questions/59737875/keras-change-learning-rate

fade-in & fade-out 으로 연속적으로 바뀌는 모습 만들어도 좋을듯

오리지널 스케일을 그대로 적용하기 위해 위컴알에서만 추출함

오리지널 이미지 사이즈가 크기 때문에 디테일을 보려면 줌해보는걸 권장합니다