# 0.Dependencies

In [1]:
%%capture
!pip install ultralytics

In [2]:
import torch
import numpy as np
from collections import deque
import cv2
import matplotlib.pyplot as plt
from ultralytics import YOLO

In [42]:
MODEL_TYPE = 'yolov5m.pt'
OBJ_CONFIDENCE = 0.4
IMAGE_SIZE = 640
MAX_AGE = 20
MIN_HITS = 5
CLASSES = {'Bicycle': 1,
           'Car': 2,
           'Motorcycle': 3,
           'Bus': 5,
           'Truck': 7}
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
if DEVICE == 'cuda':
    print("GPU")
    torch.cuda.set_device(0)

INPUT_VIDEO = "input.mp4"

GPU


# 1.Tracker(Simple Online Real-Time Tracker)

In [43]:
class KalmanFilter:
    """
    Class for storing tracked object state using Kalman Filter.
    """
    count = 0
    min_hits = 0
    base_classes = {}

    def __init__(self, labels):
        # Initialize Kalman filter parameters
        self.kf = cv2.KalmanFilter(4, 2)
        self.state = np.zeros((4, 1), dtype=np.float32)
        self.meas = np.zeros((2, 1), dtype=np.float32)

        self.kf.measurementMatrix = np.array([[1, 0, 0, 0],
                                              [0, 1, 0, 0]], np.float32)

        self.kf.transitionMatrix = np.array([[1, 0, 1, 0],
                                             [0, 1, 0, 1],
                                             [0, 0, 1, 0],
                                             [0, 0, 0, 1]], np.float32)

        self.kf.processNoiseCov = np.array([[1, 0, 0, 0],
                                            [0, 1, 0, 0],
                                            [0, 0, 1, 0],
                                            [0, 0, 0, 1]], np.float32) * 0.03

        # Initial state
        self.state.itemset(0, float(labels[1] + labels[3] / 2.))
        self.state.itemset(1, float(labels[2] + labels[4] / 2.))
        self.state.itemset(2, float(labels[3]))
        self.state.itemset(3, float(labels[4]))

        self.kf.statePre = np.array([[self.state.flatten()[0]], [self.state.flatten()[1]], [0], [0]], np.float32)
        self.kf.statePost = np.array([[self.state.flatten()[0]], [self.state.flatten()[1]], [0], [0]], np.float32)


        self.classes = deque(maxlen=30)  # Track class history
        self.classes.append(labels[0])
        self.hits = 0
        self.time_since_update = 0

        self.id = 0

    def predict(self):
        """
        Predicts the next state of the object based on the current state.
        """
        pred = self.kf.predict()
        self.state.itemset(0, pred[0])
        self.state.itemset(1, pred[1])
        self.hits += 1
        self.time_since_update += 1

    def update(self, labels):
        """
        Updates the state of the object based on the measurement.
        """
        self.meas.itemset(0, labels[1] + labels[3] / 2)
        self.meas.itemset(1, labels[2] + labels[4] / 2)
        self.kf.correct(self.meas)
        self.state.itemset(0, labels[1] + labels[3] / 2)
        self.state.itemset(1, labels[2] + labels[4] / 2)
        self.state.itemset(2, labels[3])
        self.state.itemset(3, labels[4])


        self.classes.append(labels[0])

        self.hits += 1
        self.time_since_update = 0

        if self.hits > KalmanFilter.min_hits and self.id == 0:
            self.id = KalmanFilter.count + 1
            KalmanFilter.count += 1

    def get_bbox(self):
        """
        Returns the current location of the object.
        """
        cx, cy, w, h = self.state.flatten()[:4]
        return [cx - w / 2, cy - h / 2, cx + w/2, cy + h/2]

    def get_class_id(self):
        """
        Returns the class of the object (most frequent in history).
        """
        cls = max(set(self.classes), key=self.classes.count)
        return cls

