**GAN USING DATASET**

# Execute this chapter if using Google colab

In [None]:
from google.colab import auth

**do not forget to CHANGE EXECUTE TO 'GPU'**

In [None]:
auth.authenticate_user()

In [None]:
project_id = 'le-wagon-337814'
!gcloud config set project {project_id}
!gsutil ls

In [None]:
! pip install --quiet git+https://github.com/christophelanson/icangetyoursmile

# Model parameters

In [59]:
model_name = 'GAN-test'
load_model_name = 'nan' # used only if and when loading a pretrained model
data_size = 3000 #(number of images to train on)
image_size = (64,64)

# tensorflow.data.Dataset

In [60]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [61]:
import numpy as np
import os
from PIL import Image
import matplotlib.pyplot as plt
import tensorflow as tf
import random

## Train test split dataset

In [62]:
from sklearn.model_selection import train_test_split

path_to_mask_file = f'/home/christophelanson/code/christophelanson/icangetyoursmile/raw_data/{image_size[0]}x{image_size[1]}/Mask/with-mask-default-mask-seed'
path_to_no_mask_file = f'/home/christophelanson/code/christophelanson/icangetyoursmile/raw_data/{image_size[0]}x{image_size[1]}/No_mask/seed'

# File_list : list of all files
file_list = [(f'{path_to_mask_file}{str(i).zfill(4)}.png', f'{path_to_no_mask_file}{str(i).zfill(4)}.png') for i in np.arange(data_size)]

train,test= train_test_split(file_list,test_size=0.2,random_state=1,shuffle=True)
train[:2]

[('/home/christophelanson/code/christophelanson/icangetyoursmile/raw_data/64x64/Mask/with-mask-default-mask-seed0268.png',
  '/home/christophelanson/code/christophelanson/icangetyoursmile/raw_data/64x64/No_mask/seed0268.png'),
 ('/home/christophelanson/code/christophelanson/icangetyoursmile/raw_data/64x64/Mask/with-mask-default-mask-seed0032.png',
  '/home/christophelanson/code/christophelanson/icangetyoursmile/raw_data/64x64/No_mask/seed0032.png')]

## Read image function (X and y)

In [63]:
def X_y_images(filename):
    """ return X (mask) and y (no mask) from a file name"""
    #mask_im = np.asarray(Image.open(filename[0])).tolist()
    #no_mask_im = np.asarray(Image.open(filename[1])).tolist()
    
    #mask_image = tf.io.read_file(filename[0])
    #mask_image = tf.image.decode_png(mask_image)
    #mask_image = tf.image.convert_image_dtype(mask_image, tf.float32)
    
    no_mask_image = tf.io.read_file(filename[1])
    no_mask_image = tf.image.decode_png(no_mask_image)
    no_mask_image = tf.image.convert_image_dtype(no_mask_image, tf.float32)
    
    return no_mask_image # return mask_image, no_mask_image

## Image augmentation

TO BE DONE

## Create dataset

In [64]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [67]:
# Create dataset as a sliced dataset
def create_dataset(file_list, batch_size=16):
    ds = tf.data.Dataset.from_tensor_slices(file_list)
    # Shuffle data (so that the ordering is different at each epoch)
    ds = ds.shuffle(buffer_size=len(file_list))
    # Map dataset to get images
    ds = ds.map(X_y_images, num_parallel_calls=AUTOTUNE)
    # Normalisation
#    ds = ds.map(lambda x: x/255)
    # Map for data augmentation
    #ds = ds.map(image_augmentation_function, num_parallel_calls=5)
    # Batching
    ds = ds.batch(batch_size)
    # Prefetch (use several resources at the same time)
    ds = ds.prefetch(buffer_size=AUTOTUNE)
    return ds

In [68]:
create_dataset(train)

<PrefetchDataset element_spec=TensorSpec(shape=(None, None, None, None), dtype=tf.float32, name=None)>

# Create model

## Data

In [69]:
train_ds=create_dataset(train)
test_ds=create_dataset(test)

## Model

In [85]:
from tensorflow.keras import layers, Sequential, Input, Model

In [86]:
model_size = 2**5 # Chollet model_size = 2**6

### Discriminator

In [87]:
input_shape = (image_size[0], image_size[1],3)
input_shape

(64, 64, 3)

In [88]:
discriminator = Sequential(
    [
    Input(shape=input_shape),
    layers.Conv2D(model_size, kernel_size=4, strides=2, padding="same"),
    layers.LeakyReLU(alpha=0.2),
    layers.Conv2D(model_size*2, kernel_size=4, strides=2, padding="same"),
    layers.LeakyReLU(alpha=0.2),
    layers.Conv2D(model_size*2, kernel_size=4, strides=2, padding="same"),
    layers.LeakyReLU(alpha=0.2),
    layers.Flatten(),
    #layers.Dropout(0.2),
    layers.Dense(1, activation="sigmoid"),
    ],
    name="discriminator",
)

In [89]:
discriminator.summary()

