In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
!git clone https://github.com/satojkovic/DeepLogo2.git

In [None]:
import os
os.chdir('DeepLogo2')

In [None]:
import os
import pathlib

# Clone the tensorflow models repository if it doesn't already exist
if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

In [None]:
# Install the Object Detection API
%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.
cp object_detection/packages/tf2/setup.py .
python -m pip install .

In [None]:
# Workaround for the following error: `DNN library is not found`
# https://github.com/tensorflow/models/issues/10590
!pip install --pre tensorflow==2.8.0
!apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2

In [None]:
import matplotlib
import matplotlib.pyplot as plt

import os
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage

import tensorflow as tf

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.utils import colab_utils
from object_detection.builders import model_builder

%matplotlib inline

In [None]:
from tqdm import tqdm

In [None]:
def load_image_into_numpy_array(path):
  """Load an image from file into a numpy array.

  Puts image into numpy array to feed into tensorflow graph.
  Note that by convention we put it into a numpy array with shape
  (height, width, channels), where channels=3 for RGB.

  Args:
    path: a file path.

  Returns:
    uint8 numpy array with shape (img_height, img_width, 3)
  """
  img_data = tf.io.gfile.GFile(path, 'rb').read()
  image = Image.open(BytesIO(img_data))
  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (im_height, im_width, 3)).astype(np.uint8)

def plot_detections(image_np,
                    boxes,
                    classes,
                    scores,
                    category_index,
                    figsize=(12, 16),
                    image_name=None):
  """Wrapper function to visualize detections.

  Args:
    image_np: uint8 numpy array with shape (img_height, img_width, 3)
    boxes: a numpy array of shape [N, 4]
    classes: a numpy array of shape [N]. Note that class indices are 1-based,
      and match the keys in the label map.
    scores: a numpy array of shape [N] or None.  If scores=None, then
      this function assumes that the boxes to be plotted are groundtruth
      boxes and plot all boxes as black with no classes or scores.
    category_index: a dict containing category dictionaries (each holding
      category index `id` and category name `name`) keyed by category indices.
    figsize: size for the figure.
    image_name: a name for the image file.
  """
  image_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      image_np_with_annotations,
      boxes,
      classes,
      scores,
      category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.8)
  if image_name:
    plt.imsave(image_name, image_np_with_annotations)
  else:
    plt.imshow(image_np_with_annotations)

In [None]:
def convert_csv_into_numpy_array(csv, im_width, im_height):
  # xmin,ymin,xmax,ymax
  xmin, ymin, xmax, ymax = list(map(int, csv))
  xmin /= im_width
  ymin /= im_height
  xmax /= im_width
  ymax /= im_height
  return np.array([[ymin, xmin, ymax, xmax]], dtype=np.float32)

In [None]:
def convert_csv_into_numpy_array_rects_idxes(csv, im_width, im_height):
  # [xmin_0,ymin_0,xmax_0,ymax_0,cls_idx_0,...,xmin_N,ymin_N,xmax_N,ymax_N,cls_idx_N]
  elems = list(map(int, csv))
  rects, cls_idxes = [], []
  for xmin_pos_idx in range(0, len(elems), 5):
    xmin, ymin, xmax, ymax, cls_idx = elems[xmin_pos_idx : xmin_pos_idx + 5]
    xmin /= im_width
    ymin /= im_height
    xmax /= im_width
    ymax /= im_height
    rects.append([ymin, xmin, ymax, xmax])
    cls_idxes.append(cls_idx)
  return np.array(rects, dtype=np.float32), np.array(cls_idxes, dtype=np.int32)

In [None]:
def is_include(csv, target_idxes):
  target_idxes = set(target_idxes)
  elems = list(map(int, csv))
  for xmin_pos_idx in range(0, len(elems), 5):
    _, _, _, _, cls_idx = elems[xmin_pos_idx : xmin_pos_idx + 5]
    if cls_idx in target_idxes:
      return True
  return False

In [None]:
def plot_detections(image_np, boxes, classes, scores, category_index, figsize=(12, 16), image_name=None):
  image_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      image_np_with_annotations,
      boxes, classes, scores, category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.8
  )
  if image_name:
    plt.imsave(image_name, image_np_with_annotations)
  else:
    plt.imshow(image_np_with_annotations)

