## Our prototype model for our live detection model ##

This model is currenrtly being tweaked to detect hands (gloves or no gloves). This model will be pipelined to our classification model to determine whether or not the detected subject is wearing gloves.

In [None]:
!pip install tensorflow pillow

In [None]:
# Mount the Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/SCAI_Neural_Network_Project/yolo_dataset
%ls

In [None]:
# Import the required libraries
import os
import tensorflow as tf
from PIL import Image
import io
import numpy as np
from tensorflow.keras.saving import register_keras_serializable

In [None]:
train_labels_directory = "labels/combined_train"
train_images_directory = "images/combined_train"

val_labels_directory = "labels/combined_valid"
val_images_directory = "images/combined_valid"

test_labels_directory = "labels/combined_test"
test_images_directory = "images/combined_test"

LABEL_MAP_DICT ={
    0: "no_gloves",
    1: "has_gloves"
    }

IMAGE_FORMAT = "jpg"
IMAGE_FORMAT_2 = "png"

## Encoding our dataset ##

In order for Tensorflow to read our data, we create a binary representation of our data encapsulating all of our features including the images, the text labels which contain all the of the bounding box and class information. It is stored in form of `.tfrecord`. By saving it, we can simply load the test, train, and val data if we do not include any new data.

In [None]:
def create_tf_example(image_directory, label_directory, format):
  with tf.io.gfile.GFile(os.path.join(image_directory), 'rb') as fid:
    encoded_jpg = fid.read()
  image = Image.open(io.BytesIO(encoded_jpg))
  width, height = image.size
  filename = image_directory.split('/')[-1].encode('utf8')
  image_format = ""
  if format== IMAGE_FORMAT:
    image_format = IMAGE_FORMAT.encode('utf8')
  elif format == IMAGE_FORMAT_2:
    image_format = IMAGE_FORMAT_2.encode('utf8')
  else:
    raise ValueError("Image format not recognized.")
  xmins = []
  xmaxs = []
  ymins = []
  ymaxs = []
  classes_text = []
  classes = []
  with open(label_directory, 'r') as label_file:
    for line in label_file.readlines():
      class_id, x_center, y_center, bound_width, bound_height = map(float, line.strip().split())
      xmin = (x_center - bound_width / 2) * width
      xmax = (x_center + bound_width / 2) * width
      ymin = (y_center - bound_height / 2) * height
      ymax = (y_center + bound_height / 2) * height

      xmins.append(xmin / width)
      xmaxs.append(xmax / width)
      ymins.append(ymin / height)
      ymaxs.append(ymax / height)

      class_id = int(class_id)
      classes_text.append(LABEL_MAP_DICT[class_id].encode('utf8'))
      classes.append(class_id + 1)

  tf_example = tf.train.Example(features=tf.train.Features(feature={
        'image/height': tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
        'image/width': tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
        'image/filename': tf.train.Feature(bytes_list=tf.train.BytesList(value=[filename])),
        'image/source_id': tf.train.Feature(bytes_list=tf.train.BytesList(value=[filename])),
        'image/encoded': tf.train.Feature(bytes_list=tf.train.BytesList(value=[encoded_jpg])),
        'image/format': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_format])),
        'image/object/bbox/xmin': tf.train.Feature(float_list=tf.train.FloatList(value=xmins)),
        'image/object/bbox/xmax': tf.train.Feature(float_list=tf.train.FloatList(value=xmaxs)),
        'image/object/bbox/ymin': tf.train.Feature(float_list=tf.train.FloatList(value=ymins)),
        'image/object/bbox/ymax': tf.train.Feature(float_list=tf.train.FloatList(value=ymaxs)),
        'image/object/class/text': tf.train.Feature(bytes_list=tf.train.BytesList(value=classes_text)),
        'image/object/class/label': tf.train.Feature(int64_list=tf.train.Int64List(value=classes)),
        }))
  return tf_example




In [None]:
# CREATE RECORDS FOR TRAINING DATA
##########################################################

#For each file, create an encoded representating incorporating both the image and the bounding labels
writer = tf.io.TFRecordWriter("combined_train.tfrecord")

intended_directory = train_labels_directory

num_files = len(os.listdir(intended_directory))
index = 0

#pick which image/ label to use: train, val, or test.
for label_file in os.listdir(intended_directory):
  label_path = os.path.join(intended_directory, label_file)
  if not label_file.endswith(".txt"):
    continue
  image_file_jpg = os.path.splitext(label_file)[0] + "." + IMAGE_FORMAT
  image_path_jpg = os.path.join(train_images_directory, image_file_jpg)

  image_file_png = os.path.splitext(label_file)[0] + "." + IMAGE_FORMAT_2
  image_path_png = os.path.join(train_images_directory, image_file_png)

  label_path = os.path.join(train_labels_directory, label_file)

  format = IMAGE_FORMAT
  if os.path.exists(image_path_jpg):
    image_path = image_path_jpg
  elif os.path.exists(image_path_png):
    image_path = image_path_png
    format = IMAGE_FORMAT_2
  else:
    print(f"Image file not found: {image_path_jpg} or {image_path_png}")
    continue


  tf_example = create_tf_example(image_path, label_path, format)
  writer.write(tf_example.SerializeToString())
  index += 1
  print(f"Processed {index}/{num_files} files") if index % 10 == 0 else None

