## Imports

In [None]:
#the following four imports have to be installed
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Dropout, BatchNormalization, Reshape
#from tensorboard.plugins.hparams import api as hp
import pandas as pd

import pandas.util
import os
import shutil

## Hyperparameters

In [None]:
IMG_SIZE = (200,100)    # width and height of all images (resize, if required)
BATCH_SIZE = 32  # for training and prediction
EPOCHS = 50
METRIC_ACCURACY = 'accuracy'
LOG_DIR = "logs/"
MODELFNAME = "model_checkpoints/landmarks_final.h5"

TEST_PKL = "files/test_landmarks_df.pkl"
TRAIN_PKL = "files/train_landmarks_df.pkl"
VAL_PKL = "files/val_landmarks_df.pkl"

In [None]:
# If the path does not exist, create it. 
if not os.path.exists(LOG_DIR):
    os.mkdir(LOG_DIR)

# If the path does not exist, create it.    
if not os.path.exists(MODELFNAME):
    os.mkdir(MODELFNAME)

# However, if the path exists and has been used before, the content has to be deleted and the directory created again.
if os.path.exists(LOG_DIR):
    shutil.rmtree(LOG_DIR)
    os.mkdir(LOG_DIR)
    
#check if the TEST_PKL, TRAIN_PKL, VAL_PKL files exist. If not: raise an error.
if not os.path.exists(TEST_PKL):
    raise FileNotFoundError('The model preprocess (landmarks_model_preprocess.ipynb) has to be conducted first or the name of the TEST_PKL has to be adjusted.')
if not os.path.exists(TRAIN_PKL):
    raise FileNotFoundError('The model preprocess (landmarks_model_preprocess.ipynb) has to be conducted first or the name of the IMAGE_FOLDER has to be adjusted.')
if not os.path.exists(VAL_PKL):
    raise FileNotFoundError('The model preprocess (landmarks_model_preprocess.ipynb) has to be conducted first or the name of the IMAGE_FOLDER has to be adjusted.')

# get test, train, validation datasets

In [None]:
test_df = pd.read_pickle(TEST_PKL)
train_df = pd.read_pickle(TRAIN_PKL)
val_df = pd.read_pickle(VAL_PKL)

## Map a filename to an actual image tensor

In [None]:
def path_to_array(filename, landmarks):
    img = tf.io.read_file(filename)
    img = tf.image.decode_png(img, channels = 3)
    # now img is 3 dim array of numbers in {0,..., 255}
    img = tf.cast(img, dtype = tf.float32) / 255. 
    return img, landmarks

## Make a tf dataset of images from a pd data frame of file paths 

In [None]:
def make_dataset(df):
    # first, make dataset with just the relevant: path and landmarks
    ds_path = tf.data.Dataset.from_tensor_slices((df['image_path'], df['landmarks']))

    # convert to data set with actual images
    ds = ds_path.map(path_to_array)
    ds = ds.batch(BATCH_SIZE)
    return ds

test_ds  = make_dataset(test_df)
val_ds   = make_dataset(val_df)
train_ds = make_dataset(train_df)
train_ds = train_ds.repeat() # infinitely repeat

## determine architecture and parameters of the model

In [None]:
kernel_size = (3, 3)
pool_size   = (2, 2)
first_filters  = 32
second_filters = 64
third_filters  = 128
dropout_conv  = 0.3
dropout_dense = 0.3

model = tf.keras.models.Sequential() # sequential stack of layers

model.add( BatchNormalization(input_shape = (IMG_SIZE[1],IMG_SIZE[0], 3)))
model.add( Conv2D (first_filters, kernel_size, activation = 'relu')) #convolutional layer + activation layer
model.add( Conv2D (first_filters, kernel_size, activation = 'relu'))
model.add( Conv2D (first_filters, kernel_size, activation = 'relu'))
model.add( MaxPooling2D (pool_size = pool_size)) #Pooling layer
model.add( Dropout (dropout_conv)) # Dropout layer