In [None]:
%%bash
wget http://image.ntua.gr/iva/datasets/flickr_logos/flickr_logos_27_dataset.tar.gz
#cp /content/drive/MyDrive/DeepLogo2/train_data/flickr_logos_27_dataset.tar.gz .
tar zxvf flickr_logos_27_dataset.tar.gz
cd flickr_logos_27_dataset
tar zxvf flickr_logos_27_dataset_images.tar.gz
cd ..

In [None]:
# %%bash
# python preproc_annot.py

## Load and visualize train images

In [None]:
def parse_csvs(annot_csv):
  csvs = []
  with open(annot_csv, 'r') as f:
    for line in f:
      line = line.rstrip().split(',')
      csvs.append(line)
  return csvs

In [None]:
#train_annot_csv = 'flickr_logos_27_dataset/flickr_logos_27_dataset_training_set_annotation_cropped.txt'
train_img_dir = 'flickr_logos_27_dataset/flickr_logos_27_dataset_images'
train_annot_csv = '/content/drive/MyDrive/DeepLogo2/data/flickr_logos_27_dataset_training_set_annotation_cropped.txt'
train_img_npy = '/content/drive/MyDrive/DeepLogo2/data/train_images_np.npy'
gt_boxes_npy = '/content/drive/MyDrive/DeepLogo2/data/gt_boxes.npy'
gt_class_ids_npy = '/content/drive/MyDrive/DeepLogo2/data/gt_class_ids.npy'

train_images_np = [] if not os.path.exists(train_img_npy) else np.load(train_img_npy, allow_pickle=True)
gt_boxes = [] if not os.path.exists(gt_boxes_npy) else np.load(gt_boxes_npy, allow_pickle=True)
gt_class_ids = [] if not os.path.exists(gt_class_ids_npy) else np.load(gt_class_ids_npy, allow_pickle=True)
train_image_names = []

if len(train_images_np) == 0 or len(gt_boxes) == 0 or len(gt_class_ids) == 0:
  csvs = parse_csvs(train_annot_csv)
  for csv in tqdm(csvs):
    # if not is_include(csv[1:], target_idxes=[0]):
    #   continue
    img_fname = csv[0]
    train_image_names.append(img_fname)
    with tf.io.gfile.GFile(os.path.join(train_img_dir, img_fname), 'rb') as fid:
      encoded_jpg = fid.read()
    encoded_jpg_io = io.BytesIO(encoded_jpg)
    image = Image.open(encoded_jpg_io)
    width, height = image.size

    train_images_np.append(load_image_into_numpy_array(os.path.join(train_img_dir, img_fname)))
    rects, cls_idxes = convert_csv_into_numpy_array_rects_idxes(csv[1:], width, height)
    gt_boxes.append(rects)
    gt_class_ids.append(cls_idxes)

plt.rcParams['axes.grid'] = False
plt.rcParams['figure.figsize'] = [30, 15]

for idx, train_image_np in enumerate(train_images_np[:10]):
  plt.subplot(2, 5, idx + 1)
  plt.imshow(train_image_np)
plt.show()

### Assign class category index

In [None]:
class_names = [
  "Adidas", 
  "Apple", "BMW", "Citroen", "Cocacola",
  "DHL", "Fedex", "Ferrari", "Ford", "Google", 
  "HP", "Heineken", "Intel", "McDonalds", "Mini", 
  "Nbc", "Nike", "Pepsi", "Porsche", "Puma", 
  "RedBull", "Sprite", "Starbucks", "Texaco", "Unicef",
  "Vodafone", "Yahoo"
]

In [None]:
# adidas_class_id = 1
# apple_class_id = 2

# category_index = {
#     adidas_class_id : {'id' : adidas_class_id, 'name' : 'adidas'},
#     apple_class_id : {'id' : apple_class_id, 'name' : 'apple'},
# }

category_index = {}
for i, class_name in enumerate(class_names):
  category_index[i + 1] = {'id' : i + 1, 'name' : class_name}

# Specify the number of classes that the model will predict
num_classes = len(category_index)

### Data preprocessng for training

In [None]:
# Convert class labels to one-hot; convert everything to tensors.
# The `label_id_offset` here shifts all classes by a certain number of indices;
# we do this here so that the model receives one-hot labels where non-background
# classes start counting at the zeroth index.  This is ordinarily just handled
# automatically in our training binaries, but we need to reproduce it here.
label_id_offset = 1
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []

