# Tracking Human Walking - Tracking Module (YOLOv8, DeepSORT)

## 1. Install Libraries and Trained Detection Model

In [1]:
!pip install ultralytics -q
!pip install scikit-learn numpy opencv-python tensorflow spacy -q
!pip install gdown==4.6.0 -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m722.9/722.9 kB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.7/23.7 MB[0m [31m65.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m823.6/823.6 kB[0m [31m65.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.1/14.1 MB[0m [31m89.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m731.7/731.7 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m410.6/410.6 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m121.6/121.6 MB[0m [31m14.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.5/56.5 MB[0m [31m30.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━

In [2]:
# Download Trained Detection Model
# https://drive.google.com/file/d/1PZ1L8Lsm3xN2tpiy-4tZ_e2uJ6OBrC16/view?usp=sharing
!gdown 1PZ1L8Lsm3xN2tpiy-4tZ_e2uJ6OBrC16

Downloading...
From: https://drive.google.com/uc?id=1PZ1L8Lsm3xN2tpiy-4tZ_e2uJ6OBrC16
To: /content/yolov8_mot_det.pt
100% 22.5M/22.5M [00:00<00:00, 47.4MB/s]


## 2. Install DeepSORT

Clone source code of DeepSORT from Github to folder /content of Colab

In [3]:
!git clone https://github.com/wjnwjn59/deep_sort.git

Cloning into 'deep_sort'...
remote: Enumerating objects: 167, done.[K
remote: Counting objects: 100% (25/25), done.[K
remote: Compressing objects: 100% (25/25), done.[K
remote: Total 167 (delta 9), reused 0 (delta 0), pack-reused 142[K
Receiving objects: 100% (167/167), 77.63 KiB | 19.41 MiB/s, done.
Resolving deltas: 100% (92/92), done.


Download checkpoint CNN of DeepSORT

In [4]:
!gdown --no-check-certificate --folder https://drive.google.com/open?id=18fKzfqnqhqW3s9zwsCbnVJ5XF2JFeqMp

Retrieving folder list
Retrieving folder 1VVqtL0klSUvLnmBKS89il1EKC3IxUBVK detections
Retrieving folder 1qNWOpUtKG8GqEiL-LbBdXyvifUtcbOvc MOT16_POI_test
Processing file 1aEzvFHPK-N6hqLXMqhh3i9JJzn7WFUA3 MOT16-01.npy
Processing file 1h_ktJDBIEXaSBAA-RxKNYnL9e4fp2HPd MOT16-03.npy
Processing file 1ilOElwfYZLwQKH57HoYdXfuYhpPibfqF MOT16-06.npy
Processing file 1TajzH3GbumKmtYvKBvOtGERFGD0tStwG MOT16-07.npy
Processing file 1WB9Mi4RLVPHV4_20sVq7FdoeG5JYQ_J1 MOT16-08.npy
Processing file 1mksH9GWNT7zmcuq6rlRev8pevZz8Rfsm MOT16-12.npy
Processing file 1FVVhn_IpxQ_jkYhc0CUQHSQMm1SMTEBj MOT16-14.npy
Retrieving folder 1DcOcApOkxP3NdeIUXxVF1KNex6T6YDq3 MOT16_POI_train
Processing file 1Va__9NWU2ZCmaxIq4oIabi05NYWEOk1K MOT16-02.npy
Processing file 1EH7orgDPp7kqRY5OA0hEctcEtQnYq0Ea MOT16-04.npy
Processing file 1RCfHJx5ZoUecapbZCsgp0tCEiItvLsd8 MOT16-05.npy
Processing file 1VLOvn-mbpY0Q1rsMONQZhaEQIGEmyLQL MOT16-09.npy
Processing file 1SbMhOgYPvZ84xE8lRtXc7CLXJF86lwf4 MOT16-10.npy
Processing file 1a4w-Ho

## 3. Import Libraries

In [5]:
import os
import json
import cv2
import numpy as np
from ultralytics import YOLO

## 4. Define Detector
With the YOLO model that has just been trained, we will build a Detector class for convenience in use.

This class receives as input parameter the model file path, and has a detect() method to perform detection for any image.

In [6]:
class YOLOv8:
    def __init__(self, model_path):
        self.model = YOLO(model_path)

    def detect(self, source_img):
        results = self.model.predict(source_img, verbose=False)[0]
        bboxes = results.boxes.xywh.cpu().numpy()
        bboxes[:, :2] = bboxes[:, :2] - (bboxes[:, 2:] / 2)
        scores = results.boxes.conf.cpu().numpy()
        class_ids = results.boxes.cls.cpu().numpy()

        return bboxes, scores, class_ids

## 5. Define Tracker

### Import tracking neccessary modules from source code of DeepSORT
Similar to the Detector, we will also build a class for Tracker for convenience in use.

In [7]:
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.deep_sort.tracker import Tracker
from deep_sort.tools import generate_detections as gdet

### Define DeepSORT class.

In this class, we receive input arguments are checkpoint file of DeepSORT and neccesary information of model.

In addition, we will also build a tracking() method to perform tracking with the list of results from the Detector (DeepSORT receives boulding box results in the format (x_min, y_min, width, height)).

In [8]:
class DeepSORT:
    def __init__(
        self,
        model_path='resources/networks/mars-small128.pb',
        max_cosine_distance = 0.7,
        nn_budget = None,
        classes=['objects']
    ):

        self.encoder = gdet.create_box_encoder(model_path, batch_size=1)
        self.metric = nn_matching.NearestNeighborDistanceMetric('cosine', max_cosine_distance, nn_budget)
        self.tracker = Tracker(self.metric)

        key_list = []
        val_list = []
        for ID, class_name in enumerate(classes):
            key_list.append(ID)
            val_list.append(class_name)
        self.key_list = key_list
        self.val_list = val_list

    def tracking(
        self,
        origin_frame,
        bboxes,
        scores,
        class_ids
    ):
        features = self.encoder(origin_frame, bboxes)

        detections = [Detection(bbox, score, class_id, feature)
            for bbox, score, class_id, feature in zip(bboxes, scores, class_ids, features)]

        self.tracker.predict()
        self.tracker.update(detections)

        tracked_bboxes = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 5:
                continue
            bbox = track.to_tlbr()
            class_id = track.get_class()
            conf_score = track.get_conf_score()
            tracking_id = track.track_id
            tracked_bboxes.append(
                bbox.tolist() + [class_id, conf_score, tracking_id]
            )

        tracked_bboxes = np.array(tracked_bboxes)

        return tracked_bboxes

## 6. Define Draw Tracking Result Function
From the detection result of YOLOv8 and tracking result from DeepSORT, define a function to draw predict information of 2 models, in the form of a bounding box and ID on the input images

In [9]:
def draw_detection(img, bboxes, scores, class_ids, ids,
                   classes=['objects'], mask_alpha=0.3):
    height, width = img.shape[:2]
    np.random.seed(0)
    rng = np.random.default_rng(3)
    colors = rng.uniform(0, 255, size=(len(classes), 3))

    mask_img = img.copy()
    det_img = img.copy()

    size = min([height, width]) * 0.0006
    text_thickness = int(min([height, width]) * 0.001)

    # Draw boulding boxes and labels of detections
    for bbox, score, class_id, id_ in zip(bboxes, scores, class_ids, ids):
        color = colors[class_id]

        x1, y1, x2, y2 = bbox.astype(int)

        # Draw rectangle
        cv2.rectangle(det_img, (x1, y1), (x2, y2), color, 2)

        # Draw rectangle in mask image
        cv2.rectangle(mask_img, (x1, y1), (x2, y2), color, -1)

        label = classes[class_id]
        caption = f"{label} {int(score * 100)}% ID: {id_}"
        (tw, th), _ = cv2.getTextSize(text=caption,
                                      fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                                      fontScale=size,
                                      thickness=text_thickness)
        th = int(1.2 * th)

        cv2.rectangle(det_img, (x1, y1), (x1 + tw, y1 - th), color, -1)
        cv2.rectangle(mask_img, (x1, y1), (x1 + tw, y1 - th), color, -1)
        cv2.putText(det_img, caption, (x1, y1 - 2),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    size, (255, 255, 255),
                    text_thickness, cv2.LINE_AA)

        cv2.putText(mask_img, caption, (x1, y1 - 2),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    size, (255, 255, 255),
                    text_thickness, cv2.LINE_AA)

    return cv2.addWeighted(mask_img, mask_alpha, det_img, 1 - mask_alpha, 0)

## 7. Define Video Tracking Fucntion
Define a function receives a video as input and return video with tracking result as output.

The function below will go through each frame in the video, then apply detection and tracking. The tracking results of that frame will be saved to a list.

At the same time, draw the tracking results on the frame. Synthesize 2 process and we have a video with the tracking results.

In [10]:
from inspect import findsource
def video_tracking(video_path, detector, tracker,
                   is_save_result=False, save_dir='tracking_results'):
  cap = cv2.VideoCapture(video_path)
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

  if is_save_result:
    os.makedirs(save_dir, exist_ok=True)
    # Get the video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Define the codec and create the video writer
    fourcc = cv2.VideoWriter_fourcc(*'MJPG')

    save_result_name = 'output_video.avi'
    save_result_path = os.path.join(save_dir, save_result_name)
    out = cv2.VideoWriter(save_result_path, fourcc, fps, (width, height))

  all_tracking_results = []
  tracked_ids = np.array([] , dtype=np.int32)
  while True:
    ret, frame = cap.read()
    if not ret:
      break

    detector_results = detector.detect(frame)
    bboxes, scores, class_ids = detector_results

    tracker_pred = tracker.tracking(
        origin_frame=frame,
        bboxes=bboxes,
        scores=scores,
        class_ids=class_ids
    )
    if tracker_pred.size > 0:
      bboxes = tracker_pred[:, :4]
      class_ids = tracker_pred[:, 4].astype(int)
      conf_scores = tracker_pred[:, 5]
      tracking_ids = tracker_pred[:, 6].astype(int)

      # Get new track IDs
      new_ids = np.setdiff1d(tracking_ids, tracked_ids)

      # Score new tracking IDs
      tracked_ids = np.concatenate([tracked_ids, new_ids])

      result_img = draw_detection(
          img=frame,
          bboxes=bboxes,
          scores=conf_scores,
          class_ids=class_ids,
          ids=tracking_ids,
      )
    else:
      result_img = frame

    all_tracking_results.append(tracker_pred)

    if is_save_result == 1:
      out.write(result_img)

    # Break the loop if 'q' is pressed
    if cv2.waitKey(25) & 0xFF == ord('q'):
      break

  # Release video capture
  cap.release()
  if is_save_result:
    out.release()
  cv2.destroyAllWindows()

  return all_tracking_results

## 8. Tracking

In [11]:
# Initialize detector and tracker object
from ultralytics import YOLO
yolo_model_path = 'yolov8_mot_det.pt'

detector = YOLOv8(yolo_model_path)
tracker = DeepSORT()

In [12]:
# Download a pedestrian video sample
!gdown 1sCJAZn6Ug17HWn7PmwGH1XAqjgPhr8ZQ

Downloading...
From: https://drive.google.com/uc?id=1sCJAZn6Ug17HWn7PmwGH1XAqjgPhr8ZQ
To: /content/CityRoam.mp4
100% 5.43M/5.43M [00:00<00:00, 14.4MB/s]


In [13]:
video_path = '/content/CityRoam.mp4'
all_tracking_results = video_tracking(
    video_path,
    detector,
    tracker,
    is_save_result=True
)

In [14]:
from IPython.display import HTML
from base64 import b64encode
import os

# Input video path
output_video_path = 'tracking_results/output_video.avi'

# Compressed video path
compressed_path = 'tracking_results/result_compressed.mp4'

os.system(f"ffmpeg -i {output_video_path} -vcodec libx264 {compressed_path}")

0

In [15]:
# Show video
mp4 = open(compressed_path,'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=600 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)