# Information
Title: Artificial Intelligence in Insurance Claims Management - Computer vision for car damage recognition 

Author: Roman Kastl

# Model Selection
## Module 1 - vF
- Use transfer learning to classify images of damaged vs. non-damaged car
- VGG16
- Xception
- Resnet50V2
- EfficientNetB0

# Build, train, and save models

In [None]:
!git clone https://github.com/djehuty94/MasterThesis_CarDamageRecognition
!pip install -U tensorflow-addons

## Preparation

### Import the required libraries

In [None]:
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
import os
import pathlib
import PIL
import random
import json
import datetime

#Only if we use colab
#from google.colab import files
from tensorflow.python.client import device_lib
import torch

#Import tensorboard plugins
import tensorboard as tb
from scipy import stats
from tensorboard.plugins.hparams import api as hp
from packaging import version
import seaborn as sns

#Library for model scheme
import pydot
import graphviz
import pydotplus

#Require tensorflow >= 2.3
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D
import tensorflow_addons as tfa
print(tf.__version__)

In [None]:
# Name variables for output file
model_version = "vF"
step_name ="Model Selection"
module_name = "Module1"
save_path = ""

# Dummy variable for 
save_shape_plot = True
save_training_plot = True 
save_model = True
save_history = True

### Set seed value

In [None]:
# Set the Random Seed
seed_value= 123

os.environ['PYTHONHASHSEED']=str(seed_value)
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)

### *Colab*: Check that model will run on GPU 

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
  print('and then re-execute this cell.')
else:
  print(gpu_info)

In [None]:
torch.cuda.is_available()
device_lib.list_local_devices()

## Part 1 - Data Preprocessing


### - Recover data

In [None]:
data_dir = pathlib.Path("MasterThesis_CarDamageRecognition/Data/Dataset_v1_bin_damage")

In [None]:
#count the total number of image
image_count = len(list(data_dir.glob('*/*/')))
print(image_count)

### Pre-process the images and prepare the training, validation and test set
This part split the dataset in a training (80%) and test dataset (20%)

In [None]:
#Define the image and batch size
batch_size = 32
img_size = (224,224)

In [None]:
# Generate a training and validation dataset, in the next cell, a test dataset is generated from the validation dataset
# I use a categorical labelling in order for the code to be easily used with multiple classes 
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
  data_dir,
  validation_split=0.2,
  subset="training",
  seed=123,
  label_mode='categorical',
  image_size= img_size ,
  shuffle=True,
  batch_size=batch_size)
  
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
  data_dir,
  validation_split=0.2,
  subset="validation",
  seed=123,
  label_mode='categorical',
  image_size=img_size,
  shuffle=True,
  batch_size=batch_size)

