In [1]:
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt
from glob import glob
import IPython.display as ipd
from tqdm import tqdm
import subprocess
plt.style.use('ggplot')
import os
import shutil
import yaml
import json
import math
import torchvision
import scipy
import torch
import torch.nn as nn

In [None]:
!pip install GPUtil

import torch
from GPUtil import showUtilization as gpu_usage
from numba import cuda
def free_gpu_cache():
    print("Initial GPU Usage")
    gpu_usage()                             

    torch.cuda.empty_cache()

    cuda.select_device(0)
    cuda.close()
    cuda.select_device(0)
    print("GPU Usage after emptying the cache")
    gpu_usage()

free_gpu_cache() 

In [2]:
!pip install lap 

Collecting lap
  Downloading lap-0.4.0.tar.gz (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: lap
  Building wheel for lap (setup.py) ... [?25ldone
[?25h  Created wheel for lap: filename=lap-0.4.0-cp310-cp310-linux_x86_64.whl size=1483137 sha256=822ac1d295f8e487644f15360272b749ca4cdf80a64a06fa808e1a359fabdf09
  Stored in directory: /root/.cache/pip/wheels/00/42/2e/9dfe19270eea279d79e84767ff0d7b8082c3bf776cad00e83d
Successfully built lap
Installing collected packages: lap
Successfully installed lap-0.4.0


In [3]:
try:
    import lap  # for linear_assignment

    assert lap.__version__  # verify package is not directory
except (ImportError, AssertionError, AttributeError):
    import lap


In [4]:
os.makedirs('bdd100k',exist_ok=True)
os.makedirs('bdd100k/train',exist_ok=True)
os.makedirs('bdd100k/val',exist_ok=True)
os.makedirs('bdd100k/test',exist_ok=True)
os.makedirs('bdd100k/train/images',exist_ok=True)
os.makedirs('bdd100k/train/labels',exist_ok=True)
os.makedirs('bdd100k/val/images',exist_ok=True)
os.makedirs('bdd100k/val/labels',exist_ok=True)
os.makedirs('bdd100k/test/images',exist_ok=True)
os.makedirs('bdd100k/test/labels',exist_ok=True)


In [5]:
class_labels=['pedestrian','rider','car','truck','bus','train','motorcycle','bicycle','traffic light','traffic sign']
output_dir='bdd100k'
os.makedirs(output_dir, exist_ok=True)
data_yaml_path=os.path.join(output_dir,'bdd100_data.yml')
data_yaml={
    'path':'/kaggle/working/bdd100k',
    'train':'/kaggle/working/bdd100k/train/images',
    'val':'/kaggle/working/bdd100k/val/images',
    'nc':len(class_labels),
    'names':class_labels
    
    
}
with open(data_yaml_path,'w') as f:
    yaml.dump(data_yaml,f,default_flow_style=False)

In [6]:
!pip install ultralytics -q
import ultralytics
ultralytics.checks()

Ultralytics YOLOv8.2.79 🚀 Python-3.10.13 torch-2.1.2 CUDA:0 (Tesla T4, 15095MiB)
Setup complete ✅ (4 CPUs, 31.4 GB RAM, 5771.7/8062.4 GB disk)


In [7]:
from ultralytics import YOLO

In [8]:
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [9]:
torch.cuda.empty_cache()

# Util

In [10]:
def wh2xy(x):
    y=x.clone()
    #all elements in all dimensions except the last one, take all elements in the last dimension's 0th index
    y[...,0]=x[...,0]-x[...,2]/2
    y[...,1]=x[...,1]-x[...,3]/2
    y[...,2]=x[...,0]+x[...,2]/2
    y[...,3]=x[...,1]+x[...,3]/2
    return y

def make_anchors(x,strides,offset=0.5):
    """Generate anchors from features"""
    assert x is not None
    anchor_points,stride_tensor=[],[]
    for i , stride in enumerate(strides):
        _,_,h,w=x[i].shape
        sx=torch.arange(end=w,dtype=x[i].dtype,device=x[i].device)+offset
        sy=torch.arange(end=h,dtype=x[i].dtype,device=x[i].device)+offst
        sy,sx=torch.meshgrid(sy,sx)
        anchor_points.append(torch.stack((sx,sy),-1).view(-1,2))
        #return an anchor_points of size(N,2)
        stride_tensor.append(torch.full((h*w),1),stride,dtype=x[i].dtype,device=x[i].device)
    return torch.cat(anchor_points), torch.cat(stride_tensor)    

def scale(coords,shape1,shape2,ratio_pad=None):
    if ratio_pad is None: #calculate from img0_shape
        gain=min(shape1[0]/shape2[0],shape1[1]/shape2[1]) #gain=old/new
        pad=((shape1[1]-shape2[1]*gain)/2,(shape1[0]-shape2[0]*gain)/2)
    else:
        gain=ratio_pad[0][0]
        pad=ratio_pad[1]
    coords[:, [0, 2]] -= pad[0]  # x padding
    coords[:, [1, 3]] -= pad[1]  # y padding
    coords[:, :4] /= gain

    coords[:, 0] = np.clip(coords[:, 0], 0, shape1[1])  # x1
    coords[:, 1] = np.clip(coords[:, 1], 0, shape1[0])  # y1
    coords[:, 2] = np.clip(coords[:, 2], 0, shape1[1])  # x2
    coords[:, 3] = np.clip(coords[:, 3], 0, shape1[0])  # y2
    return coords
    

def resize(image,input_size):
    #Resize and pad image while meeting stride-multiple constraints
    shape=image.shape[:2] #current shape [h,w]
    #scale ratio(new/old)
    r=min(1.0,input_size/shape[0],input_size/shape[1])
    #Compute padding
    pad=int(round(shape[1] * r)), int(round(shape[0] * r))
    w = (input_size - pad[0]) / 2
    h = (input_size - pad[1]) / 2
    if shape[::-1] != pad:  # resize
        image = cv2.resize(image,
                           dsize=pad,
                           interpolation=cv2.INTER_LINEAR)
    top, bottom = int(round(h - 0.1)), int(round(h + 0.1))
    left, right = int(round(w - 0.1)), int(round(w + 0.1))
    image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT)  # add border
    return image, (r, r), (w, h)
def non_max_suppression_adapted(results, class_labels, conf_threshold=0.25, iou_threshold=0.45):
    # Extract boxes, confidences, and class IDs from YOLOv8 results
    boxes = results.boxes.xyxy
    scores = results.boxes.conf
    class_ids = results.boxes.cls

    # Initialize list for detections after NMS
    output = []

    for i in range(len(boxes)):
        box = boxes[i]
        score = scores[i]
        class_id = class_ids[i]

        if score < conf_threshold:
            continue

        class_name = class_labels[int(class_id)]
        if class_name not in class_labels:
            continue

        # Append detection to output
        output.append([box[0].item(), box[1].item(), box[2].item(), box[3].item(), score.item(), class_id.item()])

    # Convert to tensor
    output = torch.tensor(output)

    if len(output) == 0:
        return []

    # Apply non-max suppression
    boxes, scores = output[:, :4], output[:, 4]
    indices = torchvision.ops.nms(boxes, scores, iou_threshold)

    # Gather final detections
    output = output[indices]

    return output
def merge_matches(m1, m2, shape):
    o, p, q = shape
    m1 = numpy.asarray(m1)
    m2 = numpy.asarray(m2)

    m1 = scipy.sparse.coo_matrix((numpy.ones(len(m1)), (m1[:, 0], m1[:, 1])), shape=(o, p))
    m2 = scipy.sparse.coo_matrix((numpy.ones(len(m2)), (m2[:, 0], m2[:, 1])), shape=(p, q))

    mask = m1 * m2
    match = mask.nonzero()
    match = list(zip(match[0], match[1]))
    unmatched_o = tuple(set(range(o)) - {i for i, j in match})
    unmatched_q = tuple(set(range(q)) - {j for i, j in match})

    return match, unmatched_o, unmatched_q

def linear_assignment(cost_matrix,thresh,use_lap=True):
    # Linear assignment implementations with scipy and lap.lapjv
    if cost_matrix.size == 0:
        matches = numpy.empty((0, 2), dtype=int)
        unmatched_a = tuple(range(cost_matrix.shape[0]))
        unmatched_b = tuple(range(cost_matrix.shape[1]))
        return matches, unmatched_a, unmatched_b

    if use_lap:
        _, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
        matches = [[ix, mx] for ix, mx in enumerate(x) if mx >= 0]
        unmatched_a = numpy.where(x < 0)[0]
        unmatched_b = numpy.where(y < 0)[0]
    else:
        # Scipy linear sum assignment is NOT working correctly, DO NOT USE
        y, x = scipy.optimize.linear_sum_assignment(cost_matrix)  # row y, col x
        matches = numpy.asarray([[i, x] for i, x in enumerate(x) if cost_matrix[i, x] <= thresh])
        unmatched = numpy.ones(cost_matrix.shape)
        for i, xi in matches:
            unmatched[i, xi] = 0.0
        unmatched_a = numpy.where(unmatched.all(1))[0]
        unmatched_b = numpy.where(unmatched.all(0))[0]

    return matches, unmatched_a, unmatched_b
    #unmatched_a: indices of 1 not in 2
    #unmatched_b: indices of 2 not in 1

def compute_iou(a_boxes, b_boxes):
    """
    Compute cost based on IoU
    :type a_boxes: list[tlbr] | np.ndarray
    :type b_boxes: list[tlbr] | np.ndarray

    :rtype iou | np.ndarray
    """
    iou = numpy.zeros((len(a_boxes), len(b_boxes)), dtype=numpy.float32)
    if iou.size == 0:
        return iou
    a_boxes = numpy.ascontiguousarray(a_boxes, dtype=numpy.float32)
    b_boxes = numpy.ascontiguousarray(b_boxes, dtype=numpy.float32)
    # Get the coordinates of bounding boxes
    b1_x1, b1_y1, b1_x2, b1_y2 = a_boxes.T
    b2_x1, b2_y1, b2_x2, b2_y2 = b_boxes.T

    # Intersection area
    inter_area = (numpy.minimum(b1_x2[:, None], b2_x2) - numpy.maximum(b1_x1[:, None], b2_x1)).clip(0) * \
                 (numpy.minimum(b1_y2[:, None], b2_y2) - numpy.maximum(b1_y1[:, None], b2_y1)).clip(0)

    # box2 area
    box1_area = (b1_x2 - b1_x1) * (b1_y2 - b1_y1)
    box2_area = (b2_x2 - b2_x1) * (b2_y2 - b2_y1)
    return inter_area / (box2_area + box1_area[:, None] - inter_area + 1E-7)
def iou_distance(a_tracks, b_tracks):
    """
    Compute cost based on IoU
    :type a_tracks: list[STrack]
    :type b_tracks: list[STrack]

    :rtype cost_matrix np.ndarray
    """

    if (len(a_tracks) > 0 and isinstance(a_tracks[0], numpy.ndarray)) \
            or (len(b_tracks) > 0 and isinstance(b_tracks[0], numpy.ndarray)):
        a_boxes = a_tracks
        b_boxes = b_tracks
    else:
        a_boxes = [track.tlbr for track in a_tracks]
        b_boxes = [track.tlbr for track in b_tracks]
    return 1 - compute_iou(a_boxes, b_boxes)  # cost matrix


def fuse_score(cost_matrix, detections):
    if cost_matrix.size == 0:
        return cost_matrix
    iou_sim = 1 - cost_matrix
    det_scores = numpy.array([det.score for det in detections])
    det_scores = numpy.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
    fuse_sim = iou_sim * det_scores
    return 1 - fuse_sim  # fuse_cost 

In [11]:
class KalmanFilterXYAH:
    """
    A Kalman filter for tracking bounding boxes in image space.

    The 8-dimensional state space

        x, y, a, h, vx, vy, va, vh

    contains the bounding box center position (x, y), aspect ratio a, height h,
    and their respective velocities.

    Object motion follows a constant velocity model. The bounding box location
    (x, y, a, h) is taken as direct observation of the state space (linear
    observation model).

    """
        
    def __init__(self):
        ndim, dt = 4, 1.
        # Create Kalman filter model matrices.
        self._motion_mat = np.eye(2 * ndim, 2 * ndim)
        for i in range(ndim):
            self._motion_mat[i, ndim + i] = dt
        self._update_mat = np.eye(ndim, 2 * ndim)
        self._std_weight_position = 1. / 20
        self._std_weight_velocity = 1. / 160
    def initiate(self, measurement):
        """
        Create track from unassociated measurement.

        Parameters
        ----------
        measurement : ndarray
            Bounding box coordinates (x, y, a, h) with center position (x, y),
            aspect ratio a, and height h.

        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector (8 dimensional) and covariance matrix (8x8
            dimensional) of the new track. Unobserved velocities are initialized
            to 0 mean.

        """
        mean_pos = measurement
        mean_vel = np.zeros_like(mean_pos)
        mean = np.r_[mean_pos, mean_vel]

        std = [2 * self._std_weight_position * measurement[3],
               2 * self._std_weight_position * measurement[3],
               1e-2,
               2 * self._std_weight_position * measurement[3],
               10 * self._std_weight_velocity * measurement[3],
               10 * self._std_weight_velocity * measurement[3],
               1e-5,
               10 * self._std_weight_velocity * measurement[3]]
        covariance = numpy.diag(np.square(std))
        return mean, covariance
    def predict(self, mean, covariance):
        """
        Run Kalman filter prediction step.

        Parameters
        ----------
        mean : ndarray
            The 8 dimensional mean vector of the object state at the previous
            time step.
        covariance : ndarray
            The 8x8 dimensional covariance matrix of the object state at the
            previous time step.

        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.

        """
        std_pos = [self._std_weight_position * mean[3],
                   self._std_weight_position * mean[3],
                   1e-2,
                   self._std_weight_position * mean[3]]
        std_vel = [self._std_weight_velocity * mean[3],
                   self._std_weight_velocity * mean[3],
                   1e-5,
                   self._std_weight_velocity * mean[3]]
        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))

        # mean = np.dot(self._motion_mat, mean)
        mean = np.dot(mean, self._motion_mat.T)
        covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov

        return mean, covariance
    def project(self, mean, covariance):
        """
        Project state distribution to measurement space.

        Parameters
        ----------
        mean : ndarray
            The state's mean vector (8 dimensional array).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).

        Returns
        -------
        (ndarray, ndarray)
            Returns the projected mean and covariance matrix of the given state
            estimate.

        """
        std = [self._std_weight_position * mean[3],
               self._std_weight_position * mean[3],
               1e-1,
               self._std_weight_position * mean[3]]
        innovation_cov = numpy.diag(numpy.square(std))

        mean = numpy.dot(self._update_mat, mean)
        covariance = numpy.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T))
        return mean, covariance + innovation_cov
    def multi_predict(self, mean, covariance):
        """
        Run Kalman filter prediction step (Vectorized version).
        Parameters
        ----------
        mean : ndarray
            The Nx8 dimensional mean matrix of the object states at the previous
            time step.
        covariance : ndarray
            The Nx8x8 dimensional covariance matrix of the object states at the
            previous time step.
        Returns
        -------
        (ndarray, ndarray)
            Returns the mean vector and covariance matrix of the predicted
            state. Unobserved velocities are initialized to 0 mean.
        """
        std_pos = [self._std_weight_position * mean[:, 3],
                   self._std_weight_position * mean[:, 3],
                   1e-2 * numpy.ones_like(mean[:, 3]),
                   self._std_weight_position * mean[:, 3]]
        std_vel = [self._std_weight_velocity * mean[:, 3],
                   self._std_weight_velocity * mean[:, 3],
                   1e-5 * numpy.ones_like(mean[:, 3]),
                   self._std_weight_velocity * mean[:, 3]]
        sqr = numpy.square(numpy.r_[std_pos, std_vel]).T

        motion_cov = [numpy.diag(sqr[i]) for i in range(len(mean))]
        motion_cov = numpy.asarray(motion_cov)

        mean = numpy.dot(mean, self._motion_mat.T)
        left = numpy.dot(self._motion_mat, covariance).transpose((1, 0, 2))
        covariance = numpy.dot(left, self._motion_mat.T) + motion_cov

        return mean, covariance
    def update(self, mean, covariance, measurement):
        """
        Run Kalman filter correction step.

        Parameters
        ----------
        mean : ndarray
            The predicted state's mean vector (8 dimensional).
        covariance : ndarray
            The state's covariance matrix (8x8 dimensional).
        measurement : ndarray
            The 4 dimensional measurement vector (x, y, a, h), where (x, y)
            is the center position, a the aspect ratio, and h the height of the
            bounding box.

        Returns
        -------
        (ndarray, ndarray)
            Returns the measurement-corrected state distribution.

        """
        projected_mean, projected_cov = self.project(mean, covariance)

        chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
        kalman_gain = scipy.linalg.cho_solve((chol_factor, lower),
                                             numpy.dot(covariance, self._update_mat.T).T,
                                             check_finite=False).T
        innovation = measurement - projected_mean

        new_mean = mean + numpy.dot(innovation, kalman_gain.T)
        new_covariance = covariance - numpy.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
        return new_mean, new_covariance
    def gating_distance(self, mean, covariance, measurements, only_position=False, metric='maha'):
        """
        Compute gating distance between state distribution and measurements.
        A suitable distance threshold can be obtained from `chi2inv95`. If
        `only_position` is False, the chi-square distribution has 4 degrees of
        freedom, otherwise 2.
        Parameters
        ----------
        mean : ndarray
            Mean vector over the state distribution (8 dimensional).
        covariance : ndarray
            Covariance of the state distribution (8x8 dimensional).
        measurements : ndarray
            An Nx4 dimensional matrix of N measurements, each in
            format (x, y, a, h) where (x, y) is the bounding box center
            position, a the aspect ratio, and h the height.
        only_position : Optional[bool]
            If True, distance computation is done with respect to the bounding
            box center position only.
        metric : str
            Distance metric.
        Returns
        -------
        ndarray
            Returns an array of length N, where the i-th element contains the
            squared Mahalanobis distance between (mean, covariance) and
            `measurements[i]`.
        """
        mean, covariance = self.project(mean, covariance)
        if only_position:
            mean, covariance = mean[:2], covariance[:2, :2]
            measurements = measurements[:, :2]

        d = measurements - mean
        if metric == 'gaussian':
            return numpy.sum(d * d, axis=1)
        elif metric == 'maha':
            factor = numpy.linalg.cholesky(covariance)
            z = scipy.linalg.solve_triangular(factor, d.T, lower=True, check_finite=False, overwrite_b=True)
            return numpy.sum(z * z, axis=0)  # square maha
        else:
            raise ValueError('invalid distance metric')
    

