# CNN Sketched Character Detection Model

<img src="https://user-images.githubusercontent.com/6675724/219223438-2c93f9cb-d4b5-45e9-a433-149ed76affa6.gif" alt="teaser" width="600"/>


Image credits: https://github.com/facebookresearch/AnimatedDrawings


## Dataset



We are using the *Amateur Drawings Dataset*. The original dataset comprises over 178K (~50Gb) images and associated annotations of amateur drawings. Due to the limit of Google Colab, the assiduous TAs have prepared a smaller sample.

<img src="https://github.com/shellywhen/CNN-drawing-segmentation/blob/main/images/dataset.png?raw=true" alt="dataset_overview" width="600"/>

The filtered dataset has 2K (~500Mb) amateur drawing images. And the JSON file records the annotation list. Each entry has the `src` field about the relative path to the image, and the `bbox` field is defined by `[x, y, width, height]`, where `x` and `y` is the horizontal and vertical position of the top left corner.

```json
{
    "src": "images/1.png",
    "bbox": [
        32.0,
        56.888888888888886,
        303.36507936507934,
        428.69841269841265
    ]
}
```

In [None]:
# Suppress warnings and logs
import warnings
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.filterwarnings('ignore')

import tensorflow as tf

# Check whether a GPU environment is enabled
device_name = tf.test.gpu_device_name()
if len(device_name) > 0 and __name__ == '__main__':
    print("Found GPU at: {}".format(device_name))
elif __name__ == '__main__':
    device_name = "/device:CPU:0"
    print("No GPU, using {}.".format(device_name))


Found GPU at: /device:GPU:0


# Data Loading 



Here, we are going to download the dataset, transform the data format, and split it into the training and testing dataset.

In [None]:
# Download the dataset from Amazon S3; It can take 1 min.
!wget "https://comp2211-pa2.s3.ap-northeast-2.amazonaws.com/gt_bbox.json"
!wget "https://comp2211-pa2.s3.ap-northeast-2.amazonaws.com/images.zip"

!unzip -q images.zip

In [None]:
import numpy as np
import keras
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split


def preprocess_dataset(dataset, image_size=(224, 224)):
  """Preprocess the raw dataset
  dataset: the parsed JSON file as the dataset
  image_size: targeted image size for the model input

  return an array of the image paths, the transformed images and ground truth bboxes
  """
  images = []
  bboxes = []
  urls = []

  for item in dataset:
    url = item['src']
    urls.append(url)
    original_image = load_img(item['src'], target_size=None)
    original_width = original_image.width
    original_height = original_image.height
    image = original_image.resize(image_size)
    image = img_to_array(image)
    image = preprocess_input(image)
    images.append(image)

    bbox = item['bbox']
    x, y, width, height = bbox

    new_x = x / original_width
    new_y = y / original_height
    new_width = width / original_width
    new_height = height / original_height

    new_bbox = [new_x, new_y, new_width, new_height]

    bboxes.append(new_bbox)

  urls = np.array(urls, dtype=str)
  images = np.array(images, dtype=np.float32)
  bboxes = np.array(bboxes, dtype=np.float32)
  return urls, images, bboxes

In [None]:
def split_dataset(urls, x, gt, test_split_ratio=0.2):
    """Split the dataset according to the test split ratio
    urls: the input image paths
    x: the input image data (np.ndarray) to be fed into model
    gt: the ground truth boundinng box (np.ndarray)
    test_split_ratio: the percentage of test dataset size in respect to the full dataset

    return the train_url, train_x, train_y, and test_url, test_x, and test_y
    """
    n = x.shape[0]
    test_size = int( n * test_split_ratio)

    test_x = x[:test_size]
    test_y = gt[:test_size]
    test_url = urls[:test_size]
    train_x = x[test_size:]
    train_y = gt[test_size:]
    train_url = urls[test_size:]
    return train_url, train_x, train_y, test_url, test_x, test_y

In [None]:
import json

if __name__ == '__main__':
  N = 1000  # we only load N images for the assignment (max=2,000)
  IMAGE_SIZE = (224, 224) # ResNet standard image size
  DATASET_FILE = './gt_bbox.json'
  SPLIT_TEST_RATIO = 0.2

  with open(DATASET_FILE) as f:
      dataset = json.load(f)[:N]
  urls, images, bboxes = preprocess_dataset(dataset, image_size=IMAGE_SIZE)
  train_url, train_x, train_y, test_url, test_x, test_y = split_dataset(urls, images, bboxes, SPLIT_TEST_RATIO)

