# Proyecto de object detection: conducción autónoma


##1. Utilizando un dataset de object detection: Importando el dataset de conducción autónoma.

In [None]:
!pip install opencv-python-headless==4.1.2.30

In [None]:
import os
import pathlib
import random
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [None]:
!git clone https://github.com/tensorflow/models

In [None]:
%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.
cp object_detection/packages/tf2/setup.py .
python -m pip install .

In [None]:
!pip install tensorflow==2.7.0

In [None]:
from object_detection.utils import visualization_utils as viz_utils

In [None]:
import tensorflow as tf

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
!unzip "/content/drive/MyDrive/Cursos/object_detection/data/archive.zip"

In [None]:
%matplotlib inline
plt.figure(figsize=(20,20))
img = cv2.imread("images/1479506176491553178.jpg")
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.imshow(img)

# 2. Utilizando un dataset de object detection: Vizualización de dataset de condución autonoma

In [None]:
df = pd.read_csv("labels_train.csv")
df.head()

In [None]:
print(img.shape)

In [None]:
gt_boxes={}
for index, row in df.iterrows():
  if row['class_id'] == 3:
    bbox = np.array([[row['ymin']/300, row['xmin']/480, row['ymax']/300, row['xmax']/480]], dtype=np.float32)
    if row['frame'] in gt_boxes:
      gt_boxes[row['frame']] = np.append(gt_boxes[row['frame']], np.array(bbox), axis =0)
    else:
      gt_boxes[row['frame']] = np.array(bbox)

In [None]:
my_path = 'images/'
train_images_np = []

for image in gt_boxes.keys():
  im = cv2.imread(my_path + image)
  im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
  train_images_np.append(im)

In [None]:
pedestrain_class_id = 1
num_classes = 1

category_index = {
    num_classes: {
        'id' : pedestrain_class_id,
        'name': 'pedestrian'
    }
}

# 3. Utilizando un dataset de object detection: visualización final

In [None]:
def plot_detection(image_np, boxes, classes, scores, category_index):
  img_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      img_np_with_annotations,
      boxes,
      classes,
      scores,
      category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.5
  )
  plt.imshow(img_np_with_annotations)


plt.figure(figsize = (30,30))
for idx in range(0,20):
  bboxes = list(gt_boxes.values())[idx]
  dummy_score = np.ones(shape=[bboxes.shape[0]], dtype=np.float32)
  plt.subplot(5, 4, idx+1)
  plot_detection(
      train_images_np[idx],
      bboxes,
      np.ones(shape=[bboxes.shape[0]], dtype=np.int32),
      dummy_score,
      category_index
  )

plt.show()

# 4. Aumentado de datos con Albumentations

In [None]:
!pip install albumentations==1.2.0

In [None]:
import albumentations as A

In [None]:
transforms = A.Compose([
    A.HorizontalFlip(p=0.8),
    A.RandomBrightnessContrast(p=0.8),
], bbox_params = A.BboxParams(format="albumentations"))

In [None]:
plt.figure()
plt.imshow(train_images_np[0])

In [None]:
bbox = list(gt_boxes.values())[0][0]

In [None]:
print(bbox)

In [None]:
bbox = [bbox[1], bbox[0], bbox[3], bbox[2]]

In [None]:
bbox.append("pedestrian")

In [None]:
bbox = [bbox]

In [None]:
transforms_image_1 = transforms(image=train_images_np[0], bboxes = bbox)

In [None]:
transforms_image_2 = transforms(image=train_images_np[0], bboxes = bbox)
transforms_image_3 = transforms(image=train_images_np[0], bboxes = bbox)

In [None]:
plt.figure()

plot_detection(
    train_images_np[0], 
    np.array([list(gt_boxes.values())[0][0]]),
    np.ones(shape=[1], dtype=np.int32),
    dummy_score,
    category_index
)

In [None]:
transformed_bbox_1 = (list(transforms_image_1['bboxes'][0])[:-1])

In [None]:
transformed_bbox_1 = [transformed_bbox_1[1], transformed_bbox_1[0], transformed_bbox_1[3], transformed_bbox_1[2]]

In [None]:
plt.figure()
plot_detection(
    transforms_image_1["image"],
    np.array([transformed_bbox_1]),
    np.ones(shape=[1], dtype=np.int32),
    dummy_score,
    category_index
)

In [None]:
transformed_bbox_2 = (list(transforms_image_2['bboxes'][0])[:-1])
transformed_bbox_2 = [transformed_bbox_2[1], transformed_bbox_2[0], transformed_bbox_2[3], transformed_bbox_2[2]]
plt.figure()
plot_detection(
    transforms_image_2["image"],
    np.array([transformed_bbox_1]),
    np.ones(shape=[1], dtype=np.int32),
    dummy_score,
    category_index
)

