# 0. Setup Paths

In [1]:
import os
DATA_DIR = 'dataset'
TENSORFLOW = 'Tensorflow'
PRE_TRAINED_MODELS = 'pre_trained_models'
MODELS = 'models'
COLORS = 'color_detection'
IMAGES = os.path.join(DATA_DIR,'images')
ANNOTATIONS = os.path.join(DATA_DIR,'annotations')
ANNOTATION = os.path.join(DATA_DIR,'annotation')
TRAIN = os.path.join(IMAGES,'Train')
TEST = os.path.join(IMAGES,'Test')
SCRIPTS = os.path.join(TENSORFLOW,'scripts')
API_MODELS = os.path.join(TENSORFLOW,MODELS)
CHECKPOINTS = os.path.join(MODELS,'ssd_mobnet')

# print(os.getcwd())

# 1. Train Test Split (Initial setup)

In [2]:
from math import floor
from random import shuffle
# List number of data (images) to be split into training and testing
all_files = os.listdir(IMAGES)
print ("Total number of images:",len(all_files))

# Shuffle all files for random data for training and testing
print("Random Shuffling all files...\n")
shuffle(all_files)

#Split images into 70% training and 30% testing
split = 0.7
split_index = (floor(len(all_files)*split))
training = all_files[:split_index]
testing = all_files[split_index:]
print("Splitting images into 70% for training and 30% for testing...")
print("Number of Training data:",len(training))
print("Number of test data:",len(testing))

Total number of images: 0
Random Shuffling all files...

Splitting images into 70% for training and 30% for testing...
Number of Training data: 0
Number of test data: 0


## 1.1. Moving images to respective folders

In [3]:
import shutil

# Moving splitted files into their respective directories
for file in training:
    path = os.path.join(IMAGES,file)
    dest = os.path.join(TRAIN,file)
    shutil.move(path,dest)
print("Files in Train folder:",len(os.listdir(TRAIN)))

for file in testing:
    path = os.path.join(IMAGES,file)
    dest = os.path.join(TEST,file)
    shutil.move(path,dest)
print("Files in Test folder:",len(os.listdir(TEST)))


Files in Train folder: 1195
Files in Test folder: 513


## 1.2. Moving annotations to respective folders

In [4]:
# Moving annotations for each data to respective directories
for file in training:
    pre, ext = os.path.splitext(file)
    annotation = pre + ".xml"
    path = os.path.join(ANNOTATIONS,annotation)
    dest = os.path.join(TRAIN_ANNOTATIONS,annotation)
    shutil.move(path,dest)
print("Files in Train annotations folder:",len(os.listdir(TRAIN_ANNOTATIONS)))

for file in testing:
    pre, ext = os.path.splitext(file)
    annotation = pre + ".xml"
    path = os.path.join(ANNOTATIONS,annotation)
    dest = os.path.join(TEST_ANNOTATIONS,annotation)
    shutil.move(path,dest)
print("Files in Test annotations folder:",len(os.listdir(TEST_ANNOTATIONS)))

Files in Train annotations folder: 599
Files in Test annotations folder: 258


# 2. Create Label Map (One time setup)

In [2]:
# 3 categories on mask detection is labeled with an id
labels = [{'name':'without_mask','id':1},{'name':'with_mask','id':2},{'name':'mask_weared_incorrect','id':3}]
labels

[{'name': 'without_mask', 'id': 1},
 {'name': 'with_mask', 'id': 2},
 {'name': 'mask_weared_incorrect', 'id': 3}]

In [3]:
# Create a label map for each annotation directory
# directories = [TRAIN_ANNOTATIONS,TEST_ANNOTATIONS]

# for directory in directories:
with open(ANNOTATION + '\label_map.pbtxt','w') as f:
    for label in labels:
        f.write('item{\n')
        f.write('\tname:\'{}\'\n'.format(label['name']))
        f.write('\tid:{}\n'.format(label['id']))
        f.write('}\n')

# 3. Generate TF records
**Using Tensorflow Object Detection API**

In [19]:
#Initialize paths for scripts, files and destination folder
script = os.path.join(SCRIPTS,'generate_tfrecord.py')
label_map = os.path.join(ANNOTATION,'label_map.pbtxt')
train_tfrecord = os.path.join(ANNOTATION,'train.record')
test_tfrecord = os.path.join(ANNOTATION,'test.record')

# Generate TF records for Train images
!python {script} -x {TRAIN} -l {label_map} -o {train_tfrecord}

# Generate TF records for Test images
!python {script} -x {TEST} -l {label_map} -o {test_tfrecord}

Successfully created the TFRecord file: dataset\annotation\train.record
Successfully created the TFRecord file: dataset\annotation\test.record


