In [7]:
"""
Highway Congestion and Speed Analysis
-------------------------------------
A classical computer vision pipeline for tracking vehicle speeds,
detecting stopped vehicles, and analyzing highway congestion using
Background Subtraction (MOG2) and Perspective Transformation (Homography).

Author: Raj Antala
"""

import cv2
import numpy as np
import math

# Safe import: Allows code to run in Google Colab OR locally on a PC
try:
    from google.colab import files
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

# ============================================================================
# CONFIGURATION
# ============================================================================
INPUT_VIDEO_PATH = "traffic_video.mp4"
OUTPUT_VIDEO_PATH = "high_accuracy_congestion.mp4"

cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
if not cap.isOpened():
    print(f"Error: Cannot open video. Please check the file path.")
    exit()

FRAME_WIDTH, FRAME_HEIGHT = 800, 600
fps = int(cap.get(cv2.CAP_PROP_FPS))
if fps == 0: fps = 30

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, fps, (FRAME_WIDTH, FRAME_HEIGHT))

# ============================================================================
# 1. HOMOGRAPHY (BIRD'S EYE VIEW) SETUP
# ============================================================================
src_points = np.float32([[300, 250], [500, 250], [100, 550], [700, 550]])
dst_points = np.float32([[0, 0], [400, 0], [0, 800], [400, 800]])
perspective_matrix = cv2.getPerspectiveTransform(src_points, dst_points)

# Assuming the mapped 800-pixel long stretch represents 40 meters of real highway.
METERS_PER_PIXEL_WARPED = 40.0 / 800.0

# ============================================================================
# 2. CENTROID TRACKER CLASS
# ============================================================================
class CentroidTracker:
    def __init__(self, max_disappeared=15):
        self.next_id = 0
        self.objects = {}
        self.max_disappeared = max_disappeared

    def register(self, centroid):
        self.objects[self.next_id] = {'centroids': [centroid], 'speeds': [], 'disappeared': 0}
        self.next_id += 1

    def deregister(self, object_id):
        del self.objects[object_id]

    def update(self, rects):
        if len(rects) == 0:
            for obj_id in list(self.objects.keys()):
                self.objects[obj_id]['disappeared'] += 1
                if self.objects[obj_id]['disappeared'] > self.max_disappeared:
                    self.deregister(obj_id)
            return self.objects

        input_centroids = np.zeros((len(rects), 2), dtype="int")
        for i, (x, y, w, h) in enumerate(rects):
            input_centroids[i] = (int(x + w / 2.0), int(y + h / 2.0))

        if len(self.objects) == 0:
            for i in range(0, len(input_centroids)):
                self.register(input_centroids[i])
        else:
            object_ids = list(self.objects.keys())
            object_centroids = [self.objects[obj_id]['centroids'][-1] for obj_id in object_ids]

            D = np.linalg.norm(np.array(object_centroids)[:, np.newaxis] - input_centroids, axis=2)
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            used_rows, used_cols = set(), set()

            for row, col in zip(rows, cols):
                if row in used_rows or col in used_cols: continue
                if D[row, col] > 80: continue

                obj_id = object_ids[row]
                self.objects[obj_id]['centroids'].append(input_centroids[col])
                self.objects[obj_id]['disappeared'] = 0

                # INCREASED MEMORY: Keep the last 20 frames for smoother trajectory calculation
                if len(self.objects[obj_id]['centroids']) > 20:
                    self.objects[obj_id]['centroids'].pop(0)

                used_rows.add(row)
                used_cols.add(col)

            unused_rows = set(range(0, D.shape[0])).difference(used_rows)
            unused_cols = set(range(0, D.shape[1])).difference(used_cols)

            for row in unused_rows:
                obj_id = object_ids[row]
                self.objects[obj_id]['disappeared'] += 1
                if self.objects[obj_id]['disappeared'] > self.max_disappeared:
                    self.deregister(obj_id)

            for col in unused_cols:
                self.register(input_centroids[col])

        return self.objects

