In [None]:
import cv2

video_path = r"C:\Users\Administrator\Desktop\Input_Video.mp4"

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

ret, frame = cap.read()
cap.release()

if not ret:
    print("Error: Could not read frame.")
    exit()

frame_width = 1280
frame_height = 720

line1 = (425, 0, 0, 190)
line2 = (655, 0, 1280, 190)
line3 = (0, 30, 1280, 30)
line4 = (120, 137, 566, 137)
line5 = (587, 137, 1150, 137)

dots = [
    (355, 30),
    (0, 190),
    (745, 30),
    (1280, 190)
]

cv2.line(frame, (line1[0], line1[1]), (line1[2], line1[3]), (0, 255, 0), 2)
cv2.line(frame, (line2[0], line2[1]), (line2[2], line2[3]), (0, 255, 0), 2)
cv2.line(frame, (line3[0], line3[1]), (line3[2], line3[3]), (0, 255, 0), 2)
cv2.line(frame, (line4[0], line4[1]), (line4[2], line4[3]), (0, 255, 0), 2)
cv2.line(frame, (line5[0], line5[1]), (line5[2], line5[3]), (0, 255, 0), 2)

for (x, y) in dots:
    cv2.circle(frame, (x, y), 6, (0, 0, 255), -1)

cv2.imshow("Single Frame with Lines and Dots", frame)
cv2.waitKey(0)
cv2.destroyAllWindows()


In [3]:
import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from collections import deque

VIDEO_PATH = "C:/Users/Administrator/Desktop/Input_Video.mp4"
MODEL_PATH = "yolov8m.pt"
TARGET_CLASSES = ["car", "truck", "motorcycle"]

MIN_SPEED_KMH = 2.0
SMOOTH_FRAMES = 5
ALPHA = 0.3
WINDOW_SIZE = 10

line4 = (120, 137, 566, 137)   
line5 = (587, 137, 1150, 137)  

src_pts = np.array([
    [-1187.3, 720],
    [3022.0, 720],
    [355, 30],
    [745, 30]
], dtype=np.float32)

dst_pts = np.array([
    [0, 100],   
    [20, 100],  
    [0, 0],    
    [20, 0]     
], dtype=np.float32)

H, _ = cv2.findHomography(src_pts, dst_pts)

model = YOLO(MODEL_PATH)

tracker = DeepSort(max_age=5)

track_history = {}    
speed_cache = {}     
speed_window = {}    
last_center = {}       

forward_count = 0
reverse_count = 0
already_counted_line4 = set()
already_counted_line5 = set()

COLOR_PALETTE = [
    (255, 0, 0),    
    (0, 255, 0),    
    (0, 0, 255),    
    (255, 255, 0),  
    (255, 0, 255),  
    (0, 255, 255),  
    (128, 0, 128),  
    (255, 165, 0),  
    (0, 128, 128),  
    (128, 128, 0)  
]

def get_color(track_id):
    idx = int(track_id) % len(COLOR_PALETTE)
    return COLOR_PALETTE[idx]

def crosses_horizontal_line(p_prev, p_curr, x1, y_line, x2, tol=3):
    x1, x2 = (x1, x2) if x1 <= x2 else (x2, x1)
    (x0, y0) = p_prev
    (x1c, y1c) = p_curr

    if abs(y1c - y0) < 1e-6:
        if abs(y0 - y_line) <= tol:
            xmin = min(x0, x1c) - tol
            xmax = max(x0, x1c) + tol
            return not (xmax < x1 or xmin > x2)
        return False

    if (y0 - y_line) * (y1c - y_line) > 0:
        if min(abs(y0 - y_line), abs(y1c - y_line)) <= tol:
            t = (y_line - y0) / (y1c - y0) if abs(y1c - y0) > 1e-6 else 0.0
            xi = x0 + t * (x1c - x0)
            return (xi >= x1 - tol) and (xi <= x2 + tol)
        return False

    t = (y_line - y0) / (y1c - y0)
    xi = x0 + t * (x1c - x0)
    return (xi >= x1 - tol) and (xi <= x2 + tol)