# 4. Download TF Models Pretrained Models from Tensorflow Model Zoo

In [None]:
# !cd Tensorflow && git clone https://github.com/tensorflow/models.git

# 5. Copy Model-Config to Training Folder

In [2]:
#Pre-trained model path
PT_MODEL = os.path.join(PRE_TRAINED_MODELS,'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8')
#Pre-trained model config file
PT_MODEL_CONFIG = os.path.join(PT_MODEL,'pipeline.config')

#Create a new directory called 'ssd_mobnet'
MODEL_NAME = 'ssd_mobnet'
!mkdir {'models\\'+MODEL_NAME}

#Copy model config of pre-trained model to 'models/ssb_mobnet' directory
MODEL = os.path.join(MODELS,MODEL_NAME)
!cp {PT_MODEL_CONFIG} {MODEL}

A subdirectory or file models\ssd_mobnet already exists.


# 6. Update Config for Transfer Learning

In [2]:
import tensorflow as tf
from object_detection.utils import config_util
from object_detection.protos import pipeline_pb2
from google.protobuf import text_format

In [3]:
#get config file in models/ssb_mobnet to this notebook to make changes
MODEL_NAME = 'ssd_mobnet'
CONFIG_PATH = MODELS + '/' + MODEL_NAME + '/pipeline.config'
config = config_util.get_configs_from_pipeline_file(CONFIG_PATH)

In [30]:
config

