In [None]:
!pip install catboost
!pip install lightgbm
!pip install shap
!pip install keras-tuner --upgrade

In [None]:
import numpy as np
import seaborn as sns
import os
import matplotlib.pyplot as plt
import tensorflow as tf
import six
import tensorflow_datasets as tfds
import pandas as pd
import keras
import keras_tuner as kt
from tensorflow.keras import Input, layers, Model
from sklearn.metrics import r2_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
import lightgbm as lgb
from catboost import CatBoostRegressor
import shap
import scipy.stats as stats

In [None]:
%matplotlib inline

In [None]:
plt.rcParams['figure.dpi'] = 400

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
#helper functions
def adjusted_r2(y_test, y_pred, n_predictors):
    r2 = r2_score(y_test, y_pred)  
    adj_r2 = 1-(1-r2)*(len(y_test)-1)/(len(y_test)-n_predictors-1) 
    return adj_r2


def mean_absolute_percentage_error(y_pred, y_true): 
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

In [None]:
# Create dataset from multiple .tfrecord files
train =                  ["/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00000-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00001-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00002-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00003-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00004-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00005-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00006-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00007-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00008-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00009-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00010-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00011-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00012-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00013-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00014-of-00016",
                          "/content/drive/MyDrive/Celeb_A/celeb_a-train.tfrecord-00015-of-00016",
                          
                          ]
val = ["/content/drive/MyDrive/Celeb_A/celeb_a-validation.tfrecord-00000-of-00002",
        "/content/drive/MyDrive/Celeb_A/celeb_a-validation.tfrecord-00001-of-00002"]

test = ["/content/drive/MyDrive/Celeb_A/celeb_a-test.tfrecord-00000-of-00002",
        "/content/drive/MyDrive/Celeb_A/celeb_a-test.tfrecord-00001-of-00002"]


celeba_train = tf.data.TFRecordDataset(train)
celeba_val = tf.data.TFRecordDataset(val)
celeba_test = tf.data.TFRecordDataset(test)

In [None]:
ATTRIBUTES = [
    '5_o_Clock_Shadow',
    'Arched_Eyebrows',
    'Attractive',
    'Bags_Under_Eyes',
    'Bald',
    'Bangs',
    'Big_Lips',
    'Big_Nose',
    'Black_Hair',
    'Blond_Hair',
    'Blurry',
    'Brown_Hair',
    'Bushy_Eyebrows',
    'Chubby',
    'Double_Chin',
    'Eyeglasses',
    'Goatee',
    'Gray_Hair',
    'Heavy_Makeup',
    'High_Cheekbones',
    'Male',
    'Mouth_Slightly_Open',
    'Mustache',
    'Narrow_Eyes',
    'No_Beard',
    'Oval_Face',
    'Pale_Skin',
    'Pointy_Nose',
    'Receding_Hairline',
    'Rosy_Cheeks',
    'Sideburns',
    'Smiling',
    'Straight_Hair',
    'Wavy_Hair',
    'Wearing_Earrings',
    'Wearing_Hat',
    'Wearing_Lipstick',
    'Wearing_Necklace',
    'Wearing_Necktie',
    'Young']

In [None]:
def _parse_function(example_proto):
    feature_description= {

              'attributes/5_o_Clock_Shadow': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Arched_Eyebrows': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Attractive': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Bags_Under_Eyes': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Bald': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Bangs': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Big_Lips': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Big_Nose': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Black_Hair': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Blond_Hair': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Blurry': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Brown_Hair': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Bushy_Eyebrows': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Chubby': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Double_Chin': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Eyeglasses': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Goatee': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Gray_Hair': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Heavy_Makeup': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/High_Cheekbones': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Male': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Mouth_Slightly_Open': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Mustache': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Narrow_Eyes': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/No_Beard': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Oval_Face': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Pale_Skin': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Pointy_Nose': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Receding_Hairline': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Rosy_Cheeks': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Sideburns': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Smiling': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Straight_Hair': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Wavy_Hair': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Wearing_Earrings': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Wearing_Hat': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Wearing_Lipstick': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Wearing_Necklace': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Wearing_Necktie': tf.io.FixedLenFeature((1,), tf.int64),
              'attributes/Young': tf.io.FixedLenFeature((1,), tf.int64),           
              "image": tf.io.FixedLenFeature((), tf.string)
    }
            
    data = tf.io.parse_single_example(example_proto, feature_description)
    image = data['image']
    image = tf.io.decode_jpeg(image, channels=3)

    attributes = {name: data[f'attributes/{name}']for name in ATTRIBUTES}

    return {'image': image, 'attributes': attributes}

