# cycle gan 

### this network came to solve the problem that we need paired of input and output to learn the distribution of the target domain so the idea of the network if we have two domains domian X , Y we will build two networks to learn the characteristics of the two domains and get generate new data so we build two generators and two discriminator  


![cycle gan](images/cycle-gan.png) 


### first we pass the X domain data through generator to get output F(X) and we go through two steps from here 1- we pass the F(X) domain data to the discriminator to distinguish between real and fake data 2- we pass the F(X) through the other generator to reconstruct the X domain data and we do the same process with the  Y domain data  



In [None]:
import tensorflow as tf 
import os 
from tensorflow.keras.layers import Dense , Conv2D ,concatenate,Conv2DTranspose , Flatten , Reshape  , BatchNormalization , Activation, Flatten ,LeakyReLU
from tensorflow.keras.models import Sequential 
from tensorflow_addons.layers import InstanceNormalization
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.datasets import mnist 
import numpy as np 
import matplotlib.pyplot as plt 
import math 
import os 
import argparse
import tensorflow_datasets as tfds

# the generator 

### the generator is autoencoder network which will follow this architecture 

![generator](images/generator.png)

In [None]:
class encoder_block(tf.keras.Model) :
    def __init__(self ,filters=16,kernel_size=3,strides=2,activation='relu',instance_norm=True) : 
        super(encoder_block , self).__init__()
        self.instance_norm  = instance_norm
        self.norm = InstanceNormalization()
        if activation == 'relu' : 
            self.activation = Activation('relu')
        else : 
            self.activation = LeakyReLU(.2)
             
        self.conv = Conv2D(filters , kernel_size  , strides = strides , padding = 'same' )
        
    def call(self , X ) : 
        if self.instance_norm :
                X = self.norm(X)
        X = self.activation(X)
        return self.conv(X) 
    
    

In [None]:
class decoder_block(tf.keras.Model): 
    def __init__(self,filters=16,kernel_size=3,strides=2,activation='relu',instance_norm=True) : 
        super().__init__()
        self.norm = InstanceNormalization()
        if activation == 'relu' : 
            self.activation = Activation('relu')
        else : 
            self.activation = LeakyReLU(.2) 
        self.convT = Conv2DTranspose(filters , kernel_size , padding = 'same' , strides = strides )
        
    def call(self , paired_inputs) : 
        X , skip_connection = paired_inputs 
        X = self.norm(X) 
        X = self.activation(X)
        X = self.convT(X) 
        return concatenate([X , skip_connection]) 

In [None]:
class U_Net_Network(tf.keras.Model) : 
    def __init__(self) : 
        super().__init__() 
        self.encoder_block_1 = encoder_block(32 ,  3 ,1  , activation='leaky_relu') 
        self.encoder_block_2 = encoder_block(64 , 3 ,2 , activation='leaky_relu' )
        self.encoder_block_3 = encoder_block(128 , 3, 2 , activation='leaky_relu')
        self.encoder_block_4 = encoder_block(256 , 3 ,2  , activation='leaky_relu') 
        
        self.decoder_block_1 = decoder_block(128 , 3 ,2 ) 
        self.decoder_block_2 = decoder_block(64 ,3 ,2) 
        self.decoder_block_3 = decoder_block(32 , 3, 2)
        
        self.outputs = Conv2DTranspose(3,
                              kernel_size=3,
                              strides=1,
                              activation='sigmoid',
                              padding='same')
        
        
    def call(self , X ) : 
        S_1 = self.encoder_block_1(X )
        S_2 = self.encoder_block_2(S_1 )
        S_3 = self.encoder_block_3(S_2) 
        S_4 = self.encoder_block_4(S_3) 
        
        D_1 = self.decoder_block_1((S_4 ,S_3)) 
        D_2 = self.decoder_block_2((D_1 ,S_2))
        D_3 = self.decoder_block_3((D_2 ,S_1)) 
        
        return self.outputs(D_3)


## Discriminator 

### the Discriminator will follow this architecture

![Discriminator](images/discriminator.png)

In [None]:
class Discriminator(tf.keras.Model) : 
    def __init__(self ) : 
        super().__init__() 
        self.encoder_block_1 = encoder_block(32 ,  3 , activation='leaky_relu' , instance_norm = False ) 
        self.encoder_block_2 = encoder_block(64 , 3  , activation='leaky_relu' , instance_norm = False  )
        self.encoder_block_3 = encoder_block(128 , 3 , activation='leaky_relu' , instance_norm = False )
        self.encoder_block_4 = encoder_block(256 , 3 , activation='leaky_relu' , instance_norm = False) 
        
        self.flatten = Flatten()
        self.dense = Dense(1 , activation ='linear')
         
        
    def call(self , X ) : 
        X = self.encoder_block_1(X)
        X = self.encoder_block_2(X)
        X = self.encoder_block_3(X) 
        X = self.encoder_block_4(X) 
        X = self.flatten(X) 
        return self.dense(X) 
        

## training 

### we will follow some steps to train the network 

- we will generate the fake domain y data from domain x data 
- we will reconstruct this fake y data to x data 
- we will the same think with y domain data generate x domain and reconstruct y 

- pass real and fake X , y data to the discriminator to train 

- calculate the losses 
- calculate the gradients 


In [None]:
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)
def cycle_loss(real_image, cycled_image):
    loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
    return 10 * loss1


In [None]:
def identity_loss(real_image, same_image):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return 10 * 0.5 * loss

In [None]:
def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)
    generated_loss = loss_obj(tf.zeros_like(generated), generated)
    total_disc_loss = real_loss + generated_loss

    return total_disc_loss * 0.5

