# Training Resnet50 for Head's Position Detection
After preparing the `margined_cropped_images`, and `data.csv`, now we are able to feed them to a model to predict Hexbugs' heads' position.
The approach that we managed to work with was using ***transfer learning*** to build a better model. For this, we used pre-trained `Resnet50` model, which is trained on `CoCo` dataset. This helps us to extract feature better. Hoever, top layers of the `resnet` model are used for classification tasks. To modify the model to suit our problem, we removed tha last `Dense` layer and added two other `Dense` layers to build a regressor with only ***two*** outputs for `(x, y)`.
After modifying the model, we can train it over our dataset, and evaluate its performance on the validation set.
At last, we save the model for further use cases.

### Essential Imports

* `cv2` is required to read and write images.
* `json` is used to open `json` files.
* `random` is used to pick random samples for model evaluation.
* `logging` is used to ignore `warning` messages. (not mandatory)
* `numoy` is cruicial to work with images.
* `pandas` is used to read the `data.csv` file.
* `tensorflow` is used to create model. it's used as background for `Keras`.
* `sklearn` is used to split dataset to `train` and `test` sets.

In [1]:
import os
import cv2
import json
import random
import logging
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import load_img
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications import ResNet101, DenseNet121, DenseNet169, DenseNet201
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization

from IPython.display import clear_output

tf.get_logger().setLevel(logging.ERROR)
clear_output()

### Reading data from `data.csv`

In [2]:
data = pd.read_csv('data.csv').drop(['Unnamed: 0'], axis=1).sort_values('ID', ascending=True)
data

Unnamed: 0,CroppedHexBugCoordinationX,CroppedHexBugCoordinationY,OriginalBoxCoordinationX1,OriginalBoxCoordinationY1,Path,ID
5011,48,36,218,18,cropped_bugs/training018/frame0.jpg,0
2805,125,132,481,381,cropped_bugs/training044/frame0.jpg,0
1100,269,119,905,708,cropped_bugs/training083/frame0.jpg,0
4911,22,269,536,745,cropped_bugs/training021/frame0.jpg,0
3710,222,206,760,1339,cropped_bugs/training050/frame0.jpg,0
...,...,...,...,...,...,...
3578,20,36,318,82,cropped_bugs/training022/frame100.jpg,100
1170,19,80,43,11,cropped_bugs/training041/frame100.jpg,100
3679,30,82,105,1466,cropped_bugs/training050/frame100.jpg,100
3871,31,73,520,400,cropped_bugs/training025/frame100.jpg,100


In [3]:
# Load the images
images = []
y = []

# Iterates over the `data.csv` file's rows to read images and annotatopns.
for idx, row in data.iterrows():
#     img_path = f"{row.Path.replace('cropped_bugs', 'image_sharpning/sharpened_images')}"
    img_path = f"{row.Path}"
    annotations = [int(row.CroppedHexBugCoordinationX), int(row.CroppedHexBugCoordinationY)]
    # Reads the image
    img = load_img(img_path, color_mode='rgb')
    # Converts the image to a numpy array
    np_img = np.array(img)
    
#     extended_np_img = np.expand_dims(np_img, axis=2)
    # Adds the image to a list of images.
    images.append(np_img)
    # Adds the annotations to y.
    y.append(annotations)
    
X = np.array(images)
y = np.array(y)

print(X.shape)
print(y.shape)

(5012, 300, 300, 3)
(5012, 2)


In [4]:
# Splits the dataset to `train` and `test` sets with the given ratio.
# Also, shuffles the data to prevent feeding the model same data over different runs.
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, shuffle=True)

In [5]:
print(f'X train shape: {X_train.shape}')
print(f'X test shape: {X_test.shape}')
print(f'y train shape: {y_train.shape}')
print(f'y test shape: {y_test.shape}')

X train shape: (3358, 300, 300, 3)
X test shape: (1654, 300, 300, 3)
y train shape: (3358, 2)
y test shape: (1654, 2)


In [6]:
model = tf.keras.models.load_model("resnet50_trained_model_data/v1.5/checkpoint_callback")
clear_output()

In [7]:
# dense_model = DenseNet169(
#     include_top=False,
#     weights="imagenet",
#     input_shape=X_train[0].shape)

# model = Sequential(dense_model, name='DenseNet169')

# x = model.output
# x = Flatten()(x)
# x = Dense(16, activation='relu')(x)
# x = Dense(8, activation='relu')(x)

# # The last layer should be the layer that computes the (x, y) coordinations.
# output_layer = Dense(2, activation='linear')(x)

# # Create the model based on the input and output layer.
# model = Model(inputs=model.input, outputs=output_layer)

# clear_output()

In [8]:
# vgg_model = tf.keras.applications.VGG19(
#     include_top=False,
#     weights="imagenet",
#     input_shape=X_train[0].shape,
# )

# for layer in vgg_model.layers:
#     layer.trainable = True

