In [1]:
#############################################################################################
# Project Name: BlackBeard
# Created by: Parbin Darji, Will Fortenberry, Andrew Palmertree, Imanol  Perales, Justin Romanowski
#
# Script name: Train_object_detection.ipynb
#
# The face recognition code is based on the creator's code: Evan Juras
# The script we edited was based on his google colab:
# https://colab.research.google.com/github/EdjeElectronics/TensorFlow-Lite-Object-Detection-on-Android-and-Raspberry-Pi/blob/master/Train_TFLite2_Object_Detction_Model.ipynb#scrollTo=fF8ysCfYKgTP
# His github: 
# https://github.com/EdjeElectronics/TensorFlow-Lite-Object-Detection-on-Android-and-Raspberry-Pi
#
# Editor of the script: Andrew Palmertree
#
# Date: March 9, 2024
# Version: 1.0.0
# 
# Requirments:
#     - python 3.9.18
#     - check the requirement.txt file for specific packages
#
# Description:
#     The Train_object_detection script is for the BlackBeard project to be used on the server side to
#     train an object detection model using custome data from roboflow.
#
#      Object list training: amazon logo, amazon delivery uniform, delivery car, fedex logo,
#                   fedex uniform, package, person, safety vest, ups logo, ups uniform
#                   usps logo, and usps delivery uniform
#
# Notes:
#  We used the ssd-mobilenet-v2 model
#  Linke to out roboflow data set:
#  https://universe.roboflow.com/project-gnup6/blackbeard-object-detection-ksu
#############################################################################################

In [None]:
#============================================================================================
# Import the necessary package libraries
#============================================================================================

In [None]:
import tensorflow as tf
import os
import pathlib
import matplotlib
import matplotlib.pyplot as plt
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage
from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder
from roboflow import Roboflow
import requests
import tarfile
import re
import cv2
import sys
import importlib.util
from tensorflow.lite.python.interpreter import Interpreter
import urllib.request
import zipfile

%matplotlib inline

In [None]:
tf.__version__

In [None]:
# Use the GPU on server to train the model
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

gpus = tf.config.list_physical_devices("GPU")
if gpus:
    for gpu in gpus:
        print("Found a GPU with the name:", gpu)
else:
    print("Failed to detect a GPU.")
    

In [None]:
%pwd

In [None]:
%cd "C:\Users\andre\CPE_4093\ob\main_content"

In [None]:
# Clone the tensorflow models repository if it doesn't already exist
if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

In [None]:
# %%bash
# protoc
#install the neccesary packages

In [None]:
%pwd

In [None]:
# %%bash

#install the neccesary packages 
# only needed for the first time running 

# cd models/research/
# protoc object_detection/protos/*.proto --python_out=.
# cp object_detection/packages/tf2/setup.py .
# python -m pip install --user .

In [None]:
# !pip install --user protobuf==3.20.3

In [None]:
!python --version

In [None]:
#Downloading data from Roboflow

# More package vest images
rf = Roboflow(api_key="Kcuk1uPgEUza7o2ZTKhu")
project = rf.workspace("blackbeard").project("blackbeard")
version = project.version(4).download("tfrecord")


In [None]:
dataset = r"C:/Users/andre/CPE_4093/ob/main_content/BlackBeard-4"

In [None]:
print(dataset)

In [None]:
# NOTE: Update these TFRecord names from "cells" and "cells_label_map" to your files!
test_record_fname = dataset + '/test/boxes.tfrecord'
train_record_fname = dataset + '/train/boxes.tfrecord'
label_map_pbtxt_fname = dataset + '/train/boxes_label_map.pbtxt'

In [None]:
# Set the model name, pretrained checkpoint and base file variables for ssd-mobilenet-v2-fpnlite-320 
model_name = 'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8'
pretrained_checkpoint = 'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz'
base_pipeline_file = 'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.config'

In [None]:
# Set training parameters for the model
num_steps = 7200
num_eval_steps = 250
batch_size = 16

In [None]:
# specify the directory you want the model to be saved in
dir = "ssd-mobilenet-v2-fpnlite-320_with_vests8_1"

In [None]:
%pwd

