# COVID Compliance Violation Detection

1. **Distance Violation Detection**  
This code implements `YOLOv3` and `Deep SORT` in order to perfrom real-time people tracking. Yolov3 is an algorithm that uses deep convolutional neural networks to perform object detection. We can feed the people detected into Deep SORT `(Simple Online and Realtime Tracking with a Deep Association Metric)` in order for a real-time object tracker to be created.

2. **Face Mask Detection**

## Setup

In [1]:
#python-dotenv
#tenosrflow==2.0
#opencv-python
#matplotlib
#seaborn
#pillow
#tqdm

In [2]:
import os
import sys
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)

In [3]:
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

True

In [4]:
BASE_PATH = os.getenv("BASE_PATH")
sys.path.append(BASE_PATH)

In [5]:
# Imports
import time, random
import numpy as np
from absl import app, flags, logging
from absl.flags import FLAGS
import cv2
import matplotlib.pyplot as plt
import tensorflow as tf
from yolov3_tf2.models import (
    YoloV3, YoloV3Tiny
)
from yolov3_tf2.dataset import transform_images
from yolov3_tf2.utils import draw_outputs, convert_boxes

In [6]:
from deep_sort import preprocessing
from deep_sort import nn_matching
from deep_sort.detection import Detection
from deep_sort.tracker import Tracker

In [7]:
from utils import generate_detections as gdet
from PIL import Image
from scipy.spatial import distance
import pandas as pd
from tqdm import tqdm

In [8]:
# Set Data Paths
DATA_PATH = os.path.join(BASE_PATH, 'data/raw')
OUT_DATA_PATH = os.path.join(BASE_PATH, 'data/processed')
TEMP_DATA_PATH = os.path.join(BASE_PATH, "data/temp")
test_filename = "test"
test_video_file = os.path.join(DATA_PATH,'videos/{}.mp4'.format(test_filename))
output_video_file = os.path.join(OUT_DATA_PATH,'videos/{}.avi'.format(test_filename))
output_mp4video_file = os.path.join(OUT_DATA_PATH,'videos{}-c.mp4'.format(test_filename))

In [None]:
os.makedirs(TEMP_DATA_PATH, exist_ok=True)

In [9]:
# Set Model Paths
CLASSES_PATH = os.path.join(BASE_PATH, 'data/common/cocolabels/coco.names')
MODEL_PATH = os.path.join(BASE_PATH, 'models')
YOLO_WEIGHTS_PATH = os.path.join(MODEL_PATH, 'yolov3/weights/yolov3.tf')
deepsort_model_filename = os.path.join(MODEL_PATH,'deepsort/mars-small128.pb')

In [10]:
# Setup GPU system
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [11]:
args = {'tiny': False, #'yolov3 or yolov3-tiny'
        'size': 416, #'resize images to'
        'output_format': 'XVID', #'codec used in VideoWriter when saving video to file'
        'num_classes': 80, #'number of classes in the model'
        'max_cosine_distance': 0.5,
        'nn_budget': None,
        'nms_max_overlap':1.0
       }

## Initialise the Models

In [12]:
# Deepsort Model
encoder = gdet.create_box_encoder(deepsort_model_filename, batch_size=1)
metric = nn_matching.NearestNeighborDistanceMetric("cosine", args['max_cosine_distance'], args['nn_budget'])
tracker = Tracker(metric)

In [13]:
# YOLOv3
if args['tiny']:
    yolo = YoloV3Tiny(classes=args['num_classes'])
else:
    yolo = YoloV3(classes=args['num_classes'])

In [14]:
yolo.load_weights(YOLO_WEIGHTS_PATH)
logging.info('weights loaded')

In [15]:
class_names = [c.strip() for c in open(CLASSES_PATH).readlines()]
logging.info('classes loaded')

In [16]:
try:
    assert os.path.exists(test_video_file)
    vid = cv2.VideoCapture(test_video_file)
    print("Initialised the video capture.")
except:
    print("**ERR: Unable to load test video, using default URL")
    vid = cv2.VideoCapture('https://www.sample-videos.com/video/mp4/720/big_buck_bunny_720p_2mb.mp4')
    print("Loaded default URL")

Initialised the video capture.


In [17]:
out = None

# by default VideoCapture returns float instead of int
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(vid.get(cv2.CAP_PROP_FPS))
total_f = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
codec = cv2.VideoWriter_fourcc(*args['output_format'])
out = cv2.VideoWriter(output_video_file, codec, fps, (width, height))
frame_index = -1

In [18]:
def transform_image(img):
    img_in = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) 
    img_in = tf.expand_dims(img_in, 0)
    img_in = transform_images(img_in, args['size'])
    return img_in

In [19]:
def yolo_predict(img_in):
    boxes, scores, classes, nums = yolo.predict(img_in)
    classes = classes[0]
    names = []
    for i in range(len(classes)):
        names.append(class_names[int(classes[i])])
    names = np.array(names)
    converted_boxes = convert_boxes(img, boxes[0])
    features = encoder(img, converted_boxes)    
    detections = []
    converted_boxes_lst = []
    for bbox, score, class_name, feature in zip(converted_boxes, scores[0], names, features):
        if class_name == 'person':
            detections.append(Detection(bbox, score, class_name, feature))
            converted_boxes_lst.append(converted_boxes)
    return detections, converted_boxes_lst
        


In [20]:
#define a function which return the bottom center of every bbox
def mid_point(img, bbox):
    #get the coordinates
    x1,y1,x2,y2 = bbox[0], bbox[1], bbox[2], bbox[3]
    #compute bottom center of bbox
    x_mid = int((x1+x2)/2)
    y_mid = int(y2)
    mid   = (x_mid,y_mid)

    return mid