# # Adds two dense layers for regression problem.
# # It basically uses the output of the `resnet50` model to feed the dense layers.
# x = vgg_model.output
# x = Flatten()(x)
# x = Dense(128, activation='relu')(x)
# x = Dense(64, activation='relu')(x)

# # The last layer should be the layer that computes the (x, y) coordinations.
# output_layer = Dense(2, activation='linear')(x)

# # Create the model based on the input and output layer.
# model = Model(inputs=vgg_model.input, outputs=output_layer)

In [9]:
# model = Sequential()
# model.add(Conv2D(64, (3, 3), activation='relu', input_shape=X_train[0].shape))
# # model.add(Dropout(0.25))
# # model.add(MaxPooling2D((2, 2)))
# model.add(Conv2D(128, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))

# model.add(Conv2D(256, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))

# model.add(Conv2D(512, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# # model.add(MaxPooling2D((2, 2)))

# model.add(Conv2D(256, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))

# model.add(Conv2D(128, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Conv2D(64, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Conv2D(32, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Flatten())
# model.add(Dense(16, activation='relu'))
# model.add(Dense(2))

In [10]:
# # Creates a pre-trained model instance from class `ResNet50`.
# # This model doesn't include the last classification dense layer.
# # The given `input_shape` should be the same as our dataset images.
# resnet50 = ResNet50(weights='imagenet', include_top=False, input_shape=(X_train[0].shape))

# # Iterates over each layer in the model and makes them to be trainable.
# for layer in resnet50.layers:
#     layer.trainable = True

# # Adds two dense layers for regression problem.
# # It basically uses the output of the `resnet50` model to feed the dense layers.
# x = resnet50.output
# x = Flatten()(x)
# x = Dense(32, activation='relu')(x)
# x = Dense(16, activation='relu')(x)

# # The last layer should be the layer that computes the (x, y) coordinations.
# output_layer = Dense(2, activation='linear')(x)

# # Create the model based on the input and output layer.
# model = Model(inputs=resnet50.input, outputs=output_layer)

# clear_output()

### Visualization
For better visualization, we use [tensorboard](https://www.tensorflow.org/tensorboard/get_started). It helps us to plot the figures real-time and analyze the model performance.

In [11]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

# We need datetime to name our checkpoints.
import datetime

# Clear any logs from previous runs
!rm -rf ./logs/

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

# Creates a callback, which then will be called by the model during training process.
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

/usr/bin/zsh: /home/farzam/miniconda3/envs/tf/lib/libtinfo.so.6: no version information available (required by /usr/bin/zsh)


### Saving the best model
During the training process, we need to track the model's performance and make sure that at the end of the training process, we have the best model.
To do so, we used a custom `callback` that checks if the `val_loss` has been decreased over the past epoch. In that case, it will simply save the model to the provided directory, which can be used to load and evaluate later.
This call back is then called by the model during the training process.

In [12]:
# Specifies the checkpoint directory
checkpoint_filepath = 'checkpoint_callback'

# Creates an instance from the ModelCheckpoint class.
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=False,
    monitor='val_loss',
    mode='min',
    save_best_only=True)

In [13]:
# Since we are dealing with regression problem, it's recommended to use `mse`.
# Also we use `Adam` as our optimizer.
model.compile(loss='mse', optimizer='adam')

In [14]:
# Prints a summary of the compiled model
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 densenet169_input (InputLay  [(None, 300, 300, 3)]    0         
 er)                                                             
                                                                 
 densenet169 (Functional)    (None, 9, 9, 1664)        12642880  
                                                                 
 flatten (Flatten)           (None, 134784)            0         
                                                                 
 dense (Dense)               (None, 16)                2156560   
                                                                 
 dense_1 (Dense)             (None, 8)                 136       
                                                                 
 dense_2 (Dense)             (None, 2)                 18        
                                                             

### Training the model
Now that we have everything ready, we can train the model. we pass the `validation_data` containing the `X_test` and `y_test` to the model as well.
Also, for above mentioned checkopints, we need to call those callbacks.

In [None]:
history = model.fit(X_train, y_train, epochs=20, batch_size=8, validation_data=(X_test, y_test), callbacks=[tensorboard_callback, model_checkpoint_callback])

In [None]:
# Plotting the tensorboard data in jupyter notebook
%tensorboard --logdir logs/fit

In [None]:
def compute_error(y, y_pred):
    """
    Since the competion computes the error based on Euclidian distances, this function computes
    the error for predicted and ground truth results.
    
    @param: list y. a (1, 2) list with original coordinations.
    @param: list y_pred. a (1, 2) list containing the predicted coordinations.
    
    @return: float error. The Euclidian distance between these two given datapoints.
    """
    error = np.sqrt(np.abs(y[0] - y_pred[0]) ** 2 + np.abs(y[1] - y_pred[1]) ** 2)
    return error

