In [8]:
import numpy as np
from scipy.spatial.distance import cdist

class Tracker:
    MAX_DISAPPEAR_LIMIT = 5
    def __init__(self):
        self.next_unique_id = 0
        self.trackers = {}
        self.disappear_trackers = {}
        self.tracked_bboxes = {}

    def init_object(self,centroid,boxes):
        global next_unique_id
        self.trackers[self.next_unique_id] = centroid
        self.tracked_bboxes[self.next_unique_id] = boxes
        self.disappear_trackers[self.next_unique_id] = 0
        self.next_unique_id+=1

    def del_object(self,track_id):
        del self.trackers[track_id]
        del self.tracked_bboxes[track_id]
        del self.disappear_trackers[track_id]

    def update_object(self,bboxes):

        if(len(bboxes)==0):

            for oid in list(self.disappear_trackers.keys()):
                self.disappear_trackers[oid]+=1

                if self.disappear_trackers[oid] > Tracker.MAX_DISAPPEAR_LIMIT:
                    self.del_object(oid)

            return self.tracked_bboxes

        else:
            input_centroids = np.zeros((len(bboxes),2))
            for i in range(len(bboxes)):
                x,y,w,h = bboxes[i][0],bboxes[i][1],bboxes[i][2],bboxes[i][3]
                cx,cy = x + w/2 , y + h/2
                input_centroids[i] = (cx,cy)


            if(len(self.trackers)==0):
                for i in range(len(input_centroids)):
                    self.init_object(input_centroids[i],bboxes[i])

            else:

                tracker_centroids = list(self.trackers.values())

                distance_matrix = cdist(np.array(tracker_centroids) , input_centroids)

                rows = distance_matrix.min(axis=1).argsort()
                cols = distance_matrix.argmin(axis=1)[rows]

                usedRows = set()
                usedCols = set()

                tracker_ids = list(self.trackers.keys())
                for row,col in zip(rows,cols):
                    if row in usedRows or col in usedCols:
                        continue
                    track_id = tracker_ids[row]

                    self.trackers[track_id] = input_centroids[col]
                    self.tracked_bboxes[track_id] = bboxes[col]

                    self.disappear_trackers[track_id] = 0
                    usedRows.add(row)
                    usedCols.add(col)
                unusedRows = set(range(0,distance_matrix.shape[0])).difference(usedRows)
                unusedCols = set(range(0,distance_matrix.shape[1])).difference(usedCols)
                if(distance_matrix.shape[0]>=distance_matrix.shape[1]):

                    for r in unusedRows:
                        track_id = tracker_ids[r]
                        self.disappear_trackers[track_id]+=1
                        if(self.disappear_trackers[track_id] > Tracker.MAX_DISAPPEAR_LIMIT):
                            self.del_object(track_id)
                else:
                    for c in unusedCols:
                        self.init_object(input_centroids[c],bboxes[c])

        return self.tracked_bboxes

In [9]:
import cv2
import numpy as np

