In [1]:
# Ultralytics YOLO 🚀, GPL-3.0 license
import torch
from numpy import random
import math
import cv2
from deep_sort_pytorch.utils.parser import get_config
from deep_sort_pytorch.deep_sort import DeepSort
from collections import deque
import numpy as np
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)


# setup line coordinates
limitsDown = [612, 1056, 1350, 1000] # down line coordinates [x1, y1, x2, y2]
limitsUp = [1440, 928, 1850, 876] # up line coordinates [x1, y1, x2, y2]



  from .autonotebook import tqdm as notebook_tqdm


In [2]:

object_counter = {} # Enter the object you want to count

object_counter1 = {} # leave the object you want to count

totalCountUp = [] # total count of objects that crossed the line going up
totalCountDown = [] # total count of objects that crossed the line going down
deepsort = None

def init_tracker():
    global deepsort
    cfg_deep = get_config()
    cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")

    deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
                            max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,
                            nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
                            max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,
                            use_cuda=True)
##########################################################################################

speed_line_queue = {}
data_deque = {}


# calculate the speed of the object
def estimate_speed(location1, location2):
    d_pixels = math.sqrt(math.pow(location2[0] - location1[0], 2) + math.pow(location2[1] - location1[1], 2))
    ppm = 8.8
    d_meters = d_pixels / ppm
    fps = 12.5
    speed = d_meters * fps * 3.6
    return int(speed)

# convert the coordinates of the bounding box to the center coordinates of the bounding box
def xyxy_to_xywh(*xyxy):
    """" Calculates the relative bounding box from absolute pixel values. """
    bbox_left = min([xyxy[0].item(), xyxy[2].item()])
    bbox_top = min([xyxy[1].item(), xyxy[3].item()])
    bbox_w = abs(xyxy[0].item() - xyxy[2].item())
    bbox_h = abs(xyxy[1].item() - xyxy[3].item())
    x_c = (bbox_left + bbox_w / 2)
    y_c = (bbox_top + bbox_h / 2)
    w = bbox_w
    h = bbox_h
    return x_c, y_c, w, h


def compute_color_for_labels(label):
    """
    Simple function that adds fixed color depending on the class
    """
    if label == 0: #person
        color = (85,45,255)
    elif label == 2: # Car
        color = (222,82,175)
    elif label == 3:  # Motobike
        color = (0, 204, 255)
    elif label == 5:  # Bus
        color = (0, 149, 255)
    else:
        color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
    return tuple(color)


# draw the bounding box and the speed of the object
def UI_box(x, img, color=None, label=None, id=id, line_thickness=None):
    tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1  # line/font thickness
    color = color or [random.randint(0, 255) for _ in range(3)]
    c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
    w, h = int(x[2] - x[0]), int(x[3] - x[1])
    cx, cy = int(x[0] + w / 2), int(x[1] + h / 2)
    cv2.circle(img, (cx, cy), 2, (0,0,255), cv2.FILLED)
    #cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)

    if label:
        tf = max(tl - 1, 1)  # font thickness
        t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
        
        img = cv2.rectangle(img, (c1[0], c1[1] - t_size[1] - 3), (c1[0] + t_size[0], c1[1] - 2), color, -1, cv2.LINE_AA)  # filled
        cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)

        # if the object is in the area of ​​the line, count the object
        if limitsUp[0] < cx < limitsUp[2] and limitsUp[1] - 50 < cy < limitsUp[3] + 50:
            
            if totalCountUp.count(id) == 0:
                totalCountUp.append(id)
                cv2.line(img,(limitsUp[0],limitsUp[1]),(limitsUp[2],limitsUp[3]),(255,255,0),5)

        # if the object is in the area of ​​the line, count the object
        if limitsDown[0] < cx < limitsDown[2] and limitsDown[1] - 50 < cy < limitsDown[3] + 50:
            
            if totalCountDown.count(id) == 0:
                totalCountDown.append(id)
                cv2.line(img,(limitsDown[0],limitsDown[1]),(limitsDown[2],limitsDown[3]),(255,255,0),5)

        