In [None]:
# Define the directory path
dir_path = fr"C:\Users\andre\CPE_4093\ob\main_content\mymodel\{dir}"

# Create the directory if it doesn't exist
if not os.path.exists(dir_path):
    os.makedirs(dir_path)

In [None]:
%cd C:\Users\andre\CPE_4093\ob\main_content\mymodel\{dir}

In [None]:
# URL for the pretrained checkpoint
download_tar = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/' + pretrained_checkpoint

# Download the file using requests
response = requests.get(download_tar)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Save the downloaded file
    with open(pretrained_checkpoint, 'wb') as f:
        f.write(response.content)

    # Extract the downloaded file
    tar = tarfile.open(pretrained_checkpoint)
    tar.extractall()
    tar.close()
else:
    print("Failed to download the file")


In [None]:
#download base training configuration file
# URL for the base training configuration file
download_config = 'https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/configs/tf2/' + base_pipeline_file

# Send a GET request to download the configuration file
response = requests.get(download_config)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Save the downloaded file
    with open(base_pipeline_file, 'wb') as f:
        f.write(response.content)
else:
    print("Failed to download the file")


In [None]:
#prepare the model base for ssd-mobilenet-v2-fpnlite-320
pipeline_fname = fr"C:\\Users\\andre\\CPE_4093\\ob\\main_content\\mymodel\\{dir}\\ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.config"
# fine_tune_checkpoint = fr'C:\\Users\\andre\\CPE_4093\\ob\\main_content\\mymodel\\{dir}\\ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8\\checkpoint\\ckpt-0'
fine_tune_checkpoint = fr"C:\\Users\\andre\\CPE_4093\\ob\\main_content\\training\\ssd-mobilenet-v2-fpnlite-320_with_vests8_1\\ckpt-8"

def get_num_classes(pbtxt_fname):
    label_map = label_map_util.load_labelmap(pbtxt_fname)
    categories = label_map_util.convert_label_map_to_categories(
        label_map, max_num_classes=90, use_display_name=True)
    category_index = label_map_util.create_category_index(categories)
    return len(category_index.keys())
num_classes = get_num_classes(label_map_pbtxt_fname)
print(f"Number of classes used {num_classes}")

In [None]:
# verify the model name, pipline file, and checkpoint path
print("model name:")
print(model_name)
print("\npipline file: ")
print(pipeline_fname)
print("\ncheckpoint file: ")
print(fine_tune_checkpoint)

print("\ncurrent directory")
%pwd

In [None]:
#write custom configuration file by slotting our dataset, model checkpoint, and training parameters into the base pipeline file

%cd "C:\Users\andre\CPE_4093\ob\main_content\mymodel\{dir}" 

print('writing custom configuration file')
print(pipeline_fname)

with open(pipeline_fname) as f:
    s = f.read()
with open('pipeline_file2.config', 'w') as f:
    
    # fine_tune_checkpoint
    s = re.sub(r'fine_tune_checkpoint: ".*?"',
               'fine_tune_checkpoint: "{}"'.format(fine_tune_checkpoint.replace('\\', r'\\\\')), s)
    
    # tfrecord files train and test.
    s = re.sub(
        r'(input_path: ".*?)(PATH_TO_BE_CONFIGURED/train)(.*?")', 'input_path: "{}"'.format(train_record_fname.replace('\\', r'\\\\')), s)
    s = re.sub(
        r'(input_path: ".*?)(PATH_TO_BE_CONFIGURED/val)(.*?")', 'input_path: "{}"'.format(test_record_fname.replace('\\', r'\\\\')), s)

    # label_map_path
    s = re.sub(
        r'label_map_path: ".*?"', 'label_map_path: "{}"'.format(label_map_pbtxt_fname.replace('\\', r'\\\\')), s)

    # Set training batch_size.
    s = re.sub(r'batch_size: [0-9]+',
               'batch_size: {}'.format(batch_size), s)

    # Set training steps, num_steps
    s = re.sub(r'num_steps: [0-9]+',
               'num_steps: {}'.format(num_steps), s)
    
    # Set number of classes num_classes.
    s = re.sub(r'num_classes: [0-9]+',
               'num_classes: {}'.format(num_classes), s)
    
    # fine-tune checkpoint type
    s = re.sub(
        r'fine_tune_checkpoint_type: "classification"', 'fine_tune_checkpoint_type: "{}"'.format('detection'), s)
        
    f.write(s)