In [None]:
def compute_original_coordination(predicted_coordination, original_box_coordination):
    """
    Computes the coordinations of the predicted Hexbug's head's coordination in the original image
    using the bounding box coordinations.
    
    @param: list predicted_coordination. a (1, 2) list with (x, y) format.
    @param: list original_box_coordination. a (1, 2) list with (x, y) format of bounding box.
    
    @return: list. Coordinations of the predicted Hexbug's head's coordination in the original image
    """
    return (predicted_coordination[0] + original_box_coordination[0], predicted_coordination[1] + original_box_coordination[1])

### Loading the best model

In [None]:
reconstructed_model = tf.keras.models.load_model("checkpoint_callback")
# reconstructed_model = tf.keras.models.load_model("resnet50_trained_model_data/v1.5/checkpoint_callback")
clear_output()

### Sample Evaluation
Now that we have successfully trained the model, let's see its performance on unseen data.

In [None]:
num_samples = 10

# Chooses random indices
random_samples_indices = random.sample(range(0, len(X_test)), num_samples)

# Iterates over each index and retrieves the image and annotaion
for sample_index in random_samples_indices:
    sample_img = X_test[sample_index]
    
    # Configs for drawing a circle on the image.
    center = y_test[sample_index]
    radius = 5
    # Color map is BGR
    color = (0, 255, 0)
    color_pred = (0, 0, 255)
    thickness = 5
    
    # Since we predict only one image at a time, we need to expand its dimention
    # to fit the input layer of our model.
    expanded_img = np.expand_dims(sample_img, axis=0)
    
    # Predicts the coordinations
    predicted = list(map(int, reconstructed_model.predict(expanded_img)[0]))
    
    # Computes the error
    error = round(compute_error(center, predicted), 3)
    
    print(f'y_predicted: {predicted}, y: {center}, Error: {error}')
    
    # Draws a circle centered in the correct coordination.
    img_lv1 = cv2.circle(sample_img, center, radius, color, thickness)
    # Draws a circle centered in the predicted coordination.
    img_lv2 = cv2.circle(img_lv1, predicted, radius, color_pred, thickness)
    
    # Shows the output image.
    cv2.imshow(f'Sample No. {sample_index}, error: {error}', img_lv2)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

### Sample for coordination convertion

In [None]:
num_samples = 10

random_samples_indices = random.sample(range(0, len(data)), num_samples)

for sample_id in random_samples_indices:
    sample_img_path = data.iloc[sample_id].Path

    # Constructs the path to the original frame
    original_image_path = f"Videos/{sample_img_path.split('/')[1]}/{sample_img_path.split('/')[2]}"

    sample_img = np.array(load_img(sample_img_path))

    expanded_img = np.expand_dims(sample_img, axis=0)

    predicted = list(map(int, reconstructed_model.predict(expanded_img)[0]))

    radius = 10
    color = (0, 255, 0)
    thickness = -1

    # Computes the coordination of the Hexbug's head in the original image.
    scaled_coordinations = compute_original_coordination(predicted, (data.iloc[sample_id].OriginalBoxCoordinationX1, data.iloc[sample_id].OriginalBoxCoordinationY1))

    # Reads and converts the original image into numpy array.
    original_sample_img = np.array(load_img(original_image_path))

    annotated_img = cv2.circle(sample_img, predicted, radius, color, thickness)

    cv2.circle(original_sample_img, scaled_coordinations, radius, color, thickness)
    
    center = [data.iloc[sample_id].CroppedHexBugCoordinationX, data.iloc[sample_id].CroppedHexBugCoordinationY]
    
    error = round(compute_error(center, predicted), 3)
    
    file_name = f'Sample ID: {sample_id}\nTruth: {center}, Predicted: {predicted}, Error: {error}'
    
    print(file_name)
    print('------------------------------------------------------------------------')
    
    if not os.path.exists('annotated_images'):
        os.mkdir('annotated_images')
    
    cv2.imshow(f'Sample ID. {sample_id}', original_sample_img)
    cv2.imwrite(f'annotated_images/{file_name}.jpg', original_sample_img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [None]:
# reconstructed_model.summary()

In [None]:
# model = Sequential()
# model.add(Conv2D(64, (3, 3), activation='relu', input_shape=X_train[0].shape))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Conv2D(128, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Conv2D(512, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Conv2D(1028, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Conv2D(256, (3, 3), activation='relu', kernel_regularizer='L2'))
# # model.add(Dropout(0.25))
# model.add(MaxPooling2D((2, 2)))
# model.add(Flatten())
# model.add(Dense(128, activation='relu'))
# model.add(Dense(2))

In [None]:
# model.summary()

In [None]:
# reconstructed_model.compile(loss='mse', optimizer='adam')

In [None]:
# history = reconstructed_model.fit(X_train, y_train, epochs=100, batch_size=16, validation_data=(X_test, y_test), callbacks=[tensorboard_callback, model_checkpoint_callback])