In [None]:
import numpy as np 
import pandas as pd 
import tensorflow as tf

import matplotlib
import matplotlib.pyplot as plt

import os
import random
import zipfile
import io
import scipy.misc
import numpy as np

import glob
import imageio
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage


In [None]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
BOUNDING_BOXES_PATH = '../input/car-object-detection/data/train_solution_bounding_boxes (1).csv'
TRAINING_IMAGES_PATH = '../input/car-object-detection/data/training_images'
TESTING_IMAGES_PATH = '../input/car-object-detection/data/testing_images'

### Load Bounding boxes

In [None]:
im_height = 380
im_width = 676
num_classes = 1

In [None]:
bboxes_df = pd.read_csv(BOUNDING_BOXES_PATH)
bboxes_df.sort_values(by='image', inplace = True)
bboxes_df.reset_index(drop = True, inplace = True)
bboxes_df.head()

In [None]:
print('{} train images with {} car bounding boxes.'.format(len(bboxes_df['image'].unique()), bboxes_df.shape[0]))

In [None]:
# for simplicity we will use only the first object detected in each image
gt_boxes_np = []
for name,xmin,ymin,xmax,ymax in bboxes_df.drop_duplicates(subset='image', keep='first').values:
    gt_boxes_np.append(np.array([[ymin/im_height, xmin/im_width, ymax/im_height, xmax/im_width]]))
    
# convert to list of tensor objects
gt_boxes_tensors = []
for gt_box in gt_boxes_np:
    gt_boxes_tensors.append(tf.convert_to_tensor(gt_box,dtype = tf.float32))

### Install object detection API

In [None]:
!git clone --depth 1 https://github.com/tensorflow/models/

