# Object Tracking

In [1]:
import os
os.chdir('../')
import gdown
import glob
import random
import cv2
from tqdm.auto import tqdm

%pip install ultralytics pafy youtube-dl -Uq
from ultralytics import YOLO
import pafy

In [2]:
# Cloning the DeepSort Repository
if not os.path.exists('deep_sort'):
    !git clone https://github.com/JohnPPinto/deep_sort.git

In [3]:
# Downloading the model data for the deep sort tracker
model_data_url = {'mars-small128.ckpt-68577': 'https://drive.google.com/uc?id=1hF6Cehn1SNZvh-M7FItSjEFojf_MVUba', # Model checkpoint data
                  'mars-small128.ckpt-68577.meta': 'https://drive.google.com/uc?id=1FkpWjshRY1YZC3dtQT9DNUbVZLu97uqi', # Model checkpoint meta data
                  'mars-small128.pb': 'https://drive.google.com/uc?id=1bB66hP9voDXuoBoaCcKYY7a8IYzMMs4P'} # Model data

model_dir = 'models/deep_sort'

if not os.path.exists(model_dir):
    os.makedirs(model_dir)

for k,v in model_data_url.items():
    filepath = os.path.join(model_dir, k)
    if not os.path.isfile(filepath):
        gdown.download(v, output=filepath)
        print(f'[INFO] File "{k}" has been download - "{filepath}"')

In [None]:
# Downloading a Youtube Video for testing
video_url = 'https://youtu.be/WvhYuDvH17I'
video = pafy.new(url=video_url)
stream = video.streams
for i in stream:
    print(i)

# Getting the best resolution
best_video = video.getbest()
print(best_video.resolution, best_video.extension)

# Downloading the best resolution of the video
best_video.download()

## Initializing Deep Sort Tracker

In [2]:
import numpy as np
from deep_sort.deep_sort.tracker import Tracker as DSTracker
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.tools import generate_detections as gdet



In [3]:
class DeepSortTracker:
    tracker = None
    encoder = None
    tracks = None
    
    def __init__(self):
        max_cosine_distance = 0.4
        nn_budget = None
        
        # Initializing the tracker
        model_filename = 'models/deep_sort/mars-small128.pb'
        self.encoder = gdet.create_box_encoder(model_filename=model_filename,
                                               batch_size=1)
        metric = nn_matching.NearestNeighborDistanceMetric(metric='cosine', 
                                                            matching_threshold=max_cosine_distance, 
                                                            budget=nn_budget)
        self.tracker = DSTracker(metric=metric, max_age=60)
    
    def update(self, frame, bbox_detection):
        # Getting the bounding box data
        bboxes = np.asarray([d[:-1] for d in bbox_detection])
        
        # Getting the width and height of the bounding box (bbox_detection needs to be in Pascal VOC format)
        bboxes[:, 2:] = bboxes[:, 2:] - bboxes[:, 0:2]
        
        # Getting the score of the bbox
        scores = [d[-1] for d in bbox_detection]
        
        # Generating deep sort feature for every object in the bbox
        features = self.encoder(frame, bboxes)
        detections = []
        for bbox_id, bbox in enumerate(bboxes):
            detections.append(Detection(tlwh=bbox, 
                                        confidence=scores[bbox_id], 
                                        feature=features[bbox_id]))
            
        # Performing deep sort detection and updating it for every step or frame
        self.tracker.predict()
        self.tracker.update(detections=detections)
        self.update_tracks()
        
    def update_tracks(self):
        # Updating tracker data
        tracks = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
                continue
            
            # Converting bbox data from COCO to Pascal format
            bbox = track.to_tlbr()
            
            # Getting the tracker ID
            id = track.track_id
            
            # Appending the tracker data 
            tracks.append(Track(id, bbox))
        self.tracks = tracks
        
class Track:
    track_id = None
    bbox = None
    
    def __init__(self, id, bbox):
        self.track_id = id
        self.bbox = bbox

## Implementing Deep Sort with YOLOv8

In [4]:
# Getting all the video file path
# video_path_list = glob.glob('raw_data/videos/*')
# video_filepath = random.sample(video_path_list, 1)[0]

# Using the youtube video for testing
video_filepath = 'Shopping, People, Commerce, Mall, Many, Crowd, Walking   Free Stock video footage   YouTube.mp4'

# Creating a filepath for saving the result
result_dir = 'results'
if not os.path.exists(result_dir):
    os.makedirs(result_dir)

result_filepath = os.path.join(result_dir, 'demo_result.mp4')

In [5]:
# Reading the video file 
video_reader = cv2.VideoCapture(video_filepath)
vid_wd = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
vid_ht = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