In [44]:
class Tracker:
    def __init__(self, max_age, min_hits, classes):
        self.trackers = []
        self.max_age = max_age
        self.min_hits = min_hits
        self.classes = classes
        self.color_map = self.generate_color_palette()

        KalmanFilter.min_hits = min_hits
        KalmanFilter.base_classes = classes
        KalmanFilter.count = 0

    def generate_color_palette(self):
        # Use a colormap to generate distinct colors
        cmap = plt.get_cmap('tab20')  # or any other colormap
        colors = {cls: cmap(i) for i, cls in zip(np.linspace(0, 1, len(self.classes)), self.classes)}
        return colors

    def update(self, detections):
        """
        "Updates" the tracker state based on the given detections.
        """

        for tracker in self.trackers:
            tracker.predict()

        num_trackers = self.trackers.__len__()
        num_detections = detections.__len__()

        assigned_detections = np.zeros(num_detections)

        # Solve the assignment problem (Assigning best trackers to detections)
        if num_trackers > 0 and num_detections > 0:
            for tracker in self.trackers:
                for j, detection in enumerate(detections):
                    box = np.array([detection[1], detection[2], detection[1]+detection[3], detection[2]+detection[4]])
                    if self.compute_iou(tracker.get_bbox(), box) > 0.3:
                        tracker.update(detection)
                        assigned_detections[j] = 1
                else:
                    tracker.time_since_update += 1
        elif num_trackers != 0:
            for tracker in self.trackers:
                tracker.time_since_update += 1

        # Add new trackers for the unassigned detections
        for j in range(num_detections):
            if assigned_detections[j]==0:
                self.trackers.append(KalmanFilter(detections[j]))

        # Filter the old trackers out
        self.trackers = [tracker for tracker in self.trackers if tracker.time_since_update < self.max_age]

    @staticmethod
    def compute_iou(box1, box2):
        """
        Computes the Intersection over Union (IoU) between two bounding boxes.
        """

        x11, y11, x12, y12 = box1
        x21, y21, x22, y22 = box2

        xi1 = max(x11, x21)
        yi1 = max(y11, y21)
        xi2 = min(x12, x22)
        yi2 = min(y12, y22)
        inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)

        box1_area = (x12 - x11) * (y12 - y11)
        box2_area = (x22 - x21) * (y22 - y21)
        union_area = box1_area + box2_area - inter_area

        return inter_area / (union_area + 0.000001)


    def draw(self, img):
        for tracker in self.trackers:
            if tracker.hits > self.min_hits:
                box = tracker.get_bbox()
                color = [255*clr for clr in self.color_map[tracker.get_class_id()]]
                cv2.rectangle(img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), color, 1)
                cv2.putText(img, str(tracker.id), (int(box[0]), int(box[1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 1)

# 2.Tracking

In [46]:
def get_detections(model, img, img_size, confidence):
    input = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    results = model(input, imgsz=img_size, conf=confidence, verbose=False)

    final_labels = []
    # Display the results
    for result in results:
        boxes = result.boxes.xyxy  # x1, y1, x2, y2
        confidences = result.boxes.conf
        class_ids = result.boxes.cls

        for box, confidence, class_id in zip(boxes, confidences, class_ids):
            if class_id == 1:  # Class ID for Bicycle in COCO dataset
                x1, y1, x2, y2 = map(int, box)
                final_labels.append(['Bicycle', x1, y1, x2-x1, y2-y1])
            elif class_id == 2:  # Class ID for Car in COCO dataset
                x1, y1, x2, y2 = map(int, box)
                final_labels.append(['Car', x1, y1, x2-x1, y2-y1])
            elif class_id == 3:  # Class ID for Motorcycle in COCO dataset
                x1, y1, x2, y2 = map(int, box)
                final_labels.append(['Motorcycle', x1, y1, x2-x1, y2-y1])
            elif class_id == 5:  # Class ID for Bus in COCO dataset
                x1, y1, x2, y2 = map(int, box)
                final_labels.append(['Bus', x1, y1, x2-x1, y2-y1])
            elif class_id == 7:  # Class ID for Truck in COCO dataset
                x1, y1, x2, y2 = map(int, box)
                final_labels.append(['Truck', x1, y1, x2-x1, y2-y1])

    return final_labels

In [None]:
model = YOLO(MODEL_TYPE)
tracker = Tracker(MAX_AGE, MIN_HITS, CLASSES)

cap = cv2.VideoCapture(INPUT_VIDEO)
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
size = (frame_width, frame_height)
# result = cv2.VideoWriter('result.avi',
#                          cv2.VideoWriter_fourcc(*'MJPG'),
#                          10, size)
while cap.isOpened():
    ret, frame = cap.read()
    if ret:
        labels = get_detections(model, frame, IMAGE_SIZE, OBJ_CONFIDENCE)
        tracker.update(labels)
        tracker.draw(frame)
        #result.write(frame)
        cv2.imshow('frame', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        break
cap.release()
cv2.destroyAllWindows()