# ByteTrack

In [12]:
class State:
    New = 0
    Tracked = 1
    Lost = 2
    Removed = 3
class Track:
    count=0
    shared_kalman=KalmanFilterXYAH()
    def __init__(self,tlwh,score,cls):
        self._tlwh = numpy.asarray(self.tlbr_to_tlwh(tlwh[:-1]), dtype=numpy.float32)
        self.kalman_filter = None
        self.mean, self.covariance = None, None
        self.is_activated = False

        self.score = score
        self.tracklet_len = 0
        self.cls = cls
        self.idx = tlwh[-1]
    def predict(self):
        mean_state=self.mean.copy()
        if self.state!=State.Tracked:
            mean_state[7]=0
        self.mean,self.covariance=self.kalman_filter.predict(mean_state,self.covariance)
    @staticmethod
    def multi_predict(tracks):
        if len(tracks)<=0:
            return
        multi_mean=numpy.asarray([st.mean.copy() for st in tracks])
        multi_covariance=numpy.asarray([st.covariance for st in tracks])
        for i,st in enumerate(tracks):
            if st.state!=State.Tracked:
                multi_mean[i][7]=0
        multi_mean,multi_covariance=Track.shared_kalman.multi_predict(multi_mean, multi_covariance)
        for i,(mean,cov) in enumerate(zip(multi_mean,multi_covariance)):
            tracks[i].mean=mean
            tracks[i].covariance=cov
    def activate(self,kalman_filter,frame_id):
        self.kalman_filter=kalman_filter
        self.track_id=self.next_id()
        self.mean,self.covariance=self.kalman_filter.initiate(self.convert_coords(self._tlwh))
        self.tracklet_len = 0
        self.state = State.Tracked
        if frame_id == 1:
            self.is_activated = True
        self.frame_id = frame_id
        self.start_frame = frame_id
    def re_activate(self,new_track,frame_id,new_id=False):
        self.mean,self.covariance=self.kalman_filter.update(self.mean, self.covariance,self.convert_coords(new_track.tlwh))
        self.tracklet_len=0
        self.state=State.Tracked
        self.is_activated=True
        self.frame_id=frame_id
        if new_id:
            self.track_id=self.next_id()
        self.score=new_track.score
        self.cls=new_track.cls
        self.idx=new_track.idx
    def update(self,new_track,frame_id):
        """
        Update a matched track
        :type new_track: Track
        :type frame_id: int
        :return:
        """
        self.frame_id=frame_id
        self.tracklet_len+=1
        new_tlwh=new_track.tlwh
        self.mean,self.covariance=self.kalman_filter.update(self.mean, self.covariance,self.convert_coords(new_tlwh))
        self.state=State.Tracked
        self.is_activated=True
        self.score=new_track.score
        self.cls=new_track.cls
        self.idx=new_track.idx
    def convert_coords(self, tlwh):
        return self.tlwh_to_xyah(tlwh)

    def mark_lost(self):
        self.state = State.Lost

    def mark_removed(self):
        self.state = State.Removed
    @property
    def end_frame(self):
        return self.frame_id

    @staticmethod
    def next_id():
        Track.count += 1
        return Track.count

    @property
    def tlwh(self):
        """Get current position in bounding box format `(top left x, top left y,
        width, height)`.
        """
        if self.mean is None:
            return self._tlwh.copy()
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    @property
    def tlbr(self):
        """Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
        `(top left, bottom right)`.
        """
        ret = self.tlwh.copy()
        ret[2:] += ret[:2]
        return ret

    @staticmethod
    def reset_id():
        Track.count = 0

    @staticmethod
    def tlwh_to_xyah(tlwh):
        """Convert bounding box to format `(center x, center y, aspect ratio,
        height)`, where the aspect ratio is `width / height`.
        """
        ret = numpy.asarray(tlwh).copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret

    @staticmethod
    def tlbr_to_tlwh(tlbr):
        ret = numpy.asarray(tlbr).copy()
        ret[2:] -= ret[:2]
        return ret

    @staticmethod
    def tlwh_to_tlbr(tlwh):
        ret = numpy.asarray(tlwh).copy()
        ret[2:] += ret[:2]
        return ret

    def __repr__(self):
        return f'OT_{self.track_id}_({self.start_frame}-{self.end_frame})'
        
        
        
        
        
        
        

