In [48]:
import cv2
import numpy as np
from ultralytics import YOLO
from scipy.optimize import linear_sum_assignment
import gc
import torch
print(torch.cuda.is_available())

model = YOLO('yolo11l.pt')
model.to('cuda')

vehicle_classes = [2, 5, 7, 3]
video_path = 'video.mp4'
cap = cv2.VideoCapture(video_path)

cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

tracks = []
next_id = 0
max_age = 30
min_hits = 3
frame_count = 0
process_every = 1

True


In [None]:
class KalmanTracker:
    def __init__(self):
        self.kf = cv2.KalmanFilter(4, 2)
        self.kf.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
        self.kf.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
        
        self.kf.processNoiseCov = np.eye(4, dtype=np.float32) * 0.01  #
        self.kf.measurementNoiseCov = np.eye(2, dtype=np.float32) * 5

        
    def predict(self):
        return self.kf.predict()[:2].flatten()
    
    def update(self, measurement):
        return self.kf.correct(measurement)[:2].flatten()

class Track:
    def __init__(self, detection, track_id):
        self.id = track_id
        self.tracker = KalmanTracker()
        self.tracker.kf.statePost = np.array([[detection[0]], [detection[1]], [0], [0]], np.float32)
        self.age = 0
        self.total_visible_count = 1
        self.consecutive_invisible_count = 0
        self.color = (np.random.randint(0, 255), np.random.randint(0, 255), np.random.randint(0, 255))
        self.last_pos = detection


In [51]:
while cap.isOpened():
    success, frame = cap.read()
    
    if not success:
        break
    
    frame_count += 2
    if frame_count % process_every != 0:
        continue
    
    frame = cv2.resize(frame, (frame.shape[1], frame.shape[0]))
    
    results = model(frame, classes=vehicle_classes, verbose=False, device='cuda', imgsz=640)
    
    detections = []
    for box in results[0].boxes:
        x_center = int((box.xyxy[0][0] + box.xyxy[0][2]) / 2)
        y_center = int((box.xyxy[0][1] + box.xyxy[0][3]) / 2)
        detections.append([x_center, y_center])
    
    for track in tracks:
        track.last_pos = track.tracker.predict()
    
    if len(tracks) > 0 and len(detections) > 0:
        cost_matrix = np.zeros((len(tracks), len(detections)))
        for i, track in enumerate(tracks):
            for j, det in enumerate(detections):
                cost_matrix[i, j] = np.linalg.norm(track.last_pos - np.array(det))
        
        row_ind, col_ind = linear_sum_assignment(cost_matrix)
        matched_tracks, matched_detections = set(), set()
        
        for i, j in zip(row_ind, col_ind):
            if cost_matrix[i, j] < 100:
                tracks[i].last_pos = tracks[i].tracker.update(np.array([[detections[j][0]], [detections[j][1]]], np.float32))
                tracks[i].age += 1
                tracks[i].total_visible_count += 1
                tracks[i].consecutive_invisible_count = 0
                matched_tracks.add(i)
                matched_detections.add(j)
        
        for i, track in enumerate(tracks):
            if i not in matched_tracks:
                track.consecutive_invisible_count += 1
                track.age += 1
        
        for j, det in enumerate(detections):
            if j not in matched_detections:
                tracks.append(Track(det, next_id))
                next_id += 1
    elif len(detections) > 0:
        for det in detections:
            tracks.append(Track(det, next_id))
            next_id += 1
    else:
        for track in tracks:
            track.consecutive_invisible_count += 1
            track.age += 1
    
    tracks = [t for t in tracks if t.consecutive_invisible_count < max_age]
    
    for track in tracks:
        if track.total_visible_count >= min_hits:
            cv2.circle(frame, (int(track.last_pos[0]), int(track.last_pos[1])), radius=5, color=track.color, thickness=-1)
            cv2.putText(frame, str(track.id), (int(track.last_pos[0]) + 10, int(track.last_pos[1]) - 10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, track.color, 2)
    
    cv2.imshow('Video', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    
    if frame_count % 30 == 0:
        gc.collect()

cap.release()
cv2.destroyAllWindows()