<a href="https://colab.research.google.com/github/abiralguni/abiralguni/blob/main/LaneViolation_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =========================================
# [1] INSTALL DEPENDENCIES  #@title Run once per runtime
# =========================================
import sys, subprocess, pkgutil

def _install(pkgs):
    missing = []
    for p in pkgs:
        base = p.split("==")[0]
        try:
            import importlib; importlib.import_module(base.replace("-", "_"))
        except Exception:
            missing.append(p)
    if missing:
        print("⏳ Installing:", ", ".join(missing))
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", *missing])
        print("✅ Installed.")

_install([
    "ultralytics==8.3.39",
    "easyocr==1.7.1",
    "opencv-python==4.10.0.84",
    "numpy==1.26.4",
    "scipy==1.11.4",
])

print("Section [1] OK.")


⏳ Installing: ultralytics==8.3.39, easyocr==1.7.1, opencv-python==4.10.0.84
✅ Installed.
Section [1] OK.


In [None]:
# =========================================
# [2] IMPORTS & GLOBAL CONFIG (Nepal-ready)  #@title
# =========================================
import os, math, csv, time, json
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional

import numpy as np
import cv2
from ultralytics import YOLO
import easyocr
from scipy.spatial.distance import cdist

# --- Vehicles to track (COCO) ---
CLASSES_VEHICLES = [2, 3, 5, 7]  # car, motorcycle, bus, truck

# --- Lane classification thresholds ---
SOLID_COVERAGE_THR = 0.65
DASHED_COVERAGE_THR = 0.45
GAPS_THR = 4
MIN_SEG_LEN = 80
ANGLE_EXCLUDE_DEG = 10  # ignore near-horizontal
DOUBLE_SOLID_MIN_DIST = 8
DOUBLE_SOLID_MAX_DIST = 60
LINE_RENDER_THICKNESS = 4

# --- Runtime/options ---
FRAME_STRIDE = 1
REDETECT_LANES_EVERY = 0  # 0 = first frame only
DRAW_TRACK_CENTERS = True

# --- OCR (Nepal) ---
# EasyOCR supports 'ne' (Nepali) + 'en'. Keep both; store raw & ASCII-normalized.
OCR_LANGS = ['ne', 'en']
PLATE_MIN_LEN, PLATE_MAX_LEN = 4, 12

# --- Plate detector (YOLOv8) ---
# If you have a license-plate model, upload it and set PLATE_MODEL_PATH to its filename.
# For many public YOLOv8 plate models, class 0 = 'license-plate'.
PLATE_MODEL_PATH = ""   # e.g., "lp_yolov8n.pt" (upload in Colab sidebar or via files.upload)
USE_PLATE_DETECTOR = True  # set False to force OCR-only fallback

print("Section [2] OK.")


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Section [2] OK.


In [None]:
# =========================================
# [3] GEOMETRY & UTILS  #@title
# =========================================
@dataclass
class Line:
    id: int
    p1: Tuple[int, int]
    p2: Tuple[int, int]
    kind: str  # 'solid' | 'double_solid' | 'dashed'

@dataclass
class Violation:
    frame_idx: int
    ms: int
    track_id: int
    plate_raw: str
    plate_ascii: str
    vtype: str  # 'single_solid' | 'double_solid'
    bbox: Tuple[int, int, int, int]

def ensure_dir(p: str):
    os.makedirs(p, exist_ok=True)

def ts_hhmmss_ms(ms: int) -> str:
    s, ms = divmod(ms, 1000)
    m, s = divmod(s, 60)
    h, m = divmod(m, 60)
    return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"

def safe_text(s: str) -> str:
    return "".join(ch for ch in s if ch.isalnum() or ch in ("-", "_"))

def line_angle_deg(p1, p2) -> float:
    a = math.degrees(math.atan2((p2[1]-p1[1]), (p2[0]-p1[0])))
    return (a + 360) % 360

def segment_length(p1, p2) -> float:
    return math.hypot(p2[0]-p1[0], p2[1]-p1[1])

def point_side_of_line(a, b, p) -> float:
    return (b[0]-a[0])*(p[1]-a[1]) - (b[1]-a[1])*(p[0]-a[0])

def draw_polyline(img, p1, p2, color, thickness=2):
    cv2.line(img, p1, p2, color, thickness, cv2.LINE_AA)

def nepali_to_ascii(text: str) -> str:
    # Minimal transliteration: keep ASCII letters/digits, strip others.
    # Nepal plates may be Devanagari; we retain raw too. This gives a normalized ASCII key for logs.
    return "".join(ch for ch in text.upper() if ch.isalnum())

def _roi_band_along_segment(img_bin: np.ndarray, p1, p2, half_width: int = 3) -> np.ndarray:
    x1,y1 = p1; x2,y2 = p2
    length = int(max(1, segment_length(p1,p2)))
    angle = math.degrees(math.atan2(y2 - y1, x2 - x1))
    w = length
    h = 2*half_width + 1
    M = cv2.getRotationMatrix2D((0,0), angle, 1.0)
    xs = np.arange(0, w, 1, dtype=np.float32)
    ys = np.arange(-half_width, half_width+1, 1, dtype=np.float32)
    grid_x, grid_y = np.meshgrid(xs, ys)
    pts = np.stack([grid_x.flatten(), grid_y.flatten(), np.ones_like(grid_x).flatten()], axis=1)
    rot = (pts @ M.T)
    rot[:,0] += x1
    rot[:,1] += y1
    xi = np.clip(np.round(rot[:,0]).astype(int), 0, img_bin.shape[1]-1)
    yi = np.clip(np.round(rot[:,1]).astype(int), 0, img_bin.shape[0]-1)
    band = img_bin[yi, xi].reshape(h, w)
    return band

def run_length_stats(binary_1d: np.ndarray) -> Tuple[float, int]:
    vals = binary_1d.astype(np.uint8)
    coverage = float(vals.mean()) if vals.size else 0.0
    gaps = 0
    if vals.size:
        prev = vals[0]
        for v in vals[1:]:
            if v == 0 and prev == 1:
                gaps += 1
            prev = v
    return coverage, gaps

def merge_collinear_segments(segments: List[Tuple[Tuple[int,int], Tuple[int,int]]],
                             angle_tol_deg=8, join_dist=25) -> List[Tuple[Tuple[int,int], Tuple[int,int]]]:
    used = [False]*len(segments)
    merged = []
    for i,(a1,a2) in enumerate(segments):
        if used[i]: continue
        used[i] = True
        pts = [a1, a2]
        ang = line_angle_deg(a1, a2)
        for j,(b1,b2) in enumerate(segments):
            if used[j]: continue
            ang2 = line_angle_deg(b1, b2)
            if min(abs(ang-ang2), 180-abs(ang-ang2)) <= angle_tol_deg:
                if min(cdist(np.array(pts), np.array([b1,b2])).min(), 1e9) <= join_dist:
                    pts.extend([b1,b2]); used[j] = True
        pts_arr = np.array(pts)
        dists = cdist(pts_arr, pts_arr)
        i1, i2 = np.unravel_index(dists.argmax(), dists.shape)
        merged.append((tuple(pts_arr[i1]), tuple(pts_arr[i2])))
    return merged

print("Section [3] OK.")