In [None]:
!type pipeline_file2.config

In [None]:
pipeline_file = fr"C:\Users\andre\CPE_4093\ob\main_content\mymodel\{dir}\pipeline_file2.config"
model_dir = fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}"

In [None]:
print(pipeline_file)
print(model_dir)

In [None]:
os.environ['CUDA_DIR'] = 'C:\\Users\\andre\\anaconda3\\pkgs\\cudatoolkit-11.2.2-h933977f_10\\DLLs'

In [None]:
cuda_dir = os.getenv('CUDA_DIR')
print(cuda_dir)

In [None]:
## train the model to a checkpoint file
!python "C:\\Users\\andre\\CPE_4093\\ob\\content\\models\\research\\object_detection\\model_main_tf2.py" \
    --pipeline_config_path={pipeline_file} \
    --model_dir={model_dir} \
    --alsologtostderr \
    --num_train_steps={num_steps} \
    --sample_1_of_n_eval_examples=1 \
    --num_eval_steps={num_eval_steps}

In [None]:
## see where our model saved weights are located
print(os.listdir(fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}"))

In [None]:
## create a directory to store the tensorflow and tflite model files
# Define the directory path
dir_path = fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}"

# Create the directory if it doesn't exist
if not os.path.exists(dir_path):
    os.makedirs(dir_path)

In [None]:
## Export the saved checkpoint file to a tensorflow graph
output_directory = f'C:\\Users\\andre\\CPE_4093\\ob\\main_content\\training\\{dir}\\custom_model_{dir}'

last_model_path = f'C:\\Users\\andre\\CPE_4093\\ob\\main_content\\training\\{dir}\\'

!python  "C:\\Users\\andre\\CPE_4093\\ob\\content\\models\\research\\object_detection\\export_tflite_graph_tf2.py" \
    --trained_checkpoint_dir {last_model_path} \
    --output_directory {output_directory} \
    --pipeline_config_path {pipeline_file}


In [None]:
## Convert exported graph file into TFLite model file

