In [1]:
import os

HOME_DIR = os.getcwd()

# Installing YOLOv7

In [2]:
if not (os.path.exists(f'{HOME_DIR}/yolov7')):
    !git clone https://github.com/WongKinYiu/yolov7.git
    %cd yolov7
    !pip install -r requirements.txt
else:
    print("Directory Already Exists!")

Directory Already Exists!


# Installing ByteTrack

In [3]:
if not (os.path.exists(f'{HOME_DIR}/ByteTrack')):
    !git clone https://github.com/ifzhang/ByteTrack.git
    os.chdir(f"{HOME_DIR}/ByteTrack")
    !pip3 install -r requirements.txt
    !python setup.py develop
    !pip3 install cython
    !pip install libpython
    !pip install -e git+https://github.com/samson-wang/cython_bbox.git#egg=cython-bbox
else:
    print("Directory Already Exists!")

Directory Already Exists!


In [4]:
import sys

sys.path.append(f"{HOME_DIR}/ByteTrack")

We have to modify the byte_tracker.py file to match the detection format of our model.

In [5]:
# %load ByteTrack/yolox/tracker/byte_tracker.py
import numpy as np
from collections import deque
import os
import os.path as osp
import copy
import torch
import torch.nn.functional as F

from yolov7.utils.general import xywh2xyxy, xyxy2xywh
from yolox.tracker.kalman_filter import KalmanFilter
from yolox.tracker import matching
from ByteTrack.yolox.tracker.basetrack import BaseTrack, TrackState

class STrack(BaseTrack):
    shared_kalman = KalmanFilter()
    def __init__(self, tlwh, score, cls):
        self._tlwh = np.asarray(tlwh, dtype=np.float)
        self.kalman_filter = None
        self.mean, self.covariance = None, None
        self.is_activated = False
        self.score = score
        self.tracklet_len = 0
        self.cls = cls

    def predict(self):
        mean_state = self.mean.copy()
        if self.state != TrackState.Tracked:
            mean_state[7] = 0
        self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)

    @staticmethod
    def multi_predict(stracks):
        if len(stracks) > 0:
            multi_mean = np.asarray([st.mean.copy() for st in stracks])
            multi_covariance = np.asarray([st.covariance for st in stracks])
            for i, st in enumerate(stracks):
                if st.state != TrackState.Tracked:
                    multi_mean[i][7] = 0
            multi_mean, multi_covariance = STrack.shared_kalman.multi_predict(multi_mean, multi_covariance)
            for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
                stracks[i].mean = mean
                stracks[i].covariance = cov

    def activate(self, kalman_filter, frame_id):
        """Start a new tracklet"""
        self.kalman_filter = kalman_filter
        self.track_id = self.next_id()
        self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh))

        self.tracklet_len = 0
        self.state = TrackState.Tracked
        if frame_id == 1:
            self.is_activated = True
        # self.is_activated = True
        self.frame_id = frame_id
        self.start_frame = frame_id

    def re_activate(self, new_track, frame_id, new_id=False):
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
        )
        self.tracklet_len = 0
        self.state = TrackState.Tracked
        self.is_activated = True
        self.frame_id = frame_id
        if new_id:
            self.track_id = self.next_id()
        self.score = new_track.score

    def update(self, new_track, frame_id):
        """
        Update a matched track
        :type new_track: STrack
        :type frame_id: int
        :type update_feature: bool
        :return:
        """
        self.frame_id = frame_id
        self.tracklet_len += 1

        new_tlwh = new_track.tlwh
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh))
        self.state = TrackState.Tracked
        self.is_activated = True

        self.score = new_track.score

    @property
    # @jit(nopython=True)
    def tlwh(self):
        """Get current position in bounding box format `(top left x, top left y,
                width, height)`.
        """
        if self.mean is None:
            return self._tlwh.copy()
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    @property
    # @jit(nopython=True)
    def tlbr(self):
        """Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
        `(top left, bottom right)`.
        """
        ret = self.tlwh.copy()
        ret[2:] += ret[:2]
        return ret

    @staticmethod
    # @jit(nopython=True)
    def tlwh_to_xyah(tlwh):
        """Convert bounding box to format `(center x, center y, aspect ratio,
        height)`, where the aspect ratio is `width / height`.
        """
        ret = np.asarray(tlwh).copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret

    def to_xyah(self):
        return self.tlwh_to_xyah(self.tlwh)

    @staticmethod
    # @jit(nopython=True)
    def tlbr_to_tlwh(tlbr):
        ret = np.asarray(tlbr).copy()
        ret[2:] -= ret[:2]
        return ret

    @staticmethod
    # @jit(nopython=True)
    def tlwh_to_tlbr(tlwh):
        ret = np.asarray(tlwh).copy()
        ret[2:] += ret[:2]
        return ret

    def __repr__(self):
        return 'OT_{}_({}-{})'.format(self.track_id, self.start_frame, self.end_frame)