celeba_train = celeba_train.map(_parse_function, num_parallel_calls=tf.data.AUTOTUNE)
celeba_val = celeba_val.map(_parse_function)
celeba_test = celeba_test.map(_parse_function)

In [None]:
for e in celeba_train.take(1):
    pass
    plt.imshow(e['image'].numpy())
    tf.print(e['attributes'])

In [None]:
def process(e):
    image = e['image']
    image = tf.image.resize_with_pad(image, 128, 128)
    image = tf.cast(image, tf.float32) / 255.0
    
    atts = []
    for c in ATTRIBUTES:
        atts.append(tf.cast(e['attributes'][c], tf.float32))
    atts = tf.stack(atts)
    
    return image, atts

ds_train = celeba_train.map(process).shuffle(10000).batch(64, drop_remainder=True)
ds_val = celeba_val.map(process).batch(64, drop_remainder=True)
ds_test = celeba_test.map(process).batch(64, drop_remainder=True)

In [None]:
class FaceAttributeModel(tf.keras.Model):
    def __init__(self, dropout_rate=0.0, units=32, backbone="vgg16", n_layers=1):
        super().__init__()
        
        self.backbone = backbone
        self.n_layers = n_layers
        self.units = units
        self.dropout_rate = dropout_rate
        
        if self.backbone == "vgg16":
          self.vgg = tf.keras.applications.VGG16(weights='imagenet',
                  include_top=False,
                  input_shape=(128, 128, 3),
                  pooling='avg')
          

          self.vgg.trainable=False
          

          self.base = tf.keras.models.Sequential()
          self.base.add(self.vgg)
          self.base.add(tf.keras.layers.Dropout(dropout_rate))




        if self.backbone == "resnet50":
          self.resnet = tf.keras.applications.ResNet50V2(
                  weights='imagenet',
                  include_top=False,
                  input_shape=(128, 128, 3),
                  pooling='avg')

        
          self.resnet.trainable=False
          

          self.base = tf.keras.models.Sequential()
          self.base.add(self.resnet)
          self.base.add(tf.keras.layers.Dropout(dropout_rate))

        

        #fully connected layer
        self.dnn = tf.keras.models.Sequential()

        for l in range(n_layers):
          layer = tf.keras.models.Sequential()
          layer.add(tf.keras.layers.Dense(units))
          layer.add(tf.keras.layers.Activation('relu'))
          self.dnn.add(layer)

        # binary classification output layer
        self.dnn.add(tf.keras.layers.Dense(len(ATTRIBUTES)))
        self.dnn.add(tf.keras.layers.Activation('sigmoid'))

        

        
    def call(self, inputs, training=False):
        run = inputs
       
        run = self.base(run, training=False)
        run = self.dnn(run)

        return run

    def model(self):
        img = Input(shape=(128,128,3))

        return Model(inputs=img, outputs=self.call(img))

#Hyperparameter Tuning

In [None]:
def build_model(hp):
  hp_units = hp.Choice('units', values=[32, 64])
  hp_dropout_rate = hp.Choice('dropout_rate', values = [0.0, 0.2, 0.3])
  hp_backbone = hp.Choice('backbone', values = ['vgg16', 'resnet50'])
  hp_layers = hp.Choice('layers', values = [1, 2])

  model = FaceAttributeModel(dropout_rate=hp_dropout_rate, units=hp_units, backbone=hp_backbone, n_layers=hp_layers)

  # Tune the learning rate for the optimizer
  hp_learning_rate = hp.Choice('learning_rate', values=[0.001, 0.0005, 0.0001, 0.00005])

  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=hp_learning_rate),
                loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
                metrics=[tf.keras.metrics.Recall(thresholds=0.5), 
                       tf.keras.metrics.Precision(thresholds=0),
                       tf.keras.metrics.BinaryAccuracy()])

  return model

In [None]:
# Prepare a directory to store all the checkpoints.
tuner_dir = "/content/drive/MyDrive/tiktok_model/tuner_attributes"

if not os.path.exists(tuner_dir):
    os.makedirs(tuner_dir)

tuner = kt.Hyperband(build_model,
                     objective='val_loss',
                     max_epochs=80,
                     factor=3,
                     overwrite=True,
                     directory=tuner_dir,
                     max_consecutive_failed_trials=10,
                     project_name='celeba_binary_classification')

In [None]:
#early stopping
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