{'model': ssd {
   num_classes: 3
   image_resizer {
     fixed_shape_resizer {
       height: 320
       width: 320
     }
   }
   feature_extractor {
     type: "ssd_mobilenet_v2_fpn_keras"
     depth_multiplier: 1.0
     min_depth: 16
     conv_hyperparams {
       regularizer {
         l2_regularizer {
           weight: 4e-05
         }
       }
       initializer {
         random_normal_initializer {
           mean: 0.0
           stddev: 0.01
         }
       }
       activation: RELU_6
       batch_norm {
         decay: 0.997
         scale: true
         epsilon: 0.001
       }
     }
     use_depthwise: true
     override_base_feature_extractor_hyperparams: true
     fpn {
       min_level: 3
       max_level: 7
       additional_layer_depth: 128
     }
   }
   box_coder {
     faster_rcnn_box_coder {
       y_scale: 10.0
       x_scale: 10.0
       height_scale: 5.0
       width_scale: 5.0
     }
   }
   matcher {
     argmax_matcher {
       matched_threshold: 0.5
    

In [4]:
pipeline_config = pipeline_pb2.TrainEvalPipelineConfig()
with tf.io.gfile.GFile(CONFIG_PATH,'r') as f:
    proto = f.read()
    text_format.Merge(proto,pipeline_config)

In [41]:
#Number of class labeled for images to be predicted : without_mask,with_mask,mask_weared_incorrect 
pipeline_config.model.ssd.num_classes = 3
pipeline_config.train_config.batch_size = 50
#Configure places to start training from
pipeline_config.train_config.fine_tune_checkpoint= PRE_TRAINED_MODELS + '/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint/ckpt-0'
#Specifiy the type of model desired: detection model
pipeline_config.train_config.fine_tune_checkpoint_type = 'detection'
#specify label map path for training
pipeline_config.train_input_reader.label_map_path = ANNOTATION+'/label_map.pbtxt'
#Specify files path for training
pipeline_config.train_input_reader.tf_record_input_reader.input_path[:] = {ANNOTATION+'/train.record'}
#specify label map path for testing
pipeline_config.eval_input_reader[0].label_map_path = ANNOTATION+'/label_map.pbtxt'
#Specify files path for testing
pipeline_config.eval_input_reader[0].tf_record_input_reader.input_path[:] = {ANNOTATION+'/test.record'}

In [5]:
pipeline_config

model {
  ssd {
    num_classes: 3
    image_resizer {
      fixed_shape_resizer {
        height: 320
        width: 320
      }
    }
    feature_extractor {
      type: "ssd_mobilenet_v2_fpn_keras"
      depth_multiplier: 1.0
      min_depth: 16
      conv_hyperparams {
        regularizer {
          l2_regularizer {
            weight: 4e-05
          }
        }
        initializer {
          random_normal_initializer {
            mean: 0.0
            stddev: 0.01
          }
        }
        activation: RELU_6
        batch_norm {
          decay: 0.997
          scale: true
          epsilon: 0.001
        }
      }
      use_depthwise: true
      override_base_feature_extractor_hyperparams: true
      fpn {
        min_level: 3
        max_level: 7
        additional_layer_depth: 128
      }
    }
    box_coder {
      faster_rcnn_box_coder {
        y_scale: 10.0
        x_scale: 10.0
        height_scale: 5.0
        width_scale: 5.0
      }
    }
    matcher {
      arg

In [43]:
#Use google protobuf to convert protobuf message to string
#Write the contents into config file
config_content = text_format.MessageToString(pipeline_config)
with tf.io.gfile.GFile(CONFIG_PATH,'wb') as f:
    f.write(config_content)

In [9]:
pipeline_config

model {
  ssd {
    num_classes: 3
    image_resizer {
      fixed_shape_resizer {
        height: 320
        width: 320
      }
    }
    feature_extractor {
      type: "ssd_mobilenet_v2_fpn_keras"
      depth_multiplier: 1.0
      min_depth: 16
      conv_hyperparams {
        regularizer {
          l2_regularizer {
            weight: 4e-05
          }
        }
        initializer {
          random_normal_initializer {
            mean: 0.0
            stddev: 0.01
          }
        }
        activation: RELU_6
        batch_norm {
          decay: 0.997
          scale: true
          epsilon: 0.001
        }
      }
      use_depthwise: true
      override_base_feature_extractor_hyperparams: true
      fpn {
        min_level: 3
        max_level: 7
        additional_layer_depth: 128
      }
    }
    box_coder {
      faster_rcnn_box_coder {
        y_scale: 10.0
        x_scale: 10.0
        height_scale: 5.0
        width_scale: 5.0
      }
    }
    matcher {
      arg

# 7. Train the model

In [45]:
print("""python {}/research/object_detection/model_main_tf2.py --model_dir={}/{} --pipeline_config_path={}/{}/pipeline.config --num_train_steps=5000""".format(API_MODELS,MODELS,MODEL_NAME,MODELS,MODEL_NAME))

python Tensorflow\models/research/object_detection/model_main_tf2.py --model_dir=models/ssd_mobnet --pipeline_config_path=models/ssd_mobnet/pipeline.config --num_train_steps=5000


# 8. Load Train Model From Checkpoint

In [5]:
import os
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_utils
from object_detection.builders import model_builder

In [6]:
#Grab our pipeline
configs = config_util.get_configs_from_pipeline_file(CONFIG_PATH)
# Build the model using model config
detection_model = model_builder.build(model_config=configs['model'],is_training=False)

#restore checkpoint
checkpoint = tf.compat.v2.train.Checkpoint(model=detection_model)
#latest checkpoint is ckpt-6
checkpoint.restore(os.path.join(CHECKPOINTS,'ckpt-6')).expect_partial()

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x23070484a88>

In [7]:
#tensorflow function
@tf.function
def detect_fn(image):
    #preprocess image to 320x320
    image, shapes = detection_model.preprocess(image)
    #use detection model to make prediction
    prediction_dict = detection_model.predict(image,shapes)
    #Postprocess
    detection = detection_model.postprocess(prediction_dict,shapes)
    return detection

# 9. Detect in Real-Time

In [8]:
import cv2
import numpy as np

In [9]:
#Function to check compatibility with GUI
print(cv2.getBuildInformation())


  Version control:               4.5.2

  Extra modules:
    Location (extra):            C:/Users/runneradmin/AppData/Local/Temp/pip-req-build-inblc7p7/opencv_contrib/modules
    Version control (extra):     4.5.2

  Platform:
    Timestamp:                   2021-05-07T08:28:37Z
    Host:                        Windows 10.0.17763 AMD64
    CMake:                       3.18.4
    CMake generator:             Visual Studio 14 2015 Win64
    CMake build tool:            MSBuild.exe
    MSVC:                        1900
    Configuration:               Debug Release

  CPU/HW features:
    Baseline:                    SSE SSE2 SSE3
      requested:                 SSE3
    Dispatched code generation:  SSE4_1 SSE4_2 FP16 AVX AVX2
      requested:                 SSE4_1 SSE4_2 AVX FP16 AVX2 AVX512_SKX
      SSE4_1 (15 files):         + SSSE3 SSE4_1
      SSE4_2 (1 files):          + SSSE3 SSE4_1 POPCNT SSE4_2
      FP16 (0 files):            + SSSE3 SSE4_1 POPCNT SSE4_2 FP16 AVX
      AVX

In [9]:
#create Category index
category_index = label_map_util.create_category_index_from_labelmap(ANNOTATION+'/label_map.pbtxt')
category_index

{1: {'id': 1, 'name': 'without_mask'},
 2: {'id': 2, 'name': 'with_mask'},
 3: {'id': 3, 'name': 'mask_weared_incorrect'}}

In [41]:
#get your web cam
#number depends on what worked
capture = cv2.VideoCapture(0)
cap_width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
cap_height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))

In [42]:
while True:
    #get frame from video capture
    ret,frame = capture.read()
    #convert image to numpy array
    image_array = np.array(frame)
    
    # convert image into tensor
    input_tensor = tf.convert_to_tensor(np.expand_dims(image_array,0), dtype=tf.float32)
    #detection function
    detections = detect_fn(input_tensor)
    
    #get the detections
    num_detections = int(detections.pop("num_detections"))
    
    #Preprocess the detections
    detections = {key:value[0,:num_detections].numpy() for key, value in detections.items()}
    #adding detections and detection classes to the detection
    detections['num_detections'] = num_detections
    detections['detection_classes'] = detections['detection_classes'].astype(np.int64)
    
    #constant for labels(starts at 1)
    label_id_offset = 1
    
    #copy the image array to a new variable for visualization
    image_array_detection = image_array.copy()
    
    #Visualization image with labels
    vis_utils.visualize_boxes_and_labels_on_image_array(
        image_array_detection,
        detections['detection_boxes'],
        detections['detection_classes']+label_id_offset,
        detections['detection_scores'],
        category_index,
        use_normalized_coordinates = True,
        max_boxes_to_draw=1,
        min_score_thresh=0.3,
        agnostic_mode=False
    )
    
    #Get all the bounding boxes generate by the model
    boxes = detections['detection_boxes']
    #Get image size
    img_height,img_width,img_channel = image_array_detection.shape
    
    #Color detection======
    #Get number of boxes generated
    max_boxes_to_draw = boxes.shape[0]
    # get scores to get a threshold
    scores = detections['detection_scores']
    # Score threshold determining if the category significance
    min_score_thresh=.5
    # iterate over all objects found
    for i in range(min(max_boxes_to_draw, boxes.shape[0])):
        #If the bounding box has a score over 0.5
        if scores is None or scores[i] > min_score_thresh:
            # Draw the box
            detected_class = detections['detection_classes'][i]
            category =  category_index[detected_class+1]
            #Get the labeled class name
            class_name = category['name']
            #if image is labeled with with_mask
            if(class_name == 'with_mask'):
                #get coordinates of bounding box in the image
                #3 pixels is added to cut out the bounding border
                bottom = ((boxes[i][0]*img_height)+3).astype(int)
                left = ((boxes[i][1]*img_width)+3).astype(int)
                top = ((boxes[i][2]*img_height)-3).astype(int)
                right = ((boxes[i][3]*img_width)-3).astype(int)
                
                cropped_image = image_array_detection[bottom+50:bottom+145,left:right]
                cv2.imwrite('test.png',cropped_image)
#                 cv2.rectangle(image_array_detection,(left,top),(right,bottom),(255,0,0),-1)

                clt = detect_color(cropped_image)
                hist = centroid_histogram(clt)
                colors = zip(hist, clt.cluster_centers_)
                colors = sorted(colors, reverse=True)

                startX = img_width-330
                for (percent, color) in colors:
                    endX = startX + (percent * 300)
                    cv2.rectangle(image_array_detection, (int(startX), img_height-40), (int(endX), img_height-10),color.astype("uint8").tolist(), -1)
                    startX = endX
    
    #Show image with object detected
    cv2.imshow('object_detection',cv2.resize(image_array_detection,(800,600)))
    
    #Checks if the letter 'q' is pressed on the keyboard
    #Quit if pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        capture.release()
        break
#Close all windows
cv2.destroyAllWindows()

# 10. Color detection with K-means

In [10]:
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import argparse
import cv2
import numpy as np

In [11]:
def detect_color(image):
    image = image.reshape(image.shape[0] * image.shape[1], 3)
    
    clt = KMeans(n_clusters = 3)
    clt.fit(image)
    
    return clt

def centroid_histogram(clt):
    # grab the number of different clusters and create a histogram
    # based on the number of pixels assigned to each cluster
    numLabels = np.arange(0, len(np.unique(clt.labels_)) + 1)
    (hist, _) = np.histogram(clt.labels_, bins = numLabels)
    # normalize the histogram, such that it sums to one
    hist = hist.astype("float")
    hist /= hist.sum()
    # return the histogram
    return hist

# 11. Evaluate our model

In [10]:
#Similar to the model training command, but the number of trains is replaced with directory for checkpoint
#object detection api will take the latest checkpoint and evaluate the model from there
print("""python {}/research/object_detection/model_main_tf2.py --model_dir={}/{} --pipeline_config_path={}/{}/pipeline.config --checkpoint_dir={}/{}""".format(API_MODELS,MODELS,MODEL_NAME,MODELS,MODEL_NAME,MODELS,MODEL_NAME))

python Tensorflow\models/research/object_detection/model_main_tf2.py --model_dir=models/ssd_mobnet --pipeline_config_path=models/ssd_mobnet/pipeline.config --checkpoint_dir=models/ssd_mobnet