class BYTETracker(object):
    def __init__(self, args, frame_rate=30):
        self.tracked_stracks = []  # type: list[STrack]
        self.lost_stracks = []  # type: list[STrack]
        self.removed_stracks = []  # type: list[STrack]

        self.frame_id = 0
        self.args = args
        #self.det_thresh = args.track_thresh
        self.det_thresh = args.track_thresh + 0.1
        self.buffer_size = int(frame_rate / 30.0 * args.track_buffer)
        self.max_time_lost = self.buffer_size
        self.kalman_filter = KalmanFilter()

    def update(self, dets):
        self.frame_id += 1
        activated_starcks = []
        refind_stracks = []
        lost_stracks = []
        removed_stracks = []

        xyxys = dets[:, 0:4]
        xywh = xyxy2xywh(xyxys)
        confs = dets[:, 4]
        clss = dets[:, 5]

        classes = clss
        xyxys = xyxys
        confs = confs

        remain_inds = confs > self.args.track_thresh
        inds_low = confs > 0.1
        inds_high = confs < self.args.track_thresh

        inds_second = np.logical_and(inds_low, inds_high)

        dets_second = xywh[inds_second]
        dets = xywh[remain_inds]

        scores_keep = confs[remain_inds]
        scores_second = confs[inds_second]

        clss_keep = classes[remain_inds]
        clss_second = classes[remain_inds]

        if len(dets) > 0:
            detections = [STrack(xyxy, s, c) for (xyxy, s, c) in zip(dets, scores_keep, clss_keep)]
        else:
            detections = []

        unconfirmed = []
        tracked_stracks = []
        for track in self.tracked_stracks:
            if not track.is_activated:
                unconfirmed.append(track)
            else:
                tracked_stracks.append(track)

        strack_pool = joint_stracks(tracked_stracks, self.lost_stracks)
        STrack.multi_predict(strack_pool)
        dists = matching.iou_distance(strack_pool, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.args.match_thresh)

        for itracked, idet in matches:
            track = strack_pool[itracked]
            det = detections[idet]
            if track.state == TrackState.Tracked:
                track.update(detections[idet], self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)

        if len(dets_second) > 0:
            detections_second = [STrack(xywh, s, c) for (xywh, s, c) in zip(dets_second, scores_second, clss_second)]
        else:
            detections_second = []

        r_tracked_stracks = [strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked]
        dists = matching.iou_distance(r_tracked_stracks, detections_second)
        matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5)
        for itracked, idet in matches:
            track = r_tracked_stracks[itracked]
            det = detections_second[idet]
            if track.state == TrackState.Tracked:
                track.update(det, self.frame_id)
                activated_starcks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)

        for it in u_track:
            track = r_tracked_stracks[it]
            if not track.state == TrackState.Lost:
                track.mark_lost()
                lost_stracks.append(track)

        detections = [detections[i] for i in u_detection]
        dists = matching.iou_distance(unconfirmed, detections)
        dists = matching.fuse_score(dists, detections)
        matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7)

        for itracked, idet in matches:
            unconfirmed[itracked].update(detections[idet], self.frame_id)
            activated_starcks.append(unconfirmed[itracked])

        for it in u_unconfirmed:
            track = unconfirmed[it]
            track.mark_removed()
            removed_stracks.append(track)

        for inew in u_detection:
            track = detections[inew]
            if track.score < self.det_thresh:
                continue
            track.activate(self.kalman_filter, self.frame_id)
            activated_starcks.append(track)

        for track in self.lost_stracks:
            if self.frame_id - track.end_frame > self.max_time_lost:
                track.mark_removed()
                removed_stracks.append(track)

        self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]
        self.tracked_stracks = joint_stracks(self.tracked_stracks, activated_starcks)
        self.tracked_stracks = joint_stracks(self.tracked_stracks, refind_stracks)
        self.lost_stracks = sub_stracks(self.lost_stracks, self.tracked_stracks)
        self.lost_stracks.extend(lost_stracks)
        self.lost_stracks = sub_stracks(self.lost_stracks, self.removed_stracks)
        self.removed_stracks.extend(removed_stracks)
        self.tracked_stracks, self.lost_stracks = remove_duplicate_stracks(self.tracked_stracks, self.lost_stracks)
        output_stracks = [track for track in self.tracked_stracks if track.is_activated]
        outputs = []

        for t in output_stracks:
            output = []
            tlwh = t.tlwh
            tid = t.track_id
            tlwh = np.expand_dims(tlwh, axis=0)
            xyxy = xywh2xyxy(tlwh)
            xyxy = np.squeeze(xyxy, axis=0)
            output.extend(xyxy)
            output.append(tid)
            output.append(t.cls)
            output.append(t.score)
            outputs.append(output)

        outputs = np.array(outputs)
        return outputs