In [None]:
def generator_loss(generated):
      return loss_obj(tf.ones_like(generated), generated)

In [None]:
def data() :     
    BUFFER_SIZE = 1000
    BATCH_SIZE = 1
    IMG_WIDTH = 256
    IMG_HEIGHT = 256
    
    dataset = tfds.load('cycle_gan/horse2zebra',
                              as_supervised=True)

    train_horses, train_zebras = dataset['trainA'], dataset['trainB']
    
    def random_crop(image):
        cropped_image = tf.image.random_crop(
          image, size=[IMG_HEIGHT, IMG_WIDTH, 3])

        return cropped_image
    
    # normalizing the images to [-1, 1]
    def normalize(image):
        image = tf.cast(image, tf.float32)
        image = (image / 127.5) - 1
        return image
    
    def random_jitter(image):
          # resizing to 286 x 286 x 3
        image = tf.image.resize(image, [286, 286],
                                  method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
          # randomly cropping to 256 x 256 x 3
        image = random_crop(image)
          # random mirroring
        image = tf.image.random_flip_left_right(image)
        return image
    def preprocess_image_train(image, label):
        image = random_jitter(image)
        image = normalize(image)
        return image
    
    train_horses = train_horses.cache().map(
    preprocess_image_train).shuffle(
    BUFFER_SIZE).batch(BATCH_SIZE)

    train_zebras = train_zebras.cache().map(
    preprocess_image_train).shuffle(
    BUFFER_SIZE).batch(BATCH_SIZE)
    
    return train_horses , train_zebras

In [None]:
@tf.function 
def train_step(real_x , real_y , models , optimizers  , loss_functions ) : 
    generator_x , generator_y , discriminator_x , discriminator_y = models 
    generator_x_optimizer , generator_y_optimizer ,discriminator_x_optimizer , discriminator_y_optimizer = optimizers 
    identity_loss , cycle_loss , generator_loss , discriminator_loss = loss_functions 
    with tf.GradientTape(persistent=True) as tape:
        fake_y = generator_x(real_x, training=True)
        cycled_x = generator_y(fake_y, training=True)

        fake_x = generator_y(real_y, training=True)
        cycled_y = generator_x(fake_x, training=True)
    
        # same_x and same_y are used for identity loss.
        same_x = generator_y(real_x, training=True)
        same_y = generator_x(real_y, training=True)
        
        disc_real_x = discriminator_x(real_x, training=True)
        disc_real_y = discriminator_y(real_y, training=True)

        disc_fake_x = discriminator_x(fake_x, training=True)
        disc_fake_y = discriminator_y(fake_y, training=True)
        
        gen_x_loss = generator_loss(disc_fake_y)
        gen_y_loss = generator_loss(disc_fake_x) 
        
        t_cycle_loss = cycle_loss(real_x , cycled_x) + cycle_loss(real_y , cycled_y ) 
        
        t_gen_x_loss = gen_x_loss + t_cycle_loss + identity_loss(real_y , same_y)
        t_gen_y_loss = gen_y_loss + t_cycle_loss + identity_loss(real_x , same_x) 
        
        disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x)
        disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y)
        
        generator_x_gradients = tape.gradient(t_gen_x_loss, 
                                        generator_x.trainable_variables)
        generator_y_gradients = tape.gradient(t_gen_y_loss, 
                                        generator_y.trainable_variables)
        
        discriminator_x_gradients = tape.gradient(disc_x_loss, 
                                            discriminator_x.trainable_variables)
        discriminator_y_gradients = tape.gradient(disc_y_loss, 
                                            discriminator_y.trainable_variables)
        
        generator_x_optimizer.apply_gradients(zip(generator_x_gradients , generator_x.trainable_variables))
        generator_y_optimizer.apply_gradients(zip(generator_y_gradients , generator_y.trainable_variables))
        
        discriminator_x_optimizer.apply_gradients(zip(discriminator_x_gradients , discriminator_x.trainable_variables))
        discriminator_y_optimizer.apply_gradients(zip(discriminator_y_gradients , discriminator_y.trainable_variables))
        
        return t_gen_x_loss , t_gen_y_loss , disc_x_loss , disc_y_loss
        

In [None]:
def train(epochs) : 
    
    generator_x = U_Net_Network()
    generator_y = U_Net_Network() 
    discriminator_x = Discriminator()
    discriminator_y = Discriminator()
    models = (generator_x , generator_y , discriminator_x , discriminator_y )
    
    generator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5) 
    generator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5) 
    discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5) 
    discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    optimizers = (generator_x_optimizer ,generator_y_optimizer ,discriminator_x_optimizer  , discriminator_y_optimizer )
    
    loss_functions = (identity_loss , cycle_loss , generator_loss , discriminator_loss) 
    
    x_ds , y_ds = data()
    
    for epoch in range(epochs ) :
        gen_x_losses = []
        gen_y_losses = []
        disc_x_losses = []
        disc_y_losses = []
        for batch_x , batch_y in zip(x_ds , y_ds) : 
            total_gen_x_loss , total_gen_y_loss , disc_x_loss , disc_y_loss = train_step(batch_x , batch_y, 
                                                                                        models , optimizers, loss_functions )
            gen_x_losses.append(total_gen_x_loss)
            gen_y_losses.append(total_gen_y_loss)
            disc_x_losses.append(disc_x_loss)
            disc_y_losses.append(disc_y_loss)
    
        print(f'gen_x_loss : {np.mean(gen_x_losses)} , gen_y_loss : {np.mean(gen_y_losses)} , disc_x_losses :{np.mean(disc_x_losses)} , dics_y_losses :{np.mean(disc_y_losses)}')

In [None]:
train(5)