# draw the bounding box and the speed of the object
def draw_boxes(img, bbox, names,object_id, identities=None, offset=(0, 0)):

    cv2.line(img,(limitsUp[0],limitsUp[1]),(limitsUp[2],limitsUp[3]),(0,255,0),5)
    cv2.putText(img,"up",(limitsUp[0],limitsUp[1]-15),cv2.FONT_HERSHEY_SIMPLEX,1,(0,255,0),2)
    cv2.line(img,(limitsDown[0],limitsDown[1]),(limitsDown[2],limitsDown[3]),(0,0,255),5)
    cv2.putText(img,"Down",(limitsDown[0],limitsDown[1]-15),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,255),2)

    global totalCountUp
    global totalCountDown

    height, width, _ = img.shape
    for key in list(data_deque):
      if key not in identities:
        data_deque.pop(key)

    for i, box in enumerate(bbox):
        x1, y1, x2, y2 = [int(i) for i in box]
        x1 += offset[0]
        x2 += offset[0]
        y1 += offset[1]
        y2 += offset[1]

        # bounding box center
        center = int(x1+((x2-x1) / 2)), int(y1+(y2 - y1) / 2)

        # get ID of object
        id = int(identities[i]) if identities is not None else 0

        # create new buffer for new object
        if id not in data_deque:  
            data_deque[id] = deque(maxlen= 64)
            speed_line_queue[id] = []


        color = compute_color_for_labels(object_id[i])
        obj_name = names[object_id[i]]
        label = '{}{:d}'.format("", id) + ":"+ '%s' % (obj_name)

        # add center to buffer
        data_deque[id].appendleft(center)
        # if data deque has more than two value, calculate speed
        if len(data_deque[id]) >= 2:
            object_speed = estimate_speed(data_deque[id][1], data_deque[id][0])
            speed_line_queue[id].append(object_speed)
          

        try:
            label = label + " " + str(sum(speed_line_queue[id])//len(speed_line_queue[id])) + "km/h"
        except:
            pass
        
        UI_box(box, img, label=label, color=color, id=id ,line_thickness=2)
        # draw trail
        for i in range(1, len(data_deque[id])):
            # check if on buffer value is none
            if data_deque[id][i - 1] is None or data_deque[id][i] is None:
                continue
            # generate dynamic thickness of trails
            thickness = int(np.sqrt(64 / float(i + i)) * 1.5)
            # draw trails
            cv2.line(img, data_deque[id][i - 1], data_deque[id][i], color, thickness)

        # Display count in top right and left corner
        
        cv2.circle(img,(150,100),50,(0,0,255),-1)
        cv2.circle(img,(width-150,100),50,(0,255,0),-1)
        cv2.putText(img,str(len(totalCountDown)),(130,120),cv2.FONT_HERSHEY_SIMPLEX,2,(0,0,0),5)
        cv2.putText(img,str(len(totalCountUp)),(width-170,120),cv2.FONT_HERSHEY_SIMPLEX,2,(0,0,0),5)

        cv2.putText(img, "Total Down count: " , (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
        cv2.putText(img, "Total Up count: ", (width - 280, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 3)


        
    

    return img

In [None]:
import yolov7
import numpy as np
import supervision as sv
import cv2
init_tracker()

# load yolov7 model
# pip install yolov7detect
model = yolov7.load("yolov7x.pt",trace=False)#),device="cuda:0")
print("model loaded")
print("Names: ", model.names)

colors = sv.ColorPalette.default()
# initiate polygon zone
polygons = [ np.array([
    [480, 472],[600, 996],[1300, 924],[712, 448]
]),
np.array([
    [820, 464],[1364, 864],[1800, 820],[1000, 448]
])]

# extract video frame
# istanbul live stream surveillance camera url
cap = cv2.VideoCapture("https://601a55c88ec6d.streamlock.net/hls/185.stream/chunklist.m3u8")
w,h = int(cap.get(3)), int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS)
print("fps: ", cap.get(cv2.CAP_PROP_FPS))

zones = [
    sv.PolygonZone(
        polygon=polygon, 
        frame_resolution_wh=(w,h)
    )
    for polygon
    in polygons
]
zone_annotators = [
    sv.PolygonZoneAnnotator(
        zone=zone, 
        color=colors.by_idx(index), 
        thickness=4,
        text_thickness=8,
        text_scale=4
    )
    for index, zone
    in enumerate(zones)
]
box_annotators = [
    sv.BoxAnnotator(
        color=colors.by_idx(index), 
        thickness=4, 
        text_thickness=4, 
        text_scale=2
        )
    for index
    in range(len(polygons))
]

cv2.namedWindow("test", cv2.WINDOW_NORMAL)
cv2.resizeWindow("test", 1200, 650)

# video write
#fourcc = cv2.VideoWriter_fourcc(*'XVID')
#out = cv2.VideoWriter('output.avi',fourcc, fps, (w,h))

while cap.isOpened():
    ret, frame = cap.read()
    if ret:
        # detect
        results = model(frame, size=640)
        detections = sv.Detections.from_yolov5(results)
        detections = (detections[(detections.class_id == 2)] or detections[(detections.class_id == 5)] or detections[(detections.class_id == 3)] or detections[(detections.class_id == 7)])
        # 2,3,5 = person, car, bus just detect these objects
    

        xywh_bboxs = []
        confs = []
        oids = []
        outputs = []
        for xyxy, conf, cls in zip(detections.xyxy, detections.confidence, detections.class_id):
            x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy)
            xywh_obj = [x_c, y_c, bbox_w, bbox_h]
            xywh_bboxs.append(xywh_obj)
            confs.append([conf.item()])
            oids.append(int(cls))
        xywhs = torch.Tensor(xywh_bboxs)
        confss = torch.Tensor(confs)
        if xywhs.nelement() > 0:
            outputs = deepsort.update(xywhs, confss, oids, frame)
        if len(outputs) > 0:
            bbox_xyxy = outputs[:, :4]
            identities = outputs[:, -2]
            object_id = outputs[:, -1]
            
            frame = draw_boxes(frame, bbox_xyxy, model.names, object_id,identities)



        for zone, zone_annotator, box_annotator in zip(zones, zone_annotators, box_annotators):
        # annotate
            
            mask = zone.trigger(detections=detections)
            
            detections_filtered = detections[mask]

            #labels = [f"{model.names[class_id]} {confidence:0.2f}" for _, confidence, class_id, _ in detections]
            frame = box_annotator.annotate(scene=frame, detections=detections_filtered, skip_label=False)
            frame = zone_annotator.annotate(scene=frame)
            #print(zone_count)
            #cv2.circle(frame,(815,191),50,(255,255,255),-1)
            #cv2.putText(frame,str(zone_count),(800,185),cv2.FONT_HERSHEY_SIMPLEX,2,(0,0,0),5)

        # write video
        #out.write(frame)
        cv2.imshow("test", frame)
        if cv2.waitKey(30) & 0xFF == ord('q'):
            break
    else:
        break

cap.release()
cv2.destroyAllWindows()