def joint_stracks(tlista, tlistb):
    exists = {}
    res = []
    for t in tlista:
        exists[t.track_id] = 1
        res.append(t)
    for t in tlistb:
        tid = t.track_id
        if not exists.get(tid, 0):
            exists[tid] = 1
            res.append(t)
    return res


def sub_stracks(tlista, tlistb):
    stracks = {}
    for t in tlista:
        stracks[t.track_id] = t
    for t in tlistb:
        tid = t.track_id
        if stracks.get(tid, 0):
            del stracks[tid]
    return list(stracks.values())


def remove_duplicate_stracks(stracksa, stracksb):
    pdist = matching.iou_distance(stracksa, stracksb)
    pairs = np.where(pdist < 0.15)
    dupa, dupb = list(), list()
    for p, q in zip(*pairs):
        timep = stracksa[p].frame_id - stracksa[p].start_frame
        timeq = stracksb[q].frame_id - stracksb[q].start_frame
        if timep > timeq:
            dupb.append(q)
        else:
            dupa.append(p)
    resa = [t for i, t in enumerate(stracksa) if not i in dupa]
    resb = [t for i, t in enumerate(stracksb) if not i in dupb]
    return resa, resb


  from .autonotebook import tqdm as notebook_tqdm


# Detection Methods

In [6]:
from collections import OrderedDict
from PIL import ImageColor

import json
import cv2

In [7]:
class Point:
    def __init__(self, raw_point):
        self.x = raw_point[0]
        self.y = raw_point[1]

        def to_string(self): '(' + str(self.x) + ', ' + str(self.y) + ')'

        def to_dict(self): return {'x': self.x, 'y': self.y}


class Box:
    def __init__(self, class_name, confidence, raw_corner_points, color, track_id=None):
        self.class_name = class_name
        self.confidence = confidence
        self.raw_corner_points = raw_corner_points
        self.top_left_point = Point(raw_corner_points[0])
        self.bottom_right_point = Point(raw_corner_points[1])
        self.width = self.bottom_right_point.x - self.top_left_point.x
        self.height = self.bottom_right_point.y - self.top_left_point.y
        self.color = color
        self.track_id = track_id

    def to_dict(self):
        box = OrderedDict([
            ('class', self.class_name),
            ('confidence', self.confidence),
            ('x', self.top_left_point.x),
            ('y', self.top_left_point.y),
            ('width', self.width),
            ('height', self.height),
            ('color', self.color)
        ])
        if self.track_id is not None:
            box['id'] = self.track_id
        return box


