In [None]:
%tensorflow_version 2.x
import numpy as np
import pandas as pd
from pathlib import Path

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, Add, Dense, Input, LSTM
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense, Lambda, TimeDistributed, RepeatVector, BatchNormalization, Reshape
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import applications
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint,  CSVLogger, EarlyStopping

In [None]:
K.set_image_data_format('channels_first')
model_counter = 0

In [None]:
%%capture
# Delete old runs and download data
!rm *.hdf5
!wget -O dataset.zip https://dl.dropboxusercontent.com/s/lc9es5lce77bl5l/SCUT-FBP5500_v2.1.zip?dl=0
!wget -O contestants.zip https://dl.dropboxusercontent.com/s/dbw1tpt8f4mkass/contestants.zip?dl=0
!unzip dataset.zip
!unzip contestants.zip

In [None]:
!ls

In [None]:
# Create a dataframe of the training/testing data
df = pd.read_excel('SCUT-FBP5500_v2/All_Ratings.xlsx')
ratings = df[df['Filename'].str.contains('F')].groupby('Filename').mean()['Rating']
train_df = ratings.sort_values().reset_index()
train_df['Filename'] = 'SCUT-FBP5500_v2/Images/' + train_df['Filename'] 
train_df.head()

In [None]:
# Create a dataframe with the contestant images

path = Path('contestants/')
images = path.iterdir()

contestants_df = pd.DataFrame({'Filename': [str(image) for image in images]})
contestants_df['name'] = contestants_df['Filename'].str.split('/', expand=True)[1].str.split('_', expand=True)[0].str.split('.', expand=True)[0]
contestants_df['Rating'] = 0
contestants_df.head()

In [None]:
# Get data generators for training, validation and "testing" sets
BATCH_SIZE = 32
IMAGE_SIZE = (299, 299)

train_datagen = ImageDataGenerator(
    data_format='channels_first', rescale=1./255, validation_split=.1,
    
    # Modify images for training
    rotation_range=20,
    width_shift_range=.2,
    height_shift_range=.2,
    shear_range=0.1,
    zoom_range=0.2,
    horizontal_flip=True,
)

def get_train_generator(df, subset):
    return train_datagen.flow_from_dataframe(
        df, '.', x_col='Filename', y_col='Rating',
        target_size=IMAGE_SIZE, batch_size=BATCH_SIZE,
        class_mode='other',
        drop_duplicates=False,
        subset=subset
    )

train_generator = get_train_generator(train_df, 'training')
validation_generator = get_train_generator(train_df, 'validation')

test_datagen = ImageDataGenerator(data_format='channels_first', rescale=1./255)

test_generator = test_datagen.flow_from_dataframe(
    contestants_df, None, x_col='Filename',
    target_size=IMAGE_SIZE, batch_size=BATCH_SIZE,
    y_col='Rating',
    class_mode='other',
    shuffle=False
)

In [None]:
# Return a custom CNN model
def get_custom_model():
  input_layer = Input(shape=(3, ) + IMAGE_SIZE, name='input_layer')

  conv_out = input_layer

  conv_out = Conv2D(8, (3, 3), data_format='channels_first', activation='relu')(conv_out)
  conv_out = MaxPooling2D(pool_size=(2, 2), data_format='channels_first')(conv_out)

  conv_out = Conv2D(16, (3, 3), data_format='channels_first', activation='relu')(conv_out)
  conv_out = MaxPooling2D(pool_size=(2, 2), data_format='channels_first')(conv_out)

  conv_out = Conv2D(32, (3, 3), data_format='channels_first', activation='relu')(conv_out)
  conv_out = MaxPooling2D(pool_size=(2, 2), data_format='channels_first')(conv_out)

  conv_out = Conv2D(64, (3, 3), data_format='channels_first', activation='relu')(conv_out)
  conv_out = MaxPooling2D(pool_size=(2, 2), data_format='channels_first')(conv_out)

  conv_out = Conv2D(128, (3, 3), data_format='channels_first', activation='relu')(conv_out)
  conv_out = MaxPooling2D(pool_size=(2, 2), data_format='channels_first')(conv_out)

  conv_out = Conv2D(256, (3, 3), data_format='channels_first', activation='relu')(conv_out)
  conv_out = MaxPooling2D(pool_size=(2, 2), data_format='channels_first')(conv_out)

  dense_out = Flatten()(conv_out)
  dense_out = Dense(32, activation='relu')(dense_out)
  dense_out = Dropout(.5)(dense_out)
  dense_out = Dense(1, activation='relu')(dense_out)

  model = Model(input_layer, dense_out, name="custom")

  model.compile(loss='mean_squared_error',
                optimizer='adam', metrics=['mae'])
  
  return model

# Return a model using the Inception Resnet v2 architecture
def get_inception_resnet():
  model = applications.inception_resnet_v2.InceptionResNetV2(include_top=False, input_shape=(3, ) + IMAGE_SIZE, weights='imagenet', pooling='avg')
  dense_out = model.output
  dense_out = Dense(1, activation='relu')(dense_out)
  model = Model(model.input, dense_out, name=model.name)
  model.compile(loss='mean_squared_error',
              optimizer='adam', metrics=['mae'])
  return model

# Return a model using the Inception V3 architecture
def get_inception_v3():
  model = applications.inception_v3.InceptionV3(include_top=False, input_shape=(3, ) + IMAGE_SIZE, weights='imagenet', pooling='avg')
  dense_out = model.output
  dense_out = Dense(1, activation='relu')(dense_out)
  model = Model(model.input, dense_out, name=model.name)
  model.compile(loss='mean_squared_error',
              optimizer='adam', metrics=['mae'])
  return model

In [None]:
# Train each of the models in turn then use the best weights to rate the
# images of the contestants
for get_model in [get_custom_model, get_inception_resnet, get_inception_v3]:
  model = get_model()

  run_name = model.name + str(model_counter)
  model_counter += 1

  filepath = str("%s-{epoch:02d}-{loss:.2f}.hdf5" % run_name)
  checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
  csv_logger = CSVLogger(str('%s_training_log.csv' % run_name), append=True, separator=',')
  early_stopping = EarlyStopping(monitor='val_loss', mode='min', patience=20)

  callbacks_list = [checkpoint, csv_logger, early_stopping]

  model.fit(
    train_generator,
    epochs=200,
    steps_per_epoch = train_generator.n // BATCH_SIZE,

    validation_data=validation_generator,
    validation_steps=validation_generator.n // BATCH_SIZE,

    callbacks=callbacks_list
  )

  # Load back the best weights
  model.load_weights(str(sorted(Path('.').glob('%s*.hdf5' % run_name))[-1]))

  # Run the model on the contestants' images
  contestants_df[model.name] = model.predict_generator(
    test_generator, 
    verbose=0, 
    steps=int(np.ceil(test_generator.n / test_generator.batch_size))
  )

In [None]:
# Get the average score for each person and model, and average them out for a final Rating
ratings_df = contestants_df.groupby('name').mean()[['inception_v3', 'inception_resnet_v2', 'custom']]
ratings_df['Rating'] = ratings_df.mean(axis=1)
ratings_df.sort_values('Rating', ascending=False)