In [10]:
!python social_distancing_analyser.py

The processing is over, you may open Output.mp4


OpenCV: FFMPEG: tag 0x47504a4d/'MJPG' is not supported with codec id 7 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


In [None]:
#Importing the required libraries
import time
import cv2
import numpy as np    

In [None]:
#Setting up the video path
vid_path = 'Test1.mp4'

In [None]:
confid = 0.5
thresh = 0.5 # The default value for the threshold is 0.5 for normalized predicted probabilities

# load the COCO class labels our YOLO model was trained on
labelsPath = "./coco.names"
LABELS = open(labelsPath).read().strip().split("\n")

np.random.seed(42)

# Derive the paths to the YOLO weights and model configuration
weightsPath = "./yolov3.weights"
configPath = "./yolov3.cfg"

# Load our YOLO object detector trained on COCO dataset (80 classes)
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath) 

# Determine only the *output* layer names that we need from YOLO
ln = net.getLayerNames()
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]

In [None]:
def calibrated_dist(p1, p2):
    return ((p1[0] - p2[0]) ** 2 + 550 / ((p1[1] + p2[1]) / 2) * (p1[1] - p2[1]) ** 2) ** 0.5


def isclose(p1, p2):
    c_d = calibrated_dist(p1, p2)
    calib = (p1[1] + p2[1]) / 2
    if 0 < c_d < 0.15 * calib:
        return 1
    elif 0 < c_d < 0.2 * calib:
        return 2
    else:
        return 0

In [None]:
cap = cv2.VideoCapture(vid_path) #Read the input video
writer = None
(W, H) = (None, None)

fl = 0
q = 0
while True:

    (grabbed, frame) = cap.read()
    
    if grabbed:
            frame = cv2.resize(frame, (1280,720)) # Resize frame for prediction       
    else:
            break

    if W is None or H is None:
        (H, W) = frame.shape[:2]
        q = W

    frame = frame[0:H, 200:q]
    (H, W) = frame.shape[:2]
    
    # Detecting objects
    blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416),
                                 swapRB=True, crop=False)
    net.setInput(blob)
    start = time.time()
    layerOutputs = net.forward(ln)
    end = time.time()

    boxes = []
    confidences = []
    classIDs = []

    for output in layerOutputs:

        for detection in output:

            scores = detection[5:]
            classID = np.argmax(scores)
            confidence = scores[classID]
            if LABELS[classID] == "person":

                if confidence > confid: #if prediction is 50% 
                    #Object detected
                    box = detection[0:4] * np.array([W, H, W, H])
                    (centerX, centerY, width, height) = box.astype("int")
                    # Rectangle coordinates
                    x = int(centerX - (width / 2))
                    y = int(centerY - (height / 2))
                    
                    boxes.append([x, y, int(width), int(height)])
                    confidences.append(float(confidence))
                    classIDs.append(classID)

    idxs = cv2.dnn.NMSBoxes(boxes, confidences, confid, thresh)

    if len(idxs) > 0:

        status = list()
        idf = idxs.flatten()
        close_pair = list()
        s_close_pair = list()
        center = list()
        dist = list()
        
        # Extract the bounding box and centroid coordinates
        for i in idf:
            (x, y) = (boxes[i][0], boxes[i][1])
            (w, h) = (boxes[i][2], boxes[i][3])
            center.append([int(x + w / 2), int(y + h / 2)])

            status.append(0)
        for i in range(len(center)):
            for j in range(len(center)):
                g = isclose(center[i], center[j])

                if g == 1:

                    close_pair.append([center[i], center[j]])
                    status[i] = 1
                    status[j] = 1
                elif g == 2:
                    s_close_pair.append([center[i], center[j]])
                    if status[i] != 1:
                        status[i] = 2
                    if status[j] != 1:
                        status[j] = 2

        total_p = len(center)
        a1=status.count(1)
        a2=status.count(2)
        high_risk_p = a1+a2
        safe_p = status.count(0)
        kk = 0

        for i in idf:

            sub_img = frame[10:80, 120:W - 120]
            black_rect = np.ones(sub_img.shape, dtype=np.uint8) * 0

            res = cv2.addWeighted(sub_img, 0.77, black_rect, 0.23, 1.0)

            frame[10:80, 120:W - 120] = res

            cv2.putText(frame, "Human Distance Detection (AIOTIZE TASK)", (200, 55),
                        cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2)

            tot_str = "TOTAL COUNT: " + str(total_p)
            high_str = "MORE THAN THRESHOLD : " + str(high_risk_p)
            safe_str = "LESS THAN THRESHOLD : " + str(safe_p)

            sub_img = frame[H - 80:H, 0:280]
            black_rect = np.ones(sub_img.shape, dtype=np.uint8) * 0

            res = cv2.addWeighted(sub_img, 0.8, black_rect, 0.2, 1.0)
            
            # Draw some of the parameters
            cv2.putText(frame, tot_str, (10, H - 100),
                        cv2.FONT_HERSHEY_DUPLEX, 0.6, (255, 255, 255), 1)
            cv2.putText(frame, safe_str, (10, H - 75),
                        cv2.FONT_HERSHEY_DUPLEX, 0.6, (0, 255, 0), 1)
            cv2.putText(frame, high_str, (10, H - 50),
                        cv2.FONT_HERSHEY_DUPLEX, 0.6, (0, 0, 150), 1)

            (x, y) = (boxes[i][0], boxes[i][1])
            (w, h) = (boxes[i][2], boxes[i][3])
            
            # If the index pair exists within the violation/abnormal sets, then update the color
            if status[kk] == 1:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 150), 2)

            else:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

            kk += 1
        
        # Show the output frame
        cv2.imshow('Human Distance Detection', frame)
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break

    # If an output video file path has been supplied and the video
    # writer has not been initialized, do so now        
    if writer is None:
        # Initialize our video writer
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter("Output.mp4", fourcc, 30,
                                 (frame.shape[1], frame.shape[0]), True)

    writer.write(frame)
print("The processing is over, you may open Output.mp4")
writer.release()
cap.release()
cv2.destroyAllWindows()