### CycleGAN

Bài báo: [Link](https://arxiv.org/abs/1703.10593")

Chúng ta có thể xây dựng một ứng dụng để chuyển style một bức ảnh bình thường sang một bức ảnh tương tự với phong cách vẽ của Vangogh

Bài viết này được tham khảo từ repo của bạn **vanhuyz** về [CycleGAN](https://github.com/vanhuyz/CycleGAN-TensorFlow)

#### 1. Giới thiệu Dataset

Kết quả test:

<img src="images/styletransfer.jpeg" />

#### 2. Download Dataset

In [27]:
DATASET_URL='https://people.eecs.berkeley.edu/~taesung_park/CycleGAN/datasets/vangogh2photo.zip'

In [32]:
!rm -rf data
!mkdir data

In [34]:
TARGET_FILE='./data/vangogh2photo.zip'

In [39]:
!wget -N $DATASET_URL -O $TARGET_FILE

for details.

--2019-08-06 11:52:56--  https://people.eecs.berkeley.edu/~taesung_park/CycleGAN/datasets/vangogh2photo.zip
Resolving people.eecs.berkeley.edu (people.eecs.berkeley.edu)... 128.32.189.73
Connecting to people.eecs.berkeley.edu (people.eecs.berkeley.edu)|128.32.189.73|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 306590349 (292M) [application/zip]
Saving to: ‘./data/vangogh2photo.zip’


2019-08-06 11:54:36 (2.95 MB/s) - ‘./data/vangogh2photo.zip’ saved [306590349/306590349]



In [None]:
TARGET_DIR='./data/vangogh2photo/'
!unzip $TARGET_FILE -d ./data/
!rm $TARGET_FILE

Tree folder lưu trữ data

#### 3. Load những thư viện cần thiết


In [27]:
import tensorflow as tf
import random
import os
try:
    from os import scandir
except ImportError:
    from scandir import scandir

#### 4. Đọc data và ghi thành tf records

In [28]:
# Sử dụng thư viện scandir để lấy đường dẫn của tất cả ảnh trong một thư mục
# Đọc thêm về thư viện này ở đây: https://pypi.org/project/scandir/

def read_data(target_dir):
    """
    Read images in target dir 
    Args: 
        target_dir: string, path to the target directory
    Returns:
        images: list, list of images paths
    """
    
    images = []
    for image in scandir(target_dir):
        if image.name.endswith('.jpg') and image.is_file():
            images.append(image.path)
    # Shuffle images index
    # Why random.seed()
    # Seeding a pseudo-random number generator gives it its first "previous" value. 
    # Each seed value will correspond to a sequence of generated values for a given random number generator. 
    # That is, if you provide the same seed twice, you get the same sequence of numbers twice.
    random.seed(12345)
    all_indexs = list(range(len(images)))
    random.shuffle(all_indexs)
    
    # Shuffle images paths in images list based on shuffed indexes
    shuffed_images = []
    for i in all_indexs:
        shuffed_images.append(images[all_indexs[i]])
    return shuffed_images


Sử dụng tf.Example để thực hiện đọc record nhanh hơn (vì có sử dụng caching khi xử lý data). Xem thêm tại [đây](https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/tutorials/load_data/tf_records.ipynb#scrollTo=3pkUd_9IZCFO)

Bản chất tf.Example là một cái mapping {"string": tf.train.Feature}

Một số hàm convert dữ liệu thành tf.train.Feature

In [29]:
# The following functions can be used to convert a value to a type compatible
# with tf.Example.
"""
print(_bytes_feature(b'test_string'))

bytes_list {
  value: "test_string"
}

"""

def _bytes_feature(value):
  """Returns a bytes_list from a string / byte.
  """
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
  """Returns an int64_list from a bool / enum / int / uint."""
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [30]:
def convert_data_to_tf_record(target_dir, tfrecord_file):
    """
    Convert Data to TFRecord
    """
    # Create tfrecords folder name if it does not exists
    
    tf_records_dir = tfrecord_file.split('/')[-2]
    
    tf_records_dir = os.path.dirname(tfrecord_file)
    
    if not os.path.exists(tf_records_dir):
        try:
            print('Creating tfrecords folder')
            os.makedirs(tf_records_dir)
        except:
            print('Failed to create tfrecords folder')


    images = read_data(target_dir)
        
    writer = tf.python_io.TFRecordWriter(tfrecord_file)
    
    for i in range(len(images)):
        image_path = images[i]
        image_name = image_path.split('/')[-1]
        with tf.gfile.FastGFile(image_path, 'rb') as f:
            image_data = f.read()
            
        # create example
        feature = {
            'image_name': _bytes_feature(tf.compat.as_bytes(os.path.basename(image_name))),
            'image_data': _bytes_feature((image_data))
        }
            
        example = tf.train.Example(features=tf.train.Features(feature=feature))
        writer.write(example.SerializeToString())
    
    print('Finished.')
    writer.close()
        
        

In [31]:
x_dir = './data/vangogh2photo/trainA'
y_dir = './data/vangogh2photo/trainB'
x_tfrecord_file = './data/vangogh2photo/tfrecords/vangogh.tfrecords'
y_tfrecord_file = './data/vangogh2photo/tfrecords/photo.tfrecords'
# Writing tf records for normal photos
print('Converting X data to tf record')
convert_data_to_tf_record(x_dir, x_tfrecord_file)
# Writing tf records for vangogh photos
print('Converting Y data to tf record')
convert_data_to_tf_record(y_dir, y_tfrecord_file)

Converting X data to tf record
Finished.
Converting Y data to tf record
Finished.


#### 5. Khai báo các layers

##### Các ops


In [32]:
def batch_norm(input_data):
    """
    Batch Normalization
    """
    return tf.contrib.layers.batch_norm(x, decay=0.9, updates_collections=None, epsilon=1e-5, scale=True, scope=name)

Đọc thêm về sự khác nhau giữa Batch Norm và Instance Norm: [tại đây](https://stackoverflow.com/questions/45463778/instance-normalisation-vs-batch-normalisation)

Đọc thêm về [Leaky ReLU](http://cs231n.github.io/neural-networks-1/) 

In [33]:
def instance_norm(input_data):
    """
    Instance Normalization
    """
    
    with tf.variable_scope("instance_norm"):
        epsilon = 1e-5
        depth = input_data.get_shape()[-1]
        mean, var = tf.nn.moments(x, [1, 2], keep_dims=True)
        scale = tf.get_variable('scale', depth, initializer=tf.truncated_normal_initializer(mean=1.0, stddev=0.02))
        offset = tf.get_variable('offset', depth,initializer=tf.constant_initializer(0.0))
        normalized_input_data = scale*tf.div(x-mean, tf.sqrt(var+epsilon)) + offset
        return normalized_input_data

Tổng hợp 2 hàm norm lại

In [41]:
def norm(input_data, type="batch_norm"):
    if type == "batch_norm":
        return batch_norm(input_data)
    elif type == "instance_norm":
        return instance_norm(input_data)
    else:
        return input_data

\$\$ f(x) = \mathbb{1}(x < 0) (\alpha x) + \mathbb{1}(x>=0) (x) \$\$

In [42]:
def lrelu(x, leak=0.3):
    return tf.maximum(x, x*leak)

Tổng hợp 2 hàm relu

In [49]:
def applyRelu(x, type="relu", leak=0.3):
    if type == "relu":
        return tf.nn.relu(x, "relu")
    elif type == "lrelu":
        return lrelu(x, leak)
    else:
        return x

[Cycle GAN model Architecture](https://hardikbansal.github.io/CycleGANBlog/)

In [43]:
# Standard Deviation
stddev = 0.02

In [71]:
def conv2d(inputs, num_outputs, kernel_size, stride, stddev=0.02,name="conv2d", padding="VALID", activation_fn=None, norm='instance', isApplyRelu=True, reluType="leaky"):
    with tf.variable_scope(name):
        conv = tf.contrib.layers.conv2d(inputs, num_outputs, kernel_size, stride, padding, activation_fn=None, weights_initializer=tf.truncated_normal_initializer(stddev=stddev), biases_initializer=tf.constant_initializer(0.0))
        conv = norm(conv, "instance_norm")
        if isApplyRelu:
            conv = applyRelu(conv, type=reluType)
        return conv
        

In [72]:
def deconv2d(inputs, num_outputs, kernel_size, stride, stddev=0.02, name="deconv2d", padding="VALID", activation_fn=None, norm='instance'):
    with tf.variable_scope(name):
        conv = tf.contrib.layers.conv2d_transpose(inputs, num_outputs, kernel_size, stride, padding, activation_fn=None, weights_initializer=tf.truncated_normal_initializer(stddev=stddev),biases_initializer=tf.constant_initializer(0.0))
        conv = norm(conv, "instance_norm")
        conv = applyRelu(conv, type=reluType)
        return conv
    
    

<img src="./images/Resnet.jpg" width=500/>

In [73]:
def resnet_block(input_data, name, norm="instance_norm"):
    """
    Resnet Block contains two convolutional layers
    Returns:
        Output has the same dimension with the input
    """
    with tf.variable_scope(name):
        
        input_data_shape = input_data.get_shape()

        conv1_padded_input = tf.pad(input_data, [[0,0], [1,1], [1,1], [0,0]], 'REFLECT')
        conv1 = conv2d(conv1_padded_input, input_data_shape[-1], [3, 3], [1, 1], stddev=0.02, name='{0}_conv1'.format(name), padding='VALID')
        conv2_padded_input = tf.pad(conv1, [[0,0], [1,1], [1,1], [0,0]], 'REFLECT')
        conv2 = conv2d(conv2_padded_input, input_data_shape[-1], p[3, 3], [1,1], stddev=0.02, padding='VALID', name="{}_conv2".format(name))
    
    print(tf.nn.relu(input_data + conv2).get_shape())
    return tf.nn.relu(input_data + conv2)

def n_resnet_blocks(input_data, n=6, norm="instance_norm"):
    for i in range(1, n+1):
        resnet_output = resnet_block(input_data, "resnet_{}_".format(i), norm)
        input_data = resnet_output
    return resnet_output
    

##### Generator Layers

Generator nhận vào một bức ảnh có cỡ là (1, width, heigh, 3) và trả về một ảnh có kích thước tương tự

<img src="./images/Generator.jpg" />

In [79]:
class Generator():
    def __init__(self, name,image_size,ngf=64):
        self.name = name
        self.ngf = ngf
        self.image_size = image_size
    
    def __call__(input_data):
        """
        Args: 
            input_data: size = batch_size * input_data_width * input_data_height * channels
        Returns:
            return output which has the same size with input_data
        """
        with tf.variable_scope(name):
            # Encoding
            first_filter_size = 7
            first_padding_size = 3
            paddings = [[0, 0], [first_padding_size, first_padding_size], [first_padding_size, first_padding_size], [0, 0]]
            padded_input = tf.pad(input_data, paddings, 'REFLECT')
            
            print('padded_input', padded_input.get_shape())
            
            conv1 = conv2d(padded_input, self.ngf, [first_filter_size, first_filter_size], [1, 1], stddev=0.02, name="g_conv1")
            # Output shape: (?, w, h , 32)
            conv2 = conv2d(conv1, self.ngf*2, [3, 3], [2,2], stddev=0.02, padding='SAME', name="g_conv2")
            # Output shape: (?, w/2, h/2, 64)
            conv3 = conv2d(conv2, self.ngf*2, [3, 3], [2,2], stddev=0.02, padding='SAME', name="g_conv3")
            # Output shape: (?, w/4, h/4, 128)
            
            # Transformation
            if self.image_size <= 128:
                resnet_ouput = n_resnet_blocks(conv3, 6) # (?, w/4, h/4, 128)
            else:
                # Use 9 nesnet blocks for higher-resolution image
                resnet_ouput = n_resnet_blocks(conv3, 9) # (?, w/4, h/4, 128)
            
            # Decoding
            
            deconv1 = deconv2d(resnet_ouput, self.ngf*2, [3, 3], [2, 2], stddev=0.02, padding='SAME', name='g_deconv1')
            # Output shape: (?, w/2, h/2, 64)
            deconv2 = deconv2d(deconv1, self.ngf, [3, 3], [2, 2], stddev=0.02, padding='SAME', name='g_deconv2')
            # Output shape: (?, w/4, h/4, 32)
            padded_deconv3_input = tf.pad(deconv2,[[0, 0], [3, 3], [3, 3], [0, 0]], "REFLECT")
            # Output shape: (?, w, h, 32)
            deconv3 = conv2d(padded_deconv3_input, 3, [first_filter_size, first_filter_size], [1, 1],stddev=0.02,padding="VALID", name="g_deconv3",isApplyRelu=False)
            # Output shape: (?, w, h, 3)
            
            return deconv3
            
            
        