#for (train_image_np, gt_box_np) in zip(train_images_np, gt_boxes):
for (train_image_np, gt_box_np, gt_class_id_np) in zip(train_images_np, gt_boxes, gt_class_ids):

  # convert training image to tensor, add batch dimension, add to list
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(
      train_image_np, dtype=tf.float32), axis=0))
  
  # convert numpy array to tensor, add to list
  gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32))

  # zero indexed ground truth
  #zero_indexed_groundtruth_classes = tf.convert_to_tensor(
  #    np.ones(shape=[gt_box_np.shape[0]], dtype=np.int32) - label_id_offset)
  
  # ground truth indexes (multi classes)
  # e.g. three logos(class id=[1, 0, 1], num_classes=2) in single image, np.ones=[1, 1, 1] * [1, 0, 1] => [1, 0, 1]
  #      then one hot encoding => [[0, 1], [1, 0], [0, 1]]
  zero_indexed_groundtruth_classes = tf.convert_to_tensor(
      np.ones(shape=[gt_box_np.shape[0]], dtype=np.int32) * gt_class_id_np
  )
  
  # do one hot encoding
  gt_classes_one_hot_tensors.append(tf.one_hot(
      zero_indexed_groundtruth_classes, num_classes))
print('Done prepping data.')

In [None]:
gt_classes_one_hot_tensors[:10]

### Visualize ground truth bbox

In [None]:
# give boxes a score of 100%
dummy_scores = [np.array([1.0] * gt_box.shape[0], dtype=np.float32) for gt_box in gt_boxes]

# define the figure size
plt.figure(figsize=(30, 15))

# use the `plot_detections()` utility function to draw the ground truth boxes
for idx in range(10):
    plt.subplot(2, 5, idx+1)
    plot_detections(
      train_images_np[idx],
      gt_boxes[idx],
      np.ones(shape=[gt_boxes[idx].shape[0]], dtype=np.int32) * gt_class_ids[idx] + label_id_offset,
      dummy_scores[idx], category_index)

plt.show()

### Download checkpoints

In [None]:
# Download the SSD Resnet 50 version 1, 640x640 checkpoint
!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet152_v1_fpn_640x640_coco17_tpu-8.tar.gz
!tar -xf ssd_resnet152_v1_fpn_640x640_coco17_tpu-8.tar.gz
!mv ssd_resnet152_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/
model_name = 'ssd_resnet152_v1_fpn_640x640_coco17_tpu-8'

### Configure and build the model

In [None]:
tf.keras.backend.clear_session()

print('Building {} and restoring weights for fine-tuning...'.format(model_name), flush=True)
pipeline_config = 'models/research/object_detection/configs/tf2/' + model_name + '.config'
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

# Load pipeline config and build a detection model.
#
# Since we are working off of a COCO architecture which predicts 90
# class slots by default, we override the `num_classes` field here to be just
# one (for our new rubber ducky class).
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
      model_config=model_config, is_training=True)

In [None]:
print(type(detection_model))

### Append data augmentation

In [None]:
train_config = configs['train_config']

In [None]:
train_config.data_augmentation_options

In [None]:
from google.protobuf import text_format

from object_detection.builders import preprocessor_builder
from object_detection.core import preprocessor
from object_detection.protos import preprocessor_pb2


In [None]:
# Random image scale
preprocessor_text_proto = """
random_image_scale {
  min_scale_ratio: 0.5
  max_scale_ratio: 2.2
}
"""
preprocessor_proto = preprocessor_pb2.PreprocessingStep()
text_format.Merge(preprocessor_text_proto, preprocessor_proto)

In [None]:
train_config.data_augmentation_options.append(preprocessor_proto)

In [None]:
# Random adjust brithtness
preprocessor_text_proto = """
random_adjust_brightness {
}
"""
preprocessor_proto = preprocessor_pb2.PreprocessingStep()
text_format.Merge(preprocessor_text_proto, preprocessor_proto)

In [None]:
train_config.data_augmentation_options.append(preprocessor_proto)

In [None]:
# Random adjust saturation
preprocessor_text_proto = """
random_adjust_saturation {
}
"""
preprocessor_proto = preprocessor_pb2.PreprocessingStep()
text_format.Merge(preprocessor_text_proto, preprocessor_proto)

In [None]:
train_config.data_augmentation_options.append(preprocessor_proto)

