In [3]:
"""
Plate Detection - Tail lights + license plate detection on full video
Works under perspective distortion (uses minAreaRect as robust fallback)

Pipeline per frame:
  1. detect_tail_lights()      -> finds the two rear lights and their extreme points
  2. detect_plate()            -> finds the license plate below the lights
  3. Optical flow tracking     -> propagates detected points frame-to-frame
  4. Periodic re-detection     -> corrects drift every N frames

Output: annotated video with lights (cyan), extreme points (blue),
        plate quadrilateral (green) drawn on every frame.

Usage:
    python plate_detection.py
"""

import os
import cv2
import numpy as np


# =========================
# CONFIGURATION
# =========================

CONFIG = {
    # Minimum brightness (V channel) to detect rear lights
    'V_LOWER': 210,

    # Ignore lights below this fraction of frame height (avoids road reflections)
    'MAX_Y_RATIO': 0.80,

    # Minimum height/width ratio for light blobs (lights are taller than wide)
    'MIN_VERTICAL_RATIO': 1.2,

    # Minimum blob area as fraction of total frame area
    'MIN_CONTOUR_AREA_RATIO': 0.00015,

    # Vertical tolerance (px) when extracting the outermost edge point of each light
    'Y_TOLERANCE': 5,

    # Brightness range for the license plate (white/reflective surface)
    'V_PLATE_LOW': 150,
    'V_PLATE_HIGH': 240,

    # How often (in frames) to run a full re-detection to correct tracking drift
    'REDETECTION_INTERVAL': 30,

    # How many frames to scan at the start before giving up on finding the car
    'MAX_SEARCH_FRAMES': 300,
}

VIDEO_PATH  = '/app/data/videos/input/1.mp4'
OUTPUT_PATH = '/app/data/videos/output/plate_detection_output.avi'


# =========================
# TAIL LIGHT DETECTION
# =========================

def detect_tail_lights(frame, config):
    """
    Detect the two rear tail lights and return their centers and outermost points.

    Strategy:
      - Threshold the V (brightness) channel to isolate bright blobs
      - Filter by size, aspect ratio, and vertical position
      - Split candidates into left/right clusters by x-coordinate
      - Compute area-weighted centers for each cluster
      - Find the outermost horizontal edge point at mid-height for each light

    Args:
        frame:  BGR image (numpy array)
        config: detection parameters dict

    Returns:
        fari:                 [(cx_L, cy_L), (cx_R, cy_R)] sorted left to right
        extremes:             {'SX': (x,y), 'DX': (x,y)} outermost edge points
        contours_per_cluster: [[contours_L], [contours_R]]
                              passed to detect_plate to avoid re-running the mask
        or (None, None, None) if detection fails
    """
    height, width = frame.shape[:2]

    # --- Brightness mask ---
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    V   = hsv[:, :, 2]

    mask = cv2.inRange(V, config['V_LOWER'], 255)

    # Close small gaps within blobs, remove isolated noise
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 9))
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN,  kernel)

    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    MIN_AREA = config['MIN_CONTOUR_AREA_RATIO'] * width * height
    MAX_Y    = int(height * config['MAX_Y_RATIO'])

    # --- Filter candidates ---
    candidates = []
    for c in contours:
        area = cv2.contourArea(c)
        if area < MIN_AREA:
            continue

        x, y, w, h = cv2.boundingRect(c)

        if y > MAX_Y:               # too low in frame (likely road reflection)
            continue
        if w == 0 or h == 0:
            continue
        if w / h > 1.0:             # wider than tall -> not a rear light
            continue
        if h / w < config['MIN_VERTICAL_RATIO']:
            continue

        M = cv2.moments(c)
        if M['m00'] == 0:
            continue

        cx = int(M['m10'] / M['m00'])
        cy = int(M['m01'] / M['m00'])
        candidates.append((c, cx, cy, area))

    if len(candidates) < 2:
        return None, None, None

    # --- Split into left / right clusters by x-coordinate ---
    candidates = sorted(candidates, key=lambda p: p[1])  # sort by cx
    mid       = len(candidates) // 2
    cluster_L = candidates[:mid]
    cluster_R = candidates[mid:]

    # --- Area-weighted centers ---
    fari = []
    for cluster in [cluster_L, cluster_R]:
        total_area = sum(a for _, _, _, a in cluster)
        cx_w = sum(cx * a for _, cx, _, a in cluster) / total_area
        cy_w = sum(cy * a for _, _, cy, a in cluster) / total_area
        fari.append((int(cx_w), int(cy_w)))

    # --- Outermost edge points at mid-height ---
    # For the left light we want the leftmost point; for the right, the rightmost.
    extremes = {}
    sides = [('SX', cluster_L, False), ('DX', cluster_R, True)]

    for name, cluster, want_rightmost in sides:
        y_mid    = int(np.mean([cy for _, _, cy, _ in cluster]))
        edge_pts = []

        for c, _, _, _ in cluster:
            for px, py in c[:, 0, :]:
                if abs(py - y_mid) <= config['Y_TOLERANCE']:
                    edge_pts.append((px, py))

        if not edge_pts:
            # Fallback: use the bounding-box edge of the whole cluster
            all_pts = np.vstack([c[:, 0, :] for c, _, _, _ in cluster])
            idx = all_pts[:, 0].argmax() if want_rightmost else all_pts[:, 0].argmin()
            extremes[name] = tuple(all_pts[idx])
        else:
            extremes[name] = (max if want_rightmost else min)(edge_pts, key=lambda p: p[0])

    # Pass raw contours to detect_plate so it can compute bottom-y accurately
    contours_per_cluster = [
        [c for c, _, _, _ in cluster_L],
        [c for c, _, _, _ in cluster_R],
    ]

    return fari, extremes, contours_per_cluster