In [19]:
class BYTETracker:
    def __init__(self,frame_rate=30):
        self.tracked_tracks=[]
        self.lost_tracks=[]
        self.removed_tracks=[]
        self.frame_id = 0
        self.max_time_lost = 2*int(frame_rate)
        self.kalman_filter = KalmanFilterXYAH()
        self.reset_id()
    def update(self,boxes,scores,object_classes):
        self.frame_id+=1
        activated_tracks=[]
        re_find_tracks=[]
        lost_tracks=[]
        removed_tracks=[]
        #add index
        boxes = numpy.concatenate([boxes, numpy.arange(len(boxes)).reshape(-1, 1)], axis=-1)
        indices_low=scores>0.05
        indices_high=scores<0.2
        indices_remain=scores>0.2
        indices_second=numpy.logical_and(indices_low, indices_high)

        boxes_second = boxes[indices_second]
        boxes = boxes[indices_remain]
        scores_keep = scores[indices_remain]
        scores_second = scores[indices_second]
        cls_keep = object_classes[indices_remain]
        cls_second = object_classes[indices_second]

        detections = self.init_track(boxes, scores_keep, cls_keep)
        """ Add newly detected tracklets to tracked_stracks"""
        unconfirmed = []
        tracked_stracks = []
        for track in self.tracked_tracks:
            if not track.is_activated:
                unconfirmed.append(track)
            else:
                tracked_stracks.append(track)
        """ Step 2: First association, with high score detection boxes"""
        track_pool = self.joint_stracks(tracked_stracks, self.lost_tracks)
        # Predict the current location with KF
        self.multi_predict(track_pool)

        dists = self.get_dists(track_pool, detections)
        matches, u_track, u_detection = linear_assignment(dists, thresh=0.95)
        for tracked_i, box_i in matches:
            track = track_pool[tracked_i]
            det = detections[box_i]
            if track.state == State.Tracked:
                track.update(det, self.frame_id)
                activated_tracks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                re_find_tracks.append(track)
        """ Step 3: Second association, with low score detection boxes"""
        # association the untrack to the low score detections
        detections_second = self.init_track(boxes_second, scores_second, cls_second)
        r_tracked_tracks = [track_pool[i] for i in u_track if track_pool[i].state == State.Tracked]
        dists = iou_distance(r_tracked_tracks, detections_second)
        matches, u_track, u_detection_second = linear_assignment(dists, thresh=0.85)
        for tracked_i, box_i in matches:
            track = r_tracked_tracks[tracked_i]
            det = detections_second[box_i]
            if track.state == State.Tracked:
                track.update(det, self.frame_id)
                activated_tracks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                re_find_tracks.append(track)

        for it in u_track:
            track = r_tracked_tracks[it]
            if track.state != State.Lost:
                track.mark_lost()
                lost_tracks.append(track)
        """Deal with unconfirmed tracks, usually tracks with only one beginning frame"""
        detections = [detections[i] for i in u_detection]
        dists = self.get_dists(unconfirmed, detections)
        matches, u_unconfirmed, u_detection = linear_assignment(dists, thresh=0.9)
        for tracked_i, box_i in matches:
            unconfirmed[tracked_i].update(detections[box_i], self.frame_id)
            activated_tracks.append(unconfirmed[tracked_i])
        for it in u_unconfirmed:
            track = unconfirmed[it]
            track.mark_removed()
            removed_tracks.append(track)
        """ Step 4: Init new stracks"""
        for new_i in u_detection:
            track = detections[new_i]
            if track.score < 0.2:
                continue
            track.activate(self.kalman_filter, self.frame_id)
            activated_tracks.append(track)
        """ Step 5: Update state"""
        for track in self.lost_tracks:
            if self.frame_id - track.end_frame > self.max_time_lost:
                track.mark_removed()
                removed_tracks.append(track)

        self.tracked_tracks = [t for t in self.tracked_tracks if t.state == State.Tracked]
        self.tracked_tracks = self.joint_stracks(self.tracked_tracks, activated_tracks)
        self.tracked_tracks = self.joint_stracks(self.tracked_tracks, re_find_tracks)
        self.lost_tracks = self.sub_stracks(self.lost_tracks, self.tracked_tracks)
        self.lost_tracks.extend(lost_tracks)
        self.lost_tracks = self.sub_stracks(self.lost_tracks, self.removed_tracks)
        self.removed_tracks.extend(removed_tracks)
        self.tracked_tracks, self.lost_tracks = self.remove_duplicate_stracks(self.tracked_tracks, self.lost_tracks)
        output = [track.tlbr.tolist() + [track.track_id,
                                         track.score,
                                         track.cls,
                                         track.idx] for track in self.tracked_tracks if track.is_activated]
        return numpy.asarray(output, dtype=numpy.float32)

    @staticmethod
    def init_track(boxes, scores, cls):
        return [Track(box, s, c) for (box, s, c) in zip(boxes, scores, cls)] if len(boxes) else []  # detections

    @staticmethod
    def get_dists(tracks, detections):
        dists = iou_distance(tracks, detections)
        dists = fuse_score(dists, detections)
        return dists

    @staticmethod
    def multi_predict(tracks):
        Track.multi_predict(tracks)

    @staticmethod
    def reset_id():
        Track.reset_id()

    @staticmethod
    def joint_stracks(tlista, tlistb):
        exists = {}
        res = []
        for t in tlista:
            exists[t.track_id] = 1
            res.append(t)
        for t in tlistb:
            tid = t.track_id
            if not exists.get(tid, 0):
                exists[tid] = 1
                res.append(t)
        return res

    @staticmethod
    def sub_stracks(tlista, tlistb):
        stracks = {t.track_id: t for t in tlista}
        for t in tlistb:
            tid = t.track_id
            if stracks.get(tid, 0):
                del stracks[tid]
        return list(stracks.values())

    @staticmethod
    def remove_duplicate_stracks(stracksa, stracksb):
        pdist = iou_distance(stracksa, stracksb)
        pairs = numpy.where(pdist < 0.15)
        dupa, dupb = [], []
        for p, q in zip(*pairs):
            timep = stracksa[p].frame_id - stracksa[p].start_frame
            timeq = stracksb[q].frame_id - stracksb[q].start_frame
            if timep > timeq:
                dupb.append(q)
            else:
                dupa.append(p)
        resa = [t for i, t in enumerate(stracksa) if i not in dupa]
        resb = [t for i, t in enumerate(stracksb) if i not in dupb]
        return resa, resb
    