In [None]:
transformed_bbox_3 = (list(transforms_image_3['bboxes'][0])[:-1])
transformed_bbox_3 = [transformed_bbox_3[1], transformed_bbox_3[0], transformed_bbox_3[3], transformed_bbox_3[2]]
plt.figure()
plot_detection(
    transforms_image_3["image"],
    np.array([transformed_bbox_1]),
    np.ones(shape=[1], dtype=np.int32),
    dummy_score,
    category_index
)

# 5. Model pre-entrenado

In [None]:
!wget  http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

In [None]:
from object_detection.utils import config_util
from object_detection.builders import model_builder

In [None]:
model_name = "ssd_resnet50_v1_fpn_640x640_coco17_tpu-8"
pipeline_config = os.path.join("models/research/object_detection/configs/tf2/" + model_name +".config")
model_dir = "models/research/object_detection/test_data/checkpoint/"

In [None]:
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs["model"] 
detection_model = model_builder.build(model_config = model_config, is_training=False)

In [None]:
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(os.path.join(model_dir, "ckpt-0")).expect_partial()

In [None]:
def detect_fn(model):

  @tf.function
  def detect_fn(image):
    image, shape = model.preprocess(image) # preprocesado, risize de la imagen
    prediciton_dict = model.predict(image, shape) # Predicción, imagen, shape y diccionario de prediciones
    detections = model.postprocess(prediciton_dict, shape) # un postprocesado
    return detections
  
  return detect_fn

detect_fn = detect_fn(detection_model)

In [None]:
from object_detection.utils import label_map_util

label_map_path = configs["eval_input_config"].label_map_path
label_map_path = "models/research/object_detection/data/mscoco_label_map.pbtxt"

label_map = label_map_util.load_labelmap(label_map_path)

categories = label_map_util.convert_label_map_to_categories(
    label_map,
    max_num_classes=label_map_util.get_max_label_map_index(label_map),
    use_display_name=True

)

category_index = label_map_util.create_category_index(categories)
label_map_dict = label_map_util.get_label_map_dict(label_map, use_display_name=True)

In [None]:
%matplotlib inline

img = cv2.imread('images/1479506176491553178.jpg')
image_np = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

In [None]:
input_tensor = tf.convert_to_tensor(
    np.expand_dims(image_np, 0),
    dtype=tf.float32
)

detection = detect_fn(input_tensor)

In [None]:
label_id_offset = 1
image_np_with_detections = image_np.copy()

viz_utils.visualize_boxes_and_labels_on_image_array(
    image_np_with_detections,
    detection["detection_boxes"][0].numpy(),
    (detection["detection_classes"][0].numpy() + label_id_offset).astype(int),
    detection["detection_scores"][0].numpy(),
    category_index,
    use_normalized_coordinates=True,
    min_score_thresh=0.7
)
plt.figure(figsize=(12,16))
plt.imshow(image_np_with_detections)
plt.show()

# 6. fine-tuning objecct detection

In [None]:
from object_detection.utils import config_util
from object_detection.builders import model_builder

In [None]:
num_classes = 1
pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config'
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

In [None]:
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']

In [None]:
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(model_config=model_config, is_training=True)

In [None]:
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads = detection_model._box_predictor._base_tower_layers_for_heads,
    _box_prediction_head=detection_model._box_predictor._box_prediction_head
)

In [None]:
# Solo vamos a estar el entrenamiento en la parte de clasificación
fake_model = tf.compat.v2.train.Checkpoint(
    _feature_extractor=detection_model.feature_extractor,
    _box_predictor=fake_box_predictor
)

In [None]:
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial()

In [None]:
image, shape = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))

In [None]:
predicition_dict = detection_model.predict(image, shape)

In [None]:
_ = detection_model.postprocess(predicition_dict, shape)

In [None]:
df = pd.read_csv('labels_train.csv')
df

In [None]:
gt_boxes = []
images_added = []
images_multi_bbox = set()

for index, row in df.iterrows():
  if row['frame'] not in images_added and row['class_id'] == 3:
    images_added.append(row['frame'])
    bbox = np.array([[row['ymin']/300, row['xmin']/480, row['ymax']/300, row['xmax']/480]], dtype=np.float32)
    gt_boxes.append(bbox)
  else:
    images_multi_bbox.add(row['frame'])


In [None]:
id_x_to_delete = []
for x, image in enumerate(images_added):
  if image in images_multi_bbox:
    id_x_to_delete.append(x)

In [None]:
new_gt_boxes = []
new_images_added = []
for x, gt_box in enumerate(gt_boxes):
  if x not in  id_x_to_delete:
    new_gt_boxes.append(gt_box)
    
    

In [None]:
for x, image_added in enumerate(images_added):
  if x not in id_x_to_delete:
    new_images_added.append(image_added)

In [None]:
gt_boxes = new_gt_boxes
images_added = new_images_added

