In [None]:
import os
import cv2
import tensorflow as tf

In [None]:
#uncomment and run if wget is not installed
#!pip install wget

In [None]:
import wget

In [None]:
CUSTOM_MODEL_NAME = "my_centernet_model"
PRETRAINED_MODEL_NAME = "centernet_resnet50_v1_fpn_512x512_coco17_tpu-8"
PRETRAINED_MODEL_URL = "http://download.tensorflow.org/models/object_detection/tf2/20200711/centernet_resnet50_v1_fpn_512x512_coco17_tpu-8.tar.gz"
LABELMAP_NAME = "person_label_map.pbtxt"

In [None]:
#setting the needed paths
paths = {
    'PROTOC_PATH': os.path.join('hackathon','protoc'),
    'SCRIPTS_PATH': os.path.join('hackathon','scripts'),
    'ANNOTATION_PATH': os.path.join('hackathon','workspace', 'annotation'),
    'MODEL_API_PATH': os.path.join('hackathon','models'),
    'MODEL_PATH': os.path.join('hackathon','workspace', 'model', CUSTOM_MODEL_NAME),
    'PRETRAINED_MODEL_PATH': os.path.join('hackathon','workspace', 'pre-trained_model'),
    'IMAGE_PATH': os.path.join('hackathon','workspace', 'images'),
    'TRAIN_IMAGE_PATH': os.path.join('hackathon', 'workspace', 'images', 'train'),
    'TEST_IMAGE_PATH': os.path.join('hackathon', 'workspace', 'images', 'test'),
    'PROTOC_PATH': os.path.join('hackathon', 'protoc')
}

In [None]:
for path in paths.values():
    if not os.path.exists(path):
        if os.name == 'nt':
            !mkdir {path}

In [None]:
#Do not run!
#!tar -czf {os.path.join(paths['IMAGE_PATH'], 'archive.tar.gz')} {paths['TRAIN_IMAGE_PATH']} {paths['TEST_IMAGE_PATH']}

In [None]:
#copy the archive.tar.gz file to the images folder before running this cell
ARCHIVE_FILES = os.path.join(paths['IMAGE_PATH'], 'archive.tar.gz')
if os.path.exists(ARCHIVE_FILES):
  !tar -zxvf {ARCHIVE_FILES}

In [None]:
#run the cells below to install tensorflow object detection api and protocol buffers

In [None]:
if not os.path.exists(os.path.join(paths['MODEL_API_PATH'], 'research', 'object_detection')):
    #this will clone the model api to our specified model path
    !git clone https://github.com/tensorflow/models {paths['MODEL_API_PATH']}