cap = cv2.VideoCapture(VIDEO_PATH)
FPS = cap.get(cv2.CAP_PROP_FPS)
if FPS == 0:
    FPS = 30

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
output_path = "Output_Video.mp4"
out = cv2.VideoWriter(output_path, fourcc, FPS, (frame_width, frame_height))

while True:
    ret, frame = cap.read()
    if not ret:
        break

    cv2.line(frame, (line4[0], line4[1]), (line4[2], line4[3]), (255, 255, 255), 2)
    cv2.line(frame, (line5[0], line5[1]), (line5[2], line5[3]), (255, 255, 255), 2)

    results = model.predict(frame, imgsz=640, verbose=False)[0]

    detections = []
    for box in results.boxes:
        cls_id = int(box.cls[0])
        cls_name = model.names[cls_id]
        if cls_name not in TARGET_CLASSES:
            continue
        x1b, y1b, x2b, y2b = map(int, box.xyxy[0])
        conf = float(box.conf[0])
        detections.append(([x1b, y1b, x2b - x1b, y2b - y1b], conf, cls_name))

    tracks = tracker.update_tracks(detections, frame=frame)
    current_frame_no = int(cap.get(cv2.CAP_PROP_POS_FRAMES))

    for track in tracks:
        if not track.is_confirmed():
            continue

        track_id = track.track_id
        ltrb = track.to_ltrb()
        if ltrb is None:
            continue

        x1b, y1b, x2b, y2b = map(int, ltrb)
        cls_name = track.get_det_class() or "object"

        cx = int((x1b + x2b) / 2)
        cy = int((y1b + y2b) / 2)
        cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)

        prev_center = last_center.get(track_id)

        point = np.array([[[cx, cy]]], dtype=np.float32)
        transformed_point = cv2.perspectiveTransform(point, H)[0][0]
        x_m, y_m = transformed_point
        frame_no = current_frame_no

        if track_id not in track_history:
            track_history[track_id] = deque(maxlen=SMOOTH_FRAMES)
        track_history[track_id].append((x_m, y_m, frame_no))

        if len(track_history[track_id]) >= 2:
            old_x, old_y, old_frame = track_history[track_id][0]
            dist_m = np.linalg.norm([x_m - old_x, y_m - old_y])
            time_s = (frame_no - old_frame) / FPS
            raw_speed = (dist_m / time_s) * 3.6 if time_s > 0 else 0.0
        else:
            raw_speed = 0.0

        if track_id not in speed_window:
            speed_window[track_id] = deque(maxlen=WINDOW_SIZE)
        speed_window[track_id].append(raw_speed)

        if len(speed_window[track_id]) == WINDOW_SIZE:
            avg_speed = sum(speed_window[track_id]) / WINDOW_SIZE
            prev_speed = speed_cache.get(track_id, 0.0)
            speed_kmh = ALPHA * avg_speed + (1 - ALPHA) * prev_speed
            speed_cache[track_id] = speed_kmh
        else:
            speed_kmh = speed_cache.get(track_id, 0.0)

        if speed_kmh < MIN_SPEED_KMH:
            speed_kmh = 0.0

        if prev_center is not None:
            if (track_id not in already_counted_line4) and \
               crosses_horizontal_line(prev_center, (cx, cy), line4[0], line4[1], line4[2]):
                forward_count += 1
                already_counted_line4.add(track_id)

            if (track_id not in already_counted_line5) and \
               crosses_horizontal_line(prev_center, (cx, cy), line5[0], line5[1], line5[2]):
                reverse_count += 1
                already_counted_line5.add(track_id)

        last_center[track_id] = (cx, cy)

        box_color = get_color(track_id)
        label = f"{cls_name} {speed_kmh:.1f} km/h"
        cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), box_color, 2)
        cv2.putText(frame, f"ID {track_id} {label}",
                    (x1b, y1b - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)

    cv2.putText(frame, f"Counter 1: {forward_count}", (20, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
    cv2.putText(frame, f"Counter 2: {reverse_count}", (20, 70),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    out.write(frame)
    cv2.imshow("YOLO + Smoothed Speed + Counters", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
out.release()
cv2.destroyAllWindows()
print(f"Output video saved to: {output_path}")

Output video saved to: C:/Users/Administrator/Desktop/Output_Video.mp4