writer.close()
print("tf record saved to combined_train.tfrecord")

##########################################################




# CREATE RECORDS FOR VALIDATION DATA
##########################################################

#For each file, create an encoded representating incorporating both the image and the bounding labels
writer = tf.io.TFRecordWriter("combined_val.tfrecord")

intended_directory = val_labels_directory

num_files = len(os.listdir(intended_directory))
index = 0

#pick which image/ label to use: train, val, or test.
for label_file in os.listdir(intended_directory):
  label_path = os.path.join(val_labels_directory, label_file)
  if not label_file.endswith(".txt"):
    continue
  image_file = os.path.splitext(label_file)[0] + "." + IMAGE_FORMAT
  image_path = os.path.join(val_images_directory, image_file)
  label_path = os.path.join(val_labels_directory, label_file)


  if not os.path.exists(image_path):
    print(f"Image file not found: {image_path}")
    continue

  tf_example = create_tf_example(image_path, label_path)
  writer.write(tf_example.SerializeToString())
  index += 1
  print(f"Processed {index}/{num_files} files") if index % 10 == 0 else None

writer.close()
print("tf record saved to combined_val.tfrecord")

##########################################################





# CREATE RECORDS FOR TEST DATA
##########################################################

#For each file, create an encoded representating incorporating both the image and the bounding labels
writer = tf.io.TFRecordWriter("combined_test.tfrecord")

intended_directory = test_labels_directory

num_files = len(os.listdir(intended_directory))
index = 0

#pick which image/ label to use: train, val, or test.
for label_file in os.listdir(intended_directory):
  label_path = os.path.join(test_labels_directory, label_file)
  if not label_file.endswith(".txt"):
    continue
  image_file = os.path.splitext(label_file)[0] + "." + IMAGE_FORMAT
  image_path = os.path.join(test_images_directory, image_file)
  label_path = os.path.join(test_labels_directory, label_file)


  if not os.path.exists(image_path):
    print(f"Image file not found: {image_path}")
    continue

  tf_example = create_tf_example(image_path, label_path)
  writer.write(tf_example.SerializeToString())
  index += 1
  print(f"Processed {index}/{num_files} files")

writer.close()
print("tf record saved to combined_test.tfrecord")

##########################################################


In [None]:
def parse_tfrecord(record_input):
  feature = {
      'image/encoded': tf.io.FixedLenFeature([], tf.string),
      'image/filename': tf.io.FixedLenFeature([], tf.string),
      'image/object/bbox/xmin': tf.io.VarLenFeature(tf.float32),
      'image/object/bbox/xmax': tf.io.VarLenFeature(tf.float32),
      'image/object/bbox/ymin': tf.io.VarLenFeature(tf.float32),
      'image/object/bbox/ymax': tf.io.VarLenFeature(tf.float32)
        }
  parsed_record = tf.io.parse_single_example(record_input, feature)
  print(parsed_record)

  image = tf.image.decode_jpeg(parsed_record['image/encoded'],channels=3)
  image = tf.image.resize(image, [256, 256]) / 255.0

  bounding_boxes = tf.stack([
    tf.sparse.to_dense(parsed_record['image/object/bbox/ymin']),
    tf.sparse.to_dense(parsed_record['image/object/bbox/xmin']),
    tf.sparse.to_dense(parsed_record['image/object/bbox/ymax']),
    tf.sparse.to_dense(parsed_record['image/object/bbox/xmax'])
  ], axis=1)


  return image, bounding_boxes

In [None]:
def load_dataset(record_path, batch_size, shuffle = False):
  dataset = tf.data.TFRecordDataset(record_path)
  dataset = dataset.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
  if shuffle:
    dataset = dataset.shuffle(512)
  dataset = dataset.padded_batch(batch_size, padded_shapes=([256, 256, 3], [None, 4]))
  dataset = dataset.prefetch(tf.data.AUTOTUNE)
  return dataset

In [None]:
train_dataset = load_dataset("combined_train.tfrecord", 16, shuffle = True)
val_dataset = load_dataset("combined_val.tfrecord", 16)
test_dataset = load_dataset("combined_test.tfrecord", 16)

## Our Detection Model ##
This is the core part of our live detection model, which includes 6 convolutional layers to extract the features, 2 dropout layers to discourage overfitting, and 2 MaxPooling layers to reduce the spatial dimension.