# Model Compilation & Training


When using ResNet50 for classification, the model expects a fixed-size input image and outputs a 1D array where each entry corresponds to the probability that the image belongs to a particular class. However, for our task, which is bounding box prediction, the standard ResNet50 architecture needs to be adapted. Our goal is not to classify an entire image into a single category, but rather to predict the coordinates of a box that encloses an object within the image. These coordinates are usually represented by four values: (x, y, width, height).

Therefore, we will remove its original output layer and add custom layers for our task. Check the [Functional API](https://www.tensorflow.org/guide/keras/functional_api) in Keras.

In [None]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Conv2D, Dropout

def create_model(input_shape=(224, 224, 3)):
  """Create a CNN model for predicting the bounding box for drawings in an image
  input_shape: the targeted image size

  return the model architecture
  """
  # load the pre-trained ResNet50 model without the top classification layer
  base_model = ResNet50(weights='imagenet', input_shape=input_shape, include_top=False)
  # freeze the base model layers
  base_model.trainable = False
  # add custom layers on top for bounding box prediction
  model_x = base_model.output

  model_x = Conv2D(filters=128, kernel_size=(3, 3), activation='relu', padding='same', input_shape=input_shape, strides = 3)(model_x)

  model_x = GlobalAveragePooling2D()(model_x)  # use global average pooling to flatten the output
  model_x = Dropout(0.5)(model_x) # randomly drop out weights to avoid overfitting
  model_x = Dense(64, activation='relu', kernel_initializer='random_normal', name="check_layer_1")(model_x)  # add a fully connected layer
  model_x = Dense(32, activation='relu', kernel_initializer='random_normal', name="check_layer_2")(model_x)  # add a fully connected layer

  model_x = Dense(4, activation='sigmoid', kernel_initializer="glorot_normal", name="output_layer")(model_x)

  model = Model(inputs=base_model.input, outputs=model_x)
  return model

In [None]:
def se_func(x1_pred, y1_pred, w_pred, h_pred, x1_gt, y1_gt, w_gt, h_gt):
  """Compute the square error (SE) based on the tensor of (x, y, w, h) for the predicted bbox and ground truth

  return an (N, 1) tensor as the individual loss value
  """

  tl_x = tf.square(x1_pred - x1_gt)
  tl_y = tf.square(y1_pred - y1_gt)
  tl = tl_x + tl_y

  br_x = tf.square((x1_pred + w_pred) - (x1_gt + w_gt))
  br_y = tf.square((y1_pred + h_pred) - (y1_gt + h_gt))
  br = br_x + br_y

  return tl + br

In [None]:
def giou_func(x1_pred, y1_pred, w_pred, h_pred, x1_gt, y1_gt, w_gt, h_gt):
  """Compute the giou loss based on the tensor of (x, y, w, h) for the predicted bbox and ground truth

  return an (N, 1) tensor as the loss value
  """
  c = 1e-7

  I_x_diff = tf.maximum(( tf.minimum(x1_pred + w_pred, x1_gt + w_gt) - tf.maximum(x1_pred, x1_gt) ), 0)
  I_y_diff = tf.maximum(( tf.minimum(y1_pred + h_pred, y1_gt + h_gt) - tf.maximum(y1_pred, y1_gt) ), 0)
  area_I = tf.multiply(I_x_diff, I_y_diff)

  area_U = tf.multiply(w_pred, h_pred) + tf.multiply(w_gt, h_gt) - area_I

  IoU = area_I / (area_U + c)

  C_x_diff = tf.maximum(x1_pred + w_pred, x1_gt + w_gt) - tf.minimum(x1_pred, x1_gt)
  C_y_diff = tf.maximum(y1_pred + h_pred, y1_gt + h_gt) - tf.minimum(y1_pred, y1_gt)
  area_C = tf.multiply(C_x_diff, C_y_diff)

  GIoU = IoU - (area_C - area_U) / (area_C + c)

  GIoU_loss = 1 - GIoU
  return GIoU_loss

In [None]:
@keras.saving.register_keras_serializable(name="loss")
def loss_func(pred, gt):
  """The loss function for model training.
  pred: an (N, 4) numpy array of predicted value for x, y, w, h
  gt: an (N, 4) numpy array of the ground truth value x, y, w, h

  return a scalar value of the mean batch loss
  """
  gt = tf.convert_to_tensor(gt, dtype=tf.float32)
  pred = tf.convert_to_tensor(pred, dtype=tf.float32)

  x1_gt, y1_gt, w_gt, h_gt = tf.split(gt, 4, axis=-1)
  x1_pred, y1_pred, w_pred, h_pred = tf.split(pred, 4, axis=-1)

  # you can also try using the mse error
  # loss = se_func(x1_pred, y1_pred, w_pred, h_pred, x1_gt, y1_gt, w_gt, h_gt)
  loss = giou_func(x1_pred, y1_pred, w_pred, h_pred, x1_gt, y1_gt, w_gt, h_gt)
  return tf.reduce_mean(loss)

In [None]:
if __name__ == '__main__':
  model = create_model()
  model.compile(optimizer='adam', loss=loss_func)

You may run the following code to enable an advanced callback of TensorBoard. It is a visualization suite for models. You may inspect the real-time training logs through [scalar panel](https://www.tensorflow.org/tensorboard/scalars_and_keras) or gain a better understanding of the model architecture through the [model graph panel](https://www.tensorflow.org/tensorboard/graphs).

In [None]:
from datetime import datetime
if __name__ == '__main__':
  # set up the tensorboard callback for reviewing real-time progress
  log_dir = os.path.join("logs", datetime.now().strftime("%Y%m%d-%H%M%S"))
  os.makedirs(log_dir, exist_ok=True)

  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, write_graph=True)

In [None]:
# load the tensorboard notebook extension
%load_ext tensorboard

In [None]:
# remember to click the refresh button to load updated logs while training.
%tensorboard --logdir logs

In [None]:
if __name__ == '__main__':
  # train the model, you may reduce the epoch number to test the functionability at first
  # as ResNet is a relatively large model, for 20 epochs, you may wait for 2 min using GPU, or 30 min using CPU
  N_EPOCH = 20
  model.fit(train_x, train_y, epochs=N_EPOCH, batch_size=64, callbacks=[tensorboard_callback])

In [None]:
import pickle

def get_param_from_layer(model, layer_name):
    layer = model.get_layer(name=layer_name)
    weights = layer.get_weights()
    # Concatenate the weights into a single numpy array
    params = np.concatenate([w.flatten() for w in weights])
    return params

if __name__ == '__main__':
  param_1 = get_param_from_layer(model, 'check_layer_1').reshape((1, -1))
  param_2 = get_param_from_layer(model, 'check_layer_2').reshape((1, -1))
  proof = np.concatenate([param_1.T, param_2.T], axis=0)
  with open('proof.pkl', 'wb') as f:
    pickle.dump(proof, f, pickle.HIGHEST_PROTOCOL)

In [None]:
# You can run this to save the model to your local machine to avoid re-training
if __name__ == '__main__':
  filename = 'model.keras'
  model.save(filename)
  model = tf.keras.models.load_model(filename, safe_mode=False, custom_objects={'loss_func': loss_func})

# Performance Evaluation 

We will examine the model performance based on the test dataset.

In [None]:
if __name__ == '__main__':
  results = model.evaluate(test_x, test_y)
  print(f'The loss in the model: {results}')

In [None]:
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as patches


def visualize_bounding_box(image_path, predicted_bbox=None, groundtruth_bbox=None):
  """Plot the original image, the predicted bounding box, and groundtruth (if any)
  image_path: the path to the image
  predicted_bbox: the predicted bounding box for drawings in the image
  groundtruth_bbox: the ground truth bounding box

  return void
  """
  image = cv2.imread(image_path)
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  fig, ax = plt.subplots(1, figsize=(5, 5))
  ax.imshow(image)

  def draw_boxes(bbox, color, label):
    x, y, w, h = bbox
    rect = patches.Rectangle((x, y), w, h, linewidth=2, edgecolor=color, facecolor='none')
    ax.add_patch(rect)
    ax.text(x, y-18, f'{label}', fontsize=12, verticalalignment='top', color=color)

  if predicted_bbox is not None:
      draw_boxes(predicted_bbox, 'red', 'Predicted')
  if groundtruth_bbox is not None:
      draw_boxes(groundtruth_bbox, 'green', 'Ground Truth')

  plt.axis('off')
  plt.subplots_adjust(left=0, right=1, top=1, bottom=0)
  plt.show()

In [None]:
def extract_failure_case(loss_function, pred, gt, k=5):
  """ Extract failure cases with the worst performance.
  model: the trained model
  loss_function: a loss function that calculates the scalar loss value for pairwise prediction (4,) and results (4,)
  pred: the predicted bounding box (scaled to the image size)
  gt: the ground truth bbox in the test dataset
  k: the number of top failures with the largest loss value

  return the indexes to the top k failure case
  """
  loss_values = [loss_function(pred[i], gt[i]) for i in range(pred.shape[0])]
  indexes_top_failure = np.argsort(loss_values)[len(loss_values) - 1:len(loss_values) - 1 - k: -1]
  
  return indexes_top_failure

In [None]:
def rescale_bbox(src, raw_pred, scaled_gt):
  original_image = load_img(src, target_size=None)
  original_width = original_image.width
  original_height = original_image.height
  def _rescale_bbox(bbox):
    x, y, w, h = bbox
    x = x * original_width
    y = y * original_height
    w = w * original_width
    h = h * original_height
    return np.array([x, y, w, h])
  return _rescale_bbox(raw_pred), _rescale_bbox(scaled_gt)

In [None]:
from sklearn.metrics import mean_absolute_error
# You may refer to https://scikit-learn.org/stable/modules/generated/sklearn.metrics.mean_absolute_error.html

if __name__ == '__main__':
  test_pred = model.predict(test_x)
  n_failures = 20
  worst_case_indexes = extract_failure_case(mean_absolute_error, test_pred, test_y, n_failures)
  for k in range(n_failures):
    idx = worst_case_indexes[k]
    test_pred_instance, gt_instance = rescale_bbox(test_url[idx], test_pred[idx], test_y[idx])
    visualize_bounding_box(test_url[idx], test_pred_instance, gt_instance)

In [None]:
def predict_bounding_box(model, src):
  """leverage the model to identify drawings in an image
  model: a trained object detector model
  src: the source of the image file

  return [x, y, w , h] for the predicted bounding box in the original image
  """
  
  original_image = load_img(src, target_size=None)
  original_width = original_image.width
  original_height = original_image.height
  image = original_image.resize((224, 224))
  image = img_to_array(image)

  input_prediction = preprocess_input(image)
  input_prediction = np.expand_dims(input_prediction, axis=0)
  prediction = model.predict(input_prediction)

  x, y, w, h = prediction[0]

  ori_x = x * original_width
  ori_y = y * original_height
  ori_w = w * original_width
  ori_h = h * original_height

  return [ori_x, ori_y, ori_w, ori_h]

In [None]:
from random import randint

def inspect(model, dataset, k=None):
  """Visually inspect one instance with its predicted and ground truth bbox
  model: the trained model
  dataset: the full dataset with image source and ground truth
  k: the index of the image under inspection

  return void
  """
  if k is None:
    k = randint(0, len(dataset)-1)
  src = dataset[k]['src']
  ground_truth = dataset[k]['bbox']
  pred = predict_bounding_box(model, src)
  visualize_bounding_box(src, pred, ground_truth)

In [None]:
if __name__ == '__main__':
  inspect(model, dataset)

In [None]:
# YOU MUST COMMENT OUT THIS CELL WHEN HANDING IN THE CODE

# Let us take a look at how your model performs on the drawings of Desmond's kids XD!
!wget -O earnest.jpg "https://home.cse.ust.hk/~desmond/images/ernest/peacock.jpg" > /dev/null 2>&1
!wget -O elvis.jpg "https://home.cse.ust.hk/~desmond/images/elvis/elvis-pokemon.jpg" > /dev/null 2>&1

if __name__ == '__main__':
  earnest_pred = predict_bounding_box(model, "earnest.jpg")
  visualize_bounding_box("earnest.jpg", earnest_pred, None)
  elvis_pred = predict_bounding_box(model, "elvis.jpg")
  visualize_bounding_box("elvis.jpg", elvis_pred, None)

In [None]:
# YOU MUST COMMENT OUT THIS CELL WHEN HANDING IN THE CODE
!wget "https://comp2211-pa2.s3.ap-northeast-2.amazonaws.com/grading_paths.pkl"
!wget "https://comp2211-pa2.s3.ap-northeast-2.amazonaws.com/grading_images.zip"
!wget "https://comp2211-pa2.s3.ap-northeast-2.amazonaws.com/testing_paths.pkl"
!wget "https://comp2211-pa2.s3.ap-northeast-2.amazonaws.com/testing_images.zip"

!unzip -q grading_images.zip
!unzip -q testing_images.zip