# Show results

In [14]:
#Function to check if the object has crossed line
def is_crossing_line(bbox,line):
    bx1,by1,bx2,by2=bbox
    x1,y1,x2,y2=line # the line has y1=y2
    if by1<=y1<=by2:
        return True
    else:
        return False

In [15]:
def draw_detection(img,bboxes,scores,class_ids,ids,classes=class_labels,mask_alpha=0.3):
    height,width=img.shape[:2]
    np.random.seed(0)
    rng=np.random.default_rng(3)
    colors=rng.uniform(0,255,size=(len(classes),3))
    mask_img=img.copy()
    det_img=img.copy()
    size=min([height,width])*0.0006
    text_thickness=int(min([height,width])*0.001)
    for bbox,score,class_id,id_ in zip(bboxes,scores,class_ids,ids):
        color=colors[class_id]
        x1,y1,x2,y2=bbox.astype(int)
        #Draw rectangle
        cv2.rectangle(det_img,(x1,y1),(x2,y2),color,2)
        cv2.rectangle(mask_img,(x1,y1),(x2,y2),color,-1)
        label=classes[class_id]
        caption=f'{label} {int(score*100)}% ID: {id_}'
        (tw,th),_=cv2.getTextSize(text=caption,
                                 fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                                 fontScale=size,
                                 thickness=text_thickness)
        
        th=int(th*1.2)
        cv2.rectangle(det_img,(x1,y1),(x1+tw,y1-th),color,-1)
        cv2.rectangle(mask_img,(x1,y1),(x1+tw,y1-th),color,-1)
        cv2.putText(det_img,caption,(x1,y1),
                   cv2.FONT_HERSHEY_SIMPLEX,size,
                   (255,255,255),
                   text_thickness,cv2.LINE_AA)
        cv2.putText(mask_img,caption,(x1,y1),
                   cv2.FONT_HERSHEY_SIMPLEX,size,
                   (255,255,255),
                   text_thickness,cv2.LINE_AA)
    return cv2.addWeighted(mask_img,mask_alpha,det_img,1-mask_alpha,0)

