# Lab 3.2 — Full Lane-Keeping Pipeline (Perception → Geometry → Control → Overlay)

This lab integrates all components built across previous labs:

#### From Lab 1 (Perception)
- Deep-learning lane mask extraction (YOLO / PIDNet / TwinLite / BiSeNet)


#### From Lab 2 (Geometry)
- ROI filtering
- Morphological refinement
- Bird’s-Eye View (BEV) projection

#### From Lab 3.1 (Lane Geometry)
- center_x() to detect lane center
- heading_deg_at_ratio() from two vertical samples
- meters_per_pixel() to convert px → meters
- Multi-ratio sampling (r = 0.98, 0.92, 0.82, 0.72)
- Adaptive ratio selection based on stability


### In this Lab you will:
1. Implement a simplified steering controller.
2. Convert lane geometry → LEFT / RIGHT / STRAIGHT.
3. Draw real-time overlay on video frames.
4. Run the entire lane-keeping pipeline on a real video input.


### Environment Setup

This lab requires several computer vision and numerical computation libraries.
We will install and verify all dependencies before running the geometry and
control pipeline.

**Required Libraries**
- **numpy** — matrix operations  
- **opencv-python** — image/video processing  
- **matplotlib** — visualization  
- **torch**  — load deep learning lane models  
- **ultralytics** (optional) — YOLO-based lane/segmentation models  

Make sure the runtime has access to GPU if you intend to run deep-learning
backends. CPU-only mode is still acceptable for this lab.


In [58]:
!pip install numpy opencv-python matplotlib ultralytics --quiet
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 --quiet

print("All dependencies installed successfully.")


All dependencies installed successfully.


### Import Libraries

We import all standard libraries used throughout the lab:

- numpy for numerical computation  
- cv2 for image processing  
- matplotlib for visualization  
- Optional: torch + ultralytics for segmentation backend  


In [59]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch
from ultralytics import YOLO


### Version Check

It is important to confirm that:
- OpenCV is correctly installed  
- Torch detects CUDA (optional)  


