#Mount google drive.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

#Imports.

In [None]:
import tensorflow as tf
assert tf.__version__.startswith('2')
from tensorflow import keras
from tensorflow.keras import layers

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

#from PIL import Image

import os

import time

# Change path in Colab.
!pwd
%cd "/content/drive/My Drive/Colab Notebooks/facial_landmark_pred_example/"
!ls -l

from Helper.My_Display import My_Display

tf.__version__

#Setup paths.

In [None]:
path = "/content/drive/My Drive/Colab Notebooks/facial_landmark_pred_example/"
#path = r"C:/Users/USER/Google Drive/Colab Notebooks/facial_landmark_pred_example"

data_path = path + 'tfrec_data/'
facenet_path = path + "facenet/model/"
checkpoint_path = path + 'chkpt/'
saved_model_path = path + "saved_model/"
saved_fig_dir = path + "saved_fig/saved_fig.png"

#Sizes, dimensions & hyperparameters.

In [None]:
test_size = 466     # Size of test set (20% of total dataset size).
IMAGE_SIZE = 160      # w, h of image.
IMG_SHAPE = (IMAGE_SIZE, IMAGE_SIZE, 3)      # Shape of image with 3 channels.
top_dropout_rate = 0.2      # Dropout rate for dropout layer.

# The output is a vector of size 388 float between 0 to 1.
num_output = 388      # 388 represents all x, y values of 194 coordinates.

BATCH_SIZE = 4      # For training.
epochs = 15     # For transfer learning.
epochs_fine_tune = 150     # For fine tuning.
batches = 46     # For checkpoint.

lr_fine_tune = 1e-5     # learning rate for fine tuning.
# Fine tune from this layer onwards
fine_tune_at = 0      # 0 means fine tune all layers.

#Read dataset.

In [None]:
raw_dataset = tf.data.TFRecordDataset([data_path + 'tfrec_data_1', 
                                       data_path + 'tfrec_data_2',
                                       data_path + 'tfrec_data_3', 
                                       data_path + 'tfrec_data_4', 
                                       data_path + 'tfrec_data_5', 
                                       ])

In [None]:
# Display attributes of raw_dataset.
print(tf.data.experimental.cardinality(raw_dataset).numpy())
print(len(list(raw_dataset)))

cardinality = tf.data.experimental.cardinality(raw_dataset)
print((cardinality == tf.data.experimental.INFINITE_CARDINALITY).numpy())
print((cardinality == tf.data.experimental.UNKNOWN_CARDINALITY).numpy())

raw_example = next(iter(raw_dataset))
parsed = tf.train.Example.FromString(raw_example.numpy())

##Function to parse data.

In [None]:
# Create a description of the features.
feature_description = {
    'img_name': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'img': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'coords': tf.io.FixedLenFeature([], tf.string, default_value=''),
}

In [None]:
def _parse_function(example_proto):
  # Parse the input `tf.train.Example` proto using the dictionary above.
  map_ds = tf.io.parse_single_example(example_proto, feature_description)    
  img_name = map_ds['img_name']
  img = tf.image.decode_jpeg(map_ds['img'])     
  coords = tf.io.decode_raw(map_ds['coords'], 'double')     # float64
  print(map_ds, img_name, img, coords)

  # Get img original size.
  w = tf.shape(img)[1]
  h = tf.shape(img)[0]

  # Resize img.
  img = tf.image.resize(img, [IMAGE_SIZE,IMAGE_SIZE])

  # Normalize img, not neccessary.
  #img = tf.math.divide(img, 255)

  # Get the scale factor.
  w_scale_factor = tf.math.divide(IMAGE_SIZE, w)
  h_scale_factor = tf.math.divide(IMAGE_SIZE, h)
  
  # Scale the coords.
  coords_dim = (194,2)
  coords = tf.reshape(coords, coords_dim)
  coord_vec_0 = tf.slice(coords, [0, 0], [194, 1])
  coord_vec_1 = tf.slice(coords, [0, 1], [194, 1])
  coord_vec_0_scaled = tf.math.multiply(coord_vec_0, w_scale_factor)
  coord_vec_1_scaled = tf.math.multiply(coord_vec_1, h_scale_factor)

  # Normalize coords
  coord_vec_0_scaled = tf.math.divide(coord_vec_0_scaled, IMAGE_SIZE)
  coord_vec_1_scaled = tf.math.divide(coord_vec_1_scaled, IMAGE_SIZE)
  coords = tf.concat([coord_vec_0_scaled, coord_vec_1_scaled], 1)
  coords_dim = (388,1)
  coords = tf.reshape(coords, coords_dim)

  return img, coords  

##Train/test split.

In [None]:
test_dataset = raw_dataset.take(test_size) 
train_dataset = raw_dataset.skip(test_size)

##Parse train, test set.