In [None]:
#A test dataset is built from the training dataset
val_batches = tf.data.experimental.cardinality(val_ds)
test_ds = val_ds.take(val_batches // 5)
val_ds = val_ds.skip(val_batches // 5)

print('Number of validation batches: %d' % tf.data.experimental.cardinality(val_ds))
print('Number of test batches: %d' % tf.data.experimental.cardinality(test_ds))

In [None]:
#The class names and number of classes in separate variables
target_dict={k: v for v, k in enumerate(np.unique(train_ds.class_names))}
class_names = np.array(train_ds.class_names)
num_classes = len(class_names)
print(class_names)
print(num_classes)

### Display a few images of each class

In [None]:
plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(class_names[np.argmax(labels[i])])
        plt.axis("off")

In [None]:
#Print the format of the dataset
for image_batch, labels_batch in train_ds:
  print(image_batch.shape)
  print(labels_batch.shape)
  break
#The batch size is 32, the image dimensions are 224x224 and three dimensionals (32, 224, 224, 3)

### Configure dataset for performance

In [None]:
#Optimize the datasets for performances
AUTOTUNE = tf.data.experimental.AUTOTUNE
train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
test_ds = test_ds.prefetch(buffer_size=AUTOTUNE)

## Part 2 - Build the models


### Data augmentation

In this code data augmentation is achieved through layers which are added to the model

In [None]:
#Create a layer for data augmentation
data_augmentation = keras.Sequential(
  [
    layers.experimental.preprocessing.RandomFlip("horizontal"),
    layers.experimental.preprocessing.RandomRotation(0.15),
    layers.experimental.preprocessing.RandomZoom(0.1),
  ]
)

In [None]:
#Display an image of an image with the augmentation filter applied
plt.figure(figsize=(10, 10))
for images, _ in train_ds.take(1):
  for i in range(9):
    augmented_images = data_augmentation(images)
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(augmented_images[0].numpy().astype("uint8"))
    plt.axis("off")

### Build the Models
- Initialize
- Pass Data augmentation
- Rescale /255, /127.5 and 0;1 or -1;1 depending on the model transferred
- Add a GlobalMaxPooling2D, BatchNormalization, and Dropout on top of the transferred model
- Output layer - Sigmoid function (Binary output)

#### VGG16

In [None]:
from tensorflow.keras.applications import VGG16

#Make the VGG model
def make_model_VGG16(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    x = data_augmentation(inputs)

    preprocess_input_VGG16 = keras.applications.vgg16.preprocess_input
    x = preprocess_input_VGG16(x)
    
    #We do not want the base to be trainable, otherwise we would lose all the advantagres of using pre-trained model
    conv_base_VGG16 = VGG16(weights='imagenet',include_top=False,input_shape=input_shape)
    conv_base_VGG16.trainable = False
    conv_base_VGG16.summary()

    x = conv_base_VGG16(x, training=False)

    #Rebuild top
    #Convert features of shape `base_model.output_shape[1:]` to vectors
    x = keras.layers.GlobalMaxPooling2D(name="top_GlobalMaxPooling2D")(x)
    x = layers.BatchNormalization(name="topBatchNorm")(x)

    dropout_rate = 0.5
    x = layers.Dropout(dropout_rate, name="top_dropout")(x)    

    activation = "softmax"
    units = num_classes

    outputs = layers.Dense(units, activation=activation, name="pred")(x)
 
    return keras.Model(inputs, outputs, name="VGG16_model")

#### Xception

In [None]:
from tensorflow.keras.applications import Xception

#Make the Xception model
def make_model_Xception(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    x = data_augmentation(inputs)

    preprocess_input_Xception = keras.applications.xception.preprocess_input
    x = preprocess_input_Xception(x)
    
    #We do not want the base to be trainable, otherwise we would lose all the advantagres of using pre-trained model
    conv_base_Xception = Xception(weights='imagenet',include_top=False,input_shape=input_shape)
    conv_base_Xception.trainable = False
    conv_base_Xception.summary()

    x = conv_base_Xception(x, training=False)
    
    #Rebuild top
    #Convert features of shape `base_model.output_shape[1:]` to vectors
    x = keras.layers.GlobalMaxPooling2D(name="top_GlobalMaxPooling2D")(x)
    x = layers.BatchNormalization(name="topBatchNorm")(x)

    dropout_rate = 0.5
    x = layers.Dropout(dropout_rate, name="top_dropout")(x)    

    activation = "softmax"
    units = num_classes

    outputs = layers.Dense(units, activation=activation, name="pred")(x)

    return keras.Model(inputs, outputs, name="Xception")

#### Resnet50V2

In [None]:
from tensorflow.keras.applications import ResNet50V2

#Make the Resnet50V2 model
def make_model_Resnet50V2(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    x = data_augmentation(inputs)

    preprocess_input_Resnet50V2 = keras.applications.resnet_v2.preprocess_input
    x = preprocess_input_Resnet50V2(x)
    
    #We do not want the base to be trainable, otherwise we would lose all the advantagres of using pre-trained model
    conv_base_Resnet50V2  = ResNet50V2(weights='imagenet',include_top=False,input_shape=input_shape)
    conv_base_Resnet50V2.trainable = False
    conv_base_Resnet50V2.summary()

    x = conv_base_Resnet50V2(x,training=False)
    
    #Rebuild top
    #Convert features of shape `base_model.output_shape[1:]` to vectors
    x = keras.layers.GlobalMaxPooling2D(name="top_GlobalMaxPooling2D")(x)
    x = layers.BatchNormalization(name="topBatchNorm")(x)

    dropout_rate = 0.5
    x = layers.Dropout(dropout_rate, name="top_dropout")(x)    

    activation = "softmax"
    units = num_classes

    outputs = layers.Dense(units, activation=activation, name="pred")(x)

    return keras.Model(inputs, outputs, name="Resnet50V2")

#### EfficientNetB0

In [None]:
from tensorflow.keras.applications import EfficientNetB0

#Make the EfficientNetB0 model
def make_model_EfficientNetB0(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)
    x = data_augmentation(inputs)

    preprocess_input_EfficientNetB0 = keras.applications.efficientnet.preprocess_input
    x = preprocess_input_EfficientNetB0(x)
    
    #We do not want the base to be trainable, otherwise we would lose all the advantagres of using pre-trained model
    conv_base_EfficientNetB0  = EfficientNetB0(weights='imagenet',include_top=False,input_tensor=x)
    conv_base_EfficientNetB0.trainable = False
    conv_base_EfficientNetB0.summary()

    x = conv_base_EfficientNetB0(x, training=False)
    
    #Rebuild top
    #Convert features of shape `base_model.output_shape[1:]` to vectors
    x = keras.layers.GlobalMaxPooling2D(name="top_GlobalMaxPooling2D")(x)
    x = layers.BatchNormalization(name="topBatchNorm")(x)

    dropout_rate = 0.5
    x = layers.Dropout(dropout_rate, name="top_dropout")(x)    

    activation = "softmax"
    units = num_classes

    outputs = layers.Dense(units, activation=activation, name="pred")(x)
    
    return keras.Model(inputs, outputs, name="EfficientNetB0")

## Part 3 - Compile and Train the CNN - FUNCTION



### Compile the CNN

In [None]:
#Define a function for the compilation of the models 
#The metrics used vary between a classification with two classes and one with more than two classes

def compile_model(model_to_compile,model_arch_name):
    if num_classes == 2:
      loss_func = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
      METRICS = [
                 'accuracy',
                 keras.metrics.Precision(name='precision'),
                 keras.metrics.Recall(name='recall'),
                 tfa.metrics.F1Score(name='f1-score',average='macro',num_classes=num_classes,threshold=0.5)
                 ]
    else:
      loss_func = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
      METRICS = [
                 'accuracy',
                 keras.metrics.Precision(name='precision'),
                 keras.metrics.Recall(name='recall'),
                 tfa.metrics.F1Score(name='f1-score',average='macro',num_classes=num_classes,threshold=0.5),
                 tfa.metrics.F1Score(name='f1-score_perClass',num_classes=num_classes,threshold=0.5)
                 ]
    
    #Set the intiial learning rate at 0.001
    opt = Adam(lr=1e-3)
    
    model_to_compile.compile(optimizer=opt,
              loss=loss_func,
              metrics=METRICS)
    
    #Save the model plot if required to
    if (save_shape_plot == True):
      keras.utils.plot_model(model_to_compile,show_shapes=True,to_file=save_path+model_arch_name+"_plot.png")
    return(model_to_compile)

### Training the CNN on the Training set and evaluating it on the Validation set

Callback: 

- EarlyStopping: Stop training the model if loss on validation set has not improved over 3 iterations
- ModelCheckpoint: Save the best model based on the validation accuracy
- WIP - TensorBoard: Generate the data for a tensorboard 

In [None]:
epochs = 3
#Set to 30 because we do not want early stopping because of the Visual representations, but we still use early stopping in order to restore the best weights
patience = epochs
verbose = 1

In [None]:
#Define a function to build the callbacks
def build_callback(model_name):
    callbacks_list = [
     keras.callbacks.EarlyStopping(
         monitor="val_loss",
         patience=patience,
         verbose=verbose,
         mode="auto",
         restore_best_weights=True,
     ),
      keras.callbacks.TensorBoard(
          log_dir="logs/fit/"+model_name,
          histogram_freq=1,
          embeddings_freq=1,
          )]
    if save_model == True:
      {
        callbacks_list.append(
         keras.callbacks.ModelCheckpoint(
         filepath=save_path+""+model_name+".h5",
         monitor="val_loss",
         save_best_only=True,
     ))
      }

    return(callbacks_list)

In [None]:
#Define a function to train the model 
def train_model(model_to_train,callbacks):
    model_to_train
    history_model_to_train = model_to_train.fit(
        train_ds,
        validation_data=val_ds,
        epochs=epochs,
        callbacks=callbacks
    )
    return(model_to_train,history_model_to_train)

### Build chart to evaluate the model training process

In [None]:
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']

def plot_metrics(history, model_name):
  metrics =  ['loss', 'accuracy']
  for n, metric in enumerate(metrics):
    name = metric.replace("_"," ").capitalize()
    plt.subplot(2,2,n+1)
    plt.plot(history[metric], color=colors[0], label='Train')
    plt.plot(history['val_'+metric],
             color=colors[0], linestyle="--", label='Val')
    plt.xlabel('Epoch')
    plt.ylabel(name)
    if metric == 'loss':
      plt.ylim([0, plt.ylim()[1]])
    else:
      plt.ylim([0,1])

    plt.legend()
    fig = plt.gcf()
    if(save_training_plot == True):
      fig.savefig(save_path+model_name+"_training_plot.png", dpi=300)

### JSON export

In [None]:
def json_export(name, data):
  df = pd.DataFrame.from_dict(data)
  csv_path = save_path+module_name+"_"+model_version+"_history_df_"+name+".csv"
  df.to_csv(csv_path, index=False)

## Part 4 - Build the desired model
Can only run one of the below, must restart the session in between the compilation of two model. The goal is to uniform the tests

### VGG16

In [None]:
keras.backend.clear_session()
model_VGG16 = make_model_VGG16(img_size+(3,),num_classes)
model_VGG16 = compile_model(model_VGG16,"VGG16")
model_VGG16.summary()

In [None]:
callbacks_VGG16 = build_callback("VGG16")
model_VGG16, history_model_VGG16 = train_model(model_VGG16, callbacks_VGG16)
# Save it under the form of a json file
if save_history == True:
  json_export("VGG16", history_model_VGG16.history)

In [None]:
mpl.rcParams['figure.figsize'] = (12, 10)
plot_metrics(history_model_VGG16.history,"VGG16")

### Xception

In [None]:
keras.backend.clear_session()
model_Xception = make_model_Xception(img_size+(3,),num_classes)
model_Xception = compile_model(model_Xception,"Xception")
model_Xception.summary()

In [None]:
callbacks_Xception = build_callback("Xception")
model_Xception, history_model_Xception = train_model(model_Xception, callbacks_Xception)
# Save it under the form of a json file
if save_history == True:
  json_export("Xception", history_model_Xception.history)

In [None]:
mpl.rcParams['figure.figsize'] = (12, 10)
plot_metrics(history_model_Xception.history,"Xception")

### Resnet50V2

In [None]:
keras.backend.clear_session()
model_Resnet50V2 = make_model_Resnet50V2(img_size+(3,),num_classes)
model_Resnet50V2 = compile_model(model_Resnet50V2,"Resnet50V2")
model_Resnet50V2.summary()

In [None]:
callbacks_Resnet50V2 = build_callback("Resnet50V2")
model_Resnet50V2, history_model_Resnet50V2 = train_model(model_Resnet50V2, callbacks_Resnet50V2)
# Save it under the form of a json file
if save_history == True:
  json_export("Resnet50V2", history_model_Resnet50V2.history)

In [None]:
mpl.rcParams['figure.figsize'] = (12, 10)
plot_metrics(history_model_Resnet50V2.history,"Resnet50V2")

### EfficientNetB0

In [None]:
keras.backend.clear_session()
model_EfficientNetB0 = make_model_EfficientNetB0(img_size+(3,),num_classes)
model_EfficientNetB0 = compile_model(model_EfficientNetB0,"EfficientNetB0")
model_EfficientNetB0.summary()

In [None]:
callbacks_EfficientNetB0 = build_callback("EfficientNetB0")
model_EfficientNetB0, history_model_EfficientNetB0 = train_model(model_EfficientNetB0, callbacks_EfficientNetB0)
# Save it under the form of a json file
if save_history == True:
  json_export("EfficientNetB0", history_model_EfficientNetB0.history)

In [None]:
mpl.rcParams['figure.figsize'] = (12, 10)
plot_metrics(history_model_EfficientNetB0.history,"EfficientNetB0")

# Tensorboard

Below is the code to upload the training logs to Tensorboard


In [None]:
#Update to Tensorboard
!tensorboard dev upload --logdir ./logs \
  --name "Module 1 - Model Selection - vF" \
  --description "GlobalMax Pooling, Batch Normalization, Dropout = 0.5" \
  --one_shot

## Access Tensorboard and build chart

In [None]:
experiment_id = "ID RETURNED ABOVE"
experiment = tb.data.experimental.ExperimentFromDev(experiment_id)
df = experiment.get_scalars()
print(df["run"].unique())
print(df["tag"].unique())

In [None]:
dfw = experiment.get_scalars(pivot=True) 
dfw

In [None]:
csv_path = save_path+module_name+"_"+model_version+"_experiment.csv"
dfw.to_csv(csv_path, index=False)
dfw_roundtrip = pd.read_csv(csv_path)
pd.testing.assert_frame_equal(dfw_roundtrip, dfw)

In [None]:
# Filter the DataFrame to only validation data, which is what the subsequent
# analyses and visualization will be focused on.
dfw_validation = dfw[dfw.run.str.endswith("/validation")]
# Get the optimizer value for each row of the validation DataFrame.
optimizer_validation = dfw_validation.run.apply(lambda run: run.split(",")[0])

plt.figure(figsize=(16, 6))
plt.subplot(1, 2, 1)
sns.lineplot(data=dfw_validation, x="step", y="epoch_f1-score", 
             hue=optimizer_validation).set_title("f1-score")
plt.subplot(1, 2, 2)
sns.lineplot(data=dfw_validation, x="step", y="epoch_loss",
             hue=optimizer_validation).set_title("loss")

In [None]:
dfw_validation.groupby('run')['epoch_f1-score'].nlargest(1,)

### EfficientNet-B0

In [None]:
dfw_EfficientNetB0 = dfw[dfw.run.str.startswith("fit/EfficientNetB0")]

In [None]:
# Get the optimizer value for each row of the validation DataFrame.
optimizer_validation = dfw_EfficientNetB0.run.apply(lambda run: run.split(",")[0])

plt.figure(figsize=(16, 6))
plt.subplot(1, 3, 1)
sns.lineplot(data=dfw_EfficientNetB0, x="step", y="epoch_f1-score", 
             hue=optimizer_validation).set_title("f1-score")
plt.subplot(1, 3, 2)
sns.lineplot(data=dfw_EfficientNetB0, x="step", y="epoch_accuracy",
             hue=optimizer_validation).set_title("accuracy")
plt.subplot(1, 3, 3)
sns.lineplot(data=dfw_EfficientNetB0, x="step", y="epoch_loss",
             hue=optimizer_validation).set_title("loss")

# Load and evaluate models

## Part 1 - Load the model and evaluate its accuracy

In [None]:
#model = model_VGG16
#model_name = "VGG16"
#model = model_Xception
#model_name = "Xception"
#model = model_Resnet50V2
#model_name = "Resnet50V2"
model = model_EfficientNetB0
model_name = "EfficientNetB0"


In [None]:
def model_accuracy():
  loss, acc, prec, reca, f1 = model.evaluate(test_ds)
  print('Restored model '+model_name+', accuracy: {:5.2f}%'.format(100*acc))
  print(model.predict(test_ds).shape)

In [None]:
model_accuracy()

## Part 2 - Proceed to multiple predictions


In [None]:
def model_predict():
  #Retrieve a batch of images from the test set
  image_batch, label_batch = test_ds.as_numpy_iterator().next()
  predictions = model.predict_on_batch(image_batch)

  plt.figure(figsize=(10, 10))
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(image_batch[i].astype("uint8"))
    x = class_names[np.argmax(label_batch[i])]
    y = class_names[np.argmax(predictions[i])]
    plt.title("Actual:"+ x +"\nPredicted:"+ y +"")
    plt.axis("off")
  return predictions

In [None]:
#Retrieve a batch of images from the test set
predictions = model_predict()

In [None]:
#Check
np.sum(predictions, axis=1)

## Part 3 - Single Prediction

In [None]:
img_path = "IMG_PATH"

img = keras.preprocessing.image.load_img(
      img_path, target_size=img_size
  )

img_array = keras.preprocessing.image.img_to_array(img)
img_array = tf.expand_dims(img_array, 0)  # Create batch axis

prediction = model.predict(img_array)

print(f"The algorithm says this image is:\n {prediction[0,0]:.2%} {class_names[0]}\n and {prediction[0,1]:.2%} {class_names[1]}\n")
img
