In [7]:
import cv2
import time
import numpy as np
from ultralytics import YOLO
from collections import defaultdict, deque
import os
import json
import math

# --- CONFIGURATION ---
MODEL_PATH = "yolov8n.pt"
VIDEO_PATH = "cars.mp4"
CONFIDENCE_THRESHOLD = 0.5

SCALE = 0.5
FRAME_SKIP = 2  # Process every 2nd frame

REAL_WORLD_WIDTH_METERS = 15.0
REAL_WORLD_HEIGHT_METERS = 48.0

STATIONARY_PIXEL_THRESHOLD = 3  # pixel threshold for stationary detection
STATIONARY_RESET_THRESHOLD_FRAMES = 8
SPEED_BUFFER_MAXLEN = 5

SOURCE_POINTS_FILE = "perspective_points.json"

# --- INITIALIZATION ---
model = YOLO(MODEL_PATH)
kalman_filters = {}
track_history = defaultdict(list)
speed_estimates = {}
stationary_frames_count = defaultdict(int)
speed_buffer = defaultdict(lambda: deque(maxlen=SPEED_BUFFER_MAXLEN))
SOURCE_POINTS = None

# --- HELPER FUNCTIONS ---
def create_kalman_filter():
    kf = cv2.KalmanFilter(4, 2)
    kf.transitionMatrix = np.array([[1,0,1,0],[0,1,0,1],[0,0,1,0],[0,0,0,1]], np.float32)
    kf.measurementMatrix = np.array([[1,0,0,0],[0,1,0,0]], np.float32)
    kf.processNoiseCov = np.eye(4, dtype=np.float32) * 0.03
    kf.measurementNoiseCov = np.eye(2, dtype=np.float32) * 0.5
    kf.errorCovPost = np.eye(4, dtype=np.float32)
    return kf

def cleanup_stale_trackers(current_track_ids):
    known_track_ids = set(kalman_filters.keys())
    stale_ids = known_track_ids - set(current_track_ids)
    for track_id in stale_ids:
        del kalman_filters[track_id]
        del track_history[track_id]
        speed_estimates.pop(track_id, None)
        stationary_frames_count.pop(track_id, None)
        speed_buffer.pop(track_id, None)

def mouse_callback(event, x, y, flags, params):
    if event == cv2.EVENT_LBUTTONDOWN and len(params['points']) < 4:
        params['points'].append((x, y))
        print(f"Point {len(params['points'])} added: ({x}, {y})")

