In [29]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.applications import resnet50
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K
from tensorflow.keras.callbacks import TensorBoard
from tensorflow import keras
import keras_tuner as kt

In [30]:
import numpy as np
import cv2
from pathlib import Path
import pandas as pd
import time
import os

### Paths

In [31]:
celeba_path = Path('../../../Dataset/img_celeba_cropped/')
celeba_identity_path = Path('../../Dataset/celeb_identity_processed.txt')

In [32]:
df_identity = pd.read_csv(
    celeba_identity_path, 
    sep = ",", 
    names=["image", "identity"], 
    header=0, 
    dtype=str
)

### Hyperparameters

In [33]:
triplets_count=2000
train_size=0.7
network_input_shape=(224, 224, 3)

batch_size=32

### Triplet DF

In [34]:
def create_n_triplets(df_identity, n):
    triplets = []
    
    for triplet in range(n):
        anchor_id = df_identity['identity'].sample(1, replace=True).to_string(index=False)
        
        keepGoing = True
        while(keepGoing):
            anchor, positive = df_identity.loc[df_identity['identity'] == anchor_id]['image'].sample(2, replace=True)
            keepGoing = (anchor == positive)
            
        negative = df_identity.loc[df_identity['identity'] != anchor_id]['image'].sample(1, replace=True).to_string(index=False)
        
        anchor = str(celeba_path) + '/' + anchor
        positive = str(celeba_path) + '/' + positive
        negative = str(celeba_path) + '/' + negative
        
        triplets.append([anchor, positive, negative])
        
    return pd.DataFrame(triplets, columns=['anchor', 'positive', 'negative'])

In [35]:
triplets = create_n_triplets(df_identity, triplets_count)

### TF Dataset

In [36]:
def preprocessing(anchor, positive, negative):
    return (
        convert_to_img(anchor), 
        convert_to_img(positive), 
        convert_to_img(negative)
    )


def convert_to_img(img_path):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img)
    img = resnet50.preprocess_input(img)
    return img

In [37]:
features = tf.data.Dataset.from_tensor_slices((
    triplets['anchor'].values, 
    triplets['positive'].values, 
    triplets['negative'].values
))

features = features.map(preprocessing)
fake_labels = tf.data.Dataset.from_tensor_slices(np.zeros(triplets_count))
dataset = tf.data.Dataset.zip((features, fake_labels))

train_dataset = dataset.take(round(triplets_count * train_size))
val_dataset = dataset.skip(round(triplets_count * train_size))

train_dataset = train_dataset \
        .batch(batch_size, num_parallel_calls=tf.data.AUTOTUNE) \
        .prefetch(buffer_size=tf.data.AUTOTUNE) \
        .cache()

val_dataset = val_dataset \
        .batch(batch_size, num_parallel_calls=tf.data.AUTOTUNE) \
        .prefetch(buffer_size=tf.data.AUTOTUNE) \
        .cache()

### Bayesian opt

In [38]:
def resnet_embedding(trainable_layers, n_hidden, n_neurons, dropout_rate):
    input = layers.Input(shape=network_input_shape)
    
    base_cnn=resnet50.ResNet50(
        input_tensor=input,
        input_shape=network_input_shape,
        include_top=False,
        weights='imagenet'
    )

    for layer in base_cnn.layers[0:-trainable_layers]:
        layer.trainable = False
        
    model = layers.Flatten()(base_cnn.output)
    
    for _ in range(n_hidden):
        model = tf.keras.layers.Dropout(dropout_rate)(model)
        model = tf.keras.layers.Dense(n_neurons, activation='relu')(model)
    
    model = layers.Dense(256)(model)

    return Model(input, outputs=model, name='resnet50')

In [39]:
class TripletLoss(layers.Layer):

    def __init__(self, margin, **kwargs):
        self.margin = margin
        super().__init__(**kwargs)
        
    def euclidean_distance(self, x, y):
        sum_square = tf.reduce_sum(tf.square(x - y), axis=1, keepdims=True)
        return tf.sqrt(tf.maximum(sum_square, K.epsilon()))
    

    def call(self, anchor, positive, negative):
        positive_distance = self.euclidean_distance(anchor, positive)
        negative_distance = self.euclidean_distance(anchor, negative)
        
        loss = positive_distance - negative_distance
        
        return tf.maximum(loss + self.margin, 0.0)

In [40]:
def identity_loss(y_true, y_pred):
    return tf.reduce_mean(y_pred)

In [41]:
def build_model(hp):
    trainable_layers =  hp.Int('trainable_layers', min_value=1, max_value=3, step=1, default=1)
    n_hidden = hp.Int('n_hidden', min_value=0, max_value=5, default=1)
    n_neurons = hp.Int('n_neurons', min_value=32, max_value=256, step=32)
    dropout_rate = hp.Float('dropout_rate', min_value=0, max_value=0.8, step=0.1)
    learning_rate = hp.Float('learning_rate', min_value=1e-6, max_value=1e-2, sampling='log')
    
    anchor_input = layers.Input(shape=network_input_shape, name='anchor_input')
    positive_input = layers.Input(shape=network_input_shape, name='right_input')
    negative_input = layers.Input(shape=network_input_shape, name='negative_input')
    
    embedding = resnet_embedding(trainable_layers, n_hidden, n_neurons, dropout_rate)
    
    distances = TripletLoss(margin=1)(
        embedding(anchor_input), 
        embedding(positive_input), 
        embedding(negative_input)
    )
    
    model = Model(
        inputs=[anchor_input, positive_input, negative_input], 
        outputs=distances,
        name='resnet50',
    )
    
    model.compile(loss=identity_loss, 
                  optimizer=Adam(learning_rate=learning_rate))
    return model

In [42]:
bayesian_opt_tuner = kt.BayesianOptimization(
    build_model, 
    objective="val_loss", 
    seed=42,
    max_trials=50, 
    alpha=1e-3, #to learn (1e-4 default)
    beta=2.6,  #to learn
    overwrite=True, 
    directory="celeba_tuning", 
    project_name="bayesian_opt")

In [43]:
early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=4,
    restore_best_weights=False,
    verbose=0,
)

lr_scheduler_reduce = keras.callbacks.ReduceLROnPlateau(
    factor=0.1, 
    patience=3
)

tensorboard = TensorBoard(os.path.join(bayesian_opt_tuner.project_dir, 'tensorboard'))

callbacks_list = [
    early_stopping,
]

In [44]:
bayesian_opt_tuner.search(
    train_dataset, 
    epochs=10,
    validation_data=val_dataset,
    callbacks=callbacks_list
)

Trial 50 Complete [00h 06m 50s]
val_loss: 0.8788732290267944

Best val_loss So Far: 0.8788732290267944
Total elapsed time: 05h 20m 42s
INFO:tensorflow:Oracle triggered exit


In [45]:
top_three_models = bayesian_opt_tuner.get_best_models(num_models=3)

In [49]:
top_three_models[0].values

AttributeError: 'Functional' object has no attribute 'values'

In [47]:
top_three_models[1]

<keras.engine.functional.Functional at 0x2d849b9d0>

In [48]:
top_three_models[2]

<keras.engine.functional.Functional at 0x2cf1106a0>