Here is a realistic, educational Python project for a basic traffic monitoring system using classical computer vision techniques (no deep learning models like YOLO/SSD are used here).

The code combines several methods you mentioned earlier in the course:

Background subtraction (MOG2 + KNN)

Optical flow (sparse Lucas-Kanade + dense Farneback)

Object tracking (MeanShift + CamShift)

Simple motion estimation & counting logic

<hr>

# Traffic Monitoring System – Educational Classical CV Project

# Features:

  • Background subtraction (MOG2 and KNN)

  • Sparse optical flow (Lucas-Kanade on good features)

  • Dense optical flow (Farneback) – visualization only

  • Object tracking with MeanShift and CamShift

  • Basic vehicle counting line + direction estimation



In [1]:
import cv2

import numpy as np

import argparse

import time

from collections import deque

In [13]:
from IPython.display import Video

In [2]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.4.14-py3-none-any.whl.metadata (39 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.4.14-py3-none-any.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m33.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.4.14 ultralytics-thop-2.0.18


In [3]:
from ultralytics import YOLO

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [23]:
# CONFIGURATION

VIDEO_PATH = "/content/traffic.mp4"      # Upload your video
MODEL_PATH = "/content/yolov8n.pt"


In [24]:
# COCO vehicle classes
VEHICLE_CLASSES = [2, 3, 5, 7]  # car, motorcycle, bus, truck

# Congestion parameters
LOW_SPEED_THRESHOLD = 10        # pixels per second
MIN_VEHICLES = 8
MIN_LOW_SPEED = 5

MAX_MISSING = 30                # frames before removing vehicle
RESIZE_WIDTH = 960
RESIZE_HEIGHT = 540

In [25]:
# Load Model
model = YOLO(MODEL_PATH)


In [26]:
# VIDEO CAPTURE

cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
    fps = 30

print("FPS:", fps)

FPS: 30


In [27]:
# Output video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, fps, (RESIZE_WIDTH, RESIZE_HEIGHT))

In [28]:
# TRACKING MEMORY

prev_positions = {}
speed_memory = {}
last_seen = {}

In [35]:
# =========================================================
# Highway Traffic Monitoring System
# Vehicle Counting + Speed Estimation + Congestion Detection
# =========================================================

# pip install ultralytics opencv-python numpy

import cv2
import numpy as np
from ultralytics import YOLO

# -----------------------------
# CONFIGURATION
# -----------------------------
VIDEO_PATH = "/traffic_circle.mp4"
OUTPUT_PATH = "output_video.mp4"
MODEL_PATH = "yolov8n.pt"

# COCO vehicle classes
# car=2, motorcycle=3, bus=5, truck=7
VEHICLE_CLASSES = [2, 3, 5, 7]

# Speed & congestion parameters
LOW_SPEED_THRESHOLD = 10        # pixels per second
MIN_VEHICLES = 8
MIN_LOW_SPEED = 5
MAX_MISSING = 30                # remove vehicle after N missing frames

RESIZE_WIDTH = 960
RESIZE_HEIGHT = 540

# -----------------------------
# LOAD YOLO MODEL
# -----------------------------
model = YOLO(MODEL_PATH)

# -----------------------------
# OPEN VIDEO
# -----------------------------
cap = cv2.VideoCapture(VIDEO_PATH)

fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
    fps = 30

print("Video FPS:", fps)

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(
    OUTPUT_PATH,
    fourcc,
    fps,
    (RESIZE_WIDTH, RESIZE_HEIGHT)
)

# -----------------------------
# TRACKING MEMORY
# -----------------------------
prev_positions = {}
speed_memory = {}
last_seen = {}

# =========================================================
# MAIN PROCESSING LOOP
# =========================================================
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame = cv2.resize(frame, (RESIZE_WIDTH, RESIZE_HEIGHT))

    # Run YOLO tracking
    results = model.track(
        frame,
        persist=True,
        conf=0.3,
        iou=0.5,
        verbose=False
    )

    current_ids = set()

    if results[0].boxes.id is not None:

        boxes = results[0].boxes.xyxy.cpu().numpy()
        ids = results[0].boxes.id.cpu().numpy()
        classes = results[0].boxes.cls.cpu().numpy()

        for box, track_id, cls in zip(boxes, ids, classes):

            if int(cls) not in VEHICLE_CLASSES:
                continue

            x1, y1, x2, y2 = map(int, box)

            # Calculate center
            cx = int((x1 + x2) / 2)
            cy = int((y1 + y2) / 2)

            current_ids.add(track_id)
            last_seen[track_id] = 0

            # -----------------------------
            # SPEED CALCULATION
            # -----------------------------
            if track_id in prev_positions:
                px, py = prev_positions[track_id]

                distance = np.sqrt((cx - px) ** 2 + (cy - py) ** 2)
                speed = distance * fps   # pixels per second
            else:
                speed = 0

            prev_positions[track_id] = (cx, cy)
            speed_memory[track_id] = speed

            # -----------------------------
            # DRAW BOX
            # -----------------------------
            color = (0, 255, 0)  # green

            if speed < LOW_SPEED_THRESHOLD:
                color = (0, 0, 255)  # red if slow

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            cv2.putText(
                frame,
                f"ID:{int(track_id)}",
                (x1, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                color,
                2
            )

            cv2.putText(
                frame,
                f"Speed:{int(speed)}",
                (x1, y2 + 15),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (255, 0, 0),
                2
            )

    # -----------------------------
    # REMOVE MISSING VEHICLES
    # -----------------------------
    for tid in list(last_seen.keys()):
        if tid not in current_ids:
            last_seen[tid] += 1
            if last_seen[tid] > MAX_MISSING:
                last_seen.pop(tid, None)
                prev_positions.pop(tid, None)
                speed_memory.pop(tid, None)

    # -----------------------------
    # VEHICLE COUNTING
    # -----------------------------
    vehicle_count = len(speed_memory)

    slow_vehicle_count = sum(
        1 for s in speed_memory.values()
        if s < LOW_SPEED_THRESHOLD
    )

    # -----------------------------
    # CONGESTION DETECTION
    # -----------------------------
    if vehicle_count > MIN_VEHICLES and slow_vehicle_count > MIN_LOW_SPEED:
        cv2.putText(
            frame,
            "CONGESTION DETECTED",
            (50, 50),
            cv2.FONT_HERSHEY_SIMPLEX,
            1.2,
            (0, 0, 255),
            3
        )

    # Display counts
    cv2.putText(
        frame,
        f"Vehicles: {vehicle_count}",
        (50, 90),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.8,
        (0, 255, 255),
        2
    )

    cv2.putText(
        frame,
        f"Slow Vehicles: {slow_vehicle_count}",
        (50, 120),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.8,
        (0, 255, 255),
        2
    )

    out.write(frame)

# =========================================================
# CLEANUP
# =========================================================
cap.release()
out.release()
cv2.destroyAllWindows()

print("Processing Complete ✅")
print("Output saved as:", OUTPUT_PATH)


Video FPS: 29.97002997002997
Processing Complete ✅
Output saved as: output_video.mp4