converter = tf.lite.TFLiteConverter.from_saved_model(fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\saved_model")
tflite_model = converter.convert()

with open(fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\detect.tflite", 'wb') as f:
  f.write(tflite_model)

In [None]:
## Create label file
# Define the file path
file_path = fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\labelmap.txt"

# Define the content
content = """amazon
amazon_uniform
delivery_car
fedex
fedex_uniform
package
person
safety_vest
ups
ups_uniform
usps
usps_uniform
"""

# Check if the file exists
if not os.path.exists(file_path):
    # If the file doesn't exist, create it and write the content
    with open(file_path, 'w') as f:
        f.write(content)


In [None]:
## Create label map file
# Define the file path
file_path_pbtxt = fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\label_map.pbtxt"

# Define the content
content = """item {
  id: 1
  name: 'amazon'
}

item {
  id: 2
  name: 'amazon uniform'
}

item {
  id: 3
  name: 'delivery car'
}

item {
  id: 4
  name: 'fedex'
}

item {
  id: 5
  name: 'fedex uniform'
}

item {
  id: 6
  name: 'package'
}

item {
  id: 7
  name: 'person'
}

item {
  id: 8
  name: 'safety vest'
}

item {
  id: 9
  name: 'ups'
}

item {
  id: 10
  name: 'ups uniform'
}

item {
  id: 11
  name: 'usps'
}

item {
  id: 12
  name: 'usps uniform'
}
"""

if not os.path.exists(file_path_pbtxt):
    # If the file doesn't exist, create it and write the content
    with open(file_path_pbtxt, 'w') as f:
        f.write(content)

In [None]:
## Download test images
# Define the directory path
dir_path = 'C:\\Users\\andre\\CPE_4093\\ob\\main_content\\test_images6'

# Create the directory if it doesn't exist
if not os.path.exists(dir_path):
    os.makedirs(dir_path)

# Change the current directory to the newly created directory
os.chdir(dir_path)

# Define the URL of the file to be downloaded
url = "https://app.roboflow.com/ds/lwd70Iqeq2?key=0DI02oVWZi"

# Define the local filename
filename = "roboflow.zip"
filename2 = "test_images"

# Download the file only if it doesn't already exist
if not os.path.exists(filename2):
    urllib.request.urlretrieve(url, filename)

    # Unzip the file
    with zipfile.ZipFile(filename, 'r') as zip_ref:
        zip_ref.extractall()

In [None]:
# Script to run custom TFLite model on test images to detect objects
# Source: https://github.com/EdjeElectronics/TensorFlow-Lite-Object-Detection-on-Android-and-Raspberry-Pi/blob/master/TFLite_detection_image.py

### Define function for inferencing with TFLite model and displaying results

def tflite_detect_images(modelpath, imgpath, lblpath, min_conf=0.5, num_test_images=10, savepath=r"C:\Users\andre\CPE_4093\ob\main_content\results", txt_only=False):

  # Grab filenames of all images in test folder
  images = glob.glob(imgpath + '/*.jpg') + glob.glob(imgpath + '/*.JPG') + glob.glob(imgpath + '/*.png') + glob.glob(imgpath + '/*.bmp')

  # Load the label map into memory
  with open(lblpath, 'r') as f:
      labels = [line.strip() for line in f.readlines()]

  # Load the Tensorflow Lite model into memory
  interpreter = Interpreter(model_path=modelpath)
  interpreter.allocate_tensors()

  # Get output details
  output_details = interpreter.get_output_details()

  # Print output details
  print(output_details)

  # Get model details
  input_details = interpreter.get_input_details()
  output_details = interpreter.get_output_details()
  height = input_details[0]['shape'][1]
  width = input_details[0]['shape'][2]

  float_input = (input_details[0]['dtype'] == np.float32)

  input_mean = 127.5
  input_std = 127.5

  # Randomly select test images
  images_to_test = random.sample(images, num_test_images)

  # Loop over every image and perform detection
  for image_path in images_to_test:

      # Load image and resize to expected shape [1xHxWx3]
      image = cv2.imread(image_path)
      image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
      imH, imW, _ = image.shape
      image_resized = cv2.resize(image_rgb, (width, height))
      input_data = np.expand_dims(image_resized, axis=0)

      # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
      if float_input:
          input_data = (np.float32(input_data) - input_mean) / input_std

      # Perform the actual detection by running the model with the image as input
      interpreter.set_tensor(input_details[0]['index'],input_data)
      interpreter.invoke()

      # Retrieve detection results
      boxes = interpreter.get_tensor(output_details[1]['index'])[0] # Bounding box coordinates of detected objects
      classes = interpreter.get_tensor(output_details[3]['index'])[0] # Class index of detected objects
      scores = interpreter.get_tensor(output_details[0]['index'])[0] # Confidence of detected objects
      print(scores)
      detections = []

      # Loop over all detections and draw detection box if confidence is above minimum threshold
      for i in range(len(scores)):
          if ((scores[i] > min_conf) and (scores[i] <= 1.0)):

              # Get bounding box coordinates and draw box
              # Interpreter can return coordinates that are outside of image dimensions, need to force them to be within image using max() and min()
              ymin = int(max(1,(boxes[i][0] * imH)))
              xmin = int(max(1,(boxes[i][1] * imW)))
              ymax = int(min(imH,(boxes[i][2] * imH)))
              xmax = int(min(imW,(boxes[i][3] * imW)))

              cv2.rectangle(image, (xmin,ymin), (xmax,ymax), (10, 255, 0), 2)

              # Draw label
              object_name = labels[int(classes[i])] # Look up object name from "labels" array using class index
              label = '%s: %d%%' % (object_name, int(scores[i]*100)) # Example: 'person: 72%'
              labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) # Get font size
              label_ymin = max(ymin, labelSize[1] + 10) # Make sure not to draw label too close to top of window
              cv2.rectangle(image, (xmin, label_ymin-labelSize[1]-10), (xmin+labelSize[0], label_ymin+baseLine-10), (255, 255, 255), cv2.FILLED) # Draw white box to put label text in
              cv2.putText(image, label, (xmin, label_ymin-7), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2) # Draw label text

              detections.append([object_name, scores[i], xmin, ymin, xmax, ymax])


      # All the results have been drawn on the image, now display the image
      if txt_only == False: # "text_only" controls whether we want to display the image results or just save them in .txt files
        image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
        plt.figure(figsize=(12,16))
        plt.imshow(image)
        plt.show()

      # Save detection results in .txt files (for calculating mAP)
      elif txt_only == True:

        # Get filenames and paths
        image_fn = os.path.basename(image_path)
        base_fn, ext = os.path.splitext(image_fn)
        txt_result_fn = base_fn +'.txt'
        txt_savepath = os.path.join(savepath, txt_result_fn)

        # Write results to text file
        # (Using format defined by https://github.com/Cartucho/mAP, which will make it easy to calculate mAP)
        with open(txt_savepath,'w') as f:
            for detection in detections:
                f.write('%s %.4f %d %d %d %d\n' % (detection[0], detection[1], detection[2], detection[3], detection[4], detection[5]))

  return 

In [None]:
# Set up variables for running user's model
PATH_TO_IMAGES=r"C:\Users\andre\CPE_4093\ob\main_content\test_images6\test"  # Path to test images folder
PATH_TO_MODEL=fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\detect.tflite"   # Path to .tflite model file
PATH_TO_LABELS=fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\labelmap.txt"   # Path to labelmap.txt file
min_conf_threshold=0.5   # Confidence threshold (try changing this to 0.01 if you don't see any detection results)
images_to_test = 100   # Number of images to run detection on

# Run inferencing function!
tflite_detect_images(PATH_TO_MODEL, PATH_TO_IMAGES, PATH_TO_LABELS, min_conf_threshold, images_to_test)

In [None]:
# Object recognition quantization

In [None]:
# Get list of all images in train directory

## This is for object classification
image_path = r"C:\Users\andre\CPE_4093\ob\main_content\test_images4\train"

jpg_file_list = glob.glob(image_path + '/*.jpg')
JPG_file_list = glob.glob(image_path + '/*.JPG')
png_file_list = glob.glob(image_path + '/*.png')
bmp_file_list = glob.glob(image_path + '/*.bmp')

quant_image_list = jpg_file_list + JPG_file_list + png_file_list + bmp_file_list

interpreter = Interpreter(model_path=PATH_TO_MODEL)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
height = input_details[0]['shape'][1]
width = input_details[0]['shape'][2]


# A generator that provides a representative dataset
def representative_data_gen():
    dataset_list = quant_image_list
    quant_num = 300
    for i in range(quant_num):
        pick_me = random.choice(dataset_list)
        image = tf.io.read_file(pick_me)

        if pick_me.endswith('.jpg') or pick_me.endswith('.JPG'):
            image = tf.io.decode_jpeg(image, channels=3)
        elif pick_me.endswith('.png'):
            image = tf.io.decode_png(image, channels=3)
        elif pick_me.endswith('.bmp'):
            image = tf.io.decode_bmp(image, channels=3)

        image = tf.image.resize(image, [width, height])
        image = tf.cast(image / 255., tf.float32)
        image = tf.expand_dims(image, 0)
        yield [image]


In [None]:
# Initialize converter module

## Object Recognition model
converter = tf.lite.TFLiteConverter.from_saved_model(fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\saved_model")

## This enables quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# This sets the representative dataset for quantization
converter.representative_dataset = representative_data_gen
# This ensures that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# For full integer quantization, though supported types defaults to int8 only, we explicitly declare it for clarity.
converter.target_spec.supported_types = [tf.int8]
# These set the input tensors to uint8 and output tensors to float32
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.float32
tflite_model = converter.convert()

# Save the quantized model to a file
with open(fr"C:\Users\andre\CPE_4093\ob\main_content\training\{dir}\custom_model_{dir}\{dir}_quant.tflite", 'wb') as f:
    f.write(tflite_model)