<a href="https://colab.research.google.com/github/iammuhammad41/Social-Distancing-in-COVID-19/blob/main/social_distancing_detector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

!pip install imutils
import imutils

from scipy.spatial import distance as dist
from collections import OrderedDict
import time

In [None]:
class CentroidTracker():
    def __init__(self, maxDisappeared=50):

        self.nextObjectID = 0
        self.objects = OrderedDict()
        self.disappeared = OrderedDict()


        self.maxDisappeared = maxDisappeared

    def register(self, centroid):

        self.objects[self.nextObjectID] = centroid
        self.disappeared[self.nextObjectID] = 0
        self.nextObjectID += 1

    def deregister(self, objectID):

        del self.objects[objectID]
        del self.disappeared[objectID]

    def update(self, rects):

        if len(rects) == 0:

            for objectID in list(self.disappeared.keys()):
                self.disappeared[objectID] += 1


                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)


            return self.objects

        inputCentroids = np.zeros((len(rects), 2), dtype="int")

        for (i, (startX, startY, endX, endY)) in enumerate(rects):
            cX = int(startX - (endX/2))
            cY = int(startY - (endY/2))
            inputCentroids[i] = (cX, cY)

        if len(self.objects) == 0:
            for i in range(0, len(inputCentroids)):
                self.register(inputCentroids[i])


        else:
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())

            D = dist.cdist(np.array(objectCentroids), inputCentroids)


            rows = D.min(axis=1).argsort()


            cols = D.argmin(axis=1)[rows]


            usedRows = set()
            usedCols = set()


            for (row, col) in zip(rows, cols):

                if row in usedRows or col in usedCols:
                    continue


                objectID = objectIDs[row]
                self.objects[objectID] = inputCentroids[col]
                self.disappeared[objectID] = 0

                usedRows.add(row)
                usedCols.add(col)

            unusedRows = set(range(0, D.shape[0])).difference(usedRows)
            unusedCols = set(range(0, D.shape[1])).difference(usedCols)

            if D.shape[0] >= D.shape[1]:
                for row in unusedRows:

                    objectID = objectIDs[row]
                    self.disappeared[objectID] += 1


                    if self.disappeared[objectID] > self.maxDisappeared:
                        self.deregister(objectID)

            else:
                for col in unusedCols:
                    self.register(inputCentroids[col])

        return self.objects

In [None]:
class utills:


    def get_transformed_points(boxes, prespective_transform):

        rects = []
        bottom_points = []

        for box in boxes:
            pnts = np.array([[[int(box[0]+(box[2]*0.5)),int(box[1]+box[3])]]] , dtype="float32")
            bd_pnt = cv2.perspectiveTransform(pnts, prespective_transform)[0][0]
            pnt = [int(bd_pnt[0]), int(bd_pnt[1])]
            pnt_bird = [int(bd_pnt[0]), int(bd_pnt[1]), 0, 0]

            bottom_points.append(pnt)
            rects.append(np.array(pnt_bird))

        return bottom_points, rects

    def cal_dis(p1, p2, distance_w, distance_h):

        h = abs(p2[1]-p1[1])
        w = abs(p2[0]-p1[0])

        dis_w = float((w/distance_w)*180)
        dis_h = float((h/distance_h)*180)

        return int(np.sqrt(((dis_h)**2) + ((dis_w)**2)))


    def get_distances(boxes1, bottom_points, distance_w, distance_h):

        distance_mat = []
        bxs = []

        for i in range(len(bottom_points)):
            for j in range(len(bottom_points)):
                if i != j:
                    dist = utills.cal_dis(bottom_points[i], bottom_points[j], distance_w, distance_h)
                    if dist <= 150:
                        closeness = 0
                        distance_mat.append([bottom_points[i], bottom_points[j], closeness])
                        bxs.append([boxes1[i], boxes1[j], closeness])
                    # elif dist > 150 and dist <=180:
                    #     closeness = 1
                    #     distance_mat.append([bottom_points[i], bottom_points[j], closeness])
                    #     bxs.append([boxes1[i], boxes1[j], closeness])
                    else:
                        closeness = 2
                        distance_mat.append([bottom_points[i], bottom_points[j], closeness])
                        bxs.append([boxes1[i], boxes1[j], closeness])

        return distance_mat, bxs

    def get_scale(W, H):

        dis_w = 387
        dis_h = 580

        return float(dis_w/W),float(dis_h/H)

    def get_count(distances_mat):

        r = []
        g = []
        #y = []

        for i in range(len(distances_mat)):

            if distances_mat[i][2] == 0:
                if (distances_mat[i][0] not in r) and (distances_mat[i][0] not in g):
                    r.append(distances_mat[i][0])
                if (distances_mat[i][1] not in r) and (distances_mat[i][1] not in g):
                    r.append(distances_mat[i][1])

        # for i in range(len(distances_mat)):

        #     if distances_mat[i][2] == 1:
        #         if (distances_mat[i][0] not in r) and (distances_mat[i][0] not in g) and (distances_mat[i][0] not in y):
        #             y.append(distances_mat[i][0])
        #         if (distances_mat[i][1] not in r) and (distances_mat[i][1] not in g) and (distances_mat[i][1] not in y):
        #             y.append(distances_mat[i][1])

        for i in range(len(distances_mat)):

            if distances_mat[i][2] == 2:
                if (distances_mat[i][0] not in r) and (distances_mat[i][0] not in g):
                    g.append(distances_mat[i][0])
                if (distances_mat[i][1] not in r) and (distances_mat[i][1] not in g):
                    g.append(distances_mat[i][1])

        #return (len(r),len(y),len(g))
        return (len(r), len(g))