In [None]:
!cd models/research/ && protoc object_detection/protos/*.proto --python_out=. && cp object_detection/packages/tf2/setup.py . && python -m pip install .

In [None]:
from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder

### Load train images

In [None]:
%matplotlib inline

def load_image_in_array(path):
    img_data = tf.io.gfile.GFile(path, 'rb').read()
    image = Image.open(BytesIO(img_data))
    im_width, im_height = image.size
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)

def plot_detections(image, bboxes, classes, scores, category_index, use_normalized_coordinates=True, min_score_thresh=0.8):
    """ to vizualise images with bounding boxes"""
    image_with_annotations = image.copy()
    viz_utils.visualize_boxes_and_labels_on_image_array(image_with_annotations,
                                                       bboxes,
                                                       classes,
                                                       scores,
                                                       category_index,
                                                       use_normalized_coordinates=use_normalized_coordinates,
                                                       min_score_thresh=min_score_thresh)
    plt.imshow(image_with_annotations)
    

In [None]:
# create category dictionary
car_class_id = 1

category_index = {car_class_id:{
    'id':1,
    'name':'car'
}}
category_index[car_class_id]

In [None]:
# Let's plot the 6 first images with their bounding boxes
plt.figure(figsize = (30,15))
for idx in range(6):
    plt.subplot(2,3,idx + 1)
    
    filename = bboxes_df.image[idx]
    filepath = TRAINING_IMAGES_PATH + '/' + filename
    image_np = load_image_in_array(filepath)
    
    plot_detections(image_np,
                gt_boxes_np[idx],
                np.ones(shape=[gt_boxes_np[idx].shape[0]], dtype=np.int32),
                np.array([1.0], dtype=np.float32), 
                category_index)

In [None]:
bboxes_df.head(6)

In [None]:
def load_train_images(folder_path, names, num_of_images = 20):
    images_list = []
    for i, im_name in enumerate(bboxes_df.image.unique()[0:num_of_images]):
        IM_PATH = folder_path + '/'+ im_name
        image = load_image_in_array(IM_PATH)
        images_list.append(image)
        print (i,'of', num_of_images, ':', im_name)
    return images_list    

In [None]:
num_of_images_for_training = 352
train_images_np = load_train_images(TRAINING_IMAGES_PATH, gt_boxes_np, num_of_images_for_training)

Download the checkpoint and put it into models/research/object_detection/test_data/

In [None]:
!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

In [None]:
CONFIG_FILE_PATH = './models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config'
CHECKPOINT_PATH = './models/research/object_detection/test_data/checkpoint/ckpt-0'

In [None]:
# Let's load the config file and adjust some parameters

config = config_util.get_configs_from_pipeline_file(CONFIG_FILE_PATH)
model_config = config['model']
model_config

In [None]:
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
model_config

In [None]:
car_detection_model = model_builder.build(model_config = model_config, is_training = True)

In [None]:
# RetinaNet has two prediction `heads` --- one for classification, the other for box regression.  We will
# restore the box regression head but initialize the classification head from scratch

tmp_box_predictor = tf.train.Checkpoint(
    _base_tower_layers_for_heads=car_detection_model._box_predictor._base_tower_layers_for_heads,
    _box_prediction_head=car_detection_model._box_predictor._box_prediction_head,
    )
tmp_model = tf.train.Checkpoint(
          _feature_extractor=car_detection_model._feature_extractor,
          _box_predictor=tmp_box_predictor)

ckpt = tf.train.Checkpoint(model=tmp_model)
ckpt.restore(CHECKPOINT_PATH).expect_partial()

In [None]:
# We run the model through a dummy image so that variables are created
image, shapes = car_detection_model.preprocess(tf.zeros([1, im_height, im_width, 3]))
prediction_dict = car_detection_model.predict(image, shapes)
_ = car_detection_model.postprocess(prediction_dict, shapes)
print('Weights restored!')

### Define a custom training loop

In [None]:
@tf.function
def train_step_fn(model, optimizer, vars_to_fine_tune, image_tensors, gt_boxes_list, gt_classes_list):
    
    shapes = tf.constant(BATCH_SIZE * [[im_height, im_width, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=gt_boxes_list,
        groundtruth_classes_list=gt_classes_list)
    
    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat([model.preprocess(image_tensor)[0]
           for image_tensor in image_tensors], axis=0)
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    
    return total_loss 
    

In [None]:
# Select variables in top layers to fine-tune.
trainable_variables = car_detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

In [None]:
# CREATE TENSOR OBJECT FOR TRAINING
# for ground truth boxes
gt_box_tensors = gt_boxes_tensors[:num_of_images_for_training]

# for ground truth classes 
label_id_offset = 1
gt_classes_one_hot_tensors = []
for gt_box_np in gt_boxes_np:
    zero_indexed_groundtruth_classes = tf.convert_to_tensor(np.ones(shape=[gt_box_np.shape[0]], dtype=np.int32) - label_id_offset)
    gt_classes_one_hot_tensors.append(tf.one_hot(zero_indexed_groundtruth_classes, num_classes))

# and for train images
train_image_tensors = []
for train_image_np in train_images_np:
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(
      train_image_np, dtype=tf.float32), axis=0))

### Fine tune the model

In [None]:
BATCH_SIZE = 16
learning_rate = 0.01
optimizer = tf.keras.optimizers.SGD(learning_rate = learning_rate, momentum = .9)
num_of_batches = int(num_of_images_for_training / BATCH_SIZE)
epochs = 200

In [None]:
import time

print('start fine tuning')
for epoch in range(epochs):
    start_time = time.time()
    epoch_avg_loss = 0
    random_positions = list(range(num_of_images_for_training))
    random.shuffle(random_positions)
    batches_list = [random_positions[i:i+BATCH_SIZE-1] for i in range(0, len(random_positions), BATCH_SIZE-1)] 
    
    for batch_pos in batches_list:
        gt_boxes_list = [gt_box_tensors[key] for key in batch_pos]
        gt_classes_list = [gt_classes_one_hot_tensors[key] for key in batch_pos]
        image_tensors = [train_image_tensors[key] for key in batch_pos]

        total_loss = train_step_fn(car_detection_model, optimizer, to_fine_tune, image_tensors, gt_boxes_list, gt_classes_list)
        epoch_avg_loss += total_loss
    epoch_avg_loss /= int(num_of_images_for_training / BATCH_SIZE)  
    #if epoch%10 == 0:
    print('epoch ' + str(epoch) + ' of ' + str(epochs) + ', loss=' +  str(epoch_avg_loss.numpy()),
          ', epoch train time=',np.round(time.time() - start_time, 2),
          'sec, estimated remaining time=', np.round((time.time() - start_time)*(epochs - epoch) / 60, 2),'mins',
          flush=True)
        
print('finish fine tuning')

In [None]:
@tf.function
def detect(input_tensor):
    
    expanded_tensor = tf.expand_dims(input_tensor,axis=0)
    
    preprocessed_image, shapes = car_detection_model.preprocess(expanded_tensor)
    prediction_dict = car_detection_model.predict(preprocessed_image, shapes)
    return car_detection_model.postprocess(prediction_dict, shapes)


In [None]:
def get_IOU(gt_box, pred_box):
    
    # 1.get the coordinate of inters
    ixmin = max(pred_box[0], gt_box[0])
    ixmax = min(pred_box[2], gt_box[2])
    iymin = max(pred_box[1], gt_box[1])
    iymax = min(pred_box[3], gt_box[3])

    iw = np.maximum(ixmax-ixmin+1., 0.)
    ih = np.maximum(iymax-iymin+1., 0.)

    # 2. calculate the area of inters
    inters = iw*ih

    # 3. calculate the area of union
    uni = ((pred_box[2]-pred_box[0]+1.) * (pred_box[3]-pred_box[1]+1.) +
           (gt_box[2] - gt_box[0] + 1.) * (gt_box[3] - gt_box[1] + 1.) -
           inters)

    # 4. calculate the overlaps between pred_box and gt_box
    iou = inters / uni

    return iou


In [None]:
test_im = 150
filename = bboxes_df.image[test_im]
filepath = TESTING_IMAGES_PATH + '/' + 'vid_5_26800.jpg'
test_image = load_image_in_array(filepath)

input_tensor = tf.convert_to_tensor(test_image, dtype=tf.float32)
detections = detect(input_tensor)

plt.figure(figsize = (30,15))
plot_detections(test_image, 
                detections['detection_boxes'][0].numpy(),
                detections['detection_classes'][0].numpy().astype(np.uint32) + label_id_offset,
                detections['detection_scores'][0].numpy(),
                category_index,
                True, 0.6)

In [None]:
# lets calculate the IOU score for an image with known bounding boxes with a single car
train_im = 5
filename = bboxes_df.image[train_im]
filepath = TRAINING_IMAGES_PATH + '/' + filename
print('file:',filename)
train_image = load_image_in_array(filepath)

input_tensor = tf.convert_to_tensor(train_image, dtype=tf.float32)
detections = detect(input_tensor)

#plot detections
plt.figure(figsize = (30,15))
plot_detections(train_image, 
                detections['detection_boxes'][0].numpy(),
                detections['detection_classes'][0].numpy().astype(np.uint32) + label_id_offset,
                detections['detection_scores'][0].numpy(),
                category_index,
                True, 0.6)

#calculate IOU
detected_bb = detections['detection_boxes'][0].numpy()[0]
gt_bb = np.array([bboxes_df.iloc[train_im].ymin/im_height, 
                   bboxes_df.iloc[train_im].xmin/im_width,
                   bboxes_df.iloc[train_im].ymax/im_height,
                   bboxes_df.iloc[train_im].xmax/im_width])
IOU = get_IOU(detected_bb, gt_bb)
print('IOU score: ', IOU)