In [None]:
# Random adjust hue
preprocessor_text_proto = """
random_adjust_hue {
}
"""
preprocessor_proto = preprocessor_pb2.PreprocessingStep()
text_format.Merge(preprocessor_text_proto, preprocessor_proto)

In [None]:
train_config.data_augmentation_options.append(preprocessor_proto)

In [None]:
# Random rgb to gray
preprocessor_text_proto = """
random_rgb_to_gray {
  probability: 0.3
}
"""
preprocessor_proto = preprocessor_pb2.PreprocessingStep()
text_format.Merge(preprocessor_text_proto, preprocessor_proto)

In [None]:
train_config.data_augmentation_options.append(preprocessor_proto)

In [None]:
print(train_config.data_augmentation_options)

In [None]:
# Random rotation90
preprocessor_text_proto = """
random_rotation90 {
  keypoint_rot_permutation: 3
  keypoint_rot_permutation: 0
  keypoint_rot_permutation: 1
  keypoint_rot_permutation: 2
  probability: 0.5
}
"""
preprocessor_proto = preprocessor_pb2.PreprocessingStep()
text_format.Merge(preprocessor_text_proto, preprocessor_proto)

In [None]:
train_config.data_augmentation_options.append(preprocessor_proto)

In [None]:
train_config.data_augmentation_options

### Restore checkpoint

In [None]:
# Set up object-based checkpoint restore --- RetinaNet has two predictioΩn
# `heads` --- one for classification, the other for box regression.  We will
# restore the box regression head but initialize the classification head
# from scratch (we show the omission below by commenting out the line that
# we would add if we wanted to restore both heads)
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    # _prediction_heads=detection_model._box_predictor._prediction_heads,
    #    (i.e., the classification head that we *will not* restore)
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )
fake_model = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial()

In [None]:
# Run model through a dummy image so that variables are created
image, shapes = detection_model.preprocess(tf.zeros([1, 1024, 1024, 3]))
prediction_dict = detection_model.predict(image, shapes)
_ = detection_model.postprocess(prediction_dict, shapes)
print('Weights restored!')

In [None]:
shapes

### Choose the layers to fine-tune

In [None]:
# Select variables in top layers to fine-tune.
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

In [None]:
[ft.name for ft in to_fine_tune]

### Training

In [None]:
from object_detection.builders import optimizer_builder

In [None]:
tf.keras.backend.set_learning_phase(True)

# set the batch_size
batch_size = train_config.batch_size // 2

# set the number of batches
epochs = 100
print(f'batch_size: {batch_size}')
print(f'num_train_samples: {len(train_images_np)}')

# Set the learning rate
# learning_rate = 0.01

# set the optimizer and pass in the learning_rate
# optimizer = tf.keras.optimizers.SGD(
#     learning_rate=learning_rate, momentum=0.9)

optimizer, (learning_rate,) = optimizer_builder.build(train_config.optimizer)

In [None]:
# decorate with @tf.function for faster training (remember, graph mode!)
@tf.function
def train_step_fn(image_list,
                groundtruth_boxes_list,
                groundtruth_classes_list,
                model,
                optimizer,
                vars_to_fine_tune):
    """A single training iteration.

    Args:
      image_list: A list of [1, height, width, 3] Tensor of type tf.float32.
        Note that the height and width can vary across images, as they are
        reshaped within this function to be 640x640.
      groundtruth_boxes_list: A list of Tensors of shape [N_i, 4] with type
        tf.float32 representing groundtruth boxes for each image in the batch.
      groundtruth_classes_list: A list of Tensors of shape [N_i, num_classes]
        with type tf.float32 representing groundtruth boxes for each image in
        the batch.

    Returns:
      A scalar tensor representing the total loss for the input batch.
    """

    with tf.GradientTape() as tape:
        # Preprocess the images
        preprocessed_image_list=[]
        true_shape_list=[]
        model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)

        for img in image_list:
          processed_img, true_shape = model.preprocess(img)
          preprocessed_image_list.append(processed_img)
          true_shape_list.append(true_shape)

        preprocessed_image_tensor = tf.concat(preprocessed_image_list, axis=0)
        true_shape_tensor = tf.concat(true_shape_list, axis=0) 

        # Make a prediction
        prediction_dict = model.predict(preprocessed_image_tensor, true_shape_tensor)

        # Calculate the total loss (sum of both losses)
        losses_dict = model.loss(prediction_dict, true_shape_tensor)
        total_loss=losses_dict['Loss/localization_loss']+losses_dict['Loss/classification_loss']
        # Calculate the gradients
        gradients = tape.gradient(total_loss,vars_to_fine_tune)
        optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
        # Optimize the model's selected variables

    return total_loss