In [21]:
%%time
def compute_distance(traker_idx, mids, thresh):
    p1 = []
    p2 = []
    dist = []
    dvd = []
    row = len(traker_idx)
    dist = [] #np.zeros((row,row))
    for i in range(row):
        for j in range(i+1,row):
            if i!=j:
                p1.append(traker_idx[i])
                p2.append(traker_idx[j])
                dst = distance.euclidean(mids[i], mids[j])
                dist.append(dst)
                if dst <=thresh:
                    dvd.append(True)
                else:
                    dvd.append(False)
    return pd.DataFrame(zip(p1, p2, dist, dvd), columns =['pid1', 'pid2', 'distance', 'dvd'])

CPU times: user 158 µs, sys: 0 ns, total: 158 µs
Wall time: 57.2 µs


In [22]:
%%time
def find_closest(dist,num,thresh):
    p1=[]
    p2=[]
    d=[]
    for i in range(num):
        for j in range(i,num):
            if( (i!=j) & (dist[i][j]<=thresh)):
                p1.append(i)
                p2.append(j)
                d.append(dist[i][j])
    return p1,p2,d

CPU times: user 14 µs, sys: 4 µs, total: 18 µs
Wall time: 34.8 µs


In [23]:
%%time
def check_dvd(tracker, threshold):
    mids = []
    tracker_idx = []
    for track in tracker.tracks:
        if not track.is_confirmed() or track.time_since_update > 1:
            continue 
        
        # get the coordinates
        bbox = track.to_tlbr()
        # Get the midpoints
        mids.append(mid_point(img, bbox))
        tracker_idx.append(track.track_id)
    
    # compute distance
    #tracker_mid_df = pd.DataFrame(zip(traker_idx,mids), columns=['tracker_id', 'mid_pt'])
    dist_df = compute_distance(tracker_idx, mids, threshold)
    return dist_df
    

CPU times: user 12 µs, sys: 4 µs, total: 16 µs
Wall time: 28.8 µs


In [24]:
%%time
def check_violation(df, track_id):
    #print("First Check",df[df['pid1'] == track_id & df['dvd'] == True].count())
    #print("Second Check",df[df['pid2'] == track_id & df['dvd'] == True].count())
    #print("-->",df[df['pid1'] == track_id & df['dvd'] == True])
    
    if (df[(df.pid1 == track_id) & (df.dvd == True)].shape[0] > 0):
        return True
    elif (df[(df.pid2 == track_id) & (df.dvd == True)].shape[0] > 0):
        return True
    else:
        return False
    
    return False

CPU times: user 23 µs, sys: 0 ns, total: 23 µs
Wall time: 43.9 µs


In [25]:
%%time

fps = 0.0
count = 0 
dvd_frame_idx = []
dvd_person_idx = []
dvd_mid_pt = []
dvd_shortest_distance = []
dvd_violation = []
threshold = 100
violation_count = 0
t0 = time.time()
pbar = tqdm(total=total_f)
while True:
    result, img = vid.read()
    if img is None:
        logging.warning("Empty Frame")
        time.sleep(0.1)
        count+=1
        if count < 3:
            continue
        else: 
            print("More than 3 empty frames found...")
            break
    
    # Else process the image found
    img_in = transform_image(img)
    
    # Predict the objects using yolo
    t1 = time.time()
    
    detections, converted_boxes = yolo_predict(img_in)
    cmap = plt.get_cmap('tab20b')
    colors = [cmap(i)[:3] for i in np.linspace(0, 1, 20)]
    
    # Call the tracker
    tracker.predict()
    tracker.update(detections)
    mids = []
    df = check_dvd(tracker, threshold)
    for track in tracker.tracks:
        if not track.is_confirmed() or track.time_since_update > 1:
            continue 
        bbox = track.to_tlbr()
        class_name = track.get_class()
        color = colors[int(track.track_id) % len(colors)]
        color = (0,255,0) #(0,0,255) #[i * 255 for i in color]
        #print(df.shape)
        if check_violation(df, track.track_id):
            violation_count += 1
            color = (0,0,255)
        
        cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), color, 2)
        cv2.rectangle(img, (int(bbox[0]), int(bbox[1]-30)), 
                      (int(bbox[0])+(len(class_name)+len(str(track.track_id)))*17, 
                       int(bbox[1])), color, -1)
        cv2.putText(img, class_name + "-" + str(track.track_id),
                    (int(bbox[0]), int(bbox[1]-10)),0, 0.75, (255,255,255),2)
    
    out.write(img)
    frame_index = frame_index + 1
    pbar.update(1)
    # press q to quit
    if cv2.waitKey(1) == ord('q'):
        break
    



More than 3 empty frames found...
CPU times: user 38min 31s, sys: 5min 3s, total: 43min 34s
Wall time: 3min 11s


In [26]:
vid.release()

In [27]:
out.release()
cv2.destroyAllWindows()

In [28]:
print(violation_count)

1762


In [29]:
def convert_avi_to_mp4(avi_file_path, output_name):
    os.popen("ffmpeg -i '{input}' -ac 2 -b:v 2000k -c:a aac -c:v libx264 -b:a 160k -vprofile high -bf 0 -strict experimental -f mp4 '{output}.mp4'".format(input = avi_file_path, output = output_name))
    return True

In [30]:
convert_avi_to_mp4(output_video_file, output_mp4video_file)

True