Section [3] OK.


In [None]:
# =========================================
# [4] LANE DETECTION (solid vs dashed, double-solid)  #@title
# =========================================
def classify_segment_solid_or_dashed(img_bin: np.ndarray, p1, p2) -> str:
    band = _roi_band_along_segment(img_bin, p1, p2, half_width=3)
    prof = (band.mean(axis=0) > 0.25).astype(np.uint8)
    coverage, gaps = run_length_stats(prof)
    if coverage >= SOLID_COVERAGE_THR and gaps <= GAPS_THR:
        return 'solid'
    if coverage <= DASHED_COVERAGE_THR and gaps >= GAPS_THR:
        return 'dashed'
    return 'solid' if coverage >= 0.55 else 'dashed'

def detect_lane_lines(first_frame_bgr: np.ndarray) -> Tuple[List[Line], np.ndarray]:
    h, w = first_frame_bgr.shape[:2]
    img = first_frame_bgr.copy()
    mask = np.zeros((h,w), np.uint8)
    cv2.rectangle(mask, (0,int(h*0.35)), (w,h), 255, -1)

    hls = cv2.cvtColor(img, cv2.COLOR_BGR2HLS)
    white = cv2.inRange(hls, (0, 200, 0), (255, 255, 80))
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (9,9))
    tophat = cv2.morphologyEx(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), cv2.MORPH_TOPHAT, kernel)
    white = cv2.bitwise_or(white, cv2.threshold(tophat, 30, 255, cv2.THRESH_BINARY)[1])
    white = cv2.bitwise_and(white, mask)
    white = cv2.GaussianBlur(white, (5,5), 0)
    edges = cv2.Canny(white, 80, 160, L2gradient=True)

    linesP = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=80, minLineLength=MIN_SEG_LEN, maxLineGap=20)
    if linesP is None:
        return [], white

    segments = []
    for l in linesP[:,0,:]:
        x1,y1,x2,y2 = map(int, l)
        p1,p2 = (x1,y1),(x2,y2)
        if segment_length(p1,p2) < MIN_SEG_LEN:
            continue
        ang = abs(line_angle_deg(p1,p2))
        if min(abs(ang-0), abs(ang-180)) < ANGLE_EXCLUDE_DEG:
            continue
        segments.append((p1,p2))

    segments = merge_collinear_segments(segments)
    if not segments:
        return [], white

    lines: List[Line] = []
    next_id = 0
    for p1,p2 in segments:
        kind = classify_segment_solid_or_dashed(white/255.0, p1, p2)
        lines.append(Line(id=next_id, p1=p1, p2=p2, kind=kind))
        next_id += 1

    solid_ids = [i for i,l in enumerate(lines) if l.kind=='solid']
    paired = set()
    for i in solid_ids:
        li = lines[i]
        for j in solid_ids:
            if j<=i: continue
            lj = lines[j]
            a1 = line_angle_deg(li.p1, li.p2)
            a2 = line_angle_deg(lj.p1, lj.p2)
            if min(abs(a1-a2), 180-abs(a1-a2)) > 6:  # roughly parallel
                continue
            def point_line_distance(p, a, b):
                num = abs(point_side_of_line(a,b,p))
                den = segment_length(a,b)
                return num/den if den>0 else 1e9
            d = np.mean([
                point_line_distance(li.p1, lj.p1, lj.p2),
                point_line_distance(li.p2, lj.p1, lj.p2),
                point_line_distance(lj.p1, li.p1, li.p2),
                point_line_distance(lj.p2, li.p1, li.p2),
            ])
            if DOUBLE_SOLID_MIN_DIST <= d <= DOUBLE_SOLID_MAX_DIST:
                paired.add(i); paired.add(j)
    for idx in paired:
        lines[idx].kind = 'double_solid'

    restricted = [l for l in lines if l.kind in ('solid','double_solid')]
    for k,l in enumerate(restricted):
        l.id = k
    return restricted, white

print("Section [4] OK.")


Section [4] OK.