# ============================================================================
# 3. MAIN PROCESSING PIPELINE
# ============================================================================
mog = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=50, detectShadows=True)
tracker = CentroidTracker()

print("Processing clean, high-accuracy video...")
frame_count = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break

    frame_count += 1
    if frame_count % 50 == 0: print(f"Processed {frame_count} frames...")

    frame = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT))

    # ACCURACY UPGRADE 1: Strict Shadow Rejection
    fgmask = mog.apply(frame, learningRate=0.002)
    # Thresholding at 254 drops the gray shadows (127) and keeps only the solid white cars (255)
    _, fgmask = cv2.threshold(fgmask, 254, 255, cv2.THRESH_BINARY)

    kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
    kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel_open)
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_CLOSE, kernel_close)

    contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    rects = []
    for cnt in contours:
        if cv2.contourArea(cnt) > 1200:
            x, y, w, h = cv2.boundingRect(cnt)
            rects.append((x, y, w, h))

    tracked_objects = tracker.update(rects)

    stopped_count = 0
    active_speeds = []

    for obj_id, data in tracked_objects.items():
        if data['disappeared'] > 0: continue

        centroids = data['centroids']
        current_x, current_y = centroids[-1]

        # ACCURACY UPGRADE 2: Calculate speed over a 15-frame gap to eliminate micro-jitter
        if len(centroids) >= 15:
            past_x, past_y = centroids[-15]

            pts = np.array([[[past_x, past_y], [current_x, current_y]]], dtype=np.float32)
            warped_pts = cv2.perspectiveTransform(pts, perspective_matrix)

            w_past_x, w_past_y = warped_pts[0][0]
            w_curr_x, w_curr_y = warped_pts[0][1]

            distance_pixels = math.sqrt((w_curr_x - w_past_x)**2 + (w_curr_y - w_past_y)**2)
            distance_meters = distance_pixels * METERS_PER_PIXEL_WARPED

            time_seconds = 15.0 / fps
            speed_mps = distance_meters / time_seconds
            speed_kmph = speed_mps * 3.6

            # Smooth the speed output using a larger moving average
            data['speeds'].append(speed_kmph)
            if len(data['speeds']) > 10: data['speeds'].pop(0)
            avg_smoothed_speed = np.mean(data['speeds'])

            if avg_smoothed_speed < 3.0:
                cv2.circle(frame, (current_x, current_y), 5, (0, 0, 255), -1)
                cv2.putText(frame, f"ID {obj_id}: STOPPED", (current_x - 20, current_y - 15),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                stopped_count += 1
            else:
                cv2.circle(frame, (current_x, current_y), 5, (0, 255, 0), -1)
                cv2.putText(frame, f"ID {obj_id}: {avg_smoothed_speed:.1f} km/h", (current_x - 20, current_y - 15),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                active_speeds.append(avg_smoothed_speed)

    # Dashboard overlay
    highway_avg_speed = np.mean(active_speeds) if len(active_speeds) > 0 else 0
    cv2.rectangle(frame, (10, 10), (380, 100), (0, 0, 0), -1)
    cv2.putText(frame, f"Avg Flow: {highway_avg_speed:.1f} km/h", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    cv2.putText(frame, f"Stopped Vehicles: {stopped_count}", (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255) if stopped_count > 0 else (0, 255, 0), 2)

    out.write(frame)

cap.release()
out.release()

print(f"\nSuccess! Total frames processed: {frame_count}")
print(f"Video saved as: {OUTPUT_VIDEO_PATH}")

# Triggers download only if running inside Google Colab
if IN_COLAB:
    print("Initiating download to local machine...")
    files.download(OUTPUT_VIDEO_PATH)

Processing clean, high-accuracy video...
Processed 50 frames...
Processed 100 frames...
Processed 150 frames...
Processed 200 frames...
Processed 250 frames...
Processed 300 frames...
Processed 350 frames...
Processed 400 frames...
Processed 450 frames...
Processed 500 frames...

Success! Total frames processed: 538
Video saved as: high_accuracy_congestion.mp4
Initiating download to local machine...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>