In [None]:
print('Start fine-tuning!', flush=True)

for epoch in range(epochs):
  for idx in range(len(train_image_tensors) // batch_size):
      # Get the ground truth
      gt_boxes_list = gt_box_tensors[idx * batch_size : min(len(gt_box_tensors), (idx + 1) * batch_size)]
      gt_classes_list = gt_classes_one_hot_tensors[idx * batch_size : min(len(gt_classes_one_hot_tensors), (idx + 1) * batch_size)]
      
      # get the images
      image_tensors = train_image_tensors[idx * batch_size : min(len(train_image_tensors), (idx + 1) * batch_size)]

      # Training step (forward pass + backwards pass)
      total_loss = train_step_fn(image_tensors, 
                                gt_boxes_list, 
                                gt_classes_list,
                                detection_model,
                                optimizer,
                                to_fine_tune
                                )

      if idx % 2 == 0:
          print('epoch ' + str(epoch) +  ', batch ' + str(idx) + ' of ' + str(len(train_image_tensors) // batch_size)
          + ', loss=' +  str(total_loss.numpy()), flush=True)

print('Done fine-tuning!')

## Run Test

In [None]:
test_annot_csv = '/content/drive/MyDrive/DeepLogo2/data/flickr_logos_27_dataset_test_set_annotation_cropped.txt'
test_img_dir = 'flickr_logos_27_dataset/flickr_logos_27_dataset_images'
test_images_np = []
csvs = parse_csvs(test_annot_csv)
for csv in tqdm(csvs):
  img_fname = csv[0]
  image_path = os.path.join(test_img_dir, img_fname)
  test_images_np.append(np.expand_dims(
      load_image_into_numpy_array(image_path), axis=0)
  )

In [None]:
plt.figure(figsize=(30, 15))
for idx, test_image_np in enumerate(test_images_np[:5]):
  plt.subplot(2, 3, idx + 1)
  plt.imshow(test_image_np[0])
plt.show()

In [None]:
@tf.function
def detect(input_tensor):
    """Run detection on an input image.

    Args:
    input_tensor: A [1, height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.

    Returns:
    A dict containing 3 Tensors (`detection_boxes`, `detection_classes`,
      and `detection_scores`).
    """
    preprocessed_image, shapes = detection_model.preprocess(input_tensor)
    prediction_dict = detection_model.predict(preprocessed_image, shapes)
    
    # use the detection model's postprocess() method to get the the final detections
    detections = detection_model.postprocess(prediction_dict, shapes)
    
    return detections

In [None]:
label_id_offset = 1
image_np_with_annotations = []

for test_idx in range(len(test_images_np)):
  input_tensor = tf.convert_to_tensor(test_images_np[test_idx], dtype=np.float32)
  detections = detect(input_tensor)
  image_np_with_annotation = viz_utils.visualize_boxes_and_labels_on_image_array(
    test_images_np[test_idx][0], # [b, h, w, c]
    detections['detection_boxes'][0].numpy(), 
    detections['detection_classes'][0].numpy().astype(np.uint32) + label_id_offset, 
    detections['detection_scores'][0].numpy(), category_index,
    use_normalized_coordinates=True,
    min_score_thresh=0.6
  )
  image_np_with_annotations.append(image_np_with_annotation)

In [None]:
num_view_images = 5 if len(image_np_with_annotations) > 5 else len(image_np_with_annotations)
plt.figure(figsize=(30, 15))
for idx, image_np_with_annotation in enumerate(np.random.choice(image_np_with_annotations, num_view_images, replace=False)):
  plt.subplot(1, 5, idx + 1)
  plt.imshow(image_np_with_annotation)
plt.show()

### Save the model

In [None]:
checkpoint = tf.train.Checkpoint(detection_model)
checkpoint.save('/content/drive/MyDrive/DeepLogo2/model/DeepLogo2' + \
                '-' + model_name + '-epoch' + str(epochs) + '-batch' + str(batch_size))