In [None]:
# =========================================
# [5] OPTIONAL MANUAL LINE EDITOR  #@title (skips in headless Colab)
# =========================================
class LineEditor:
    def __init__(self, frame, lines: List[Line]):
        self.frame = frame
        self.lines = lines
        self.cur_start = None
        self.win = "Lane Editor"
        self.mode_edit = False

    def _draw(self):
        vis = self.frame.copy()
        for l in self.lines:
            color = (0,255,255) if l.kind=='solid' else (0,0,255)
            if l.kind=='double_solid':
                p1,p2 = np.array(l.p1), np.array(l.p2)
                v = p2-p1
                if np.linalg.norm(v) >= 1:
                    n = np.array([-v[1], v[0]], dtype=np.float32)
                    n = n / (np.linalg.norm(n)+1e-6)
                    offset = int(max(3, min(8, DOUBLE_SOLID_MIN_DIST//2)))
                    p1a = tuple((p1 + n*offset).astype(int)); p2a = tuple((p2 + n*offset).astype(int))
                    p1b = tuple((p1 - n*offset).astype(int)); p2b = tuple((p2 - n*offset).astype(int))
                    draw_polyline(vis, p1a, p2a, (0,0,255), LINE_RENDER_THICKNESS)
                    draw_polyline(vis, p1b, p2b, (0,0,255), LINE_RENDER_THICKNESS)
            else:
                draw_polyline(vis, l.p1, l.p2, color, LINE_RENDER_THICKNESS)
            cv2.putText(vis, f"id:{l.id} {l.kind}", l.p1, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
        return vis

    def _mouse(self, event, x, y, flags, param):
        if not self.mode_edit: return
        if event == cv2.EVENT_LBUTTONDOWN:
            self.cur_start = (x,y)
        elif event == cv2.EVENT_LBUTTONUP and self.cur_start is not None:
            p1 = self.cur_start; p2 = (x,y)
            if segment_length(p1,p2) >= MIN_SEG_LEN:
                new_id = 0 if not self.lines else (max(l.id for l in self.lines)+1)
                self.lines.append(Line(id=new_id, p1=p1, p2=p2, kind='solid'))
            self.cur_start = None

    def edit(self) -> List[Line]:
        try:
            cv2.namedWindow(self.win, cv2.WINDOW_NORMAL)
            cv2.resizeWindow(self.win, self.frame.shape[1], self.frame.shape[0])
            cv2.setMouseCallback(self.win, self._mouse)
            print("[Lane Editor] Keys: e=edit, s=save/accept, d=delete-last, q=quit")
            while True:
                vis = self._draw()
                if self.cur_start is not None and self.mode_edit:
                    cv2.circle(vis, self.cur_start, 4, (255,0,0), -1)
                cv2.imshow(self.win, vis)
                key = cv2.waitKey(20) & 0xFF
                if key == ord('e'):
                    self.mode_edit = True
                elif key == ord('d') and self.lines:
                    self.lines.pop()
                elif key in (ord('s'), ord('q')):
                    break
            cv2.destroyWindow(self.win)
        except Exception:
            # headless (Colab) — ignore
            pass
        for k,l in enumerate(self.lines):
            l.id = k
        return self.lines

print("Section [5] OK.")


Section [5] OK.


In [None]:
# =========================================
# [6] LICENSE-PLATE DETECTOR + OCR  #@title
# =========================================
_lp_model = None
_reader = None

def load_plate_model():
    global _lp_model
    if _lp_model is not None:
        return _lp_model
    if not USE_PLATE_DETECTOR:
        return None
    if PLATE_MODEL_PATH and os.path.exists(PLATE_MODEL_PATH):
        try:
            _lp_model = YOLO(PLATE_MODEL_PATH)
            return _lp_model
        except Exception as e:
            print(f"⚠️ Could not load plate model: {e}. Falling back to OCR-only.")
            _lp_model = None
            return None
    else:
        print("ℹ️ No plate model provided. Using OCR-only fallback.")
        return None

def get_ocr_reader():
    global _reader
    if _reader is None:
        # GPU=False for broader compatibility
        _reader = easyocr.Reader(OCR_LANGS, gpu=False)
    return _reader

def detect_plate_boxes_in_vehicle(veh_bgr: np.ndarray) -> List[Tuple[int,int,int,int,float]]:
    model = load_plate_model()
    if model is None:
        return []
    try:
        res = model.predict(source=veh_bgr, stream=False, verbose=False)
        boxes = []
        for r in res:
            if r.boxes is None: continue
            for b in r.boxes:
                x1,y1,x2,y2 = b.xyxy[0].cpu().numpy().astype(int).tolist()
                conf = float(b.conf[0].cpu().item()) if hasattr(b,'conf') else 0.0
                boxes.append((x1,y1,x2,y2,conf))
        boxes.sort(key=lambda t: t[-1], reverse=True)
        return boxes
    except Exception as e:
        print(f"⚠️ Plate detection error: {e}")
        return []

def ocr_plate_image(img_gray_or_bgr: np.ndarray) -> Tuple[str, float]:
    if img_gray_or_bgr.ndim == 3:
        gray = cv2.cvtColor(img_gray_or_bgr, cv2.COLOR_BGR2GRAY)
    else:
        gray = img_gray_or_bgr
    gray = cv2.bilateralFilter(gray, 7, 50, 50)
    gray = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 31, 5)
    res = get_ocr_reader().readtext(gray)
    best_text, best_conf = "", 0.0
    for item in res:
        try:
            _,_,text,conf = item
        except:
            text, conf = item[1], item[2]
        raw = text.strip()
        ascii_norm = nepali_to_ascii(raw)
        # prefer: reasonable length OR high confidence raw
        if (PLATE_MIN_LEN <= len(ascii_norm) <= PLATE_MAX_LEN) and conf > best_conf:
            best_text, best_conf = raw, float(conf)
        elif not best_text and conf > 0.5:
            best_text, best_conf = raw, float(conf)
    return best_text, best_conf

def read_plate_from_vehicle_crop(veh_bgr: np.ndarray) -> Tuple[str, str, float]:
    # Try detector first
    boxes = detect_plate_boxes_in_vehicle(veh_bgr)
    best_raw, best_ascii, best_conf = "", "", 0.0
    if boxes:
        for (x1,y1,x2,y2,conf_det) in boxes[:3]:
            x1=max(0,x1); y1=max(0,y1); x2=min(veh_bgr.shape[1]-1,x2); y2=min(veh_bgr.shape[0]-1,y2)
            plate_crop = veh_bgr[y1:y2, x1:x2]
            raw, conf_ocr = ocr_plate_image(plate_crop)
            ascii_norm = nepali_to_ascii(raw)
            if conf_ocr > best_conf and (len(ascii_norm)>=PLATE_MIN_LEN):
                best_raw, best_ascii, best_conf = raw, ascii_norm, conf_ocr
    # Fallback: ROI heuristic on vehicle if detector missing/weak
    if not best_raw:
        h,w = veh_bgr.shape[:2]
        y1 = int(h*0.55); y2 = int(h*0.95)
        x1 = int(w*0.15); x2 = int(w*0.85)
        roi = veh_bgr[y1:y2, x1:x2]
        raw, conf_ocr = ocr_plate_image(roi)
        best_raw, best_ascii, best_conf = raw, nepali_to_ascii(raw), conf_ocr
    return best_raw, best_ascii, best_conf

print("Section [6] OK.")


Section [6] OK.


In [None]:
# =========================================
# [7] CORE VIOLATION ENGINE (refactored)  #@title
# =========================================
def render_restricted_lines(img, restricted_lines: List[Line]):
    for l in restricted_lines:
        if l.kind=='double_solid':
            p1,p2 = np.array(l.p1), np.array(l.p2)
            v = p2-p1; n = np.array([-v[1], v[0]], dtype=np.float32); n = n / (np.linalg.norm(n)+1e-6)
            offset = int(max(3, min(8, DOUBLE_SOLID_MIN_DIST//2)))
            p1a = tuple((p1 + n*offset).astype(int)); p2a = tuple((p2 + n*offset).astype(int))
            p1b = tuple((p1 - n*offset).astype(int)); p2b = tuple((p2 - n*offset).astype(int))
            draw_polyline(img, p1a, p2a, (0,0,255), LINE_RENDER_THICKNESS)
            draw_polyline(img, p1b, p2b, (0,0,255), LINE_RENDER_THICKNESS)
        else:
            draw_polyline(img, l.p1, l.p2, (0,255,255), LINE_RENDER_THICKNESS)

def detect_violations_for_tracks(frame_idx: int,
                                 tracks_xyxy: List[Tuple[int,Tuple[int,int,int,int]]],
                                 restricted_lines: List[Line],
                                 prev_sides: Dict[int, Dict[int, float]],
                                 seen: Dict[Tuple[int,int], bool],
                                 fps: float,
                                 canvas: np.ndarray,
                                 plates_dir: str,
                                 known_plate: Dict[int, Tuple[str,str]]) -> List[Violation]:
    violations: List[Violation] = []
    h,w = canvas.shape[:2]
    for tid, (x1,y1,x2,y2) in tracks_xyxy:
        x1=max(0,x1); y1=max(0,y1); x2=min(w-1,x2); y2=min(h-1,y2)
        cx = (x1+x2)//2; cy = y2
        if tid not in prev_sides:
            prev_sides[tid] = {}
        for l in restricted_lines:
            side = point_side_of_line(l.p1, l.p2, (cx,cy))
            prev = prev_sides[tid].get(l.id, None)
            prev_sides[tid][l.id] = side
            if prev is None or side == 0:
                continue
            if np.sign(prev) != np.sign(side):
                key = (tid, l.id)
                if seen.get(key, False):  # avoid double counting per line
                    continue
                seen[key] = True
                vtype = 'double_solid' if l.kind=='double_solid' else 'single_solid'
                plate_raw, plate_ascii = "", ""
                if tid in known_plate:
                    plate_raw, plate_ascii = known_plate[tid]
                else:
                    veh_crop = canvas[y1:y2, x1:x2]
                    try:
                        raw, ascii_norm, conf = read_plate_from_vehicle_crop(veh_crop)
                    except Exception:
                        raw, ascii_norm, conf = "", "", 0.0
                    plate_raw, plate_ascii = raw or "UNKNOWN", ascii_norm or "UNKNOWN"
                    known_plate[tid] = (plate_raw, plate_ascii)
                    # save crop for audit
                    cv2.imwrite(os.path.join(plates_dir, f"{safe_text(plate_ascii) or 'UNKNOWN'}_track{tid}_f{frame_idx}.jpg"), veh_crop)
                ms = int((frame_idx / max(1.0, fps)) * 1000.0)
                violations.append(Violation(
                    frame_idx=frame_idx, ms=ms, track_id=tid, plate_raw=plate_raw,
                    plate_ascii=plate_ascii, vtype=vtype, bbox=(x1,y1,x2,y2)))
                # draw alerts
                cv2.rectangle(canvas, (x1,y1), (x2,y2), (0,0,255), 4)
                cv2.putText(canvas, f"VIOLATION: {vtype.upper()}", (x1, max(30, y1-10)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2, cv2.LINE_AA)
                if plate_ascii and plate_ascii!="UNKNOWN":
                    cv2.putText(canvas, f"PLATE: {plate_ascii}", (x1, min(h-10, y2+20)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2, cv2.LINE_AA)
        # draw box + id
        cv2.rectangle(canvas, (x1,y1), (x2,y2), (0,255,0), 2)
        label = f"id:{tid}"
        if tid in known_plate and known_plate[tid][1] and known_plate[tid][1]!="UNKNOWN":
            label += f" | {known_plate[tid][1]}"
        cv2.putText(canvas, label, (x1, max(15, y1-5)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)
        if DRAW_TRACK_CENTERS:
            cv2.circle(canvas, (cx,cy), 3, (255,255,255), -1)
    return violations

def process_video(video_path: str, out_dir: str, frame_stride: int = FRAME_STRIDE):
    assert os.path.exists(video_path), f"Video not found: {video_path}"
    ensure_dir(out_dir)
    plates_dir = os.path.join(out_dir, "plates"); ensure_dir(plates_dir)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError("Cannot open video.")
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # writer
    fourccs = [cv2.VideoWriter_fourcc(*c) for c in ('avc1','H264','mp4v','MJPG','XVID')]
    vw = None
    for cc in fourccs:
        vw = cv2.VideoWriter(os.path.join(out_dir, "annotated.mp4"), cc, fps/max(1,frame_stride), (w,h))
        if vw.isOpened(): break
    if not vw or not vw.isOpened():
        raise RuntimeError("Failed to create VideoWriter.")

    ok, first = cap.read()
    if not ok: raise RuntimeError("Empty video.")
    restricted_lines, _white_dbg = detect_lane_lines(first)

    # Optional manual adjust (no-op on headless)
    restricted_lines = LineEditor(first, restricted_lines.copy()).edit()

    # Save preview
    preview = first.copy()
    render_restricted_lines(preview, restricted_lines)
    for l in restricted_lines:
        cv2.putText(preview, l.kind, l.p1, cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)
    cv2.imwrite(os.path.join(out_dir, "lane_preview.jpg"), preview)

    # Prepare tracking
    yolo = YOLO('yolov8n.pt')
    track_stream = yolo.track(source=video_path, stream=True, classes=CLASSES_VEHICLES,
                              persist=True, tracker='bytetrack.yaml', vid_stride=frame_stride)

    csv_path = os.path.join(out_dir, "violations.csv")
    with open(csv_path, "w", newline="", encoding="utf-8") as fcsv:
        writer_csv = csv.writer(fcsv)
        writer_csv.writerow(["video_time", "frame", "track_id", "plate_raw", "plate_ascii",
                             "violation_type", "x1","y1","x2","y2"])

        prev_sides: Dict[int, Dict[int, float]] = {}
        seen: Dict[Tuple[int,int], bool] = {}
        known_plate: Dict[int, Tuple[str,str]] = {}

        frame_idx = -1
        for res in track_stream:
            frame_idx += frame_stride
            frame = res.orig_img
            if frame is None:
                continue

            if REDETECT_LANES_EVERY and frame_idx % REDETECT_LANES_EVERY == 0:
                try:
                    restricted_lines, _ = detect_lane_lines(frame)
                except:
                    pass

            boxes = res.boxes
            ids = boxes.id.cpu().numpy().astype(int) if boxes.id is not None else np.arange(len(boxes))
            xyxy = boxes.xyxy.cpu().numpy().astype(int)
            tracks = [(tid, (int(x1),int(y1),int(x2),int(y2))) for tid,(x1,y1,x2,y2) in zip(ids, xyxy)]

            canvas = frame.copy()
            render_restricted_lines(canvas, restricted_lines)
            vio = detect_violations_for_tracks(frame_idx, tracks, restricted_lines,
                                               prev_sides, seen, fps, canvas, plates_dir, known_plate)
            for v in vio:
                x1,y1,x2,y2 = v.bbox
                writer_csv.writerow([ts_hhmmss_ms(v.ms), v.frame_idx, v.track_id, v.plate_raw, v.plate_ascii,
                                     v.vtype, x1,y1,x2,y2])
            vw.write(canvas)

    vw.release()
    cap.release()
    print(f"✅ Done. Outputs in: {out_dir}")
    print(f"- Annotated video: {os.path.join(out_dir, 'annotated.mp4')}")
    print(f"- CSV log:         {csv_path}")
    print(f"- Plate crops:     {plates_dir}")

print("Section [7] OK.")


Section [7] OK.


In [None]:
# === RESCUE MEGA CELL (single cell: defines everything + runs) ===
# Works in fresh Colab even if previous cells were not run.

# 0) Deps
import sys, subprocess, pkgutil
def _install(pkgs):
    miss=[p for p in pkgs if not pkgutil.find_loader(p.split("==")[0])]
    if miss:
        subprocess.check_call([sys.executable,"-m","pip","install","-q",*miss])
_install([
    "ultralytics==8.3.39","easyocr==1.7.1","opencv-python==4.10.0.84",
    "numpy==1.26.4","scipy==1.11.4"
])

# 1) Imports & config
import os, math, csv, glob
from dataclasses import dataclass
from typing import List, Tuple, Dict
import numpy as np, cv2
from ultralytics import YOLO
import easyocr
from scipy.spatial.distance import cdist

CLASSES_VEHICLES = [2,3,5,7]
SOLID_COVERAGE_THR = 0.65
DASHED_COVERAGE_THR = 0.45
GAPS_THR = 4
MIN_SEG_LEN = 80
ANGLE_EXCLUDE_DEG = 10
DOUBLE_SOLID_MIN_DIST = 8
DOUBLE_SOLID_MAX_DIST = 60
LINE_RENDER_THICKNESS = 4
FRAME_STRIDE = 1
REDETECT_LANES_EVERY = 0
DRAW_TRACK_CENTERS = True
OCR_LANGS = ['ne','en']
PLATE_MIN_LEN, PLATE_MAX_LEN = 4, 12
PLATE_MODEL_PATH = ""   # optionally upload a YOLOv8 plate model .pt and set this to its filename
USE_PLATE_DETECTOR = bool(PLATE_MODEL_PATH)

# 2) Data + utils
@dataclass
class Line:
    id:int; p1:Tuple[int,int]; p2:Tuple[int,int]; kind:str

@dataclass
class Violation:
    frame_idx:int; ms:int; track_id:int
    plate_raw:str; plate_ascii:str; vtype:str
    bbox:Tuple[int,int,int,int]

def ensure_dir(p:str): os.makedirs(p, exist_ok=True)
def ts_hhmmss_ms(ms:int)->str:
    s,ms = divmod(ms,1000); m,s = divmod(s,60); h,m = divmod(m,60)
    return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"
def safe_text(s:str)->str: return "".join(ch for ch in s if ch.isalnum() or ch in "-_")
def line_angle_deg(p1,p2)->float: return (math.degrees(math.atan2(p2[1]-p1[1], p2[0]-p1[0]))+360)%360
def segment_length(p1,p2)->float: return math.hypot(p2[0]-p1[0], p2[1]-p1[1])
def point_side_of_line(a,b,p)->float: return (b[0]-a[0])*(p[1]-a[1]) - (b[1]-a[1])*(p[0]-a[0])
def draw_polyline(img,p1,p2,color,thickness=2): cv2.line(img,p1,p2,color,thickness,cv2.LINE_AA)
def nepali_to_ascii(text:str)->str: return "".join(ch for ch in text.upper() if ch.isalnum())

def _roi_band_along_segment(img_bin:np.ndarray,p1,p2,half_width:int=3)->np.ndarray:
    x1,y1=p1; x2,y2=p2; length=int(max(1,segment_length(p1,p2)))
    angle=math.degrees(math.atan2(y2-y1,x2-x1)); w=length; h=2*half_width+1
    M=cv2.getRotationMatrix2D((0,0),angle,1.0)
    xs=np.arange(0,w,1,dtype=np.float32); ys=np.arange(-half_width,half_width+1,1,dtype=np.float32)
    grid_x,grid_y=np.meshgrid(xs,ys)
    pts=np.stack([grid_x.flatten(),grid_y.flatten(),np.ones_like(grid_x).flatten()],axis=1)
    rot=(pts@M.T); rot[:,0]+=x1; rot[:,1]+=y1
    xi=np.clip(np.round(rot[:,0]).astype(int),0,img_bin.shape[1]-1)
    yi=np.clip(np.round(rot[:,1]).astype(int),0,img_bin.shape[0]-1)
    return img_bin[yi,xi].reshape(h,w)

def run_length_stats(binary_1d:np.ndarray)->Tuple[float,int]:
    v=binary_1d.astype(np.uint8); coverage=float(v.mean()) if v.size else 0.0
    gaps=0
    if v.size:
        prev=v[0]
        for x in v[1:]:
            if x==0 and prev==1: gaps+=1
            prev=x
    return coverage,gaps

def merge_collinear_segments(segments:List[Tuple[Tuple[int,int],Tuple[int,int]]],
                             angle_tol_deg=8, join_dist=25):
    used=[False]*len(segments); merged=[]
    for i,(a1,a2) in enumerate(segments):
        if used[i]: continue
        used[i]=True; pts=[a1,a2]; ang=line_angle_deg(a1,a2)
        for j,(b1,b2) in enumerate(segments):
            if used[j]: continue
            ang2=line_angle_deg(b1,b2)
            if min(abs(ang-ang2),180-abs(ang-ang2))<=angle_tol_deg:
                if min(cdist(np.array(pts),np.array([b1,b2])).min(),1e9)<=join_dist:
                    pts.extend([b1,b2]); used[j]=True
        arr=np.array(pts); d=cdist(arr,arr); i1,i2=np.unravel_index(d.argmax(),d.shape)
        merged.append((tuple(arr[i1]), tuple(arr[i2])))
    return merged

# 3) Lane detection
def classify_segment_solid_or_dashed(img_bin:np.ndarray,p1,p2)->str:
    band=_roi_band_along_segment(img_bin,p1,p2,half_width=3)
    prof=(band.mean(axis=0)>0.25).astype(np.uint8)
    coverage,gaps=run_length_stats(prof)
    if coverage>=SOLID_COVERAGE_THR and gaps<=GAPS_THR: return 'solid'
    if coverage<=DASHED_COVERAGE_THR and gaps>=GAPS_THR: return 'dashed'
    return 'solid' if coverage>=0.55 else 'dashed'

def detect_lane_lines(first_frame_bgr:np.ndarray)->Tuple[List[Line], np.ndarray]:
    h,w=first_frame_bgr.shape[:2]
    mask=np.zeros((h,w),np.uint8); cv2.rectangle(mask,(0,int(h*0.35)),(w,h),255,-1)
    hls=cv2.cvtColor(first_frame_bgr,cv2.COLOR_BGR2HLS)
    white=cv2.inRange(hls,(0,200,0),(255,255,80))
    kernel=cv2.getStructuringElement(cv2.MORPH_RECT,(9,9))
    tophat=cv2.morphologyEx(cv2.cvtColor(first_frame_bgr,cv2.COLOR_BGR2GRAY),cv2.MORPH_TOPHAT,kernel)
    white=cv2.bitwise_or(white, cv2.threshold(tophat,30,255,cv2.THRESH_BINARY)[1])
    white=cv2.bitwise_and(white,mask)
    white=cv2.GaussianBlur(white,(5,5),0)
    edges=cv2.Canny(white,80,160,L2gradient=True)
    lp=cv2.HoughLinesP(edges,1,np.pi/180,threshold=80,minLineLength=MIN_SEG_LEN,maxLineGap=20)
    if lp is None: return [], white
    seg=[]
    for x1,y1,x2,y2 in lp[:,0,:]:
        p1=(int(x1),int(y1)); p2=(int(x2),int(y2))
        if segment_length(p1,p2)<MIN_SEG_LEN: continue
        ang=abs(line_angle_deg(p1,p2))
        if min(abs(ang-0),abs(ang-180))<ANGLE_EXCLUDE_DEG: continue
        seg.append((p1,p2))
    seg=merge_collinear_segments(seg)
    if not seg: return [], white
    lines=[]; nid=0
    for p1,p2 in seg:
        kind=classify_segment_solid_or_dashed(white/255.0,p1,p2)
        lines.append(Line(id=nid,p1=p1,p2=p2,kind=kind)); nid+=1
    solid_ids=[i for i,l in enumerate(lines) if l.kind=='solid']; paired=set()
    for i in solid_ids:
        li=lines[i]
        for j in solid_ids:
            if j<=i: continue
            lj=lines[j]
            a1=line_angle_deg(li.p1,li.p2); a2=line_angle_deg(lj.p1,lj.p2)
            if min(abs(a1-a2),180-abs(a1-a2))>6: continue
            def pld(p,a,b):
                den=segment_length(a,b); return abs(point_side_of_line(a,b,p))/(den if den>0 else 1e9)
            d=np.mean([pld(li.p1,lj.p1,lj.p2),pld(li.p2,lj.p1,lj.p2),pld(lj.p1,li.p1,li.p2),pld(lj.p2,li.p1,li.p2)])
            if DOUBLE_SOLID_MIN_DIST<=d<=DOUBLE_SOLID_MAX_DIST: paired.add(i); paired.add(j)
    for idx in paired: lines[idx].kind='double_solid'
    restricted=[l for l in lines if l.kind in ('solid','double_solid')]
    for k,l in enumerate(restricted): l.id=k
    return restricted, white

# 4) Plate detector + OCR (Nepali + English)
_lp_model=None; _reader=None
def load_plate_model():
    global _lp_model
    if _lp_model is not None: return _lp_model
    if PLATE_MODEL_PATH and os.path.exists(PLATE_MODEL_PATH):
        try:
            _lp_model=YOLO(PLATE_MODEL_PATH); return _lp_model
        except Exception as e:
            print("Plate model load error:",e)
    return None

def get_ocr_reader():
    global _reader
    if _reader is None: _reader=easyocr.Reader(OCR_LANGS, gpu=False)
    return _reader

def detect_plate_boxes_in_vehicle(veh_bgr:np.ndarray):
    m=load_plate_model()
    if m is None: return []
    out=[]
    for r in m.predict(source=veh_bgr, stream=False, verbose=False):
        if r.boxes is None: continue
        for b in r.boxes:
            x1,y1,x2,y2=b.xyxy[0].cpu().numpy().astype(int).tolist()
            conf=float(b.conf[0].cpu().item()) if hasattr(b,'conf') else 0.0
            out.append((x1,y1,x2,y2,conf))
    out.sort(key=lambda t:t[-1], reverse=True)
    return out

def ocr_plate_image(img):
    gray=cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) if img.ndim==3 else img
    gray=cv2.bilateralFilter(gray,7,50,50)
    gray=cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_MEAN_C,cv2.THRESH_BINARY,31,5)
    res=get_ocr_reader().readtext(gray)
    best_raw,best_conf="",-1.0
    for item in res:
        try: _,_,text,conf=item
        except: text,conf=item[1],item[2]
        raw=text.strip()
        if len(nepali_to_ascii(raw))>=PLATE_MIN_LEN and conf>best_conf:
            best_raw,best_conf=raw,float(conf)
    if best_raw=="":  # fallback to any reasonably confident text
        for item in res:
            try: _,_,text,conf=item
            except: text,conf=item[1],item[2]
            if float(conf)>best_conf:
                best_raw,best_conf=text.strip(),float(conf)
    return best_raw, max(0.0,best_conf)

def read_plate_from_vehicle_crop(veh_bgr:np.ndarray):
    boxes=detect_plate_boxes_in_vehicle(veh_bgr)
    best_raw,best_ascii,best_conf="","",0.0
    for (x1,y1,x2,y2,_) in boxes[:3]:
        x1=max(0,x1); y1=max(0,y1); x2=min(veh_bgr.shape[1]-1,x2); y2=min(veh_bgr.shape[0]-1,y2)
        raw,conf=ocr_plate_image(veh_bgr[y1:y2,x1:x2]); asc=nepali_to_ascii(raw)
        if conf>best_conf and len(asc)>=PLATE_MIN_LEN: best_raw,best_ascii,best_conf=raw,asc,conf
    if not best_raw:
        h,w=veh_bgr.shape[:2]
        y1=int(h*0.55); y2=int(h*0.95); x1=int(w*0.15); x2=int(w*0.85)
        raw,conf=ocr_plate_image(veh_bgr[y1:y2,x1:x2]); best_raw,best_ascii,best_conf=raw,nepali_to_ascii(raw),conf
    return best_raw or "UNKNOWN", best_ascii or "UNKNOWN", best_conf

# 5) Violation engine
def render_restricted_lines(img, lines:List[Line]):
    for l in lines:
        if l.kind=='double_solid':
            p1,p2=np.array(l.p1),np.array(l.p2); v=p2-p1
            n=np.array([-v[1],v[0]],dtype=np.float32); n=n/(np.linalg.norm(n)+1e-6)
            off=int(max(3,min(8,DOUBLE_SOLID_MIN_DIST//2)))
            p1a=tuple((p1+n*off).astype(int)); p2a=tuple((p2+n*off).astype(int))
            p1b=tuple((p1-n*off).astype(int)); p2b=tuple((p2-n*off).astype(int))
            draw_polyline(img,p1a,p2a,(0,0,255),LINE_RENDER_THICKNESS)
            draw_polyline(img,p1b,p2b,(0,0,255),LINE_RENDER_THICKNESS)
        else:
            draw_polyline(img,l.p1,l.p2,(0,255,255),LINE_RENDER_THICKNESS)

def detect_violations_for_tracks(frame_idx:int, tracks, lines:List[Line],
                                 prev_sides:Dict[int,Dict[int,float]], seen:Dict[Tuple[int,int],bool],
                                 fps:float, canvas, plates_dir:str, known_plate:Dict[int,Tuple[str,str]]):
    vlist=[]
    H,W=canvas.shape[:2]
    for tid,(x1,y1,x2,y2) in tracks:
        x1=max(0,x1); y1=max(0,y1); x2=min(W-1,x2); y2=min(H-1,y2)
        cx=(x1+x2)//2; cy=y2
        if tid not in prev_sides: prev_sides[tid]={}
        for l in lines:
            side=point_side_of_line(l.p1,l.p2,(cx,cy))
            prev=prev_sides[tid].get(l.id); prev_sides[tid][l.id]=side
            if prev is None or side==0: continue
            if np.sign(prev)!=np.sign(side):
                key=(tid,l.id)
                if seen.get(key,False): continue
                seen[key]=True
                vtype='double_solid' if l.kind=='double_solid' else 'single_solid'
                if tid in known_plate: raw,asc=known_plate[tid]
                else:
                    crop=canvas[y1:y2,x1:x2]; raw,asc,_=read_plate_from_vehicle_crop(crop)
                    known_plate[tid]=(raw,asc)
                    cv2.imwrite(os.path.join(plates_dir,f"{safe_text(asc) or 'UNKNOWN'}_track{tid}_f{frame_idx}.jpg"),crop)
                ms=int((frame_idx/max(1.0,fps))*1000.0)
                v=Violation(frame_idx,ms,tid,raw,asc,vtype,(x1,y1,x2,y2)); vlist.append(v)
                cv2.rectangle(canvas,(x1,y1),(x2,y2),(0,0,255),4)
                cv2.putText(canvas,f"VIOLATION: {vtype.upper()}",(x1,max(30,y1-10)),cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,0,255),2,cv2.LINE_AA)
                if asc!="UNKNOWN":
                    cv2.putText(canvas,f"PLATE: {asc}",(x1,min(H-10,y2+20)),cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,0,255),2,cv2.LINE_AA)
        cv2.rectangle(canvas,(x1,y1),(x2,y2),(0,255,0),2)
        label=f"id:{tid}";
        if tid in known_plate and known_plate[tid][1]!="UNKNOWN": label+=f" | {known_plate[tid][1]}"
        cv2.putText(canvas,label,(x1,max(15,y1-5)),cv2.FONT_HERSHEY_SIMPLEX,0.6,(255,255,255),2,cv2.LINE_AA)
        if DRAW_TRACK_CENTERS: cv2.circle(canvas,(cx,cy),3,(255,255,255),-1)
    return vlist

def process_video(video_path:str, out_dir:str, frame_stride:int=FRAME_STRIDE):
    assert os.path.exists(video_path), f"Video not found: {video_path}"
    ensure_dir(out_dir); plates_dir=os.path.join(out_dir,"plates"); ensure_dir(plates_dir)
    cap=cv2.VideoCapture(video_path);
    if not cap.isOpened(): raise RuntimeError("Cannot open video.")
    fps=cap.get(cv2.CAP_PROP_FPS) or 30.0
    W=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    # writer
    fourccs=[cv2.VideoWriter_fourcc(*c) for c in ('avc1','H264','mp4v','MJPG','XVID')]
    vw=None
    for cc in fourccs:
        vw=cv2.VideoWriter(os.path.join(out_dir,"annotated.mp4"),cc,fps/max(1,frame_stride),(W,H))
        if vw.isOpened(): break
    if not vw or not vw.isOpened(): raise RuntimeError("Failed to create VideoWriter.")
    # lanes
    ok,first=cap.read();
    if not ok: raise RuntimeError("Empty video.")
    lines,_=detect_lane_lines(first)
    # preview
    prev=first.copy(); render_restricted_lines(prev,lines)
    for l in lines: cv2.putText(prev,l.kind,l.p1,cv2.FONT_HERSHEY_SIMPLEX,0.6,(255,255,255),2,cv2.LINE_AA)
    cv2.imwrite(os.path.join(out_dir,"lane_preview.jpg"),prev)
    cap.release()
    # tracking + processing
    model=YOLO('yolov8n.pt')
    stream=model.track(source=video_path, stream=True, classes=CLASSES_VEHICLES,
                       persist=True, tracker='bytetrack.yaml', vid_stride=frame_stride)
    csv_path=os.path.join(out_dir,"violations.csv")
    with open(csv_path,"w",newline="",encoding="utf-8") as fcsv:
        writer=csv.writer(fcsv)
        writer.writerow(["video_time","frame","track_id","plate_raw","plate_ascii","violation_type","x1","y1","x2","y2"])
        prev_sides:Dict[int,Dict[int,float]]={}; seen:Dict[Tuple[int,int],bool]={}; known:Dict[int,Tuple[str,str]]={}
        frame_idx=-1
        for res in stream:
            frame_idx+=frame_stride
            frame=res.orig_img
            if frame is None: continue
            if REDETECT_LANES_EVERY and frame_idx%REDETECT_LANES_EVERY==0:
                try: lines,_=detect_lane_lines(frame)
                except: pass
            b=res.boxes
            ids=b.id.cpu().numpy().astype(int) if b.id is not None else np.arange(len(b))
            xyxy=b.xyxy.cpu().numpy().astype(int)
            tracks=[(tid,(int(x1),int(y1),int(x2),int(y2))) for tid,(x1,y1,x2,y2) in zip(ids,xyxy)]
            canvas=frame.copy(); render_restricted_lines(canvas,lines)
            vios=detect_violations_for_tracks(frame_idx,tracks,lines,prev_sides,seen,fps,canvas,plates_dir,known)
            for v in vios:
                x1,y1,x2,y2=v.bbox
                writer.writerow([ts_hhmmss_ms(v.ms),v.frame_idx,v.track_id,v.plate_raw,v.plate_ascii,v.vtype,x1,y1,x2,y2])
            vw.write(canvas)
    vw.release()
    print("✅ Done. Outputs in:", out_dir)
    print("- annotated.mp4")
    print("- violations.csv")
    print("- lane_preview.jpg")
    print("- plates/ (crops)")

# 6) Upload + run (no other cells needed)
try:
    from google.colab import files  # type: ignore
    print("📤 Upload your road video (choose your local input.mp4). If already uploaded, you can Cancel.")
    uploaded=files.upload()
    if uploaded: print("Uploaded:", list(uploaded.keys())[0])
except Exception:
    pass

def _resolve(prefer="input.mp4"):
    if os.path.exists(prefer): return prefer
    stem,ext=os.path.splitext(prefer)
    cands=sorted(glob.glob(f"{stem}*{ext}"), key=os.path.getmtime, reverse=True)
    if cands: print("Using detected file:", cands[0]); return cands[0]
    raise FileNotFoundError(f"Could not find {prefer}. Found: {os.listdir('.')[:15]}")

video_path=_resolve("input.mp4")
out_dir="./outputs"; ensure_dir(out_dir)
print("▶️ Running on:", video_path, " → out:", out_dir)
process_video(video_path, out_dir, frame_stride=FRAME_STRIDE)


  miss=[p for p in pkgs if not pkgutil.find_loader(p.split("==")[0])]


📤 Upload your road video (choose your local input.mp4). If already uploaded, you can Cancel.


Saving input.mp4 to input.mp4
Uploaded: input.mp4
▶️ Running on: input.mp4  → out: ./outputs
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:00<00:00, 403MB/s]


[31m[1mrequirements:[0m Ultralytics requirement ['lapx>=0.5.2'] not found, attempting AutoUpdate...
Collecting lapx>=0.5.2
  Downloading lapx-0.9.2-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (18 kB)
Downloading lapx-0.9.2-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (2.0 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.0/2.0 MB 115.5 MB/s eta 0:00:00
Installing collected packages: lapx
Successfully installed lapx-0.9.2

[31m[1mrequirements:[0m AutoUpdate success ✅ 3.5s, installed 1 package: ['lapx>=0.5.2']
[31m[1mrequirements:[0m ⚠️ [1mRestart runtime or rerun command for updates to take effect[0m


video 1/1 (frame 1/183) /content/input.mp4: 640x384 (no detections), 71.4ms
video 1/1 (frame 2/183) /content/input.mp4: 640x384 (no detections), 6.4ms
video 1/1 (frame 3/183) /content/input.mp4: 640x384 (no detections), 6.1ms
video 1/1 (frame 4/183) /content/input.mp4: 640x384 (no detections), 6.4ms
video 1/1 (frame 5/183) /content/input.

In [None]:
import os, glob
base = "./outputs"
if not os.path.exists(base):
    print("outputs/ folder not found yet. Run your processing cell (block 9) first.")
else:
    files = sorted(glob.glob(os.path.join(base, "**/*"), recursive=True))
    if not files:
        print("outputs/ is empty.")
    for p in files:
        if os.path.isfile(p):
            print(p, f"({os.path.getsize(p)/1_000_000:.2f} MB)")


./outputs/annotated.mp4 (4.20 MB)
./outputs/lane_preview.jpg (0.10 MB)
./outputs/violations.csv (0.00 MB)


In [None]:
from IPython.display import Video
Video("./outputs/annotated.mp4", embed=True)



In [None]:
import pandas as pd, os
csv_path = "./outputs/violations.csv"
if not os.path.exists(csv_path):
    print("violations.csv not found. Did block 9 finish?")
else:
    df = pd.read_csv(csv_path)
    print("Rows, Columns:", df.shape)
    df.head(20)


Rows, Columns: (0, 10)


In [None]:
import os, glob, cv2, math
import matplotlib.pyplot as plt

plate_dir = "./outputs/plates"
if not os.path.exists(plate_dir):
    print("No plates/ folder yet. Run block 9.")
else:
    paths = sorted(glob.glob(os.path.join(plate_dir, "*.jpg")))[:12]
    if not paths:
        print("No plate crops saved.")
    else:
        cols = 4
        rows = math.ceil(len(paths)/cols)
        plt.figure(figsize=(12, 3*rows))
        for i, p in enumerate(paths, 1):
            img = cv2.imread(p)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            plt.subplot(rows, cols, i)
            plt.imshow(img)
            plt.title(os.path.basename(p), fontsize=8)
            plt.axis('off')
        plt.tight_layout()
        plt.show()


No plate crops saved.


In [None]:
from google.colab import files
files.download("./outputs/annotated.mp4")     # downloads the video
# files.download("./outputs/violations.csv")  # (optional) downloads the CSV


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# =========================================
# [10] TESTS + SYNTHETIC VIDEO GENERATOR  #@title Run to validate logic
# =========================================
# Generates a synthetic clip: lanes (solid+double+~dashed) + moving boxes. Validates crossings.

def synth_draw_line(img, p1, p2, kind='solid'):
    if kind=='double_solid':
        p1=np.array(p1); p2=np.array(p2); v=p2-p1
        n=np.array([-v[1], v[0]], dtype=np.float32); n=n/(np.linalg.norm(n)+1e-6)
        for sgn in (-1,1):
            a = tuple((p1+sgn*n*6).astype(int)); b = tuple((p2+sgn*n*6).astype(int))
            cv2.line(img, a, b, (255,255,255), 6, cv2.LINE_AA)
    elif kind=='solid':
        cv2.line(img, p1, p2, (255,255,255), 6, cv2.LINE_AA)
    else:
        # dashed: draw dashes
        N = 20
        for i in range(N):
            t1 = i/N; t2 = min(1,(i+0.5)/N)
            a = (int(p1[0]*(1-t1)+p2[0]*t1), int(p1[1]*(1-t1)+p2[1]*t1))
            b = (int(p1[0]*(1-t2)+p2[0]*t2), int(p1[1]*(1-t2)+p2[1]*t2))
            cv2.line(img, a, b, (255,255,255), 6, cv2.LINE_AA)

def make_synth_video(path="./synthetic.mp4", W=960, H=540, T=120, fps=30):
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    vw = cv2.VideoWriter(path, fourcc, fps, (W,H))
    # lanes
    solid = ((int(W*0.33), int(H*0.2)), (int(W*0.4), int(H*0.95)))
    dashed = ((int(W*0.5), int(H*0.2)), (int(W*0.52), int(H*0.95)))
    dbl = ((int(W*0.65), int(H*0.2)), (int(W*0.7), int(H*0.95)))
    # moving vehicles (rectangles) crossing:
    # v1 crosses SOLID → violation
    # v2 crosses DASHED → allowed
    # v3 crosses DOUBLE → violation
    for t in range(T):
        img = np.zeros((H,W,3), np.uint8)
        synth_draw_line(img, *solid, kind='solid')
        synth_draw_line(img, *dashed, kind='dashed')
        synth_draw_line(img, *dbl, kind='double_solid')
        # vehicles as colored boxes
        y = int(H*0.75)
        # v1 moves left→right crossing SOLID around frame 40
        x1 = int(W*0.25 + (t-40)*3)
        cv2.rectangle(img, (x1, y-40), (x1+70, y+20), (0,200,0), -1)
        # v2 crosses DASHED around frame 60 (should NOT flag)
        x2 = int(W*0.48 + (t-60)*3)
        cv2.rectangle(img, (x2, y-50), (x2+70, y+10), (200,200,0), -1)
        # v3 crosses DOUBLE around frame 80
        x3 = int(W*0.63 + (t-80)*3)
        cv2.rectangle(img, (x3, y-55), (x3+70, y+5), (0,128,255), -1)
        vw.write(img)
    vw.release()
    return path

def run_synth_test():
    vid = make_synth_video()
    cap = cv2.VideoCapture(vid)
    ok, first = cap.read()
    assert ok, "Synthetic read failed."
    lines, _ = detect_lane_lines(first)
    # Guarantee types present for test; if detector misses, force lines.
    kinds = [l.kind for l in lines]
    if 'solid' not in kinds:
        lines.append(Line(id=len(lines), p1=(320,120), p2=(384,500), kind='solid'))
    if 'double_solid' not in kinds:
        lines.append(Line(id=len(lines), p1=(624,120), p2=(672,500), kind='double_solid'))
    # dashed ignored; not required for violation list

    # Simulate "tracking" from boxes we drew (no YOLO)
    fps = 30.0
    prev_sides: Dict[int, Dict[int, float]] = {}
    seen: Dict[Tuple[int,int], bool] = {}
    known_plate: Dict[int, Tuple[str,str]] = {}
    plates_dir = "./outputs_synth/plates"; ensure_dir(plates_dir)
    out_dir = "./outputs_synth"; ensure_dir(out_dir)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    vw = cv2.VideoWriter(os.path.join(out_dir,"annotated.mp4"), fourcc, fps, (first.shape[1], first.shape[0]))
    frame_idx=0
    v1_id, v2_id, v3_id = 101, 102, 103
    got_v1 = got_v3 = False
    got_v2 = False

    while True:
        ok, frame = cap.read()
        if not ok: break
        canvas = frame.copy()
        render_restricted_lines(canvas, lines)
        H,W = frame.shape[:2]
        y = int(H*0.75)
        x1 = int(W*0.25 + (frame_idx-40)*3)
        x2 = int(W*0.48 + (frame_idx-60)*3)
        x3 = int(W*0.63 + (frame_idx-80)*3)
        tracks = [
            (v1_id, (x1, y-40, x1+70, y+20)),
            (v2_id, (x2, y-50, x2+70, y+10)),
            (v3_id, (x3, y-55, x3+70, y+5)),
        ]
        vio = detect_violations_for_tracks(frame_idx, tracks, lines, prev_sides, seen, fps,
                                           canvas, plates_dir, known_plate)
        for v in vio:
            if v.track_id==v1_id and v.vtype=='single_solid': got_v1=True
            if v.track_id==v2_id: got_v2=True
            if v.track_id==v3_id and v.vtype=='double_solid': got_v3=True
        vw.write(canvas)
        frame_idx += 1

    vw.release(); cap.release()
    assert got_v1, "Expected solid-line violation not detected."
    assert not got_v2, "Dashed-line crossing was incorrectly flagged."
    assert got_v3, "Expected double-solid violation not detected."
    print("✅ Synthetic test passed: solid & double-solid violations detected; dashed ignored.")
    print("Outputs:", os.path.join(out_dir,"annotated.mp4"))

# Run test
run_synth_test()
print("Section [10] OK.")




Progress: |██████████████████████████████████████████████████| 100.0% Complete



Progress: |██████████████████████████████████████████████████| 100.0% Complete

AssertionError: Dashed-line crossing was incorrectly flagged.

In [None]:
# =========================================
# [11] OPTIONAL: FIRST-FRAME PREVIEW  #@title
# =========================================
def preview_lanes(video_path: str, save_to: str = "./lane_preview_debug.jpg"):
    cap = cv2.VideoCapture(video_path)
    ok, frame = cap.read(); cap.release()
    if not ok:
        raise RuntimeError("Cannot read first frame.")
    lines, _ = detect_lane_lines(frame)
    vis = frame.copy()
    render_restricted_lines(vis, lines)
    for l in lines:
        cv2.putText(vis, l.kind, l.p1, cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)
    cv2.imwrite(save_to, vis)
    print("Saved:", save_to)

print("Section [11] OK.")