In [None]:
#perform hyperparmeter search
tuner.search(x=ds_train, validation_data=ds_val, epochs=40, callbacks=[stop_early])

#Train Model

In [None]:
#init with optimal hyperparameters
model = FaceAttributeModel(dropout_rate=0.0, units=64, backbone="vgg16", n_layers=1)

In [None]:
#plot model structure
tf.keras.utils.plot_model(
    model.model(),
    show_shapes=True,
    show_dtype=True,
    show_layer_names=False,
    rankdir="TB",
    expand_nested=True,
    dpi=200,
    layer_range=None,
    show_layer_activations=True,
)

In [None]:
#compile model
model.compile(optimizer=tf.keras.optimizers.Adam(0.0001),
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
              metrics=[tf.keras.metrics.Recall(), 
                       tf.keras.metrics.Precision(),
                       tf.keras.metrics.BinaryAccuracy()])

In [None]:
# Prepare a directory to store all the checkpoints.
checkpoint_dir = "/content/drive/MyDrive/Celeb_A_vgg/ckpt"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        # Path where to save the model
        # The two parameters below mean that we will overwrite
        # the current checkpoint if and only if
        # the `val_loss` score has improved.
        # The saved model name will include the current epoch.
        filepath="/content/drive/MyDrive/Celeb_A_vgg/ckpt/CelebaModel_vgg{epoch}",
        save_best_only=True,  # Only save a model if `val_loss` has improved.
        monitor="val_loss",
        verbose=1
    )

# Prepare a directory to store all the logs.
log_dir = "/content/drive/MyDrive/Celeb_A/logs"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)
  
# Prepare a directory to store plots.
plot_dir = "/content/drive/MyDrive/Celeb_A/plots"
if not os.path.exists(plot_dir):
    os.makedirs(plot_dir)

In [None]:
history = model.fit(ds_train, epochs=60, callbacks=[model_checkpoint_callback], validation_data=ds_val)

In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']

In [None]:
# convert the history.history dict to a pandas DataFrame:     
hist_df = pd.DataFrame(history.history) 

# save to json:  
hist_json_file = '/content/drive/MyDrive/Celeb_A/logs/history_resnet.json' 
with open(hist_json_file, mode='w') as f:
    hist_df.to_json(f)

In [None]:
#plot model loss
plt.figure(figsize=(14,5))
plt.plot(loss)
plt.plot(val_loss)
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Training', 'Validation'], loc='upper right')
plt.savefig("train_val_loss_vgg", dpi=400, bbox_inches="tight")

plt.show()

In [None]:
recall = history.history['val_recall']
precision = history.history['val_precision']
binary_accuracy = history.history['val_binary_accuracy']

In [None]:
plt.figure(figsize=(14,5))
plt.plot(recall, label='Recall')
plt.plot(precision, label='Precision')
plt.plot(binary_accuracy, label='Binary Accuracy')
plt.legend(loc='best')
plt.ylabel('Value')
#plt.ylim(0, 1)
plt.xlabel('Epoch')
plt.show()

In [None]:
#alterantively load model from file 
model = keras.models.load_model('/content/drive/MyDrive/Celeb_A_vgg/ckpt/CelebaModel_vgg59')

###Evaluation

In [None]:
#Evaluation on test set
labels = []
preds = []
for e, l in ds_test:
    labels.extend(l.numpy())
    preds.extend(model.predict(e, verbose=False))


labels = np.array(labels)
preds = np.array(preds)

In [None]:
#Compute Area Under the Receiver Operating Characteristic Curve (ROC AUC) from prediction scores
for i, a in enumerate(ATTRIBUTES):
    print('%s: %.5f' % (a, roc_auc_score(labels[:, i], preds[:, i])))

In [None]:
labels = labels[:, :, 0]

In [None]:
#F1-scores
for i, a in enumerate(ATTRIBUTES):
    print('%s: %.5f' % (a, f1_score(labels[:, i], preds[:, i])))

#Predict attributes on our tiktok dataset

In [None]:
def _parse_function(example_proto):
    feature_description = {
        'image': tf.io.FixedLenFeature((), tf.string),
        'user': tf.io.FixedLenFeature((), tf.string),
        'label': tf.io.FixedLenFeature((), tf.float32),

    }

    data = tf.io.parse_single_example(example_proto, feature_description)
    image = data['image']
    image = tf.io.decode_jpeg(image, channels=3)
    image = tf.image.resize_with_pad(image, 128, 128) #resize images to fit model input
    image = tf.cast(image, tf.float32) / 255.0 # Normalize the values of the image from the range [0, 255] 

    return {'image': image, 'user': data['user']}, data['label']

