<img src="https://github.com/RadimKozl/OpenCV_Project4/blob/832738cf629a0d6518297f860fd1952e3eaa3143/notebooks/img/OpenCV_logo.png" alt="OpenCV logo" style="width: 200px;"/>

# ***Video Face Mask Detection***
--------------------------------------

## ***Import libraries***

In [1]:
import numpy as np

from IPython.display import display, clear_output
import pickle, cv2, sys, os, csv

from sys import exit

from dataPath import MODEL_PATH, VIDEOS_PATH, PICKLE_PATH, DATA_PATH

from PIL import Image, ImageDraw, ImageFont
import onnxruntime as ort

import ipywidgets as widgets
from ipywidgets import interact, interactive, fixed, interact_manual, IntSlider, Output
from ipywidgets.widgets.interaction import interactive
from IPython.display import display

## ***Load models***

In [2]:
path_model_dir = os.path.join(MODEL_PATH, 'onnx')

In [3]:
def load_file(path_dir, suffix='.onnx'):
    list_files = os.listdir(path_dir)
    list_models_files = [file for file in list_files if os.path.isfile(os.path.join(path_dir, file)) and file.endswith(suffix)]
    return list_models_files

In [4]:
models_names = load_file(path_model_dir, suffix='.onnx')
models_names

['YOLOv10m_FMD_01.onnx',
 'YOLOv10m_FMD_02.onnx',
 'YOLOv5m_FMD.onnx',
 'YOLOv6m_FMD_01.onnx',
 'YOLOv6m_FMD_02.onnx',
 'YOLOv7_FMD_01.onnx',
 'YOLOv7_FMD_02.onnx',
 'YOLOv7_FMD_03.onnx',
 'YOLOv8m_FMD.onnx',
 'YOLOv9m_FMD.onnx']

In [5]:
if not os.path.exists(os.path.join(DATA_PATH, 'pickle')):
    os.mkdir(os.path.join(DATA_PATH, 'pickle')) 
    
PICKLE_PATH = os.path.join(DATA_PATH, 'pickle')

In [6]:
def sel_model(name):
    path = os.path.join(path_model_dir,name)
    data = (name, path)
    file_path = os.path.join(PICKLE_PATH, 'sel_video_model.pkl')
    with open(file_path, 'wb') as f:  
        pickle.dump(data, f)
    return data

In [7]:
path_model = interact(sel_model, name=models_names);

