In [None]:
import cv2
import tensorflow as tf
import tensorflow_hub as hub
from PIL import Image
import numpy as np
from scipy.spatial import distance
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Num GPUs Available:  1


In [None]:
#load video
def load_video(video_path):
    vid = cv2.VideoCapture(video_path)
    if vid.isOpened()==False:
        print("Error opening video!")
    width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(vid.get(cv2.CAP_PROP_FPS))
    codec = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(r'Kadıköy-Bahariye-Caddesi-15-01-2000-2005_out.avi', codec, fps, (width, height))
    return vid, out

In [None]:
#get distance of each pair of boxes
def get_distance(midpos,boxes_num):
    dist=np.zeros((boxes_num,boxes_num))
    for i in range(boxes_num):
        for j in range(boxes_num):
            if i!=j:
                dist[i][j]=distance.euclidean(midpos[i],midpos[j])
                print(dist[i][j])
    return dist;

In [None]:
#process frames one by one
def process_frames(vid,detector,out):
    frame_num=0
    while True:
        return_value, frame = vid.read()
        if return_value:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image = Image.fromarray(frame)
        else:
            if frame_num > 0:
                print("Video processing complete")
                break
            else:
                raise ValueError("No image! Try with another video format")
     
        image_data = cv2.resize(frame, (512, 512))
        image_data = image_data[np.newaxis, ...].astype(np.ubyte)
        batch_data = tf.constant(image_data)
    
        #inference
        detections = detector(batch_data)
    
        classes = detections['detection_classes']
        boxes= detections['detection_boxes']
        scores= detections['detection_scores']
       
        classes=tf.reshape(classes,[-1]);
        scores=tf.reshape(scores,[-1]);
        boxes=tf.reshape(boxes,[-1,4])
    
        #get person with score exceeding the threshold
        ind=np.where((classes==1) & (scores>0.30))
        if len(ind[0]) == 0:
            continue;
        person_boxes=tf.gather(boxes,ind);
        person_boxes=tf.reshape(person_boxes,[-1,4])
        person_num=len(person_boxes)
    
        #draw bounding boxes
        image_h, image_w, _ = frame.shape
        image_hw = tf.constant([image_h,image_w,image_h,image_w],dtype=tf.float32)
        image_hw = tf.broadcast_to(image_hw,person_boxes.shape)
        person_boxes = person_boxes * image_hw
    
        bbox_line = int(min(image_h,image_w) / 100)
        bbox_color=(255,0,0) 
        midpos=np.zeros((person_num,2))
        for i in range(person_num):
            coor=list(person_boxes[i])  
            c1, c2 = (coor[1], coor[0]), (coor[3], coor[2])
            #get middle points of box
            midpos[i][0]=(coor[1]+coor[3])/2
            midpos[i][1]=(coor[0]+coor[2])/2
            cv2.rectangle(frame, c1, c2, bbox_color, bbox_line) 
        dist=get_distance(midpos,person_num)
        for i in range(person_num):
            coor_i=list(person_boxes[i])
            boxw_i=coor_i[3]-coor_i[1]
            boxh_i=coor_i[2]-coor_i[0]
            c1_i,c2_i=(coor_i[1],coor_i[0]),(coor_i[3],coor_i[2])
            for j in range(i+1,person_num):
                coor_j=list(person_boxes[j])
                boxw_j=coor_j[3]-coor_j[1]
                boxh_j=coor_j[2]-coor_j[0]
                c1_j,c2_j=(coor_j[1],coor_j[0]),(coor_j[3],coor_j[2])
                if abs(coor_i[2]-coor_j[2])<min(boxw_i,boxw_j)/5 and dist[i][j]<boxw_j+boxw_i:
                    cv2.rectangle(frame,c1_i,c2_i,(0,0,255),bbox_line)
                    cv2.rectangle(frame,c1_j,c2_j,(0,0,255),bbox_line)
        result = np.asarray(frame)
        #cv2.namedWindow("result", cv2.WINDOW_NORMAL)
        #cv2.imshow("result", result)
        #if cv2.waitKey(1) & 0xFF == ord('q'): break
        out.write(result)
        frame_num=frame_num+1

In [None]:
#load model
detector = hub.load("https://tfhub.dev/tensorflow/efficientdet/d2/1")
#load video
vid, out = load_video(r'/content/drive/MyDrive/Social Distancing Video/Kadıköy-Bahariye-Caddesi-15-01-2000-2005.avi')
process_frames(vid,detector,out)