class Detections:
    def __init__(self, raw_detection, classes, tracking=False):
        self.__raw_detection = raw_detection
        self.__classes = classes
        self.__boxes = []
        self.__tracking = tracking
        self.__point1_index = 0
        self.__point2_index = 1
        self.__point3_index = 2
        self.__point4_index = 3
        self.__tracking_index = 4
        self.__class_index = 5 if tracking else 5
        self.__confidence_index = 6 if tracking else 4
        self.__extract_boxes()

    def __extract_boxes(self):
        for raw_box in self.__raw_detection:
            track_id = None
            if self.__tracking:
                track_id = int(raw_box[self.__tracking_index])
            class_id = int(raw_box[self.__class_index])
            raw_corner_points = (int(raw_box[self.__point1_index]), int(raw_box[self.__point2_index])), (
                int(raw_box[self.__point3_index]), int(raw_box[self.__point4_index]))
            confidence = raw_box[self.__confidence_index]
            dataset_class = self.__classes[class_id]
            class_name = dataset_class['name']
            class_color = dataset_class['color']
            box = Box(class_name, confidence, raw_corner_points, class_color, track_id=track_id)
            self.__boxes.append(box)

    def get_boxes(self):
        return self.__boxes

    def to_dict(self):
        boxes = []
        for box in self.__boxes:
            boxes.append(box.to_dict())
        return boxes

    def to_json(self):
        boxes = self.to_dict()
        return json.dumps(boxes, indent=4)


def plot_box(image, top_left_point, bottom_right_point, width, height, label, color=(210, 240, 0), padding=6,
             font_scale=0.35):
    label = label.upper()

    cv2.rectangle(image, (top_left_point['x'] - 1, top_left_point['y']),
                  (bottom_right_point['x'], bottom_right_point['y']), color, thickness=2, lineType=cv2.LINE_AA)
    res_scale = (image.shape[0] + image.shape[1]) / 1600
    font_scale = font_scale * res_scale
    font_width, font_height = 0, 0
    font_face = cv2.FONT_HERSHEY_DUPLEX
    text_size = cv2.getTextSize(label, font_face, fontScale=font_scale, thickness=1)[0]

    if text_size[0] > font_width:
        font_width = text_size[0]
    if text_size[1] > font_height:
        font_height = text_size[1]
    if top_left_point['x'] - 1 < 0:
        top_left_point['x'] = 1
    if top_left_point['x'] + font_width + padding * 2 > image.shape[1]:
        top_left_point['x'] = image.shape[1] - font_width - padding * 2
    if top_left_point['y'] - font_height - padding * 2 < 0:
        top_left_point['y'] = font_height + padding * 2

    p3 = top_left_point['x'] + font_width + padding * 2, top_left_point['y'] - font_height - padding * 2
    cv2.rectangle(image, (top_left_point['x'] - 2, top_left_point['y']), p3, color, -1, lineType=cv2.LINE_AA)
    x = top_left_point['x'] + padding
    y = top_left_point['y'] - padding
    cv2.putText(image, label, (x, y), font_face, font_scale, [0, 0, 0], thickness=1, lineType=cv2.LINE_AA)
    return image

def draw(image, detections):
    image_copy = image.copy()
    for box in detections:
        class_name = box['class']
        conf = box['confidence']
        text = ''
        if 'text' in box:
            text = box['text']
            if len(text) > 50:
                text = text[:50] + ' ...'
        label = (str(box['id']) + '. ' if 'id' in box else '') + class_name + ' ' + str(int(conf * 100)) + '%' + (
            (' | ' + text) if ('text' in box and len(box['text']) > 0 and not box['text'].isspace()) else '')
        width = box['width']
        height = box['height']
        color = box['color']

        if isinstance(color, str):
            color = ImageColor.getrgb(color)
            color = (color[2], color[1], color[0])

        top_left_point = {'x': box['x'], 'y': box['y']}
        bottom_right_point = {'x': box['x'] + width, 'y': box['y'] + height}
        image_copy = plot_box(image_copy, top_left_point, bottom_right_point, width, height, label, color=color)
    return image_copy

# Object Detector

In [8]:
from yolov7.models.experimental import attempt_load
from yolov7.utils.datasets import letterbox
from yolov7.utils.general import check_img_size, non_max_suppression, scale_coords

from yolov7.utils.torch_utils import select_device
from detections import Detections
from ByteTrack.yolox.tracker.byte_tracker import BYTETracker

import numpy as np
import torch
import yaml

from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False


