# Model loading & training

In [3]:
import os
import random
import cv2
import wget
import tensorflow as tf
import numpy as np
from six import BytesIO
from PIL import Image
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET
from object_detection.utils import (
    label_map_util, 
    config_util, 
    visualization_utils as viz_utils,
)
from object_detection.builders import model_builder
%matplotlib inline

In [4]:
CUSTOM_MODEL_NAME = 'my_ssd_mobnet' 
PRETRAINED_MODEL_NAME = 'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8'
PRETRAINED_MODEL_URL = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz'
TF_RECORD_SCRIPT_NAME = 'generate_tfrecord.py'
TF_RECORD_SCRIPT_PATH = 'tf_record_generator\generate_tfrecord.py'
LABEL_MAP_NAME = 'label_map.pbtxt'
MODELS_PATH = './models/'
PROTOC_PATH = './protoc/'
MOBILE_NET_PATH = './mobile_net/'
IMAGES_PATH = './data/images/'
LABELMAP_PATH = './data/labelmap.pbtxt'
TRAINED_MODEL_PATH = './mobile_net/trained_model/checkpoint'

In [5]:
if not os.path.exists(MODELS_PATH):
    !git clone https://github.com/tensorflow/models.git {MODELS_PATH}

In [6]:
url="https://github.com/protocolbuffers/protobuf/releases/download/v3.15.6/protoc-3.15.6-win64.zip"