In [None]:
@register_keras_serializable()
class detector(tf.keras.Model):
  def __init__(self):
    super(detector, self).__init__()
    self.backbone = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 2, strides = 1, activation = 'relu', padding= 'same'),
        #tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(64, 2, strides = 1, activation = 'relu', padding= 'same'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(128, 2, strides = 1, activation = 'relu', padding= 'same'),
        #tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(256, 2, strides = 1, activation = 'relu', padding= 'same'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Dropout(0.50),
        tf.keras.layers.Conv2D(512, 2, strides = 1, activation = 'relu', padding= 'same'),
        #tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(1024, 2, strides = 1, activation = 'relu', padding= 'same'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.GlobalAveragePooling2D()
    ])
    self.box_head = tf.keras.layers.Dense(4, activation = 'sigmoid')
  def call(self, x):
    return self.box_head(self.backbone(x))

def loss_fn(bboxes_true, box_preds):
    return tf.reduce_mean(tf.square(bboxes_true[:, 0] - box_preds))


In [None]:
## Train the model

model = detector()
model.compile(optimizer = 'adam', loss = loss_fn)

num_images = 3875
batch_size = 16
steps_per_epoch = num_images // batch_size

model.fit(train_dataset,validation_data=val_dataset, epochs = 70, steps_per_epoch = steps_per_epoch)

In [None]:
# Save the model

model.export("new_model1")

## Testing our model ##

In [None]:
# Test model we just trained.

# note: add more dropouts. looks like the model is starting to memorize the the data
print("evaluating the dataset")
result = model.evaluate(test_dataset)
print(result)

In [None]:
# Compute the accuracy of the model using iou loss on test dataset

"""
def compute_dist(box1, box2):
  ymin1, xmin1, ymax1, xmax1 = box1
  ymin2, xmin2, ymax2, xmax2 = box2
  print(f"computing differences between {box1} and {box2}.")

  dist = (xmax1 - xmin1) * (ymax1 - ymin1) + (xmax2 - xmin2) * (ymax2 - ymin2) - (xmax2 - xmin2) * (ymax2 - ymin2)

  return dist
"""

def compute_iou(box1, box2):
  ymin1, xmin1, ymax1, xmax1 = box1
  ymin2, xmin2, ymax2, xmax2 = box2

  xi1 = max(xmin1, xmin2)
  yi1 = max(ymin1, ymin2)
  xi2 = min(xmax1, xmax2)
  yi2 = min(ymax1, ymax2)

  intersection_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)

  box1_area = (xmax1 - xmin1) * (ymax1 - ymin1)
  box2_area = (xmax2 - xmin2) * (ymax2 - ymin2)

  union_area = box1_area + box2_area - intersection_area

  iou = intersection_area / union_area if union_area > 0 else 0.0
  
  return iou


In [None]:
# Test model with test images.

num_images = len(os.listdir(test_images_directory))
computed_ious = 0.0
index = 0
for test_image in os.listdir(test_images_directory):
  if (test_image.endswith(".png")) or (test_image.endswith(".jpg")) or (test_image.endswith(".jpeg")):
    None
  else:
    continue
  img = Image.open(os.path.join(test_images_directory, test_image)).convert("RGB")
  img = img.resize((256,256))
  input_tensor = tf.convert_to_tensor(np.array(img)/255.0, dtype = tf.float32)
  input_tensor = tf.expand_dims(input_tensor, axis = 0)

  pred_bounds = model(input_tensor)
  pred_bounds = pred_bounds[0].numpy()

  img_width, img_height = img.size
  ymin, xmin, ymax, xmax = pred_bounds

  (ymin, xmin, ymax, xmax) = (int(ymin * img_height), int(xmin * img_width), int(ymax * img_height), int(xmax * img_width))

  normalized_bounds = (ymin, xmin, ymax, xmax)

  test_labels = test_image.replace(".jpg", ".txt")
  if test_labels not in os.listdir(test_labels_directory):
    test_labels = test_image.replace(".jpeg", ".txt")

  label_directory = os.path.join(test_labels_directory, test_labels)
  #print(label_directory)

  with open(label_directory, 'r') as label_file:
    for line in label_file.readlines():
      class_id, x_center, y_center, bound_width, bound_height = map(float, line.strip().split())
      xmin_actual = (x_center - bound_width / 2) * img_width
      xmax_actual = (x_center + bound_width / 2) * img_width
      ymin_actual = (y_center - bound_height / 2) * img_height
      ymax_actual = (y_center + bound_height / 2) * img_height
      actual_bounds =  (int(ymin_actual), int(xmin_actual), int(ymax_actual), int(xmax_actual))
      iou = compute_iou(normalized_bounds,actual_bounds)
      #print(f"Current IOU: {iou * 100:.2f}%")
      computed_ious += iou
      index += 1
average_iou = computed_ious / index
print(f"\nAverage IOU: {average_iou * 100:.2f}%")