In [None]:
parsed_dataset_train = train_dataset.map(_parse_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
parsed_dataset_test = test_dataset.map(_parse_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)

print(parsed_dataset_train)
print(parsed_dataset_test)

In [None]:
parsed_dataset_train = parsed_dataset_train.cache()
parsed_dataset_train = parsed_dataset_train.shuffle(500, seed=1234, reshuffle_each_iteration=True)
parsed_dataset_train = parsed_dataset_train.batch(BATCH_SIZE, drop_remainder=True)
parsed_dataset_train = parsed_dataset_train.prefetch( tf.data.experimental.AUTOTUNE)
parsed_dataset_train

In [None]:
parsed_dataset_test = parsed_dataset_test.cache()
parsed_dataset_test = parsed_dataset_test.shuffle(500, seed=1234, reshuffle_each_iteration=True)
parsed_dataset_test = parsed_dataset_test.batch(BATCH_SIZE, drop_remainder=True)
parsed_dataset_test = parsed_dataset_test.prefetch( tf.data.experimental.AUTOTUNE)
parsed_dataset_test

#Build model for transfer learning.

In [None]:
# Load the pretrained keras facenet model.
facenet_model = keras.models.load_model(facenet_path + 'facenet_keras.h5')

base_model = facenet_model
base_model.trainable = False      # Fix the weights of the base facenet model.

# Replace clasification head from facenet model with one that does regression.
x = base_model.layers[-2].output    # Remove the last output layer.

# Add Dropout, BatchNorm & Dense layers. 
x = layers.Dropout(top_dropout_rate, name="top_dropout")(x)    
x = layers.BatchNormalization()(x)
outputs = layers.Dense(num_output, activation="sigmoid", name="pred")(x)

# Setup model.
model = tf.keras.Model(facenet_model.input, 
                      outputs,
                      name="facenet_model_mod")

optimizer = tf.keras.optimizers.Adam()      # Select optimizer.
# Compile model.
model.compile(optimizer=optimizer, 
              loss="mse", 
              metrics=["mse",],
)

print('Number of trainable variables = {}'.format(len(model.trainable_variables)))
model.summary()
#tf.keras.utils.plot_model(model)

##Setup chkpt.

In [None]:
# Include the epoch in the file name (uses `str.format`)

checkpoint_file = checkpoint_path + "/cp-{epoch:04d}.ckpt"
print(os.listdir(checkpoint_path))

# Create a callback that saves the model's weights every 5 epochs
cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_file, 
    verbose=1, 
    save_weights_only=True,
    save_freq=batches*BATCH_SIZE)

# Save the weights using the `checkpoint_path` format
#model.save_weights(checkpoint_file.format(epoch=0))

#file = '/content/drive/My Drive/Colab Notebooks/models/digit_mnist_apml.h5'
#Check whether a saved model existed, if true we load it
if os.listdir(checkpoint_path):
    # Loads the weights
    #model.load_weights(checkpoint_path)
    #model.load_weights(checkpoint_path.format(epoch=0))
    latest = tf.train.latest_checkpoint(checkpoint_path)
    print(latest)
    #model.load_weights(latest)

## Train the model (transfer learning)

In [None]:
import time
start_time = time.time()

history = model.fit(parsed_dataset_train, 
                    epochs=epochs, 
                    validation_data=parsed_dataset_test, 
                    callbacks=[cp_callback],
                    verbose=1,
                    )

print("--- %s seconds ---" % (time.time() - start_time))

### Learning curves


In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(loss, label='Training loss')
plt.plot(val_loss, label='Validation loss')
plt.legend(loc='best')
plt.ylabel('loss')
plt.title('Training and Validation lossloss')

plt.show()

###Test transfer learning results.

In [None]:
n = 3      # 3 batches.
samples = parsed_dataset_test.take(n)     
pred = model.predict(samples)
results = model.evaluate(samples)
print(results)

## Fine tuning

In [None]:
base_model.trainable = True

# Let's take a look to see how many layers are in the base model
print("Number of layers in the base model: ", len(base_model.layers))

# Freeze all the layers before the `fine_tune_at` layer
for layer in base_model.layers[:fine_tune_at]:
  layer.trainable =  False

print('Number of trainable variables = {}'.format(len(base_model.trainable_variables)))  
#base_model.summary()

###Compile the model using a much lower training rate.

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(lr_fine_tune),             
              loss='mse',
              metrics=[tf.keras.metrics.MeanSquaredError(),],
              )

print('Number of trainable variables = {}'.format(len(model.trainable_variables)))
model.summary()

### Continue to train the model.

In [None]:
import time
start_time = time.time()

history_fine = model.fit(parsed_dataset_train,                         
                         epochs=epochs_fine_tune, 
                         validation_data=parsed_dataset_test, 
                         callbacks=[cp_callback],
                         verbose=1,
                         )
print("--- %s seconds ---" % (time.time() - start_time))                        

In [None]:
loss = history_fine.history['loss']
val_loss = history_fine.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(loss, label='Training loss')
plt.plot(val_loss, label='Validation loss')
plt.legend(loc='best')
plt.ylabel('loss')
plt.title('Training and Validation lossloss')

plt.show()

###Test fine tuning result.

In [None]:
n = 3      # 3 batches.
samples = parsed_dataset_test.take(n)     
pred = model.predict(samples)
results = model.evaluate(samples)
print(results)

#Save model.

In [None]:
#assert False      # Stop cells from running.

#tf.saved_model.save(model, saved_model_path)     # tf
model.save(saved_model_path + 'saved_model.h5')      # tf.keras

## Convert & save as TFLite model.

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_path)
tflite_model = converter.convert()

with open(saved_model_path + 'saved_model.tflite', 'wb') as f:
  f.write(tflite_model)

###Download the converted model and labels

In [None]:
#from google.colab import files

#files.download('model.tflite')

#Load saved model, test & display results.

In [None]:
# Load saved trained model.
saved_model = tf.keras.models.load_model(saved_model_path + 'saved_model.h5')     
saved_model.summary()

In [None]:
# 1 batch = 4 samples, 75 batches = 300 samples.
samples = parsed_dataset_test.take(75)      
#pred = saved_model.predict(samples)
results = saved_model.evaluate(samples)
print(results)

In [None]:
# Display fine tune results.
fig=plt.figure(figsize=(20, 150))
rows = 60
cols = 10
show = My_Display()
show.display_test_fine_tune(samples, saved_model, IMAGE_SIZE, 
                            fig, rows, cols, 
                            saved_fig_dir)