In [None]:
my_path = 'images/'
train_images_np = []
print(len(images_added))
for image in images_added:
  im = cv2.imread(my_path+ image)
  im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
  train_images_np.append(im)

In [None]:
label_id_offset = 1
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []
for (train_image_np, gt_box_np) in zip(train_images_np, gt_boxes):
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(train_image_np, dtype=tf.float32), axis=0))
  gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32))
  zero_indexed_ground_truth_classes = tf.convert_to_tensor(np.ones([gt_box_np.shape[0]], dtype=np.int32)-label_id_offset)
  gt_classes_one_hot_tensors.append(tf.one_hot(zero_indexed_ground_truth_classes, num_classes))

In [None]:
tf.keras.backend.set_learning_phase(True)

In [None]:
batch_size = 10
learning_rate= 0.01
num_batches = 200

In [None]:
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead'                 
]


for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

In [None]:
def get_model_train_step_function(model, optimizer, vars_to_fine_tune):

  @tf.function
  def train_step_fn(image_tensors, 
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list
    )

    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
          for image_tensor in image_tensors], axis=0
      )
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))

    return total_loss

  return train_step_fn


In [None]:
optimizer = tf.keras.optimizers.SGD(learning_rate, momentum=0.9)
train_step_fn = get_model_train_step_function(detection_model, optimizer, to_fine_tune)

In [None]:
def process_data_augmentation(image_list, image_tensors, gt_boxes_list):
  new_train_image_tensors = []
  new_gt_box_tensors = []
  for x, image in enumerate(image_list):
    # Preparando los datos para el bounding box
    bbox = [gt_boxes_list[x].numpy()[0][1] , gt_boxes_list[x].numpy()[0][0], gt_boxes_list[x].numpy()[0][3], gt_boxes_list[x].numpy()[0][2]]
    bbox.append("pedestrian")
    bbox = [bbox]

    # Creando los data augmentation
    trasformed_image_1 = transforms(image = image, bboxes = bbox)
    # Organizamos los datos
    transformed_bbox = (list(trasformed_image_1["bboxes"][0])[:-1])
    transformed_bbox = [transformed_bbox[1], transformed_bbox[0], transformed_bbox[3], transformed_bbox[2]]
    gt_boxes_list[x] =  transformed_bbox
    # convertimos todos en datos
    new_train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(trasformed_image_1["image"], dtype=tf.float32), axis = 0))
    transformed_bbox = np.array([transformed_bbox])
    new_gt_box_tensors.append(tf.convert_to_tensor(transformed_bbox, dtype=tf.float32))
  
  return new_train_image_tensors, new_gt_box_tensors

In [None]:
for idx in range(num_batches):
  # Se barajea los datos
  all_keys = list(range(len(train_images_np)))
  random.shuffle(all_keys)
  example_keys = all_keys[:batch_size]

  gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
  gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
  image_tensors = [train_image_tensors[key] for key in example_keys]
  image_list = [train_images_np[key] for key in example_keys]

  # Hacemos data agumentation
  new_train_image_tensors , new_gt_box_tensors = process_data_augmentation(image_list, image_tensors, gt_boxes_list)

  images_tensors = new_train_image_tensors
  gt_boxes_list = new_gt_box_tensors


  # Calculamos el error
  total_loss = train_step_fn(images_tensors, gt_boxes_list, gt_classes_list)

  if idx%10 == 0:
    print("batch " + str(idx) + ' of ' + str(num_batches) +  ', loss= ' + str(total_loss), flush=True)


In [None]:
def detect(input_tensor):
  preprocessed_image, shapes = detection_model.preprocess(input_tensor)
  predict_dict = detection_model.predict(preprocessed_image, shapes)
  return detection_model.postprocess(predict_dict, shapes)
  

In [None]:
detect(images_tensors[0])["detection_scores"]

In [None]:
pedestrian_class_id = 1
num_classes = 1

category_index = {
    num_classes :{
        "id" : pedestrain_class_id,
        "name": "pedestrian"
    }
}

In [None]:
img = cv2.imread('images/1478900584619750605.jpg')
image_np = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

In [None]:
input_tensor = tf.convert_to_tensor(
    np.expand_dims(image_np, 0),
    dtype = tf.float32
)

In [None]:
detections = detect(input_tensor)

In [None]:
detections.keys()

In [None]:
detections["detection_scores"]

In [None]:
label_id_offset = 1
image_np_with_detections = image_np.copy()

viz_utils.visualize_boxes_and_labels_on_image_array(
    image_np_with_detections,
    detections['detection_boxes'][0].numpy(),
    (detections['detection_classes'][0].numpy() + label_id_offset).astype(int),
    detections['detection_scores'][0].numpy(),
    category_index,
    use_normalized_coordinates=True,
    min_score_thresh=0.7
)

plt.figure(figsize=(12, 16))
plt.imshow(image_np_with_detections)
plt.show()