model.add( Conv2D (second_filters, kernel_size, activation ='relu'))
model.add( Conv2D (second_filters, kernel_size, activation ='relu'))
model.add( Conv2D (second_filters, kernel_size, activation ='relu'))
model.add( MaxPooling2D (pool_size = pool_size))
model.add( Dropout (dropout_conv))

model.add( Conv2D (third_filters, kernel_size, activation ='relu'))
model.add( Conv2D (third_filters, kernel_size, activation ='relu'))
model.add( Conv2D (third_filters, kernel_size, activation ='relu'))
model.add( MaxPooling2D (pool_size = pool_size))
model.add( Dropout (dropout_conv))

model.add( Flatten())
model.add( Dense (256, activation = "relu", kernel_regularizer = tf.keras.regularizers.l2(0.001))) #dense layer + activation layer
model.add( Dropout (dropout_dense)) #Dropout layer
model.add( Dense (20))  # Dense layer
model.add( Reshape ((10,2))) #Reshape to wanted output

model.summary()

In [None]:
# define the loss, optimization algorithm and prepare the model for gradient computation 
model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0005),
              loss = 'MSE', metrics = ['acc']) #loss = MSE

In [None]:
num_train = len(train_df)

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=LOG_DIR, histogram_freq=1, update_freq='batch')

# Function to store model to file, if validation loss has a new record
# Check always after having seen at least another save_freq examples.
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    MODELFNAME, monitor = 'val_loss', mode = 'min', 
    save_best_only = True, verbose = 1)

# Function to decrease learning rate by 'factor'
# when there has been no significant improvement in the last 'patience' epochs.
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor = 'val_loss', mode = 'min', factor = 0.75, patience = 4, verbose = 1)
                         
# fit_generator is like fit, but training set generation (image reading) is run in parallel to optimization
model.fit_generator(
    train_ds, epochs = EPOCHS, 
    steps_per_epoch = num_train / BATCH_SIZE, #would use each example once on average
    validation_data = val_ds, verbose = 1,
    callbacks = [checkpoint, tensorboard_callback])

In [None]:
# Load the parameters with the best validation accuracy during training.
# This works also if you interruped the training!
model.load_weights(MODELFNAME)

test_loss, test_acc = model.evaluate(test_ds, verbose = 0)
print("Loss on test set:", test_loss)

## Create a collage with predicted and original landmarks on the test images

In [None]:
from tqdm import tqdm
import math
from PIL import ImageDraw
from PIL import Image
import ntpath

all_lm = np.load('files/preprocessed_landmarks.npy', allow_pickle = True)[()].copy()
unique_test_files = list({key[:-len("_t0.jpg")] for key in test_df.filename.tolist()})

columns = 5
lm_radius = 1
n = len(unique_test_files)
rows = math.ceil(n / columns)
outsize = IMG_SIZE

collage = Image.new('RGB', (columns * outsize[0], rows * outsize[1]))

for i in tqdm(range(n), unit='images', desc='Drawing landmarks'):
    img_path = f"images/images_landmarks/{unique_test_files[i]}_t0.jpg"
    filename = ntpath.basename(img_path)
    
    img = tf.io.read_file(img_path)
    img = tf.image.decode_png(img, channels = 3)
    # now img is 3 dim array of numbers in {0,..., 255}
    img = tf.cast(img, dtype = tf.float32) / 255.
    
    landmarks = model(tf.expand_dims(img, 0))[0,...]
    
    x = (i % columns) * outsize[0]
    y = (i // columns) * outsize[1]
    
    lm = landmarks
    orig_lm = all_lm[filename]["landmarks"]
    img = Image.open(img_path)
    draw = ImageDraw.Draw(img)
    for i in range(lm.shape[0]):
        draw.ellipse((lm[i,0]-lm_radius,lm[i,1]-lm_radius, lm[i,0]+lm_radius,lm[i,1]+lm_radius),fill = 'red')
        draw.ellipse((orig_lm[i,0]-lm_radius,orig_lm[i,1]-lm_radius, orig_lm[i,0]+lm_radius,orig_lm[i,1]+lm_radius),fill = 'blue')
        
    collage.paste(img, (x, y))
    

collage.save('learned_collage_landmarks_test.jpg')