In [None]:
# Dataset, 32 face per channel, random-shuffled.
tiktok_dataset = tf.data.Dataset.from_tensor_slices(['/content/drive/MyDrive/tiktok_model/tiktok_dataset.tfrecord'])
tiktok_dataset = tiktok_dataset.flat_map(lambda filename: tf.data.TFRecordDataset(filename))
tiktok_dataset = tiktok_dataset.map(_parse_function, num_parallel_calls=tf.data.AUTOTUNE)
tiktok_dataset = tiktok_dataset.shuffle(1234).batch(64, drop_remainder=True)

In [None]:
#example
labels = []
preds = []
images =[]

for e, l in tiktok_dataset.take(1):
    labels.extend(l.numpy())
    preds.extend(model.predict(e['image'], verbose=False))
    images.extend(e['image'].numpy())
    

In [None]:
plt.imshow(images[1])
z = dict(zip(ATTRIBUTES, preds[1].numpy()))
print(z)

In [None]:
#Predict tikok dataset attributes
labels = []
preds = []
users = []

for e, l in tiktok_dataset:
    labels.extend(l.numpy())
    users.extend(e['user'].numpy())
    preds.extend(model.predict(e['image'], verbose=False))

In [None]:
#convert to array
labels = np.array(labels)
preds = np.array(preds)
preds_binary = np.array(preds_binary)
users = np.array(users)

#create dataframes
df = pd.DataFrame(
    {'username': users,
     'engagement': labels
     }
     )


In [None]:
df[ATTRIBUTES] = pd.DataFrame(preds, index=df.index)

# Regression Descision Trees

In [None]:
X, y = df[ATTRIBUTES], df['engagement']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=1)

In [None]:
X_train_split, X_eval_split, y_train_split, y_eval_split = train_test_split(X_train, y_train, test_size=0.2, random_state=22)

####LightGBM

In [None]:
model = lgb.LGBMRegressor()

In [None]:
model.fit(X_train_split, y_train_split, eval_set=(X_eval_split, y_eval_split), verbose=False, categorical_feature=ATTRIBUTES)

In [None]:
preds = model.predict(X_test)

In [None]:
print("MAE: ", mean_absolute_error(y_test, preds))
print("MAPE: ", mean_absolute_percentage_error(y_test, preds))
print("MSE: ", mean_squared_error(y_test, preds))
print("RMSE: ", mean_squared_error(y_test, preds, squared=False))

#calculate Pearson correlation
print("Pearson :", np.corrcoef(preds, y_test)[0][1])

#Spearman correlation
res, p_value = stats.spearmanr(preds, y_test)
print("Spearman :", res, p_value)


#adjusted r2
n_predictors = 40
print("Adj. R2: ",adjusted_r2(y_test, preds, n_predictors))

In [None]:
sns.jointplot(x=y_test, y=preds, alpha=0.3, s=3)
plt.xlabel('Ground Truth [%]')
plt.ylabel('Predictions [%]')

####CatBoost

In [None]:
catboostmodel = CatBoostRegressor()

In [None]:
catboostmodel.fit(X_train_split, y_train_split, eval_set=(X_eval_split, y_eval_split), use_best_model=True, verbose=False, cat_features=ATTRIBUTES)

In [None]:
preds = catboostmodel.predict(X_test)

In [None]:
print("MAE: ", mean_absolute_error(y_test, preds))
print("MAPE: ", mean_absolute_percentage_error(y_test, preds))
print("MSE: ", mean_squared_error(y_test, preds))
print("RMSE: ", mean_squared_error(y_test, preds, squared=False))

#calculate pearson correlation
print("Pearson :", np.corrcoef(preds, y_test)[0][1])

#Spearman correlation
res, p_value = stats.spearmanr(preds, y_test)
print("Spearman :", res, p_value)


#adjusted r2
n_predictors = 40
print("Adj. R2: ",adjusted_r2(y_test, preds, n_predictors))

In [None]:
sns.jointplot(x=y_test, y=preds, alpha=0.3, s=3)
plt.xlabel('Ground Truth [%]')
plt.ylabel('Predictions [%]')

#####SHAP analysis

In [None]:
explainer = shap.Explainer(catboostmodel)
shap_values = explainer(pd.DataFrame(X_test))

In [None]:
shap.plots.bar(shap_values, max_display=11)

In [None]:
shap.plots.beeswarm(shap_values, max_display=40, color=newCmap)