class YoloDetection():
    def __init__(self, model_path: str, config: str, classes: str, width: int, height: int,
                 scale=0.00392, thr=0.5, nms=0.4, backend=0,
                 framework=3,
                 target=0, mean=[0, 0, 0]):

        super(YoloDetection,self).__init__()
        choices = ['caffe', 'tensorflow', 'torch', 'darknet']
        backends = (
            cv2.dnn.DNN_BACKEND_DEFAULT, cv2.dnn.DNN_BACKEND_HALIDE, cv2.dnn.DNN_BACKEND_INFERENCE_ENGINE,
            cv2.dnn.DNN_BACKEND_OPENCV)
        targets = (
            cv2.dnn.DNN_TARGET_CPU, cv2.dnn.DNN_TARGET_OPENCL, cv2.dnn.DNN_TARGET_OPENCL_FP16, cv2.dnn.DNN_TARGET_MYRIAD)

        self.__confThreshold = thr
        self.__nmsThreshold = nms
        self.__mean = mean
        self.__scale = scale
        self.__width = width
        self.__height = height

        self.__net = cv2.dnn.readNet(model_path, config, choices[framework])
        self.__net.setPreferableBackend(backends[backend])
        self.__net.setPreferableTarget(targets[target])
        self.__classes = None

        if classes:
            with open(classes, 'rt') as f:
                self.__classes = f.read().rstrip('\n').split('\n')


    def get_output_layers_name(self, net):
        all_layers_names = net.getLayerNames()
        return [all_layers_names[i-1] for i in net.getUnconnectedOutLayers()]

    def post_process_output(self, frame, outs):
        frame_height = frame.shape[0]
        frame_width = frame.shape[1]

        class_ids = []
        confidences = []
        boxes = []

        class_ids = []
        confidences = []
        boxes = []
        for out in outs:
            for detection in out:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]
                if confidence > self.__confThreshold:
                    center_x = int(detection[0] * frame_width)
                    center_y = int(detection[1] * frame_height)
                    width = int(detection[2] * frame_width)
                    height = int(detection[3] * frame_height)
                    left = center_x - width / 2
                    top = center_y - height / 2
                    class_ids.append(class_id)
                    confidences.append(float(confidence))
                    boxes.append([left, top, width, height])


        indices = cv2.dnn.NMSBoxes(boxes, confidences, self.__confThreshold, self.__nmsThreshold)
        return (indices, boxes, confidences, class_ids)

    def process_frame(self, frame: np.ndarray):
        frame_height = frame.shape[0]
        frame_width = frame.shape[1]

        blob = cv2.dnn.blobFromImage(frame, self.__scale, (self.__width, self.__height), self.__mean, True, crop=False)

        self.__net.setInput(blob)
        outs = self.__net.forward(self.get_output_layers_name(self.__net))
        (indices, boxes, confidences, class_ids) = self.post_process_output(frame, outs)
        detected_objects = []

        for i in indices:

            box = boxes[i]
            left = box[0]
            top = box[1]
            width = box[2]
            height = box[3]
            x = int(left)
            y = int(top)
            nw = int(width)
            nh = int(height)
            if x < 0:
                x = 0
            if y < 0:
                y = 0
            if x + nw > frame_width:
                nw = frame_width - x
            if y + nh > frame_height:
                nh = frame_height - y
            detected_objects.append([self.__classes[class_ids[i]], x, y, nw, nh, confidences[i]])
        return detected_objects

In [10]:
import cv2
from google.colab.patches import cv2_imshow

CONFIG_FILE = None
model = None

def load_config(config_path):
    global CONFIG_FILE
    CONFIG_FILE = eval(open(config_path).read())

def load_model():
    global model
    model = YoloDetection(CONFIG_FILE["model-parameters"]["model-weights"],
                    CONFIG_FILE["model-parameters"]["model-config"],
                    CONFIG_FILE["model-parameters"]["model-names"],
                    CONFIG_FILE["shape"][0],
                    CONFIG_FILE["shape"][1])

def start_detection(media_path):
    tracker = Tracker()
    cap = cv2.VideoCapture(media_path)

    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter('tracked_output.mp4', fourcc, fps, (width, height))

    ret = True
    while ret:
        ret , frame = cap.read()
        if not ret:
            break

        detections = model.process_frame(frame)
        tracker_res = tracker.update_object([ x[1:5] for x in detections ])

        for id,boxes in tracker_res.items():
            x,y = (int(boxes[0]), int(boxes[1]))
            w,h = (int(boxes[2]), int(boxes[3]))
            cv2.rectangle(frame,(x,y),(x+w,y+h),thickness=2,color=(255,0,0))
            cv2.putText(frame,str(id),(x,y-20),cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

        out.write(frame)

    out.release()
    cap.release()

if __name__=="__main__":
    config_path = '/content/config.json'
    load_config(config_path)
    load_model()
    start_detection('/content/los_angeles.mp4')
    print('Трекинг сохранен в файл tracked_output.mp4')

Трекинг сохранен в файл tracked_output.mp4