interactive(children=(Dropdown(description='name', options=('YOLOv10m_FMD_01.onnx', 'YOLOv10m_FMD_02.onnx', 'Y…

In [116]:
with open(os.path.join(PICKLE_PATH, 'sel_video_model.pkl'), 'rb') as f:
    sel_model = pickle.load(f)

## ***Create model***

In [117]:
model_session = ort.InferenceSession(sel_model[1])

In [118]:
# Model parameters
input_name = model_session.get_inputs()[0].name
output_names = [output.name for output in model_session.get_outputs()]
print(
    'Input name: ', input_name,
    '\nOutput names: ', output_names
)

Input name:  images 
Output names:  ['output0']


In [119]:
input_shape = model_session.get_inputs()[0].shape
output_shape = [output.shape for output in model_session.get_outputs()]
print(
    'Input shape: ', input_shape,
    '\nOutput shape: ', output_shape
)

Input shape:  [1, 3, 640, 640] 
Output shape:  [[1, 6, 8400]]


In [120]:
transpose_check_output = True

In [121]:
# check models
if len(output_shape[0]) != 3:
    print('Wrong type of model used!!!')
    assert len(output_shape[0]) == 3
    
else:
    if output_shape[0][1] > output_shape[0][2]:
        transpose_check_output = False
    else:
        transpose_check_output = False

print('Transpose is necessary: ', transpose_check_output)

Transpose is necessary:  False


## ***Load videos***

In [122]:
path_dir_videos = os.path.join(VIDEOS_PATH, 'test_videos')

In [123]:
videos_names = load_file(path_dir_videos, suffix='.mp4')
videos_names

['test-video1.mp4', 'test-video2.mp4']

In [124]:
def sel_video(name):
    path = os.path.join(path_dir_videos,name)
    data = (name, path)
    file_path = os.path.join(PICKLE_PATH, 'selected_video.pkl')
    with open(file_path, 'wb') as f:  
        pickle.dump(data, f)
    return data

In [125]:
path_video = interact(sel_video, name=videos_names);

interactive(children=(Dropdown(description='name', options=('test-video1.mp4', 'test-video2.mp4'), value='test…

In [148]:
with open(os.path.join(PICKLE_PATH, 'selected_video.pkl'), 'rb') as f:
    sel_video = pickle.load(f)

In [149]:
if not os.path.exists(os.path.join(VIDEOS_PATH,'results')):
    os.mkdir(os.path.join(VIDEOS_PATH,'results')) 
    
PATH_RESULTS = os.path.join(VIDEOS_PATH,'results')

In [150]:
# Load the video
cap = cv2.VideoCapture(sel_video[1])

In [151]:
# Set output for video
name_used_model = sel_model[0].split('.')[0]
name_result_video = 'onnxruntime_' + name_used_model + '_' + sel_video[0]
print(name_result_video)
output_path = os.path.join(PATH_RESULTS, name_result_video)

onnxruntime_YOLOv9m_FMD_test-video2.mp4


In [152]:
# set codec and videowriter
#fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fourcc = cv2.VideoWriter_fourcc(*"XVID")
out = cv2.VideoWriter(output_path, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))

In [153]:
# Parameters for image processing
input_width = 640
input_height = 640
conf_threshold = 0.1

## ***Video processing***

In [154]:
# Define YOLO classes, form YAML file of train process
yolo_classes = ['mask','no-mask']

In [155]:
# Define a color map for different classes
COLOR_MAP = {
    'mask': (0, 255, 0),  # Green for class 0 (mask)
    'no-mask': (0, 0, 255)  # Red for class 1 (no-mask)
    # Add more classes and colors as needed
}

# Set the font size
FONT_TYPE = cv2.FONT_HERSHEY_SIMPLEX
THICKNESS = 2
SCALE = 0.5

In [156]:
# Function to plot detections on an image
def draw_boxes(frame, boxes):
    for box in boxes:
        x1, y1, x2, y2, class_label, prob = box
        #print(x1, y1, x2, y2, class_label, prob)
        label = f"Class {class_label}: {prob:.2f}"
        color = COLOR_MAP[class_label]
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, THICKNESS) 
        cv2.putText(frame, label, (int(x1), int(y1) - 10), FONT_TYPE, fontScale=SCALE, color=color, thickness=THICKNESS)

    return frame

In [157]:
# Calculate the area of ​​intersection
def intersection(box1,box2):
    box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
    box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
    x1 = max(box1_x1,box2_x1)
    y1 = max(box1_y1,box2_y1)
    x2 = min(box1_x2,box2_x2)
    y2 = min(box1_y2,box2_y2)
    return (x2-x1)*(y2-y1) 

In [158]:
# Calculate the connection area
def union(box1,box2):
    box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
    box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
    box1_area = (box1_x2-box1_x1)*(box1_y2-box1_y1)
    box2_area = (box2_x2-box2_x1)*(box2_y2-box2_y1)
    return box1_area + box2_area - intersection(box1,box2)

In [159]:
# Divide one by the other
def iou(box1,box2):
    return intersection(box1,box2)/union(box1,box2)

In [160]:
# preprocess image
def preprocess(image):
    img = cv2.resize(image, (input_width, input_height))
    img = img.transpose(2, 0, 1)  # Convert to format (channels, height, width)
    img = img[np.newaxis, :, :, :].astype(np.float32) / 255.0  # Normalize and add dimension
    return img

In [161]:
def parse_row(row, img_width, img_height, class_names):
    # slice out tx_center, y_center, width, height from the row
    xc,yc,w,h = row[:4]

    # calculate coordinates of bounding box corners
    x1 = (xc-w/2)/640*img_width
    y1 = (yc-h/2)/640*img_height
    x2 = (xc+w/2)/640*img_width
    y2 = (yc+h/2)/640*img_height

    # find the object with a maximum probability
    prob = row[4:].max()
    # id of class
    class_id = row[4:].argmax()

    # Check class annotation, note: YOLOv7 have different annotation 1,2 not 0,1!!!
    if (len(class_names)-1) < class_id:
        # get a class label by ID
        label = class_names[(class_id-1)]
    else:
        # get a class label by ID
        label = class_names[class_id]

    return [x1, y1, x2, y2, label, prob]

In [162]:
def postprocess(frame, outputs, class_labels, conf_threshold, iou_threshold):
    frame_height, frame_width = frame.shape[:2]
    #boxes, scores, class_labels = [], [], []
    
    # We expect output[0] to contain detections
    detections = outputs[0]


    # Check Output shape
    detections_shape = detections.shape
        
    # We turned it out to a matrix with 84 rows and, 8400 columns.
    if detections_shape[1] < detections_shape[2]:
        detections = detections.transpose(0, 2, 1)[0]
        #print('Turned output shape: ', output.shape)
    else:
        detections = detections[0]

    # parses and filter outs all rows from output
    boxes = [row for row in [parse_row(row, frame_width, frame_height, class_labels) for row in detections] if row[5] > conf_threshold]
    boxes.sort(key=lambda x: x[5], reverse=True)

    filtered_boxes = []
    while len(boxes)>0:
        filtered_boxes.append(boxes[0])
        boxes = [box for box in boxes if iou(box,boxes[0])<iou_threshold]
    
    return filtered_boxes



In [163]:
def check_conf_threshold(conf_threshold):
    file_path = os.path.join(PICKLE_PATH, 'conf_threshold_video.pkl')
    with open(file_path, 'wb') as f:  
        pickle.dump(round(conf_threshold, 2), f)
    return conf_threshold

In [164]:
interact(check_conf_threshold, conf_threshold=widgets.FloatSlider(min=0.1, max=1.0, step=0.01, value=0.50));

interactive(children=(FloatSlider(value=0.5, description='conf_threshold', max=1.0, min=0.1, step=0.01), Outpu…

In [165]:
with open(os.path.join(PICKLE_PATH, 'conf_threshold_video.pkl'), 'rb') as f:
    conf_threshold = pickle.load(f)

print(conf_threshold)

0.5


In [166]:
def check_iou_threshold(iou_threshold):
    file_path = os.path.join(PICKLE_PATH, 'iou_threshold_video.pkl')
    with open(file_path, 'wb') as f:  
        pickle.dump(round(iou_threshold, 2), f)
    return iou_threshold

In [167]:
interact(check_iou_threshold, iou_threshold=widgets.FloatSlider(min=0.1, max=1.0, step=0.01, value=0.70));

interactive(children=(FloatSlider(value=0.7, description='iou_threshold', max=1.0, min=0.1, step=0.01), Output…

In [168]:
with open(os.path.join(PICKLE_PATH, 'iou_threshold_video.pkl'), 'rb') as f:
    iou_threshold = pickle.load(f)

print(iou_threshold)

0.7


In [169]:
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    blob = preprocess(frame)

    # Run inference
    outputs = model_session.run(output_names, {input_name: blob})

    boxes = postprocess(frame, outputs, yolo_classes, conf_threshold, iou_threshold)
    
    # Display of detections on the image
    frame = draw_boxes(frame, boxes)
    
    out.write(frame)
    #cv2.imshow('Video', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()