In [26]:
import warnings
import time
import cv2
import numpy
import torch


warnings.filterwarnings("ignore")
size = 640
model = YOLO("/kaggle/input/dccbbccd/best (2).pt")
reader = cv2.VideoCapture('/kaggle/input/dccbbccd/vid4.mp4')
save_re=[]
if not reader.isOpened():
    print("Error opening video stream or file")

fps = int(reader.get(cv2.CAP_PROP_FPS))
orig_w = int(reader.get(cv2.CAP_PROP_FRAME_WIDTH))
orig_h = int(reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
start_line=(0,140,orig_w,140)
end_line=(0,orig_h-50,orig_w,orig_h-50)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, fps, (orig_w, orig_h))

bytetrack = BYTETracker(fps)
tracked_ids = np.array([], dtype=np.int32)
object_states={}
frame_index = 0
while reader.isOpened():
    success, frame = reader.read()
    if not success:
        break

    # Prepare image for model
    image = frame.copy()
    shape = image.shape[:2]
    r = size / max(shape[0], shape[1])
    if r != 1:
                h, w = shape
                image = cv2.resize(image,
                                   dsize=(int(w * r), int(h * r)),
                                   interpolation=cv2.INTER_LINEAR)
    h, w = image.shape[:2]
    image, ratio, pad = resize(image, size)
    shapes = shape, ((h / shape[0], w / shape[1]), pad)
    sample = image.transpose((2, 0, 1))[::-1]
    sample = np.ascontiguousarray(sample)
    sample = torch.unsqueeze(torch.from_numpy(sample), dim=0).cuda().half() / 255

    # Model inference
    with torch.no_grad():
        results = model.predict(sample,conf=0.12)[0]

    # Process detections
    bboxes = results.boxes.xywh.cpu().numpy()
    scores = results.boxes.conf.cpu().numpy()
    class_ids = results.boxes.cls.cpu().numpy()
    class_labels = ['pedestrian', 'rider', 'car', 'truck', 'bus', 'train', 'motorcycle', 'bicycle', 'traffic light', 'traffic sign']

    detections = non_max_suppression_adapted(results, class_labels, 0.25, 0.7)
    boxes = []
    confidences = []
    object_classes = []
    for detection in detections:
        x1, y1, x2, y2, conf, cls_id = detection
        boxes.append([x1, y1, x2, y2])
        confidences.append(conf)
        object_classes.append(cls_id)

    # Update tracker
    outputs = bytetrack.update(np.array(boxes), np.array(confidences), np.array(object_classes))
    #print(len(outputs))
    if len(outputs) > 0:
        boxes = outputs[:, :4]
        tracking_ids = outputs[:, 4].astype(np.int32)
        new_ids = np.setdiff1d(tracking_ids, tracked_ids)
        tracked_ids = np.concatenate((tracked_ids, new_ids))
        class_ids = outputs[:, 6].astype(np.int32)
        conf_scores = outputs[:, 5].astype(np.float64)
        boxes = scale(boxes, (orig_h, orig_w), (size, size), ratio_pad=shapes[1])
        boxes = boxes.astype(np.int32)
        save_re.append(boxes)
        for i in range(len(outputs)):
            obj_id = tracking_ids[i]
            if obj_id not in object_states:
                object_states[obj_id] = {'start_cross': False, 'end_cross': False}
            if is_crossing_line(boxes[i], start_line):
                object_states[obj_id]['start_cross'] = True
            if is_crossing_line(boxes[i], end_line):
                object_states[obj_id]['end_cross'] = True

    result_img = draw_detection(img=frame,
                                bboxes=boxes,
                                scores=conf_scores,
                                class_ids=class_ids,
                                ids=tracking_ids)
    # Write the frame to the output video
    cv2.line(result_img, (start_line[0], start_line[1]), (start_line[2], start_line[3]), (0, 255, 0), 2)
    cv2.line(result_img, (end_line[0], end_line[1]), (end_line[2], end_line[3]), (0, 0, 255), 2)

    # Calculate the number of objects that crossed both lines while keeping their IDs
    successful_tracks = sum(1 for state in object_states.values() if state['start_cross'] and state['end_cross'])

    # Display the number of successful tracks on the frame
    cv2.putText(result_img, f"Successful Tracks: {successful_tracks}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    out.write(result_img)

    frame_index += 1

reader.release()
out.release()
print(successful_tracks)





0: 640x640 12 riders, 9 cars, 17 motorcycles, 4 bicycles, 3 traffic lights, 17.8ms
Speed: 0.1ms preprocess, 17.8ms inference, 2.3ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 17 riders, 7 cars, 14 motorcycles, 5 bicycles, 3 traffic lights, 16.3ms
Speed: 0.1ms preprocess, 16.3ms inference, 1.8ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 pedestrian, 14 riders, 8 cars, 1 truck, 13 motorcycles, 5 bicycles, 1 traffic light, 16.4ms
Speed: 0.1ms preprocess, 16.4ms inference, 1.7ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 pedestrian, 11 riders, 6 cars, 13 motorcycles, 4 bicycles, 3 traffic lights, 16.2ms
Speed: 0.1ms preprocess, 16.2ms inference, 1.9ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 2 pedestrians, 15 riders, 5 cars, 1 truck, 14 motorcycles, 5 bicycles, 4 traffic lights, 15.6ms
Speed: 0.1ms preprocess, 15.6ms inference, 1.7ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 3 pedestrians, 13 rider

# DeepSORT

In [28]:
import warnings
import time
import cv2
import numpy
import torch

#Initialize
warnings.filterwarnings("ignore")
size = 640
model = YOLO("/kaggle/input/dccbbccd/best (2).pt")
reader = cv2.VideoCapture('/kaggle/input/dccbbccd/vid4.mp4')
save_re=[]
frame_count = 0
previous_frame = None
kalman_filter = KalmanFilterXYAH()
object_tracks = {}  # To hold the Kalman filter states for each object
fps = int(reader.get(cv2.CAP_PROP_FPS))
orig_w = int(reader.get(cv2.CAP_PROP_FRAME_WIDTH))
orig_h = int(reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
start_line=(0,140,orig_w,140)
end_line=(0,orig_h-50,orig_w,orig_h-50)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, fps, (orig_w, orig_h))
frame_index=0
frame_detections={}
bytetrack = BYTETracker(fps)
tracked_ids = np.array([], dtype=np.int32)
object_states={}


#Main
if not reader.isOpened():
    print("Error opening video stream or file")

while reader.isOpened():
    success, frame = reader.read()
    if not success:
        break

    # Prepare image for model
    image = frame.copy()
    shape = image.shape[:2]
    r = size / max(shape[0], shape[1])
    if r != 1:
        h, w = shape
        image = cv2.resize(image,
                           dsize=(int(w * r), int(h * r)),
                           interpolation=cv2.INTER_LINEAR)
    h, w = image.shape[:2]
    image, ratio, pad = resize(image, size)
    shapes = shape, ((h / shape[0], w / shape[1]), pad)
    sample = image.transpose((2, 0, 1))[::-1]
    sample = np.ascontiguousarray(sample)
    sample = torch.unsqueeze(torch.from_numpy(sample), dim=0).cuda().half() / 255
    frame_index += 1
    if previous_frame is not None:
        current_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame_diff = cv2.absdiff(current_frame, previous_frame)
        _, thresh = cv2.threshold(frame_diff, 25, 255, cv2.THRESH_BINARY)
        non_zero_count = np.count_nonzero(thresh)
        frame_count += 1
    else:
        non_zero_count = 0
        frame_count = 0

    boxes = []
    confidences = []
    object_classes = []

    if non_zero_count > 500 or frame_count > 5:
        # Perform detection with YOLOv8
        with torch.no_grad():
            results = model.predict(sample, conf=0.25)[0]

        # Process detections
        bboxes = results.boxes.xywh.cpu().numpy()
        scores = results.boxes.conf.cpu().numpy()
        class_ids = results.boxes.cls.cpu().numpy()
        class_labels = ['pedestrian', 'rider', 'car', 'truck', 'bus', 'train', 'motorcycle', 'bicycle', 'traffic light', 'traffic sign']
        

        detections = non_max_suppression_adapted(results, class_labels, 0.25, 0.7)
        #print(len(detections))
        for detection in detections:
            x1, y1, x2, y2, conf, cls_id = detection
            boxes.append([x1, y1, x2, y2])
            confidences.append(conf)
            object_classes.append(cls_id)

        # Save detections for this frame
        frame_detections[frame_index] = {'boxes': boxes, 'confidences': confidences, 'class_ids': object_classes}

        # Update Kalman filter with new detections
        for i in range(len(boxes)):
            bbox = boxes[i]
            x, y, a, h = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2, bbox[2] - bbox[0], bbox[3] - bbox[1])
            measurement = np.array([x, y, a, h])
            obj_id = tracking_ids[i]  # Ensure you have tracking_ids corresponding to each detection

            if obj_id in object_tracks:
                kf_mean, kf_cov = object_tracks[obj_id]
                kf_mean, kf_cov = kalman_filter.update(kf_mean, kf_cov, measurement)
                object_tracks[obj_id] = (kf_mean, kf_cov)
            else:
                kf_mean, kf_cov = kalman_filter.initiate(measurement)
                object_tracks[obj_id] = (kf_mean, kf_cov)

            # Store class ID with object ID
            object_states[obj_id] = {'class_id': class_ids[i]}

    else:
        # Use Kalman filter for prediction
        previous_frame_index = frame_index - 1
        if previous_frame_index in frame_detections:
            prev_boxes = frame_detections[previous_frame_index]['boxes']
            prev_confidences = frame_detections[previous_frame_index]['confidences']
            prev_class_ids = frame_detections[previous_frame_index]['class_ids']

            for i, obj_id in enumerate(object_tracks.keys()):
                kf_mean, kf_cov = object_tracks[obj_id]
                kf_mean, kf_cov = kalman_filter.predict(kf_mean, kf_cov)
                object_tracks[obj_id] = (kf_mean, kf_cov)

                # Convert Kalman filter state back to bounding box format
                x, y, a, h = kf_mean[:4]
                w = a  # Assuming 'a' is the width of the bounding box; adjust as needed
                bbox = [x - w/2, y - h/2, x + w/2, y + h/2]
                boxes.append(bbox)
                confidences.append(prev_confidences[i])  # Use the confidence from the previous detection
                object_classes.append(prev_class_ids[i])  # Use the class ID from the previous detection

            # Save predictions for this frame
            frame_detections[frame_index] = {'boxes': boxes, 'confidences': confidences, 'class_ids': object_classes}

            # Now use `boxes`, `confidences`, and `object_classes` in BYTETrack update
            outputs = bytetrack.update(np.array(boxes), np.array(confidences), np.array(object_classes))
    
        

   
    print(len(outputs))
    if len(outputs) > 0:
        boxes = outputs[:, :4]
        tracking_ids = outputs[:, 4].astype(np.int32)
        new_ids = np.setdiff1d(tracking_ids, tracked_ids)
        tracked_ids = np.concatenate((tracked_ids, new_ids))
        class_ids = outputs[:, 6].astype(np.int32)
        conf_scores = outputs[:, 5].astype(np.float64)
        boxes = scale(boxes, (orig_h, orig_w), (size, size), ratio_pad=shapes[1])
        boxes = boxes.astype(np.int32)
        save_re.append(boxes)
        for i in range(len(outputs)):
            obj_id = tracking_ids[i]
            if obj_id not in object_states:
                object_states[obj_id] = {'start_cross': False, 'end_cross': False}
            if is_crossing_line(boxes[i], start_line):
                object_states[obj_id]['start_cross'] = True
            if is_crossing_line(boxes[i], end_line):
                object_states[obj_id]['end_cross'] = True

    result_img = draw_detection(img=frame,
                                bboxes=boxes,
                                scores=conf_scores,
                                class_ids=class_ids,
                                ids=tracking_ids)
    # Write the frame to the output video
    cv2.line(result_img, (start_line[0], start_line[1]), (start_line[2], start_line[3]), (0, 255, 0), 2)
    cv2.line(result_img, (end_line[0], end_line[1]), (end_line[2], end_line[3]), (0, 0, 255), 2)

    # Calculate the number of objects that crossed both lines while keeping their IDs
    successful_tracks = sum(1 for state in object_states.values() if state['start_cross'] and state['end_cross'])

    # Display the number of successful tracks on the frame
    cv2.putText(result_img, f"Successful Tracks: {successful_tracks}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    out.write(result_img)


reader.release()
out.release()
print(successful_tracks)




31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
31
3