<a href="https://colab.research.google.com/github/Akilesh1989/tiger-object-detection/blob/main/Object_Detection_Tiger_Detector_with_TF2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Install libraries

In [None]:
!pip install tf_slim

# Import libraries

In [None]:
import os
from google.colab import drive

# Constants

In [None]:
my_gdrive_path = "/gdrive/My Drive"
project_folder = f"{my_gdrive_path}/Projects"
WORKSPACE_NAME = "TIGER_DETECTOR_TF2"
DIR_PATH = f"{project_folder}/{WORKSPACE_NAME}"
MODEL_PATH = f"{DIR_PATH}/models"
RESEARCH_PATH = f"{MODEL_PATH}/research"
ANNOTATIONS_DIR = f"{DIR_PATH}/annotations"

# Mount GDrive

In [None]:
drive.mount('/gdrive')

# Creating a workspace

In [None]:
if not os.path.exists(DIR_PATH):
  os.makedirs(DIR_PATH)

os.listdir(project_folder)

# Installing tensorflow

In [None]:
import tensorflow as tf
tf.__version__

# Cloning Object Detection Models from Github

In [None]:
clone_command = "git clone https://github.com/tensorflow/models.git"

os.chdir(DIR_PATH)
!git clone https://github.com/tensorflow/models.git

## Testing a File in Cloned Object Detection File 