Model: "discriminator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_16 (Conv2D)          (None, 32, 32, 32)        1568      
                                                                 
 leaky_re_lu_24 (LeakyReLU)  (None, 32, 32, 32)        0         
                                                                 
 conv2d_17 (Conv2D)          (None, 16, 16, 64)        32832     
                                                                 
 leaky_re_lu_25 (LeakyReLU)  (None, 16, 16, 64)        0         
                                                                 
 conv2d_18 (Conv2D)          (None, 8, 8, 64)          65600     
                                                                 
 leaky_re_lu_26 (LeakyReLU)  (None, 8, 8, 64)          0         
                                                                 
 flatten_4 (Flatten)         (None, 4096)            

### Generator

In [90]:
latent_dim = model_size * 2
deeper_layer_size_param = image_size[0] * image_size[1] / 8 / 8

In [91]:
generator = Sequential(
    [
    Input(shape=(latent_dim,)),
    layers.Dense(deeper_layer_size_param * model_size*2),
    layers.Reshape((int(image_size[0] / 8), int(image_size[1] / 8), model_size*2)),
    layers.Conv2DTranspose(model_size*2, kernel_size=4, strides=2, padding="same"),
    layers.LeakyReLU(alpha=0.2),
    layers.Conv2DTranspose(model_size*4, kernel_size=4, strides=2, padding="same"),
    layers.LeakyReLU(alpha=0.2),
    layers.Conv2DTranspose(model_size*8, kernel_size=4, strides=2, padding="same"),
    layers.LeakyReLU(alpha=0.2),
    layers.Conv2D(3, kernel_size=5, padding="same", activation="sigmoid"),
    ],
    name="generator",
)

In [92]:
generator.summary()

Model: "generator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_9 (Dense)             (None, 4096)              266240    
                                                                 
 reshape_4 (Reshape)         (None, 8, 8, 64)          0         
                                                                 
 conv2d_transpose_12 (Conv2D  (None, 16, 16, 64)       65600     
 Transpose)                                                      
                                                                 
 leaky_re_lu_27 (LeakyReLU)  (None, 16, 16, 64)        0         
                                                                 
 conv2d_transpose_13 (Conv2D  (None, 32, 32, 128)      131200    
 Transpose)                                                      
                                                                 
 leaky_re_lu_28 (LeakyReLU)  (None, 32, 32, 128)       0 

### Gan model (class)

In [93]:
from tensorflow import shape as tf_shape
from tensorflow.random import normal as tf_random_normal
from tensorflow.random import uniform as tf_random_uniform
from tensorflow import concat as tf_concat
from tensorflow import ones as tf_ones
from tensorflow import zeros as tf_zeros
from tensorflow import GradientTape
from tensorflow.keras.metrics import Mean as keras_mean
from tensorflow.keras.optimizers import Adam

In [94]:
class GAN(Model):
    def __init__(self, discriminator, generator, latent_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.d_loss_metric = keras_mean(name="d_loss")
        self.g_loss_metric = keras_mean(name="g_loss")
    
    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
    
    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_images):
        batch_size = tf_shape(real_images)[0]
        random_latent_vectors = tf_random_normal(
            shape=(batch_size, self.latent_dim))
        generated_images = self.generator(random_latent_vectors)
        combined_images = tf_concat([generated_images, real_images], axis=0)
        labels = tf_concat(
            [tf_ones((batch_size, 1)), tf_zeros((batch_size, 1))],
            axis=0
        )
        labels += 0.05 * tf_random_uniform(tf_shape(labels))
        
        with GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
            grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
            self.d_optimizer.apply_gradients(
                zip(grads, self.discriminator.trainable_weights)
            )
        
        random_latent_vectors = tf_random_normal(
            shape=(batch_size, self.latent_dim))
        
        misleading_labels = tf_zeros((batch_size, 1))
        
        with GradientTape() as tape:
            predictions = self.discriminator(
                self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
            grads = tape.gradient(g_loss, self.generator.trainable_weights)
            self.g_optimizer.apply_gradients(
                zip(grads, self.generator.trainable_weights))
        
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)
        return {"d_loss": self.d_loss_metric.result(),
            "g_loss": self.g_loss_metric.result()}

### Custom Callback (GAN Monitor class)

In [95]:
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.utils import array_to_img

In [96]:
class GANMonitor(Callback):
    def __init__(self, num_img=3, latent_dim=128):
        self.num_img = num_img
        self.latent_dim = latent_dim
        
    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf_random_normal(
        shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
     #   generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = array_to_img(generated_images[i])
            img.save(f"GAN_gen_img/generated_img_{epoch:03d}_{i}.png")

### GAN Model

In [97]:
from tensorflow.keras.losses import BinaryCrossentropy

In [98]:
epochs = 50
gan = GAN(discriminator=discriminator, generator=generator,
            latent_dim=latent_dim)

gan.compile(
    d_optimizer=Adam(learning_rate=0.0001),
    g_optimizer=Adam(learning_rate=0.002),
    loss_fn=BinaryCrossentropy(),
)

gan.fit(
    train_ds, epochs=epochs,
    callbacks=[GANMonitor(num_img=3, latent_dim=latent_dim)]
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50

KeyboardInterrupt: 