In [60]:
print("OpenCV version:", cv2.__version__)
print("Torch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())


OpenCV version: 4.12.0
Torch version: 2.7.1+cu118
CUDA available: True


### 1. Perception Module: YOLOv8 Backend

> **Note:** This module was fully implemented in **Lab 1**. In this Lab 3, we will treat it as a **"Black Box" sensor** that provides us with the binary lane mask.

We utilize the `YoloV8Backend` class to:
1.  Load the pre-trained weights (`best.pt`).
2.  Perform inference on each video frame.
3.  Return a binary segmentation mask (`0` for background, `1` for lane).

**Action:** Just **run the cell below** to initialize the backend class. No coding is required here.

In [61]:
class YoloV8Backend:
    """
    Lightweight wrapper for YOLOv8 segmentation → binary lane mask.
    Matches real project structure but simplified for lab environment.
    """
    def __init__(self, weights, device="cuda", imgsz=640, conf=0.18):
        self.model = YOLO(weights)
        self.device = device
        self.imgsz = imgsz
        self.conf = conf
        self.fp16 = (torch.cuda.is_available() and device != "cpu")
        self.model.to(device)
        self.model.fuse()

    def infer_mask01(self, frame_bgr):
        H, W = frame_bgr.shape[:2]
        res = self.model.predict(frame_bgr, imgsz=self.imgsz, conf=self.conf,
                                 device=self.device, verbose=False, half=self.fp16)
        r0 = res[0]
        if r0.masks is None:
            return np.zeros((H, W), np.uint8)

        mk = r0.masks.data.cpu().numpy().astype(np.uint8)
        merged = np.zeros((H, W), np.uint8)
        for k in range(mk.shape[0]):
            merged = cv2.bitwise_or(merged, cv2.resize(mk[k], (W, H)))

        return merged


In [62]:
backend = YoloV8Backend(r"C:\Users\admin\ACE_Finalv4\AI\LaneDetection\Lane_weight\Yolo_v8\best.pt")
print("Backend loaded:", backend.name() if hasattr(backend, "name") else "YOLOv8")

YOLOv8n-seg summary (fused): 85 layers, 3,258,259 parameters, 0 gradients, 11.3 GFLOPs
Backend loaded: YOLOv8


### 2. Preprocessing Module: ROI & BEV (Recap from Lab 2)

 **Context:** In **Lab 2**, we built a robust preprocessing pipeline to clean the segmentation mask and transform it into Bird's-Eye-View (BEV). We will reuse these utilities here.

This module includes:
1.  **apply_roi**: Crops the region of interest (removes sky/background).
2.  **refine_mask01**: Applies morphological operations (Opening/Closing) to reduce noise and fill gaps.
3.  **BEVProjector**: A class that manages the Homography matrix to warp the image from *Perspective View* to *Top-Down View*.

**Action:** Run the cell below to define these helper functions.



In [63]:
def apply_roi(mask, poly):
    H, W = mask.shape
    pts = np.array([(int(x*W), int(y*H)) for x,y in poly], dtype=np.int32)
    roi = np.zeros_like(mask)
    cv2.fillPoly(roi, [pts], 1)
    return mask * roi

def refine_mask01(mask):
    k = np.ones((5,5), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k)
    return mask

class BEVProjector:
    def __init__(self):
        self.src = ((0.20, 0.58),(0.10, 0.90),(0.90, 0.90),(0.80, 0.58))
        self.dst = ((0.25, 0.00),(0.25, 1.00),(0.75, 1.00),(0.75, 0.00))
        self.M = None
        self.M_inv = None
        self._wh = None

    def _ensure(self, W, H):
        if self._wh == (W,H): return
        src = np.float32([(x*W,y*H) for x,y in self.src])
        dst = np.float32([(x*W,y*H) for x,y in self.dst])
        self.M = cv2.getPerspectiveTransform(src, dst)
        self.M_inv = cv2.getPerspectiveTransform(dst, src)
        self._wh = (W,H)

    def warp(self, mask):
        H,W = mask.shape
        self._ensure(W,H)
        bev = cv2.warpPerspective(mask*255, self.M, (W,H), cv2.INTER_NEAREST)
        return (bev>0).astype(np.uint8)


### Geometry Math ( Recap from Lab 3.1)

In this section, we re-define all lane-geometry functions required by the full pipeline.  
Although these functions were implemented in Lab 3.1, Jupyter notebooks require them to be declared again inside Lab 3.2.

This block provides:
- center_x() — extracts lane center at a given BEV sampling ratio  
- heading_deg_at_ratio() — computes heading angle using two vertical samples  
- multi_ratio_measure() — evaluates multiple ratios (0.98, 0.92, 0.82, 0.72)  
- adaptive_select_solution() — selects the ratio with the most stable geometry estimate  

These functions convert the refined BEV mask into real-valued geometric quantities used by the controller.


In [64]:
def center_x(bev, r):
    H, W = bev.shape
    y = int(r * H)
    xs = np.where(bev[y] > 0)[0]
    if len(xs) < 2:
        return np.nan
    return 0.5 * (xs[0] + xs[-1])

def heading_deg_at_ratio(bev, r, dy=30):
    H, W = bev.shape
    y = int(r * H)
    y2 = max(0, y - dy)

    xs1 = np.where(bev[y] > 0)[0]
    xs2 = np.where(bev[y2] > 0)[0]

    if len(xs1)<2 or len(xs2)<2:
        return np.nan, np.nan

    cx1 = 0.5*(xs1[0]+xs1[-1])
    cx2 = 0.5*(xs2[0]+xs2[-1])
    dx = cx1 - cx2
    dy = (y - y2)

    ang = np.degrees(np.arctan2(dx, dy + 1e-6))
    return ang, cx1

def multi_ratio_measure(bev, ratios, dy_px=30, lane_width_m=0.20):
    H, W = bev.shape
    mpp = lane_width_m / 20.0  

    centers, pos_list, head_list = [], [], []
    for r in ratios:
        head, cx = heading_deg_at_ratio(bev, r, dy_px)
        pos_m = (W/2 - cx) * mpp
        centers.append(cx)
        pos_list.append(pos_m)
        head_list.append(head)

    return centers, pos_list, head_list


In [65]:
def adaptive_select(centers, pos_list, head_list, ratios,
                             W_STAB=2.0, W_HEAD=0.1, W_LAT=0.05):
    best_i = 0
    best_score = -1e9

    for i,r in enumerate(ratios):
        cx = centers[i]
        if np.isnan(cx): continue

        std_pos = abs(pos_list[i])
        mean_head = abs(head_list[i])
        score = (W_STAB/(std_pos+1e-6) - W_HEAD*mean_head - W_LAT*(1-r))

        if score > best_score:
            best_score = score
            best_i = i

    return {
        "best_ratio": ratios[best_i],
        "pos": pos_list[best_i],
        "head": head_list[best_i]
    }


### Controller

The controller converts lane geometry into a steering command used to keep the vehicle centered in the lane.  
It combines two types of geometric information:

1. **Lateral offset** (pos_m):  
   How far the vehicle is from the lane center (in meters).  
   - If pos_m > 0 → vehicle is to the RIGHT → must steer LEFT  
   - If pos_m < 0 → vehicle is to the LEFT → must steer RIGHT  

2. **Heading angle** (head_deg):  
   Orientation of the lane relative to the vehicle’s forward direction.  
   - Positive angle → lane bends left  
   - Negative angle → lane bends right  

To correct the vehicle trajectory, we apply a linear control formula that blends these two signals:


#### **Control law**

$$
\text{steer} = k_{\text{pos}} \cdot \text{pos}_m \;+\; k_{\text{head}} \cdot \text{head}_{deg}
$$

#### Meaning of each term:

- $( k_{\text{pos}} \cdot \text{pos}_m $):  
  Corrects how far the vehicle is off-center → **primary steering contribution**.

- $( k_{\text{head}} \cdot \text{head}_{deg} $):  
  Anticipates lane curvature by adjusting steering earlier → **stabilizes turns**.

The gains:
- $( k_{\text{pos}} $): how strongly the controller reacts to lateral offset  
- $( k_{\text{head}} $): how strongly it reacts to lane curvature  

These values determine how “aggressive” or “smooth” steering is.

The final steering output is constrained to stay within hardware limits:

$$
\text{steer} \in [-50, 50]
$$


#### Steering label (for visualization)

To help interpret the numeric steering output, we convert the value into one of three labels:

- **LEFT** → steer > threshold  
- **RIGHT** → steer < −threshold  
- **STRAIGHT** → the steering is small enough to treat as no significant turn  

This classification is for visualization only and does not affect control.


In [None]:
def controller(pos_m, head_deg, k_pos=40.0, k_head=1.5):
    steer = k_pos * pos_m + k_head * head_deg
    steer = np.clip(steer, -50, 50)
    return float(steer)

def steering_label(steer, thresh=3.0):
    if steer > thresh:
        return "LEFT"
    elif steer < -thresh:
        return "RIGHT"
    else:
        return "STRAIGHT"

    

### Draw Overlay

This function visualizes all lane-geometry outputs on top of the original camera frame.  
It provides an intuitive way to understand what the pipeline is doing internally at each step.

The overlay includes:

#### **1. BEV Mask Projected Back to Camera View**
The binary BEV mask (top-down view) is inverse-warped using the inverse homography $( M^{-1} $)  
so the detected drivable lane region appears in the camera image.  
This helps verify whether the BEV transformation and mask refinement are correct.

#### **2. Multi-Ratio Lane Centers**
For each sampling ratio (e.g., 0.98 → 0.92 → 0.82 → 0.72):

- The center point detected in BEV coordinates is mapped back into camera coordinates.
- All ratio points are drawn:
  - **Primary ratio** (ratio with best stability score) → RED dot  
  - **Other valid ratios** → YELLOW dots  

This makes it easy to inspect:
- whether lane center detection is stable across ratios  
- whether adaptive ratio selection is working correctly  

#### **3. On-Screen Debug Information**
A small info panel is drawn in the top-left corner showing:

- Lateral offset in meters  
- Heading angle in degrees  
- Steering direction label (“LEFT”, “RIGHT”, “STRAIGHT”)  

These values help correlate the visual overlay with the numeric geometry and controller output.

#### **Purpose**
This visualization step is essential for debugging:
- segmentation errors  
- BEV misalignment  
- geometry instability  
- ratio selection  
- controller behavior  

The function returns an annotated frame that can be displayed or saved.


In [67]:
def draw_overlay(frame_bgr, bev01, M_inv, ratios, centers_px, primary_idx, pos_m, head_deg, dir_label):
    draw = frame_bgr.copy()
    H, W = draw.shape[:2]
    # ============================
    # (1) Warp BEV mask về camera
    # ============================
    if bev01 is not None and M_inv is not None:
        bev_warp = cv2.warpPerspective(
            (bev01 * 255).astype(np.uint8),
            M_inv, (W, H),
            flags=cv2.INTER_NEAREST
        )
        green = np.zeros_like(draw)
        green[bev_warp > 0] = (0, 180, 0)
        draw = cv2.addWeighted(draw, 1.0, green, 0.8, 0)

    # ============================
    # (2) Chuyển điểm BEV → camera
    # ============================
    pts_bev = []
    for r, cx in zip(ratios, centers_px):
        if np.isnan(cx):
            pts_bev.append([np.nan, np.nan])
        else:
            y_bev = r * bev01.shape[0]
            pts_bev.append([cx, y_bev])

    valid_pts = np.array(
        [[px, py] for px, py in pts_bev if not np.isnan(px)],
        dtype=np.float32
    ).reshape(-1, 1, 2)

    if valid_pts.shape[0] > 0:
        pts_cam = cv2.perspectiveTransform(valid_pts, M_inv)
    else:
        pts_cam = []

    # ============================
    # (3) Vẽ các điểm ratio
    # ============================
    cam_i = 0
    for i, (r, cx) in enumerate(zip(ratios, centers_px)):
        if np.isnan(cx):
            continue
        px, py = pts_cam[cam_i][0]
        cam_i += 1
        if i == primary_idx:
            cv2.circle(draw, (int(px), int(py)), 5, (0,0,255), -1)   # đỏ
        else:
            cv2.circle(draw, (int(px), int(py)), 3, (0,255,255), -1) # vàng

    # ============================
    # (4) Ô thông góc, hướng
    # ============================

    font = cv2.FONT_HERSHEY_SIMPLEX
    cv2.putText(draw, f"Pos: {pos_m:+.3f} m", (15,20), font, 0.45, (0,255,255), 2)
    cv2.putText(draw, f"Head: {head_deg:+.2f} deg", (15,40), font, 0.55, (0,255,255), 2)
    color_dir = (0,255,0) if dir_label=="STRAIGHT" else (0,0,255)
    cv2.putText(draw, f"DIR: {dir_label}", (15,60), font, 0.65, color_dir, 2)
    return draw


### Full Pipeline Execution (Video Processing)

In this final task, you will run the **entire lane-keeping pipeline** on a recorded driving video.
This integrates every module developed across the previous labs:

- **Perception (Lab 1):** lane mask extraction  
- **Geometry (Lab 2):** ROI → refinement → BEV projection  
- **Lane estimation (Lab 3.1):** multi-ratio sampling + adaptive selection  
- **Control (Lab 3.2):** lateral position + heading → steering command  
- **Visualization:** overlaying BEV, sampling points, and steering information  

The loop below performs all of these operations frame-by-frame:

1. Load a video from disk  
2. Apply segmentation and ROI cleaning  
3. Project the mask to BEV  
4. Compute geometry at multiple ratios  
5. Select the most stable ratio  
6. Generate a steering command  
7. Draw an overlay for debugging  
8. Save the processed frames into an output video


### Step 1 — Load Input Video

In this step, we specify the path to the driving video that will be processed by the full lane-keeping pipeline.  
The cell includes:

- a user-editable `video_path`
- automatic file existence checking
- a fallback input prompt if the file is missing
- initialization of the OpenCV video capture object

This ensures that the pipeline always starts with a valid input video and provides clear feedback to the learner.


In [68]:
video_path = r"C:\Users\admin\ACE_images\esp32_capture.mp4"
# Optional fallback if file is missing
if not os.path.exists(video_path):
    print(f"[INFO] The path '{video_path}' does not exist.")
    print("Please enter a valid video file path:")
    video_path = input("Video path: ").strip()

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise FileNotFoundError(f"Could not open video: {video_path}")

print(f"[OK] Loaded input video: {video_path}")


[OK] Loaded input video: C:\Users\admin\ACE_images\esp32_capture.mp4


### Step 2 — Configure Output Video Writer

We now configure the video writer that will save the processed frames produced by the pipeline.

This cell performs:
- extraction of resolution and FPS from the input video  
- creation of an MP4 writer using OpenCV  
- confirmation messages showing output location and video parameters  

The processed visualization frames will be written to `lab3_output.mp4`.


In [69]:
frame_width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps_in       = cap.get(cv2.CAP_PROP_FPS) or 30

output_path = "lab3_output.mp4"

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(
    output_path,
    fourcc,
    fps_in,
    (frame_width, frame_height)
)

print(f"[OK] Output will be saved to: {output_path}")
print(f"[INFO] Resolution: {frame_width}x{frame_height}, FPS: {fps_in}")


[OK] Output will be saved to: lab3_output.mp4
[INFO] Resolution: 320x240, FPS: 25.0


### Step 3 — Run the Full Lane-Keeping Pipeline

This cell integrates all components developed across the previous labs and runs them frame-by-frame on the input video.

The pipeline consists of:

1. ***Segmentation (Lab 1)*** 
   Generate a lane mask from each input frame and refine it using ROI + morphology.

2. ***BEV Projection (Lab 2)***
   Convert the lane mask to a bird’s-eye-view representation for easier geometry extraction.

3. ***Multi-Ratio Geometry Sampling (Lab 3.1)***
   Estimate lane center and heading at multiple vertical sampling ratios.

4. ***Adaptive Ratio Selection (Lab 3.1)*** 
   Automatically choose the most stable geometry among the sampled ratios.

5. ***Controller Computation (Lab 3.2)***
   Convert lane geometry into a steering command and direction label.

6. ***Visualization Overlay*** 
   Render the BEV projection, ratio points, and steering debug information onto the frame.

7. ***Video Export***
   Write the processed frame to the output video.

When the loop finishes, the final processed video is saved as `lab3_output.mp4`.


In [70]:
# ======================================================
# Lane-Keeping Full Pipeline (End-to-End)
# ======================================================

proj = BEVProjector()
ratios = [0.98, 0.92, 0.82, 0.72]
ROI = np.float32([[0.03,0.58],[0.97,0.58],[0.97,0.99],[0.03,0.99]])

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # --------------------------------------------------
    # (1) Segmentation + ROI refinement
    # --------------------------------------------------
    mask = backend.infer_mask01(frame)
    mask = apply_roi(mask, ROI)
    mask = refine_mask01(mask)

    # --------------------------------------------------
    # (2) BEV projection
    # --------------------------------------------------
    bev = proj.warp(mask)
    bev01 = bev   # alias

    # --------------------------------------------------
    # (3) Multi-ratio geometry sampling
    # --------------------------------------------------
    centers, pos_list, head_list = multi_ratio_measure(bev, ratios)

    # --------------------------------------------------
    # (4) Adaptive ratio selection
    # --------------------------------------------------
    sel = adaptive_select(centers, pos_list, head_list, ratios)
    pos    = sel["pos"]
    head   = sel["head"]
    r_star = sel["best_ratio"]

    # --------------------------------------------------
    # (5) Steering computation
    # --------------------------------------------------
    steer = controller(pos, head)
    label = steering_label(steer)

    # --------------------------------------------------
    # (6) Ratio → pixel conversion for visualization
    # --------------------------------------------------
    H = bev.shape[0]
    cy = int(r_star * H)
    cx = center_x(bev, r_star)

    # --------------------------------------------------
    # (7) Overlay visualization
    # --------------------------------------------------
    overlay = draw_overlay(
        frame_bgr   = frame,
        bev01       = bev,
        M_inv       = proj.M_inv,
        ratios      = ratios,
        centers_px  = centers,
        primary_idx = ratios.index(r_star),
        pos_m       = pos,
        head_deg    = head,
        dir_label   = label
    )

    out.write(overlay)

cap.release()
out.release()
print("DONE — saved to lab3_output.mp4")


DONE — saved to lab3_output.mp4