class YOLOv7:

    def __init__(self, conf_thres=0.25, iou_thres=0.45, img_size=640):
        self.settings = {
            'conf_thres': conf_thres,
            'iou_thres': iou_thres,
            'img_size': img_size,
        }
        self.tracker = BYTETracker(BYTETrackerArgs)
        self.text_recognizer = None

    def load(self, weights_path, classes, device='cpu'):
        with torch.no_grad():
            self.device = select_device(device)
            self.model = attempt_load(weights_path, map_location=self.device)

            if device != 'cpu':
                self.model.half()
                self.model.to(self.device).eval()

            stride = int(self.model.stride.max())
            self.imgsz = check_img_size(self.settings['img_size'], s=stride)
            self.classes = yaml.load(open(file=classes, mode="r", encoding="utf8"), Loader=yaml.SafeLoader)['classes']

    def unload(self):
        if self.device.type != 'cpu':
            torch.cuda.empty_cache()

    def set(self, **config):
        for key in config.keys():
            if key in self.settings.keys():
                self.settings[key] = config[key]
            else:
                raise Exception(f'{key} is not a valid inference setting')

    def __parse_image(self, img):
        im0 = img.copy()
        img = letterbox(im0, self.imgsz, auto=self.imgsz != 1280)[0]
        img = img[:, :, ::-1].transpose(2, 0, 1)
        img = np.ascontiguousarray(img)
        img = torch.from_numpy(img).to(self.device)
        img = img.half() if self.device.type != 'cpu' else img.float()
        img /= 255.0

        if img.ndimension() == 3:
            img = img.unsqueeze(0)

        return im0, img

    def detect(self, img, track=False):
        with torch.no_grad():
            im0, img = self.__parse_image(img)
            pred = self.model(img)[0]
            pred = non_max_suppression(pred, self.settings['conf_thres'], self.settings['iou_thres'])
            raw_detection = np.empty((0, 6), float)

            for det in pred:
                if len(det) > 0:
                    det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
                    for *xyxy, conf, cls in reversed(det):
                        raw_detection = np.concatenate((raw_detection, [
                            [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3]), round(float(conf), 2), int(cls)]]))

            if track:
                raw_detection = self.tracker.update(raw_detection)

            detections = Detections(raw_detection, self.classes, tracking=track).to_dict()

            return detections


# Track Objects

To test our model prototype, a trimmed video will be employed to cut time for our model detections and tracking. The VIRAT_S_000001 video will be trimmed and used for our testing purposes.

**Original Video:**

In [9]:
from IPython.display import Video

Video('VIRAT Ground Dataset/videos_original/VIRAT_S_000001.MP4')

**Trimmed Video:**

In [10]:
Video('VIRAT_S_000001_Trimmed.mp4')

Now that we have our trimmed video, we can proceed with testing our tracker.

In [11]:
WEIGHTS_PATH = f'{HOME_DIR}/yolov7/yolov7.pt'
CLASSES_PATH = f'{HOME_DIR}/track_classes.yaml'
VIDEO_PATH = f'{HOME_DIR}/VIRAT_S_000001_Trimmed.mp4'
OUTPUT_PATH = f'{HOME_DIR}/VIRAT_S_000001_Trimmed_Tracked.mp4'

In [13]:
from tqdm import tqdm

os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'

import warnings
warnings.filterwarnings("ignore") 

sys.path.insert(0, './yolov7')

yolov7 = YOLOv7()
yolov7.load(weights_path=WEIGHTS_PATH, classes=CLASSES_PATH, device='cpu')

video = cv2.VideoCapture(VIDEO_PATH)
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(video.get(cv2.CAP_PROP_FPS))
frames_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
output = cv2.VideoWriter(OUTPUT_PATH, fourcc, fps, (width, height))

if video.isOpened() == False:
    print('Error Opening Video!')
    
print('Tracking Objects...\n')
pbar = tqdm(total=frames_count, unit=' frames', dynamic_ncols=True, position=0, leave=True)

try:
    while video.isOpened():
        ret, frame = video.read()
        if ret:
            detections = yolov7.detect(frame, track=True)
            detected_frame = draw(frame, detections)
            output.write(detected_frame)
            pbar.update(1)
        else:
            break
except KeyboardInterrupt:
    pass

pbar.close()
video.release()
output.release()
yolov7.unload()

Fusing layers... 
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
Tracking Objects...



100%|███████████████████████████████████████████████████████████████████████| 7500/7500 [1:33:31<00:00,  1.34 frames/s]


In [20]:
Video('VIRAT_S_000001_Trimmed_Tracked.mp4')

As we can see, the tracking was a success.