# Creating the instance for result file
video_writer = cv2.VideoWriter(result_filepath, 
                               cv2.VideoWriter_fourcc(*'MP4V'), 
                               video_reader.get(cv2.CAP_PROP_FPS),
                               (vid_wd, vid_ht))

In [7]:
# Initiating the model
model_path = 'models/exp1_yolov8m/best.pt'
model = YOLO(model_path)

# Initializing the tracker
tracker = DeepSortTracker()

In [None]:
# Reading all the frames
confidence_threshold = 0.5
progress_bar = iter(tqdm(range(total_frame_count)))
while video_reader.isOpened():
    success, frame = video_reader.read()
    if not success:
        break
    
    # Performing Yolo model prediction
    results = model(frame, verbose=False)
    
    # Getting the prediction data
    for result in results:
        det = []
        for r in result.boxes.data.tolist():
            x1, y1, x2, y2, score, class_id = r
            x1 = int(x1)
            y1 = int(y1)
            x2 = int(x2)
            y2 = int(y2)
            class_id = int(class_id)
            if score > confidence_threshold:
                det.append([x1, y1, x2, y2, score])
            
        # Performing deep sort tracking
        tracker.update(frame, det)
        for track in tracker.tracks:
            bbox = track.bbox
            x1, y1, x2, y2 = bbox.astype(int)
            track_id = track.track_id
            
            cv2.rectangle(frame, (x1, y1), (x2, y2), (255,0,0), 1)
            
            caption = 'id:' + str(track_id)
            (tw, th), _ = cv2.getTextSize(caption, cv2.FONT_HERSHEY_SIMPLEX, 1, 1)
            th = int(th * 1.2)
            cv2.rectangle(frame, (x1, y1), 
                          (x1 + tw, y1 - th if y1-10>0 else y1+10+th), (255,0,0), -1)
            cv2.putText(frame, caption, (x1, y1 if y1-10>0 else y1+15), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1)
    next(progress_bar)
    video_writer.write(frame)
video_reader.release()
video_writer.release()
print(f'[INFO] File is been saved to "{result_filepath}".')

## Creating Tracker File (Modular) - tracker_engine.py

In [11]:
%%writefile module/tracker_engine.py
import numpy as np
from deep_sort.deep_sort.tracker import Tracker as DSTracker
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.tools import generate_detections as gdet

class DeepSortTracker:
    """
    A Deep Sort tracker for tracking multiple objects.
    
    Parameters:
        max_age: A int for stating the tracking age, tracker to track the object for certain number of frames.
    """
    tracker = None
    encoder = None
    tracks = None
    
    def __init__(self, max_age=60):
        self.max_age = max_age
        max_cosine_distance = 0.4
        nn_budget = None
        
        # Initializing the tracker
        model_filename = 'models/deep_sort/mars-small128.pb'
        self.encoder = gdet.create_box_encoder(model_filename=model_filename,
                                               batch_size=1)
        metric = nn_matching.NearestNeighborDistanceMetric(metric='cosine', 
                                                            matching_threshold=max_cosine_distance, 
                                                            budget=nn_budget)
        self.tracker = DSTracker(metric=metric, max_age=self.max_age)
    
    def update(self, frame, bbox_detection):
        # Getting the bounding box data
        bboxes = np.asarray([d[:-1] for d in bbox_detection])
        
        # Getting the width and height of the bounding box (bbox_detection needs to be in Pascal VOC format)
        bboxes[:, 2:] = bboxes[:, 2:] - bboxes[:, 0:2]
        
        # Getting the score of the bbox
        scores = [d[-1] for d in bbox_detection]
        
        # Generating deep sort feature for every object in the bbox
        features = self.encoder(frame, bboxes)
        detections = []
        for bbox_id, bbox in enumerate(bboxes):
            detections.append(Detection(tlwh=bbox, 
                                        confidence=scores[bbox_id], 
                                        feature=features[bbox_id]))
            
        # Performing deep sort detection and updating it for every step or frame
        self.tracker.predict()
        self.tracker.update(detections=detections)
        self.update_tracks()
        
    def update_tracks(self):
        # Updating tracker data
        tracks = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 1:
                continue
            
            # Converting bbox data from COCO to Pascal format
            bbox = track.to_tlbr()
            
            # Getting the tracker ID
            id = track.track_id
            
            # Appending the tracker data 
            tracks.append(Track(id, bbox))
        self.tracks = tracks
        
class Track:
    track_id = None
    bbox = None
    
    def __init__(self, id, bbox):
        self.track_id = id
        self.bbox = bbox

Overwriting module/tracker_engine.py