In [None]:
class plot:

    def bird_eye_view(frame, distances_mat, bottom_points, scale_w, scale_h, risk_count, objects):
        h = frame.shape[0]
        w = frame.shape[1]

        red = (0, 0, 255)
        green = (0, 255, 0)
        #yellow = (0, 255, 255)
        white = (200, 200, 200)
        black = (0,0,0)



        blank_image = np.zeros((int(h * scale_h), int(w * scale_w), 3), np.uint8)
        blank_image[:] = white
        red_image = blank_image.copy()

        warped_pts = []
        r = []
        g = []

        for i in range(len(distances_mat)):
            if distances_mat[i][2] == 0:
                if (distances_mat[i][0] not in r) and (distances_mat[i][0] not in g):
                    r.append(distances_mat[i][0])
                if (distances_mat[i][1] not in r) and (distances_mat[i][1] not in g):
                    r.append(distances_mat[i][1])

                blank_image = cv2.line(blank_image, (int(distances_mat[i][0][0] * scale_w),
                                                     int(distances_mat[i][0][1] * scale_h)),
                                       (int(distances_mat[i][1][0] * scale_w),
                                        int(distances_mat[i][1][1]* scale_h)), red, 2)

        for i in range(len(distances_mat)):
            if distances_mat[i][2] == 2:
                if (distances_mat[i][0] not in r) and (distances_mat[i][0] not in g):
                    g.append(distances_mat[i][0])
                if (distances_mat[i][1] not in r) and (distances_mat[i][1] not in g):
                    g.append(distances_mat[i][1])

        for i in bottom_points:
            blank_image = cv2.circle(blank_image, (int(i[0]  * scale_w), int(i[1] * scale_h)), 5, green, 10)
        for i in r:
            blank_image = cv2.circle(blank_image, (int(i[0]  * scale_w), int(i[1] * scale_h)), 5, red, 10)
            red_image = cv2.circle(red_image, (int(i[0]  * scale_w), int(i[1] * scale_h)), 10, red, 10)


        for (objectID, centroid) in objects.items():
            text = "ID {}".format(objectID)
            cv2.putText(blank_image, text, (int(centroid[0] * scale_w) - 10, int(centroid[1] * scale_h) - 15),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)


        return blank_image, red_image


    def social_distancing_view(frame, distances_mat, boxes, risk_count, bird_view):

        red = (0, 0, 255)
        green = (0, 255, 0)
        #yellow = (0, 255, 255)

        for i in range(len(boxes)):
            x,y,w,h = boxes[i][:]
            frame = cv2.rectangle(frame,(x,y),(x+w,y+h),green,2)

        for i in range(len(distances_mat)):
            per1 = distances_mat[i][0]
            per2 = distances_mat[i][1]
            closeness = distances_mat[i][2]

            if closeness == 0:
                x,y,w,h = per1[:]
                frame = cv2.rectangle(frame,(x,y),(x+w,y+h),red,2)

                x1,y1,w1,h1 = per2[:]
                frame = cv2.rectangle(frame,(x1,y1),(x1+w1,y1+h1),red,2)

                frame = cv2.line(frame, (int(x+w/2), int(y+h/2)), (int(x1+w1/2), int(y1+h1/2)),red, 2)


            pad = np.full((100,frame.shape[1],3), [250, 250, 250], dtype=np.uint8)

            cv2.putText(pad, "Jumlah Orang Terdeteksi : " + str(risk_count[0] + risk_count[1]) + " Orang", (100, 40),cv2.FONT_HERSHEY_SIMPLEX, 0.5, (100, 100, 0), 2)
            cv2.putText(pad, "Jumlah Pelanggaran Social Distancing : " + str(risk_count[0]) + " Orang", (100, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)

        frame = np.vstack((frame,pad))
        frame = np.hstack((frame, bird_view))

        return frame

In [None]:
def calculate_social_distancing(vid_path, net, ln1, points):


    count = 0
    vs = cv2.VideoCapture(vid_path)

    height = int(vs.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(vs.get(cv2.CAP_PROP_FRAME_WIDTH))
    fps = int(vs.get(cv2.CAP_PROP_FPS))

    scale_w, scale_h = utills.get_scale(width, height)

    fourcc = cv2.VideoWriter_fourcc(*"MJPG")
    output_movie = cv2.VideoWriter("perspective_view.avi", fourcc, fps, (1464, 720), True)

    #points = []


    RED_IMAGE = []

    global image

    while True:

        (grabbed, frame) = vs.read()

        if not grabbed:
            print("[INFO] Processing done...")
            break

        (H, W) = frame.shape[:2]


        src = np.float32(np.array(points[:4]))
        dst = np.float32([[0, H], [W, H], [W, 0], [0, 0]])
        prespective_transform = cv2.getPerspectiveTransform(src, dst)

        inv_trans = np.linalg.pinv(prespective_transform)

        pts = np.float32(np.array([points[4:7]]))
        warped_pt = cv2.perspectiveTransform(pts, prespective_transform)[0]

        distance_w = np.sqrt((warped_pt[0][0] - warped_pt[1][0]) ** 2 + (warped_pt[0][1] - warped_pt[1][1]) ** 2)
        distance_h = np.sqrt((warped_pt[0][0] - warped_pt[2][0]) ** 2 + (warped_pt[0][1] - warped_pt[2][1]) ** 2)


        pnts = np.array(points[:4], np.int32)
        cv2.polylines(frame, [pnts], True, (70, 70, 70), thickness=2)

        blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
        net.setInput(blob)
        start = time.time()
        layerOutputs = net.forward(ln1)
        end = time.time()

        boxes = []
        confidences = []
        classIDs = []
        rects = []

        for output in layerOutputs:
            for detection in output:
                scores = detection[5:]
                classID = np.argmax(scores)
                confidence = scores[classID]

                if classID == 0:

                    if confidence > confid:

                        box = detection[0:4] * np.array([W, H, W, H])
                        (centerX, centerY, width, height) = box.astype("int")

                        x = int(centerX - (width / 2))
                        y = int(centerY - (height / 2))

                        boxes.append([x, y, int(width), int(height)])
                        confidences.append(float(confidence))
                        classIDs.append(classID)

        idxs = cv2.dnn.NMSBoxes(boxes, confidences, confid, thresh)
        font = cv2.FONT_HERSHEY_PLAIN
        boxes1 = []

        for i in range(len(boxes)):
            if i in idxs:
                boxes1.append(boxes[i])
                x,y,w,h = boxes[i]

        if len(boxes1) == 0:
            count = count + 1
            continue


        person_points, rects = utills.get_transformed_points(boxes1, prespective_transform)

        objects = ct.update(rects)


        frame1 = np.copy(frame)

        distances_mat, bxs_mat = utills.get_distances(boxes1, person_points, distance_w, distance_h)
        risk_count = utills.get_count(distances_mat)

        print("Jumlah R :", len(RED_IMAGE))
        bird_image, red_image = plot.bird_eye_view(frame, distances_mat, person_points, scale_w, scale_h,
                                        risk_count, objects)
        RED_IMAGE.append(red_image)

        for red_image in RED_IMAGE:
            bird_image = cv2.addWeighted(red_image, 0.4, bird_image, 1 - 0.4, 0)

        img = plot.social_distancing_view(frame1, bxs_mat, boxes1, risk_count, bird_image)


        img = imutils.resize(img, height=720)

        if count != 0:
            output_movie.write(img)
            #bird_movie.write(bird_image)

            #cv2.imshow('Bird Eye View', bird_image)
            #cv2.imwrite("frame%d.jpg" % count, img)
            cv2.imwrite("Bird%d.jpg" % count, bird_image)

        count = count + 1

In [2]:
ct = CentroidTracker()

confid = 0.5
thresh = 0.5


# pts = [(27.29, 559.83), (1408.92, 815.57), (1957.83, 30.41), (1182.81, 30.41),
#        (909.91, 642.47), (1057.27, 674.44), (1013.61, 559.83)]

pts = [(0, 236.2), (615.86, 346.92), (853.5, 7), (500.25, 7),
       (382.11, 271.98), (445.91, 285.82), (427, 236.2)]

weightsPath = "../input/yolo-coco-data/yolov3.weights"
configPath = "../input/yolo-coco-data/yolov3.cfg"

net_yl = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
ln = net_yl.getLayerNames()
ln1 = [ln[i[0] - 1] for i in net_yl.getUnconnectedOutLayers()]

net_yl.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net_yl.setPreferableTarget(cv2.dnn.DNN_TARGET_OPENCL_FP16)

video_path = "../input/social-distancing/Pedestrian 480p.mp4"

t1 = cv2.getTickCount()

calculate_social_distancing(video_path, net_yl, ln1=ln1, points=pts)

t2 = cv2.getTickCount()

t3 = (t2 - t1)/ cv2.getTickFrequency()

print("Waktu yang dibutuhkan :", t3)

In [None]:
R = []
if len(R) == 0:
    print("aaaa")