def get_perspective_transform_points(cap):
    global SOURCE_POINTS
    if os.path.exists(SOURCE_POINTS_FILE):
        with open(SOURCE_POINTS_FILE, 'r') as f:
            points = json.load(f)
            SOURCE_POINTS = np.array(points, dtype="float32")
            return

    ret, frame = cap.read()
    if not ret:
        raise SystemExit("Error: Could not read the first frame.")

    clone = frame.copy()
    params = {'points': []}
    cv2.namedWindow("Select 4 Points")
    cv2.setMouseCallback("Select 4 Points", mouse_callback, params)
    print("Select 4 points: Top-Left, Top-Right, Bottom-Right, Bottom-Left")

    while True:
        temp = clone.copy()
        for i, pt in enumerate(params['points']):
            cv2.circle(temp, pt, 5, (0,255,0), -1)
            cv2.putText(temp, str(i+1), (pt[0]+10, pt[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255),2)
        cv2.imshow("Select 4 Points", temp)
        key = cv2.waitKey(1) & 0xFF
        if len(params['points']) == 4 and (key == 13 or key == 32):
            break
        elif key == ord('r'):
            params['points'] = []
            print("Points reset. Select again.")
        elif key == ord('q'):
            raise SystemExit("Calibration cancelled.")

    cv2.destroyAllWindows()
    SOURCE_POINTS = np.array(params['points'], dtype="float32")
    with open(SOURCE_POINTS_FILE, 'w') as f:
        json.dump(params['points'], f)
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

def process_frame(frame, model, transform_matrix):
    h, w = frame.shape[:2]
    frame_small = cv2.resize(frame, (int(w*SCALE), int(h*SCALE)))
    results = model.track(frame_small, persist=True, classes=[2,3,5,7], conf=CONFIDENCE_THRESHOLD, verbose=False)
    detections = []

    if results[0].boxes is not None and results[0].boxes.id is not None:
        boxes = results[0].boxes.xyxy.cpu().numpy()
        track_ids = results[0].boxes.id.cpu().numpy().astype(int)
        for box, track_id in zip(boxes, track_ids):
            x1, y1, x2, y2 = (box/SCALE).astype(int)
            cx, cy = (x1+x2)//2, (y1+y2)//2
            pt = np.array([[[cx, cy]]], dtype="float32")
            transformed_pt = cv2.perspectiveTransform(pt, transform_matrix)[0][0]
            detections.append({"box": (x1,y1,x2,y2), "track_id": track_id, "transformed_point": transformed_pt})
    return detections

def update_tracker(detections, fps, pixels_per_meter):
    stationary_threshold_m = STATIONARY_PIXEL_THRESHOLD / pixels_per_meter['y']

    for det in detections:
        tid = det['track_id']
        point = det['transformed_point']

        if tid not in kalman_filters:
            kalman_filters[tid] = create_kalman_filter()
        kf = kalman_filters[tid]
        kf.predict()
        kf.correct(np.array(point, dtype=np.float32))
        track_history[tid].append(tuple(kf.statePost[:2].flatten()))

        speed_kmh = 0.0
        if len(track_history[tid]) > 1:
            prev, curr = track_history[tid][-2], track_history[tid][-1]
            dx = (curr[0] - prev[0]) / pixels_per_meter['x']
            dy = (curr[1] - prev[1]) / pixels_per_meter['y']
            meters_dist = np.sqrt(dx**2 + dy**2)
            time_elapsed = FRAME_SKIP / fps

            if meters_dist > stationary_threshold_m:
                stationary_frames_count[tid] = 0
                speed_kmh = meters_dist / time_elapsed * 3.6
            else:
                stationary_frames_count[tid] += 1
                speed_kmh = 0

        if stationary_frames_count[tid] >= STATIONARY_RESET_THRESHOLD_FRAMES:
            speed_estimates[tid] = 0
            speed_buffer[tid].clear()
            track_history[tid].clear()
            kf.statePost[2:] = 0
        else:
            speed_buffer[tid].append(speed_kmh)
            speed_estimates[tid] = int(sum(speed_buffer[tid])/len(speed_buffer[tid])) if speed_buffer[tid] else 0

def draw_annotations(frame, detections):
    for det in detections:
        x1,y1,x2,y2 = det['box']
        tid = det['track_id']
        speed = speed_estimates.get(tid, 0)
        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
        label = f"ID {tid}" if speed==0 else f"ID {tid} | {speed} km/h"
        (w,h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
        cv2.rectangle(frame, (x1, y1-h-10), (x1+w, y1), (0,255,0), -1)
        cv2.putText(frame, label, (x1,y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,0), 2)
    return frame

# --- MAIN EXECUTION ---
def main():
    cap = cv2.VideoCapture(VIDEO_PATH)
    if not cap.isOpened():
        raise SystemExit(f"Cannot open {VIDEO_PATH}")

    VIDEO_FPS = cap.get(cv2.CAP_PROP_FPS) or 30.0
    print(f"Video FPS: {VIDEO_FPS:.2f}")

    get_perspective_transform_points(cap)
    dest_w = 800
    dest_h = int(dest_w * (REAL_WORLD_HEIGHT_METERS/REAL_WORLD_WIDTH_METERS))
    DEST_POINTS = np.float32([[0,0],[dest_w-1,0],[dest_w-1,dest_h-1],[0,dest_h-1]])
    TRANSFORM_MATRIX = cv2.getPerspectiveTransform(SOURCE_POINTS, DEST_POINTS)
    pixels_per_meter = {'x': dest_w/REAL_WORLD_WIDTH_METERS, 'y': dest_h/REAL_WORLD_HEIGHT_METERS}

    frame_counter = 0
    last_detections = []

    try:
        while True:
            ret, frame = cap.read()
            if not ret: break
            frame_counter += 1

            if frame_counter % FRAME_SKIP == 0:
                detections = process_frame(frame, model, TRANSFORM_MATRIX)
                update_tracker(detections, VIDEO_FPS, pixels_per_meter)
                current_ids = [d['track_id'] for d in detections]
                cleanup_stale_trackers(current_ids)
                last_detections = detections

            annotated = draw_annotations(frame, last_detections)
            cv2.polylines(annotated, [np.int32(SOURCE_POINTS)], True, (0,0,255),2)
            cv2.imshow("Vehicle Speed Detection", annotated)
            if cv2.waitKey(1) & 0xFF == ord("q"): break
    finally:
        cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


Video FPS: 29.97
