# Physical Distancing Detector

Importing Libraries - Make sure opencv, tensorflow and scipy is installed

In [1]:
import os
import cv2 as cv
import numpy as np
from scipy.spatial.distance import cdist
import argparse
import itertools
import tensorflow as tf

## Person Detection

In [2]:
class Person_detection(object):
    
    def __init__(self, model_name, min_threshold=0.40, input_shape=(320,320)):
        
        #initialize threshold values, interpreters and tensors
        self.min_score_threshold = min_threshold
        self.model = os.path.join('models', model_name)
        self.interpreter = tf.lite.Interpreter(model_path=self.model)
        self.input_tensor = self.interpreter.get_input_details()
        self.output_tensor = self.interpreter.get_output_details()
        self.interpreter.allocate_tensors()

    def predict(self, frame):
        
        # return the predictions for each frame
        # prediction contains the bounding box coordinates, object classes and scores
        
        if self.input_tensor[0]['dtype'] == np.float32:
            dtype_model = tf.float32
        else:
            dtype_model = tf.uint8
        input_tensor = tf.convert_to_tensor(frame, dtype=dtype_model)
        input_tensor = input_tensor[tf.newaxis, ...]
        
        self.interpreter.set_tensor(self.input_tensor[0]['index'], input_tensor)
        self.interpreter.invoke()
        
        det_box = tf.convert_to_tensor(self.interpreter.get_tensor(self.output_tensor[0]['index']))
        det_class = tf.convert_to_tensor(self.interpreter.get_tensor(self.output_tensor[1]['index']))
        det_score = tf.convert_to_tensor(self.interpreter.get_tensor(self.output_tensor[2]['index']))
        
        # convert tensor object to numpy array
        det_class = tf.squeeze(det_class, axis=[0]).numpy().astype(np.int64) + 1
        det_box = tf.squeeze(det_box, axis=[0]).numpy()
        det_score = tf.squeeze(det_score, axis=[0]).numpy()
        
        return{
            'det_boxes': det_box,
            'det_classes': det_class,
            'det_scores': det_score
        }

Physical distancing detection function which uses euclidean distance between the centroids of each person.

In [3]:
def physical_distance_detection(prediction, dist_threshold, frame):
    
    detection = [False] * len(prediction['det_boxes'])
    centroids = []
    red_color = (0,0,255)
    
    # calculate centroid value of each bunding box / person
    for boxes in prediction['det_boxes']:
        centroids.append(((boxes[1] + boxes[3])/2, (boxes[0]+ boxes[2])/2))
        
    # calculate the euclidean distance between each centroid
    for ((x,x1),( y,y1)) in itertools.combinations(enumerate(centroids), 2):
        if detection[x] and detection[y]:
            continue

        if cdist([x1],[y1], 'euclidean')[0][0] < dist_threshold:
                detection[x] = True
                detection[y] = True
                frame = cv.arrowedLine(frame, (int(x1[0]),int(x1[1])), (int(y1[0]),int(y1[1])), red_color,6)

    return detection

## Post Processing

Clean Up code to remove unwanted detections - like low confidence scores, objects other than person and object with bounding box too large considering the frame and camera view

In [4]:
def cleanup(prediction, image_w, image_h):
    
    delete_ids = []

    for i in range(len(prediction['det_classes'])):
        
        #select only person object
        if prediction['det_classes'][i] != 1:
            delete_ids.append(i)
        
        # select only objects with scores greater than threshold
        if prediction['det_scores'][i] < 0.5:
            delete_ids.append(i)
        
        x_min, y_min = int(prediction['det_boxes'][i][1] * image_w), int(prediction['det_boxes'][i][0] * image_h)
        x_max, y_max = int(prediction['det_boxes'][i][3] * image_w), int(prediction['det_boxes'][i][2] * image_h)
        prediction['det_boxes'][i] =  [y_min, x_min, y_max, x_max]
        
        if (x_max - x_min > image_w/3) or (y_max - y_min > image_h /2):
            delete_ids.append(i)
            
    cleaned_list = list(dict.fromkeys(delete_ids))
    prediction['det_classes'] = np.delete(prediction['det_classes'], cleaned_list, axis=0)
    prediction['det_boxes'] = np.delete(prediction['det_boxes'], cleaned_list, axis=0)
    prediction['det_scores'] = np.delete(prediction['det_scores'], cleaned_list, axis=0)
    
    return prediction

Draw bounding box rectangles around detected persons using open cv functions

In [5]:
def draw_rect(image, box, image_w, image_h, detection=False):
    
    red_color = (0, 0 , 255)
    green_color = (0, 255, 0)
    
    y_min = int(max(1, box[0]))
    x_min = int(max(1, box[1]))
    y_max = int(min(image_h, box[2]))
    x_max = int(min(image_w, box[3]))
    
    # draw a rectangle on the image
    if detection:
        cv.rectangle(image, (x_min, y_min), (x_max, y_max), red_color, 2)
    else:
        cv.rectangle(image, (x_min, y_min), (x_max, y_max), green_color, 2)

## Main Function

Entry point
Can be executed as a script with 3 optional arguments
args [-d] - predefined threshold for physical distancing (in pixel, depending on the video or stream source and fov)
     [-i] - input video filename with path
     [-o] - output video filename

In [6]:
if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()
    
    parser.add_argument('-d', type=int, required=False, dest='dist', default=150, \
                        help="physical distancing threshold distance in pixel")
    parser.add_argument('-i', type=str, required=False, dest='video', \
                        default='PDD_demo.avi', help="input video file name")
    parser.add_argument('-o', type=str, required=False, dest='op', \
                        default='PDD_ouput_demo.avi', help="output video file name")
    args, unknown = parser.parse_known_args()
    
    dist_threshold = args.dist
    input_video = args.video

    # load the input Video
    vid = cv.VideoCapture(input_video)
    
    # Calculate the height and width of the stream
    image_w = int(vid.get(cv.CAP_PROP_FRAME_WIDTH))
    image_h = int(vid.get(cv.CAP_PROP_FRAME_HEIGHT))
    
    # Storing the output to a avi video
    fourcc = cv.VideoWriter_fourcc(*'XVID')
    out = cv.VideoWriter(args.op, fourcc, 24, (image_w,image_h), True)
    
    # Load the detection model with threshold
    people_model = Person_detection('model.tflite',0.5)

    while(vid.isOpened()):
        
        ret_val, frame = vid.read()
        
        if frame is None or frame.size == 0:
            break
        else:
            # Resize the image to expected tensor shape for the loaded model
            prediction = people_model.predict(cv.resize(frame, (320,320)))
        
            # Cleanup non person and weak predictions
            person_prediction = cleanup(prediction, image_w, image_h)
        
            # Run the physical distance detector for each person
            # dist_threshold is the minimum distance between persons to consider breach
            detection = physical_distance_detection(person_prediction, dist_threshold, frame)

            # Draw the rectangle bounding boxes
            for i in range(len(person_prediction['det_boxes'])):
                draw_rect(frame, person_prediction['det_boxes'][i], image_w, image_h, detection[i])

            # Display the frame
            cv.imshow('PDD_DEMO', frame)
            out.write(frame)
            if cv.waitKey(1) == 27 or ret_val is False:
                break
        
    cv.destroyWindow('PDD_DEMO')
    if out is not None:
        out.release()