if not os.path.exists(PROTOC_PATH):
    wget.download(url)
    os.makedirs(PROTOC_PATH)
    !move protoc-3.15.6-win64.zip {PROTOC_PATH}
    !cd {PROTOC_PATH} && tar -xf protoc-3.15.6-win64.zip
    os.environ['PATH'] += os.pathsep + os.path.abspath(os.path.join(PROTOC_PATH, 'bin'))   
    !cd models/research && protoc object_detection/protos/*.proto --python_out=. && copy object_detection\\packages\\tf2\\setup.py setup.py && python setup.py build && python setup.py install
    !cd models/research/slim && pip install -e . 

In [7]:
VERIFICATION_SCRIPT = os.path.join(MODELS_PATH, 'research', 'object_detection', 'builders', 'model_builder_tf2_test.py')
!python {VERIFICATION_SCRIPT}

Running tests under Python 3.9.18: d:\Edu\Python_Projects\Object_Detection\.conda\python.exe
[ RUN      ] ModelBuilderTF2Test.test_create_center_net_deepmac
2023-12-28 16:03:42.349537: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX AVX2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-12-28 16:03:43.238677: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1616] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 1667 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3050 Laptop GPU, pci bus id: 0000:01:00.0, compute capability: 8.6
  logging.warn(('Building experimental DeepMAC meta-arch.'
W1228 16:03:43.823992 16452 model_builder.py:1112] Building experimental DeepMAC meta-arch. Some features may be omitted.
INFO:tensorflow:time(__main__.ModelBuilderTF2Te

In [8]:
if not os.path.exists(MOBILE_NET_PATH):
    wget.download(PRETRAINED_MODEL_URL)
    os.makedirs(MOBILE_NET_PATH)
    !move {PRETRAINED_MODEL_NAME+'.tar.gz'} {MOBILE_NET_PATH}
    !cd {MOBILE_NET_PATH} && tar -zxvf {PRETRAINED_MODEL_NAME+'.tar.gz'}

        1 file(s) moved.


x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint/
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint/ckpt-0.data-00000-of-00001
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint/checkpoint
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint/ckpt-0.index
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/pipeline.config
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/saved_model/
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/saved_model/saved_model.pb
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/saved_model/variables/
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/saved_model/variables/variables.data-00000-of-00001
x ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/saved_model/variables/variables.index


In [9]:
def image_to_numpy(image_path):
    img = Image.open(image_path)
    np_img = np.array(img)
    return np_img

In [10]:
if not os.path.exists(TF_RECORD_SCRIPT_PATH):
    !git clone https://github.com/nicknochnack/GenerateTFRecord {TF_RECORD_SCRIPT_PATH.split('\\')[0]}

In [11]:
labels = [{'name':'thumbs_up', 'id':1}, {'name':'thumbs_down', 'id':2}, {'name':'hello', 'id':3}, {'name':'fist', 'id':4}]

if not os.path.exists(LABELMAP_PATH):
    with open(LABELMAP_PATH, 'w') as f:
        for label in labels:
            f.write('item { \n')
            f.write('\tname:\'{}\'\n'.format(label['name']))
            f.write('\tid:{}\n'.format(label['id']))
            f.write('}\n')

In [12]:
if not os.path.exists(os.path.join('data', 'records')):
    os.makedirs(os.path.join('data', 'records'))
!python {TF_RECORD_SCRIPT_PATH} -x {os.path.join(IMAGES_PATH, 'train')} -l {LABELMAP_PATH} -o {os.path.join('data', 'records', 'train.record')} 
!python {TF_RECORD_SCRIPT_PATH} -x {os.path.join(IMAGES_PATH, 'test')} -l {LABELMAP_PATH} -o {os.path.join('data', 'records', 'test.record')} 

Successfully created the TFRecord file: data\records\train.record
Successfully created the TFRecord file: data\records\test.record


In [13]:
def get_dataset_sample(path: str):
    classes = {
        "thumbs_up": 0,
        "thumbs_down": 1,
        "hello": 2,
        "fist": 3
    }
    if ".xml" in path or ".jpg" in path:
        path = path.rstrip(".xml")
        path = path.rstrip(".jpg")
    image_path = path + ".jpg"
    xml_path = path + ".xml"
    tree = ET.parse(xml_path)
    root = tree.getroot()
    data = {}
    for elem in root.iter():
        data[elem.tag] = elem.text
    xmin = int(data["xmin"]) / int(data["width"])
    ymin = int(data["ymin"]) / int(data["height"])
    xmax = int(data["xmax"]) / int(data["width"])
    ymax = int(data["ymax"]) / int(data["height"])
    label = classes[data["name"]]
    np_img = image_to_numpy(image_path)
    tf_img = tf.expand_dims(tf.convert_to_tensor(np_img, dtype=tf.float32), axis=0)
    return {
        "image": tf_img, 
        "box": tf.convert_to_tensor([[ymin, xmin, ymax, xmax]], dtype=tf.float32), 
        "label": tf.one_hot([label], depth=4)
    }

In [14]:
train_dataset_path = "./data/images/train/"
test_dataset_path = "./data/images/test/"
labels = ['thumbs_up', 'thumbs_down', 'hello', 'fist']

train_images = []
train_boxes = []
train_labels = []

test_images = []
test_boxes = []
test_labels = []

for label in labels:
    label_path = os.path.join(train_dataset_path, label)
    image_files = list(filter(lambda x: ".jpg" in x, os.listdir(label_path)))
    for file in image_files:
        file_path = os.path.join(label_path, file)
        data = get_dataset_sample(file_path)
        train_images.append(data["image"])
        train_boxes.append(data["box"])
        train_labels.append(data["label"])

for label in labels:
    label_path = os.path.join(test_dataset_path, label)
    image_files = list(filter(lambda x: ".jpg" in x, os.listdir(label_path)))
    for file in image_files:
        file_path = os.path.join(label_path, file)
        data = get_dataset_sample(file_path)
        test_images.append(data["image"])
        test_boxes.append(data["box"])
        test_labels.append(data["label"])

In [15]:
tf.keras.backend.clear_session()

print('Building model and restoring weights for fine-tuning...', flush=True)
num_classes = 4
pipeline_config = 'mobile_net\ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8\pipeline.config'
checkpoint_path = 'mobile_net\ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8\checkpoint\ckpt-0'

# Load pipeline config and build a detection model.
#
# Since we are working off of a COCO architecture which predicts 90
# class slots by default, we override the `num_classes` field here to be just
# one (for our new rubber ducky class).
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
      model_config=model_config, is_training=True)

# Set up object-based checkpoint restore --- RetinaNet has two prediction
# `heads` --- one for classification, the other for box regression.  We will
# restore the box regression head but initialize the classification head
# from scratch (we show the omission below by commenting out the line that
# we would add if we wanted to restore both heads)
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    # _prediction_heads=detection_model._box_predictor._prediction_heads,
    #    (i.e., the classification head that we *will not* restore)
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )
fake_model = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial()
with tf.device("cpu:0"):
    # Run model through a dummy image so that variables are created
    image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))
    prediction_dict = detection_model.predict(image, shapes)
    _ = detection_model.postprocess(prediction_dict, shapes)
print('Weights restored!')

Building model and restoring weights for fine-tuning...


Weights restored!


In [18]:
tf.keras.backend.set_learning_phase(True)

# These parameters can be tuned; since our training set has 5 images
# it doesn't make sense to have a much larger batch size, though we could
# fit more examples in memory if we wanted to.
batch_size = 16
learning_rate = 0.05
num_batches = 2000

# Select variables in top layers to fine-tune.
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

def get_model_train_step_function(model, optimizer, vars_to_fine_tune):
  """Get a tf.function for training step."""

  @tf.function
  def train_step_fn(image_tensors,
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)
    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
           for image_tensor in image_tensors], axis=0)
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    return total_loss

  return train_step_fn

optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)
train_step_fn = get_model_train_step_function(
    detection_model, optimizer, to_fine_tune)

print('Start fine-tuning!', flush=True)
for idx in range(num_batches):
  all_keys = list(range(len(train_images)))
  random.shuffle(all_keys)
  example_keys = all_keys[:batch_size]

  gt_boxes_list = [train_boxes[key] for key in example_keys]
  gt_classes_list = [train_labels[key] for key in example_keys]
  image_tensors = [train_images[key] for key in example_keys]

  with tf.device("cpu:0"):
    total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)

  if idx % 10 == 0:
    with tf.device("cpu:0"):
      shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
      detection_model.provide_groundtruth(
          groundtruth_boxes_list=test_boxes,
          groundtruth_classes_list=test_labels)
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
          for image_tensor in test_images], axis=0)
      prediction_dict = detection_model.predict(preprocessed_images, shapes)
      losses_dict = detection_model.loss(prediction_dict, shapes)
      val_total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
    print('batch ' + str(idx) + ' of ' + str(num_batches)
    + ', train loss=' +  str(total_loss.numpy()) 
    + ", val loss=" + str(val_total_loss.numpy()), flush=True)

print('Done fine-tuning!')

Start fine-tuning!
batch 0 of 2000, train loss=0.6609279, val loss=0.7955948
batch 10 of 2000, train loss=0.62604797, val loss=0.792227
batch 20 of 2000, train loss=0.67103356, val loss=0.7863291
batch 30 of 2000, train loss=0.651078, val loss=0.7830785
batch 40 of 2000, train loss=0.6402905, val loss=0.77963513
batch 50 of 2000, train loss=0.60451573, val loss=0.7767215
batch 60 of 2000, train loss=0.64125085, val loss=0.77298677
batch 70 of 2000, train loss=0.5723854, val loss=0.7706064
batch 80 of 2000, train loss=0.5718476, val loss=0.76721156
batch 90 of 2000, train loss=0.65216583, val loss=0.76392543
batch 100 of 2000, train loss=0.6433079, val loss=0.76066977
batch 110 of 2000, train loss=0.6361129, val loss=0.75649154
batch 120 of 2000, train loss=0.564725, val loss=0.75603914
batch 130 of 2000, train loss=0.6060797, val loss=0.7535943
batch 140 of 2000, train loss=0.5862499, val loss=0.75059104
batch 150 of 2000, train loss=0.6063098, val loss=0.74675
batch 160 of 2000, train

KeyboardInterrupt: 

In [19]:
ckpt_trained = tf.train.Checkpoint(model=detection_model)
if not os.path.exists(TRAINED_MODEL_PATH):
    ckpt_trained.save(TRAINED_MODEL_PATH)

In [20]:
@tf.function
def detect(input_tensor, model):
  preprocessed_image, shapes = model.preprocess(input_tensor)
  prediction_dict = model.predict(preprocessed_image, shapes)
  return model.postprocess(prediction_dict, shapes)

In [21]:
category_index = {
    1: {"id": 1, "name": "thumbs_up"},
    2: {"id": 2, "name": "thumbs_down"},
    3: {"id": 3, "name": "hello"},
    4: {"id": 4, "name": "fist"}
}
label_id_offset = 1


def plot_img_with_detection(tf_image, model):
    np_image = np.array(tf_image[0], dtype=np.uint8)
    with tf.device("cpu:0"):
        detections = detect(tf_image, model)
    viz_utils.visualize_boxes_and_labels_on_image_array(
        np_image,
        detections["detection_boxes"][0].numpy(),
        np.array(detections["detection_classes"][0], dtype=np.int32)+label_id_offset,
        detections["detection_scores"][0].numpy(),
        category_index,
        min_score_thresh=0.3,
        max_boxes_to_draw=1,
        use_normalized_coordinates=True
    )
    plt.imshow(np_image)
    plt.axis("off")
    plt.show()

In [22]:
del detection_model
model = model_builder.build(model_config, False)
ckpt_trained = tf.train.Checkpoint(model=model)
ckpt_trained.restore(TRAINED_MODEL_PATH + "-1")

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x1cd1bf120a0>

In [23]:
with tf.device("cpu:0"):
    image, shape = model.preprocess(tf.random.uniform((1, 640, 640, 3)))
    pred = model.predict(image, shape)
    output = model.postprocess(pred, shape)

In [27]:
config_util.save_pipeline_config(model_config, "./mobile_net/trained_model")

INFO:tensorflow:Writing pipeline config file to ./mobile_net/trained_model\pipeline.config


INFO:tensorflow:Writing pipeline config file to ./mobile_net/trained_model\pipeline.config


In [24]:
vid = cv2.VideoCapture(0)
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))

while vid.isOpened(): 
    ret, frame = vid.read()
    image_np = np.array(frame)
    
    input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
    with tf.device("cpu:0"):
        detections = detect(input_tensor, model)

    label_id_offset = 1
    image_np_with_detections = image_np.copy()

    viz_utils.visualize_boxes_and_labels_on_image_array(
                image_np_with_detections,
                detections["detection_boxes"][0].numpy(),
                np.array(detections["detection_classes"][0], dtype=np.int32)+label_id_offset,
                detections["detection_scores"][0].numpy(),
                category_index,
                use_normalized_coordinates=True,
                max_boxes_to_draw=2,
                min_score_thresh=.5,
                agnostic_mode=False)

    cv2.imshow('object detection',  cv2.resize(image_np_with_detections, (800, 600)))
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        vid.release()
        cv2.destroyAllWindows()
        break