In [None]:
if os.name == 'nt':
    url="https://github.com/protocolbuffers/protobuf/releases/download/v3.15.6/protoc-3.15.6-win64.zip"
    wget.download(url)
    !move protoc-3.15.6-win64.zip {paths['PROTOC_PATH']}
    !cd {paths['PROTOC_PATH']} && tar -xf protoc-3.15.6-win64.zip
    os.environ['PATH'] += os.pathsep + os.path.abspath(os.path.join(paths['PROTOC_PATH'], 'bin'))   
    !cd Tensorflow/models/research && protoc object_detection/protos/*.proto --python_out=. 
    !copy object_detection\\packages\\tf2\\setup.py setup.py 
    !python setup.py build 
    !python setup.py install
    !cd Tensorflow/models/research/slim && pip install -e .

In [None]:
if os.name == 'nt':
    #downloading a pre-trained model to use
    wget.download(PRETRAINED_MODEL_URL)

    #move the downloaded model to our pretrained model folder
    !move {PRETRAINED_MODEL_NAME+'.tar.gz'} {paths['PRETRAINED_MODEL_PATH']}

    #change into the pretrained model folder and extract the file
    !cd {paths['PRETRAINED_MODEL_PATH']} && tar -zxvf {PRETRAINED_MODEL_NAME+'.tar.gz'}

In [None]:
#create a label map for the dataset

LABELMAP = os.path.join(paths['ANNOTATION_PATH'], LABELMAP_NAME)
labels = [{'name': 'Person', 'id': 1}]

with open(LABELMAP, 'w') as f:
    for label in labels:
        f.write('item { \n')
        f.write('\tname: \'{}\'\n'.format(label['name']))
        f.write('\tid: {}\n'.format(label['id']))
        f.write('}\n')

In [None]:
#!
PIPELINE_CONFIG = os.path.join('hackathon', 'workspace','model', CUSTOM_MODEL_NAME, 'pipeline.config')
TFRECORD_SCRIPT = os.path.join(paths['SCRIPTS_PATH'], 'generate_tfrecord.py')

In [None]:
if not os.path.exists(TFRECORD_SCRIPT):
    !git clone https://github.com/nicknochnack/GenerateTFRecord {paths['SCRIPTS_PATH']}

In [None]:
#creating a tensorflow record file for training and testing dataset

!python {TFRECORD_SCRIPT} -x {os.path.join(paths['IMAGE_PATH'], 'train')} -l {LABELMAP} -o {os.path.join(paths['ANNOTATION_PATH'], 'train.tfrecord')} 
!python {TFRECORD_SCRIPT} -x {os.path.join(paths['IMAGE_PATH'], 'test')} -l {LABELMAP} -o {os.path.join(paths['ANNOTATION_PATH'], 'test.tfrecord')}

In [None]:
# copy pretrained model pipeline configuration to our custom model

!copy {os.path.join(paths['PRETRAINED_MODEL_PATH'], PRETRAINED_MODEL_NAME, 'pipeline.config')} {os.path.join(paths['MODEL_PATH'])}

In [None]:
import object_detection
from object_detection.utils import config_util
from object_detection.protos import pipeline_pb2
from google.protobuf import text_format

## Custom model pipeline configuration

In [None]:
pipeline_config = pipeline_pb2.TrainEvalPipelineConfig()

with tf.io.gfile.GFile(PIPELINE_CONFIG, "r") as f:                                                                                                                                                                                                                     
    proto_str = f.read()                                                                                                                                                                                                                                          
    text_format.Merge(proto_str, pipeline_config)
    
pipeline_config.model.center_net.num_classes = 1
pipeline_config.train_config.batch_size = 2
pipeline_config.train_config.optimizer.adam_optimizer.learning_rate.cosine_decay_learning_rate.learning_rate_base = 0.01
pipeline_config.train_config.optimizer.adam_optimizer.learning_rate.cosine_decay_learning_rate.total_steps = 50000
pipeline_config.train_config.optimizer.adam_optimizer.learning_rate.cosine_decay_learning_rate.warmup_learning_rate = 0.005
pipeline_config.train_config.fine_tune_checkpoint = os.path.join(paths['PRETRAINED_MODEL_PATH'], PRETRAINED_MODEL_NAME, 'checkpoint', 'ckpt-0')
pipeline_config.train_config.fine_tune_checkpoint_type = "detection"
pipeline_config.train_input_reader.label_map_path = os.path.join(paths['ANNOTATION_PATH'], LABELMAP_NAME)
pipeline_config.train_input_reader.tf_record_input_reader.input_path[:] = [os.path.join(paths['ANNOTATION_PATH'], 'train.tfrecord')]
pipeline_config.eval_input_reader[0].label_map_path = os.path.join(paths['ANNOTATION_PATH'], LABELMAP_NAME)
pipeline_config.eval_input_reader[0].tf_record_input_reader.input_path[:] = [os.path.join(paths['ANNOTATION_PATH'], 'test.tfrecord')]

In [None]:
config_text = text_format.MessageToString(pipeline_config)                                                                                                                                                                                                        
with tf.io.gfile.GFile(PIPELINE_CONFIG, "wb") as f:                                                                                                                                                                                                                     
    f.write(config_text)

## Model Training

In [None]:
TRAINING_SCRIPT = os.path.join(paths['MODEL_API_PATH'], 'research', 'object_detection', 'model_main_tf2.py')

In [None]:
command = "python {} --model_dir={} --pipeline_config_path={} --num_train_steps=5000".format(TRAINING_SCRIPT, paths['MODEL_PATH'], PIPELINE_CONFIG)

In [None]:
!{command}

## Model Evaluation

In [None]:
command = "python {} --model_dir={} --pipeline_config_path={} --checkpoint_dir={}".format(TRAINING_SCRIPT, paths['MODEL_PATH'], PIPELINE_CONFIG, paths['MODEL_PATH'])

In [None]:
!{command}

## Trained Model Loading

In [None]:
from object_detection.utils import visualization_utils as vis_utils
from object_detection.builders import model_builder

In [None]:
# building the detection model
config = config_util.get_configs_from_pipeline_file(PIPELINE_CONFIG)
model = model_builder.build(model_config=config['model'], is_training=False)

In [None]:
# restore checkpoint from disk to use for detection
checkpoint= tf.compat.v2.train.Checkpoint(model)
checkpoint_path = os.path.join(paths['MODEL_PATH'], 'ckpt-6')
checkpoint.restore(checkpoint_path).expect_partial() # expect_partial() to avoid warning of unused checkpoint indexes

In [None]:
@tf.function #converts the code below to tensorflow graph function for better performance
def detection(image):
    image, shape = model.preprocess(image)
    prediction = model.predict(image, shape)
    detection = model.postprocess(prediction, shape)
    return detection

## Detecting from an image

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from object_detection.utils import label_map_util
%matplotlib inline

In [None]:
#setting the appropriate parameter and converting the image/frame into required format(numpy arrays)
def detection_inference(media_type):
    converted_img = np.array(media_type)

    input_tensor = tf.convert_to_tensor(np.expand_dims(converted_img, 0), dtype=tf.float32)
    category_index = label_map_util.create_category_index_from_labelmap(LABELMAP)

    result_dict = detection(input_tensor)
    num_detections = int(result_dict.pop('num_detections'))

    result_dict = {key: value[0, :num_detections].numpy() for key, value in result_dict.items()}
    result_dict['num_detections'] = num_detections
    result_dict['detection_classes'] = result_dict['detection_classes'].astype(np.int64)
    
    vis_utils.visualize_boxes_and_labels_on_image_array(
        converted_img,
        result_dict['detection_boxes'],
        result_dict['detection_classes']+1,
        result_dict['detection_scores'],
        category_index,
        use_normalized_coordinates=True,
        max_boxes_to_draw=1,
        min_score_thresh=0.5,
        line_thickness=3
    )
    return converted_img


In [None]:
IMAGE_PATH = os.path.join(paths['IMAGE_PATH'], 'test', '12.avi_snapshot_07.25_[2021.06.21_14.35.07].jpg')

In [None]:
image = cv2.imread(IMAGE_PATH)
converted_img = detection_inference(image)

rect_image = cv2.cvtColor(converted_img, cv2.COLOR_BGR2RGB)
plt.imshow(rect_image)
plt.show()

In [None]:
help(vis_utils.visualize_boxes_and_labels_on_image_array)