# =========================
# PLATE DETECTION
# =========================

def detect_plate(frame, extremes, contours_per_cluster, config):
    """
    Detect the license plate in the region below the tail lights.

    Strategy:
      - Define ROI: horizontally between the two outer edge points,
        vertically just below the bottom of the light blobs
        (bottom-y is read from contours_per_cluster, not re-computed)
      - Threshold ROI brightness to isolate the plate (bright white/silver)
      - Apply morphological gradient + Otsu threshold to find edges
      - Fit minAreaRect to the largest contour -> robust to perspective tilt

    Args:
        frame:                BGR image
        extremes:             {'SX': (x,y), 'DX': (x,y)}
        contours_per_cluster: [[contours_L], [contours_R]] from detect_tail_lights
        config:               detection parameters dict

    Returns:
        plate_corners: {'TL': (x,y), 'TR': (x,y), 'BR': (x,y), 'BL': (x,y)}
                       in global image coordinates, or None if detection fails
    """
    height, width = frame.shape[:2]

    # --- Compute bottom-y directly from the light contours ---
    # Using the contours already found avoids re-running the brightness mask
    # and fixes the bug where fari_bottom_y was always empty.
    fari_bottom_y = []
    for cluster_contours in contours_per_cluster:
        for c in cluster_contours:
            _, y, _, h = cv2.boundingRect(c)
            fari_bottom_y.append(y + h)

    if not fari_bottom_y:
        return None

    y_base = max(fari_bottom_y)

    # ROI horizontal bounds: between the two outermost edge points
    x1 = max(0, extremes['SX'][0])
    x2 = min(width - 1, extremes['DX'][0])

    if x2 <= x1:
        return None

    # ROI vertical: start just below the lights, span ~18% of frame height
    y1 = min(y_base + 20, height - 1)
    y2 = min(y1 + int(0.18 * height), height)

    roi = frame[y1:y2, x1:x2]
    if roi.size == 0:
        return None

    # --- Brightness mask to isolate the plate blob ---
    hsv_roi    = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    V_roi      = hsv_roi[:, :, 2]
    mask_plate = cv2.inRange(V_roi, config['V_PLATE_LOW'], config['V_PLATE_HIGH'])

    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (7, 3))
    mask_plate = cv2.morphologyEx(mask_plate, cv2.MORPH_CLOSE, kernel)

    contours, _ = cv2.findContours(mask_plate, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return None

    # Keep only the largest blob (assumed to be the plate)
    largest      = max(contours, key=cv2.contourArea)
    mask_cluster = np.zeros_like(mask_plate)
    cv2.drawContours(mask_cluster, [largest], -1, 255, -1)

    ys, xs = np.where(mask_cluster > 0)
    if len(xs) == 0:
        return None

    # Tight bounding box with padding to include plate edges
    x_min, x_max = xs.min(), xs.max()
    y_min, y_max = ys.min(), ys.max()

    pad_x = int(0.25 * (x_max - x_min))
    pad_y = int(0.40 * (y_max - y_min))

    roi_x_min = max(0, x_min - pad_x)
    roi_x_max = min(roi.shape[1], x_max + pad_x)
    roi_y_min = max(0, y_min - pad_y)
    roi_y_max = min(roi.shape[0], y_max + pad_y)

    roi_plate = roi[roi_y_min:roi_y_max, roi_x_min:roi_x_max]
    if roi_plate.size == 0:
        return None

    # --- Edge detection inside the plate crop ---
    # CLAHE improves contrast for low-light / nighttime images
    gray_plate = cv2.cvtColor(roi_plate, cv2.COLOR_BGR2GRAY)
    clahe      = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    gray_plate = clahe.apply(gray_plate)
    gray_plate = cv2.GaussianBlur(gray_plate, (7, 7), 0)

    kernel_g  = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
    gradient  = cv2.morphologyEx(gray_plate, cv2.MORPH_GRADIENT, kernel_g)

    # Otsu threshold automatically adapts to the local brightness level
    _, gradient_binary = cv2.threshold(
        gradient, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
    )

    contours_grad, _ = cv2.findContours(
        gradient_binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
    )
    if not contours_grad:
        return None

    main_contour = max(contours_grad, key=cv2.contourArea)

    # --- minAreaRect: robust to perspective tilt ---
    # A standard bounding rect gives wrong corners when the plate is at an angle.
    # minAreaRect fits a rotated rectangle and handles perspective correctly.
    rect = cv2.minAreaRect(main_contour)
    box  = cv2.boxPoints(rect)
    box  = np.int32(box)

    # Sort the 4 box points into TL / TR / BR / BL
    # TL = smallest (x+y), BR = largest (x+y)
    pts    = box.reshape(4, 2)
    s      = pts.sum(axis=1)
    idx_tl = np.argmin(s)
    idx_br = np.argmax(s)
    remaining = [i for i in range(4) if i not in [idx_tl, idx_br]]

    if pts[remaining[0]][0] > pts[remaining[1]][0]:
        idx_tr, idx_bl = remaining[0], remaining[1]
    else:
        idx_tr, idx_bl = remaining[1], remaining[0]

    # Convert from roi_plate local coords -> global image coords
    offset_x = roi_x_min + x1
    offset_y = roi_y_min + y1

    return {
        'TL': (int(pts[idx_tl][0]) + offset_x, int(pts[idx_tl][1]) + offset_y),
        'TR': (int(pts[idx_tr][0]) + offset_x, int(pts[idx_tr][1]) + offset_y),
        'BR': (int(pts[idx_br][0]) + offset_x, int(pts[idx_br][1]) + offset_y),
        'BL': (int(pts[idx_bl][0]) + offset_x, int(pts[idx_bl][1]) + offset_y),
    }


# =========================
# FULL DETECTION (single frame)
# =========================

def detect_all(frame, config):
    """
    Run lights + plate detection on a single frame.

    Returns:
        dict with 'fari', 'extremes', 'plate_corners', 'contours_per_cluster'
        or None if tail lights are not found
    """
    fari, extremes, contours_per_cluster = detect_tail_lights(frame, config)

    if fari is None:
        return None

    plate_corners = detect_plate(frame, extremes, contours_per_cluster, config)

    return {
        'fari':                 fari,
        'extremes':             extremes,
        'plate_corners':        plate_corners,
        'contours_per_cluster': contours_per_cluster,
    }


# =========================
# DRAW RESULTS ON FRAME
# =========================

def draw_detections(frame, fari, extremes, plate_corners, frame_idx, status):
    """
    Draw lights, extreme points and plate corners on a copy of the frame.

    Args:
        frame:         BGR image
        fari:          [(cx_L, cy_L), (cx_R, cy_R)] or None
        extremes:      {'SX': (x,y), 'DX': (x,y)} or None
        plate_corners: {'TL','TR','BR','BL'} or None
        frame_idx:     current frame number (shown in overlay)
        status:        string shown in top-right corner

    Returns:
        annotated copy of the frame
    """
    vis = frame.copy()

    # Light centers — cyan filled circles
    if fari:
        for pt in fari:
            cv2.circle(vis, pt, 8, (0, 255, 255), -1)

    # Outermost edge points — blue filled circles
    if extremes:
        for pt in extremes.values():
            cv2.circle(vis, pt, 6, (255, 0, 0), -1)

    # Plate quadrilateral — green outline + corner labels
    if plate_corners:
        poly = np.array(list(plate_corners.values()), dtype=np.int32)
        cv2.polylines(vis, [poly], True, (0, 255, 0), 2)
        for name, pt in plate_corners.items():
            cv2.putText(vis, name, (pt[0] + 4, pt[1] - 4),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 1)

    # Frame counter and tracking status
    cv2.putText(vis, f"Frame: {frame_idx}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    cv2.putText(vis, status, (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 0.65,
                (0, 255, 0) if status == 'TRACKING' else (0, 200, 255), 2)

    return vis


# =========================
# MAIN — PROCESS FULL VIDEO
# =========================

def process_video(video_path, output_path, config):
    """
    Process an entire video:
      - scan opening frames to find the car
      - track lights + plate with optical flow
      - re-detect every REDETECTION_INTERVAL frames to correct drift
      - write annotated output video

    Args:
        video_path:  path to input video
        output_path: path for the annotated output video
        config:      detection + processing parameters dict
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Cannot open video: {video_path}")
        return

    fps          = cap.get(cv2.CAP_PROP_FPS)
    width        = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height       = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"Video: {width}x{height} @ {fps:.1f} fps, {total_frames} frames")

    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    out = cv2.VideoWriter(output_path,
                          cv2.VideoWriter_fourcc(*'MJPG'),
                          fps, (width, height))

    # Optical flow parameters — used to propagate tracked points between frames
    lk_params = dict(
        winSize  = (21, 21),
        maxLevel = 3,
        criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01),
    )

    # ---- PHASE 1: scan until the car is found ----
    print(f"Scanning first {config['MAX_SEARCH_FRAMES']} frames for the car...")

    start_frame   = None
    first_result  = None
    first_frame   = None

    for i in range(config['MAX_SEARCH_FRAMES']):
        ret, frame = cap.read()
        if not ret:
            break

        result = detect_all(frame, config)

        if result is not None:
            start_frame  = i
            first_result = result
            first_frame  = frame.copy()
            print(f"Car found at frame {start_frame}")
            break

        # Write waiting frames to output as-is
        cv2.putText(frame, f"Frame: {i} — waiting for car...", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 200), 2)
        out.write(frame)

    if start_frame is None:
        print(f"Car not found in first {config['MAX_SEARCH_FRAMES']} frames. Exiting.")
        cap.release()
        out.release()
        return

    # ---- PHASE 2: build tracked_points dict for optical flow ----
    # We track: left center, right center, and the 4 plate corners.
    # All stored as float32 arrays of shape (1, 2) as required by calcOpticalFlowPyrLK.

    def build_tracked_points(result):
        """Convert detection result into a flat dict of (1,2) float32 arrays."""
        pts = {}
        pts['faro_L'] = np.array([result['fari'][0]], dtype=np.float32)
        pts['faro_R'] = np.array([result['fari'][1]], dtype=np.float32)
        if result['plate_corners']:
            for k, v in result['plate_corners'].items():
                pts[f'plate_{k}'] = np.array([v], dtype=np.float32)
        return pts

    tracked_points = build_tracked_points(first_result)
    prev_gray      = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)

    # Write the detection frame
    out.write(draw_detections(
        first_frame,
        first_result['fari'],
        first_result['extremes'],
        first_result['plate_corners'],
        start_frame,
        'DETECTED',
    ))

    # ---- PHASE 3: process remaining frames ----
    frames_since_redetect = 0

    for frame_idx in range(start_frame + 1, total_frames):
        ret, frame = cap.read()
        if not ret:
            break

        curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        status    = 'TRACKING'

        # --- Optical flow: propagate all tracked points ---
        new_tracked = {}
        for name, pts in tracked_points.items():
            new_pts, st, _ = cv2.calcOpticalFlowPyrLK(
                prev_gray, curr_gray, pts, None, **lk_params
            )
            if st is not None and st[0][0] == 1:
                new_tracked[name] = new_pts   # point successfully tracked
            # If tracking fails for a point we simply drop it this frame

        tracked_points = new_tracked

        # --- Reconstruct fari and plate_corners from tracked points ---
        fari = None
        if 'faro_L' in tracked_points and 'faro_R' in tracked_points:
            fari = [
                tuple(tracked_points['faro_L'][0].astype(int)),
                tuple(tracked_points['faro_R'][0].astype(int)),
            ]

        plate_corners = None
        plate_keys    = ['plate_TL', 'plate_TR', 'plate_BR', 'plate_BL']
        if all(k in tracked_points for k in plate_keys):
            plate_corners = {
                k.replace('plate_', ''): tuple(tracked_points[k][0].astype(int))
                for k in plate_keys
            }

        # --- Periodic re-detection to correct drift ---
        frames_since_redetect += 1
        if frames_since_redetect >= config['REDETECTION_INTERVAL']:
            fresh = detect_all(frame, config)
            if fresh is not None:
                # Re-detection succeeded: reset tracked points to fresh positions
                tracked_points        = build_tracked_points(fresh)
                fari                  = fresh['fari']
                plate_corners         = fresh['plate_corners']
                frames_since_redetect = 0
                status                = 'RE-DETECTED'
            else:
                # Re-detection failed: keep using optical flow result
                frames_since_redetect = 0
                status                = 'TRACKING (redetect failed)'

        # Extremes are not tracked by optical flow (they are edge points on contours).
        # We only show them after a fresh detection; otherwise pass None.
        extremes = fresh['extremes'] if status == 'RE-DETECTED' else None

        out.write(draw_detections(frame, fari, extremes, plate_corners, frame_idx, status))

        prev_gray = curr_gray.copy()

        if frame_idx % 50 == 0:
            pct = frame_idx / total_frames * 100
            print(f"  {frame_idx}/{total_frames} ({pct:.1f}%)")

    cap.release()
    out.release()
    print(f"Done. Output saved to {output_path}")


# =========================
# ENTRY POINT
# =========================

if __name__ == '__main__':
    process_video(VIDEO_PATH, OUTPUT_PATH, CONFIG)

Video: 1908x1080 @ 30.0 fps, 220 frames
Scanning first 300 frames for the car...
Car found at frame 68
  100/220 (45.5%)
  150/220 (68.2%)
  200/220 (90.9%)
Done. Output saved to /app/data/videos/output/plate_detection_output.avi


In [None]:
1