In [None]:
import numpy as np
import cv2
import torch
from ultralytics import YOLO
import math
import time

# Define vehicle classes from COCO dataset
VEHICLE_CLASSES = [2, 3, 5, 7]  # car, motorcycle, bus, truck
CLASS_NAMES = {
    2: 'Car',
    3: 'Motorcycle',
    5: 'Bus',
    7: 'Truck'
}

In [None]:
class MotionCompensator:
    def __init__(self):
        # Parameters for optical flow
        self.feature_params = dict(
            maxCorners=100,
            qualityLevel=0.3,
            minDistance=7,
            blockSize=7
        )
        self.lk_params = dict(
            winSize=(15, 15),
            maxLevel=2,
            criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
        )
        self.prev_gray = None
        self.prev_points = None
        
    def estimate_camera_motion(self, frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        if self.prev_gray is None:
            self.prev_gray = gray
            self.prev_points = cv2.goodFeaturesToTrack(gray, mask=None, **self.feature_params)
            return np.eye(3)  # Return identity matrix for first frame
        
        if self.prev_points is None or len(self.prev_points) < 10:
            self.prev_points = cv2.goodFeaturesToTrack(self.prev_gray, mask=None, **self.feature_params)
            if self.prev_points is None:
                self.prev_gray = gray
                return np.eye(3)
        
        # Calculate optical flow
        next_points, status, _ = cv2.calcOpticalFlowPyrLK(
            self.prev_gray, gray, self.prev_points, None, **self.lk_params
        )
        
        if next_points is None or len(next_points) < 4:
            self.prev_gray = gray
            return np.eye(3)
        
        # Filter good points
        good_old = self.prev_points[status == 1]
        good_new = next_points[status == 1]
        
        if len(good_old) < 4 or len(good_new) < 4:
            self.prev_gray = gray
            self.prev_points = next_points
            return np.eye(3)
        
        # Estimate rigid transform
        transform_matrix, _ = cv2.estimateAffinePartial2D(good_old, good_new)
        if transform_matrix is None:
            transform_matrix = np.eye(3)[:2]
        
        # Convert to 3x3 matrix
        full_transform = np.eye(3)
        full_transform[:2] = transform_matrix
        
        # Update previous frame and points
        self.prev_gray = gray
        self.prev_points = good_new.reshape(-1, 1, 2)
        
        return full_transform

In [None]:
class SpeedEstimator:
    def __init__(self, camera_fps=30, px_to_meter_ratio=10):
        self.fps = camera_fps
        self.px_to_meter_ratio = px_to_meter_ratio
        self.prev_positions = {}
        self.prev_times = {}
        self.speed_buffer = {}
        self.buffer_size = 5
        
    def calculate_speed(self, track_id, current_pos, current_time, camera_motion):
        if track_id not in self.prev_positions:
            self.prev_positions[track_id] = current_pos
            self.prev_times[track_id] = current_time
            self.speed_buffer[track_id] = []
            return None
        
        # Get previous position and time
        prev_pos = self.prev_positions[track_id]
        prev_time = self.prev_times[track_id]
        time_diff = current_time - prev_time
        
        if time_diff <= 0:
            return None
            
        # Compensate for camera motion
        adjusted_prev_pos = self.apply_transform(prev_pos, camera_motion)
        
        # Calculate displacement in pixels
        displacement = np.linalg.norm(current_pos - adjusted_prev_pos)
        
        # Convert to meters using ratio
        displacement_meters = displacement / self.px_to_meter_ratio
        
        # Calculate speed in m/s
        speed_ms = displacement_meters / time_diff
        
        # Convert to km/h
        speed_kmh = speed_ms * 3.6
        
        # Apply moving average filter
        self.speed_buffer[track_id].append(speed_kmh)
        if len(self.speed_buffer[track_id]) > self.buffer_size:
            self.speed_buffer[track_id].pop(0)
        
        filtered_speed = np.mean(self.speed_buffer[track_id])
        
        # Update previous position and time
        self.prev_positions[track_id] = current_pos
        self.prev_times[track_id] = current_time
        
        return filtered_speed
    
    def apply_transform(self, point, transform):
        # Convert point to homogeneous coordinates
        homog_point = np.array([point[0], point[1], 1])
        # Apply transform
        transformed = transform @ homog_point
        # Convert back to 2D
        return transformed[:2] / transformed[2]

In [2]:
class VehicleTracker:
    def __init__(self, video_path, yolo_model='yolov8n.pt', conf_thresh=0.5):
        self.model = YOLO(yolo_model)
        self.conf_thresh = conf_thresh
        self.cap = cv2.VideoCapture(video_path)
        self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        
        self.motion_compensator = MotionCompensator()
        self.speed_estimator = SpeedEstimator(camera_fps=self.fps)
        
        self.output_path = 'output_' + video_path.split('/')[-1]
        self.writer = cv2.VideoWriter(
            self.output_path,
            cv2.VideoWriter_fourcc(*'mp4v'),
            self.fps,
            (self.frame_width, self.frame_height)
        )
        
        self.tracks = {}
        self.next_id = 0
    
    def get_track_id(self, bbox, max_dist=50):
        center = np.array([(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2])
        
        for track_id, track_info in self.tracks.items():
            track_center = track_info['center']
            if np.linalg.norm(center - track_center) < max_dist:
                return track_id
                
        self.next_id += 1
        return self.next_id
    
    def process_video(self):
        while True:
            ret, frame = self.cap.read()
            if not ret:
                break
            
            # Estimate camera motion
            camera_motion = self.motion_compensator.estimate_camera_motion(frame)
            current_time = time.time()
            
            # Detect vehicles
            results = self.model(frame, conf=self.conf_thresh, classes=VEHICLE_CLASSES, verbose=False)[0]
            
            # Update active tracks
            active_tracks = set()
            
            for bbox, cls in zip(results.boxes.xyxy, results.boxes.cls):
                bbox = bbox.cpu().numpy()
                cls = int(cls)
                
                # Get center point
                center = np.array([(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2])
                
                # Get track ID
                track_id = self.get_track_id(bbox)
                active_tracks.add(track_id)
                
                # Update track info
                self.tracks[track_id] = {
                    'center': center,
                    'class': cls,
                    'bbox': bbox
                }
                
                # Calculate speed
                speed = self.speed_estimator.calculate_speed(
                    track_id, center, current_time, camera_motion
                )
                
                # Draw bounding box and information
                cv2.rectangle(frame, 
                    (int(bbox[0]), int(bbox[1])),
                    (int(bbox[2]), int(bbox[3])), 
                    (0, 255, 0), 2
                )
                
                if speed is not None:
                    label = f"{CLASS_NAMES.get(cls, 'Vehicle')} ID:{track_id} {speed:.1f}km/h"
                    cv2.putText(frame, label,
                        (int(bbox[0]), int(bbox[1] - 10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2
                    )
            
            # Remove inactive tracks
            self.tracks = {k: v for k, v in self.tracks.items() if k in active_tracks}
            
            # Write frame
            self.writer.write(frame)
            
            # Display frame
            cv2.imshow('Vehicle Tracking and Speed Estimation', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        self.cap.release()
        self.writer.release()
        cv2.destroyAllWindows()

def main():
    video_path = "image/rec.mp4"
    tracker = VehicleTracker(
        video_path=video_path,
        yolo_model='yolov8n.pt',
        conf_thresh=0.5
    )
    tracker.process_video()

if __name__ == "__main__":
    main()