In [None]:
os.chdir(RESEARCH_PATH)
!protoc object_detection/protos/*.proto --python_out=.

In [None]:
SLIM_PATH = f"{RESEARCH_PATH}/slim"

os.environ['PYTHONPATH'] += f':{MODEL_PATH}:{RESEARCH_PATH}:{SLIM_PATH}'
%cp object_detection/packages/tf2/setup.py .

In [None]:
%ls

In [None]:
# !python -m pip install .
!pip install -e .

In [None]:
  !python object_detection/builders/model_builder_tf2_test.py

# Creating the necessary folders

In [None]:
folders_to_create = ['annotations', 'exported-models', 'pre-trained-models', 'models/my_mobilenet']
os.chdir(DIR_PATH)
for folder in folders_to_create:
  if not os.path.exists(folder):
    os.makedirs(folder)

# Getting those images in

In [None]:
IMAGES_DIR = f"{DIR_PATH}/images"
if not os.path.exists(IMAGES_DIR):
  os.makedirs(IMAGES_DIR)
# upload all you images with their annotations here. You don't have to split
# them into training and test set. We'll do it in the Colab notebook

In [None]:
TRAIN_DIR = f"{IMAGES_DIR}/train"
TEST_DIR = f"{IMAGES_DIR}/test"

if not os.path.exists(TRAIN_DIR):
  os.makedirs(TRAIN_DIR)

if not os.path.exists(TEST_DIR):
  os.makedirs(TEST_DIR)

In [None]:
import shutil
# First move all files to the main dir
if len(os.listdir(TRAIN_DIR)) > 0:
  for file_name in os.listdir(TRAIN_DIR):
    if not file_name in os.listdir(IMAGES_DIR):
      shutil.move(os.path.join(TRAIN_DIR, file_name), IMAGES_DIR)

if len(os.listdir(TEST_DIR)) > 0:
  for file_name in os.listdir(TEST_DIR):
    if not file_name in os.listdir(IMAGES_DIR):
      shutil.move(os.path.join(TEST_DIR, file_name), IMAGES_DIR)

In [None]:
SPLIT_SIZE = 0.9 # percent of train test split

# num_train_files = len(os.listdir(IMAGES_DIR)) * SPLIT_SIZE
extensions = (".jpg", ".png", ".jpeg")
all_files = [file for file in os.listdir(IMAGES_DIR) if file.endswith(extensions)]

# Check if all the file names are unique. This is very crucial so that we don't end
# up with duplicates
if len(all_files) != len(set(all_files)):
  print("DUPLICATE FILE NAMES. FIX IT")

In [None]:
import random
import shutil

num_train_files = int(len(all_files) * SPLIT_SIZE)
train_list = random.sample(all_files, num_train_files)
len(train_list)

# Create test and train data

## Move files to train dir

In [None]:
for file_name in train_list:
  for extension in extensions:
    if extension in file_name:
      xml_file_name = file_name.replace(extension, '') + '.xml'
      if xml_file_name in os.listdir(IMAGES_DIR):
        shutil.move(os.path.join(IMAGES_DIR, file_name), TRAIN_DIR)
        shutil.move(os.path.join(IMAGES_DIR, xml_file_name), TRAIN_DIR)

print(len(os.listdir(TRAIN_DIR)))

## Move files to test dir

In [None]:
extensions = (".jpg", ".png", ".jpeg")
all_test_files = [file for file in os.listdir(IMAGES_DIR) if os.path.isfile(os.path.join(IMAGES_DIR, file))]

for file_name in all_test_files:
  shutil.move(os.path.join(IMAGES_DIR, file_name), TEST_DIR)

# Converting XML Files to CSV Files

In [None]:
import os
import glob
import pandas as pd
import xml.etree.ElementTree as ET


def xml_to_csv(path):
    xml_list = []
    for xml_file in glob.glob(path + '/*.xml'):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        for member in root.findall('object'):
            value = (root.find('filename').text,
                     int(root.find('size')[0].text),
                     int(root.find('size')[1].text),
                     member[0].text,
                     int(member[4][0].text),
                     int(member[4][1].text),
                     int(member[4][2].text),
                     int(member[4][3].text)
                     )
            xml_list.append(value)
    column_name = ['filename', 'width', 'height', 'class', 'xmin', 'ymin', 'xmax', 'ymax']
    xml_df = pd.DataFrame(xml_list, columns=column_name)
    return xml_df


def main():
  # create annotations dir
  if not os.path.exists(ANNOTATIONS_DIR):
    os.makedirs(ANNOTATIONS_DIR)
  
  image_path = os.path.join(os.getcwd(), TRAIN_DIR)
  xml_df = xml_to_csv(image_path)
  xml_df.to_csv(f'{ANNOTATIONS_DIR}/train_labels.csv', index=None)
  
  image_path = os.path.join(os.getcwd(), TEST_DIR)
  xml_df = xml_to_csv(image_path)
  xml_df.to_csv(f'{ANNOTATIONS_DIR}/test_labels.csv',index=None)

main()

# Creating TF Record

In [None]:
os.chdir(RESEARCH_PATH)
!protoc object_detection/protos/*.proto --python_out=.

In [None]:
!python setup.py build

In [None]:
os.environ['PYTHONPATH'] += f':{RESEARCH_PATH}/slim'
os.environ['PYTHONPATH'] += f':{RESEARCH_PATH}/object_detection/utils/:{RESEARCH_PATH}/object_detection'

In [None]:
!python object_detection/builders/model_builder_test.py

In [None]:
train_csv_path = f"{ANNOTATIONS_DIR}/train_labels.csv"
train_output_path = f"{ANNOTATIONS_DIR}/train.tfrecord"
train_images_dir = f"{DIR_PATH}/images/train/"

test_csv_path = f"{ANNOTATIONS_DIR}/test_labels.csv"
test_output_path = f"{ANNOTATIONS_DIR}/test.tfrecord"
test_images_dir = f"{DIR_PATH}/images/test/"

In [None]:
LABEL = "Tiger"

In [None]:
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import os
import io
import pandas as pd
import tensorflow as tf

from PIL import Image
from object_detection.utils import dataset_util
from collections import namedtuple, OrderedDict


# TO-DO replace this with label map
def class_text_to_int(row_label):
    if row_label == LABEL:
        return 1
    else:
        None


def split(df, group):
    data = namedtuple('data', ['filename', 'object'])
    gb = df.groupby(group)
    return [data(filename, gb.get_group(x)) for filename, x in zip(gb.groups.keys(), gb.groups)]


def create_tf_example(group, path):
    with tf.io.gfile.GFile(os.path.join(path, '{}'.format(group.filename)), 'rb') as fid:
        encoded_jpg = fid.read()
    encoded_jpg_io = io.BytesIO(encoded_jpg)
    image = Image.open(encoded_jpg_io)
    width, height = image.size

    filename = group.filename.encode('utf8')
    image_format = b'jpg'
    xmins = []
    xmaxs = []
    ymins = []
    ymaxs = []
    classes_text = []
    classes = []

    for index, row in group.object.iterrows():
        xmins.append(row['xmin'] / width)
        xmaxs.append(row['xmax'] / width)
        ymins.append(row['ymin'] / height)
        ymaxs.append(row['ymax'] / height)
        classes_text.append(row['class'].encode('utf8'))
        classes.append(class_text_to_int(row['class']))

    tf_example = tf.train.Example(features=tf.train.Features(feature={
        'image/height': dataset_util.int64_feature(height),
        'image/width': dataset_util.int64_feature(width),
        'image/filename': dataset_util.bytes_feature(filename),
        'image/source_id': dataset_util.bytes_feature(filename),
        'image/encoded': dataset_util.bytes_feature(encoded_jpg),
        'image/format': dataset_util.bytes_feature(image_format),
        'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),
        'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),
        'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),
        'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),
        'image/object/class/text': dataset_util.bytes_list_feature(classes_text),
        'image/object/class/label': dataset_util.int64_list_feature(classes),
    }))
    return tf_example


def convert_to_tfrecord(output_path, image_dir, csv_input):
    writer = tf.compat.v1.python_io.TFRecordWriter(output_path)
    path = os.path.join(image_dir)
    examples = pd.read_csv(csv_input)
    grouped = split(examples, 'filename')
    for group in grouped:
        tf_example = create_tf_example(group, path)
        writer.write(tf_example.SerializeToString())

    writer.close()
    print('Successfully created the TFRecords: {}'.format(output_path))


In [None]:
convert_to_tfrecord(train_output_path, train_images_dir, train_csv_path)
convert_to_tfrecord(test_output_path, test_images_dir, test_csv_path)

# Downloading Pre-trained Models

In [None]:
os.chdir(f"{DIR_PATH}/pre-trained-models")

In [None]:
!curl "http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz" --output "ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz"

In [None]:
import tarfile
model_name = 'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8'
model_file = model_name + '.tar.gz'
tar = tarfile.open(model_file)
tar.extractall()
tar.close()
os.remove(model_file)

In [None]:
os.listdir(DIR_PATH)

In [None]:
%ls

In [None]:
os.chdir(DIR_PATH)
%ls pre-trained-models/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/

# Add labels

In [None]:
os.chdir(ANNOTATIONS_DIR)
!touch label_map.pbtxt
label_string = """
item {
  id: 1
  name: 'Tiger'
}
"""
with open(os.path.join(ANNOTATIONS_DIR, 'label_map.pbtxt'), 'w') as f:
  f.writelines(label_string)
f.close()

!cat label_map.pbtxt

# Modifying the Config File

In [None]:
pipeline_config_path = f"{DIR_PATH}/pre-trained-models/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/pipeline.config"

In [None]:
import tensorflow as tf
from google.protobuf import text_format
from object_detection.protos import pipeline_pb2

def read_config():
    pipeline = pipeline_pb2.TrainEvalPipelineConfig()                                                                                                                                                                                                          
    with tf.io.gfile.GFile(pipeline_config_path, "r") as f:                                                                                                                                                                                                                     
        proto_str = f.read()                                                                                                                                                                                                                                          
        text_format.Merge(proto_str, pipeline)
    return pipeline

def write_config(pipeline):
    config_text = text_format.MessageToString(pipeline)                                                                                                                                                                                                        
    with tf.io.gfile.GFile(pipeline_config_path, "wb") as f:                                                                                                                                                                                                                       
        f.write(config_text)

def modify_config(pipeline):
    pipeline.model.ssd.num_classes = 1
    pipeline.train_config.fine_tune_checkpoint_type = 'detection'
    pipeline.train_config.batch_size = 8
    pipeline.train_config.fine_tune_checkpoint = "pre-trained-models/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint/ckpt-0"
    pipeline.train_config.num_steps = 25000

    # pipeline.train_input_reader.label_map_path = f"{ANNOTATIONS_DIR}/label_map.pbtxt"
    pipeline.train_input_reader.label_map_path = f"annotations/label_map.pbtxt"
    pipeline.train_input_reader.tf_record_input_reader.input_path[0] = f"annotations/train.tfrecord"

    pipeline.eval_input_reader[0].label_map_path = f"annotations/label_map.pbtxt"
    pipeline.eval_input_reader[0].tf_record_input_reader.input_path[0] = f"annotations/test.tfrecord"

    return pipeline


def setup_pipeline():
    pipeline = read_config()
    pipeline = modify_config(pipeline)
    write_config(pipeline)
    # print(pipeline)

setup_pipeline()

Modify the file manually by opening the file in the side bar and double clicking on it.

# Tensorboard installation

In [None]:
%load_ext tensorboard
%tensorboard --logdir 'models/my_mobilenet'

# Training

In [None]:
os.chdir(DIR_PATH)

In [None]:
!cp 'models/research/object_detection/model_main_tf2.py' .

In [None]:
!python model_main_tf2.py \
    --model_dir=models/my_mobilenet \
    --pipeline_config_path=pre-trained-models/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/pipeline.config

# Evaluating the model

In [None]:
from google.colab import drive
drive.mount("/gdrive", force_remount=True)

In [None]:
os.listdir("pre-trained-models")

In [None]:
os.chdir(DIR_PATH)
!python model_main_tf2.py --model_dir=exported-models/checkpoint --pipeline_config_path=pre-trained-models/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/pipeline.config --checkpoint_dir=models/my_mobilenet/checkpoint # The folder where the model has saved the checkpoints during training

# Exporting Trained Model

In [None]:
import numpy as np
import re

os.chdir(DIR_PATH)
output_directory = 'exported-models/'

# goes through the model is the training/ dir and gets the last one.
# you could choose a specfic one instead of the last
lst = os.listdir("models/my_mobilenet/")
lst = [l for l in lst if 'ckpt-' in l and '.index' not in l]
steps=np.array([int(re.findall('\d+', l)[0]) for l in lst])
last_model = lst[steps.argmax()]
last_model_path = os.path.join('models/my_mobilenet', last_model)
print(last_model_path)

In [None]:
os.chdir(DIR_PATH)
!python models/research/object_detection/exporter_main_v2.py \
--input_type=image_tensor \
--pipeline_config_path=pre-trained-models/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/pipeline.config \
--output_directory=exported-models \
--trained_checkpoint_dir=models/my_mobilenet

# Detection

In [None]:
os.listdir(f"{DIR_PATH}/images/train")

In [None]:
os.chdir(RESEARCH_PATH)
!cp object_detection/utils/visualization_utils.py .
!cp object_detection/utils/label_map_util.py .

import numpy as np
from PIL import Image
from google.colab.patches import cv2_imshow
import tensorflow as tf
import label_map_util as label_map_util
import visualization_utils as viz_utils


def load_image_into_numpy_array(path):
    """Load an image from file into a numpy array.
    Puts image into numpy array to feed into tensorflow graph.
    Note that by convention we put it into a numpy array with shape
    (height, width, channels), where channels=3 for RGB.
    Args:
      path: the file path to the image
    Returns:
      uint8 numpy array with shape (img_height, img_width, 3)
    """
    return np.array(Image.open(path))


SAVED_MODEL_PATH = f"{DIR_PATH}/exported-models/saved_model/"
detect_fn = tf.saved_model.load(SAVED_MODEL_PATH)


In [None]:
for image in os.listdir(f"{DIR_PATH}/test_images"):
  image_path = f"{DIR_PATH}/test_images/{image}"
  # image_path = /elephant.jpeg"
  print('Running inference for {}... '.format(image_path), end='')

  image_np = load_image_into_numpy_array(image_path)

  # The input needs to be a tensor, convert it using `tf.convert_to_tensor`.
  input_tensor = tf.convert_to_tensor(image_np)
  # The model expects a batch of images, so add an axis with `tf.newaxis`.
  input_tensor = input_tensor[tf.newaxis, ...]

  detections = detect_fn(input_tensor)
  detections['detection_classes']
  num_detections = int(detections.pop('num_detections'))
  detections = {key: value[0, :num_detections].numpy()
                for key, value in detections.items()}
  detections['num_detections'] = num_detections
  print(f"NUMBER OF TIGERS: {num_detections}")
  detections['detection_boxes'].shape
  detections['detection_classes'] = detections['detection_classes'].astype(np.int64)
  print(detections['detection_scores'])
  category_index = label_map_util.create_category_index_from_labelmap(f"{ANNOTATIONS_DIR}/label_map.pbtxt")
  image_np_with_detections = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
        image_np_with_detections,
        detections['detection_boxes'],
        detections['detection_classes'],
        detections['detection_scores'],
        category_index,
        use_normalized_coordinates=True,
        max_boxes_to_draw=200,
        min_score_thresh=.99, # Adjust this value to set the minimum probability boxes to be classified as True
        agnostic_mode=False)

  cv2_imshow(image_np_with_detections)
  


# THE END.