In [1]:
# Install required packages
!pip install wget
!pip install open3d
!pip install ultralytics
!pip install matplotlib
!pip install deep-sort-realtime
!pip install torchreid

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9656 sha256=875a1e058af639cb16b5bc099ccddddecb1040d564ade87de9194608662e4c59
  Stored in directory: /root/.cache/pip/wheels/40/b3/0f/a40dbd1c6861731779f62cc4babcb234387e11d697df70ee97
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2
Collecting open3d
  Downloading open3d-0.19.0-cp311-cp311-manylinux_2_31_x86_64.whl.metadata (4.3 kB)
Collecting dash>=2.6.0 (from open3d)
  Downloading dash-3.0.4-py3-none-any.whl.metadata (10 kB)
Collecting configargparse (from open3d)
  Downloading ConfigArgParse-1.7-py3-none-any.whl.metadata (23 kB)
Collecting addict (from open3d)
  Downloading addict-2.4.0-py3-none-any.whl.metadata (1.0 kB)
Collecting pyquaternion (from open3d)
  Downloading 

In [2]:
import os
import sys
import shutil
import cv2
import numpy as np
import torch
from PIL import Image
import pandas as pd
import matplotlib.pyplot as plt
from transformers import pipeline
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from scipy.stats import multivariate_normal  # Added the missing import

2025-04-30 10:09:40.471113: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746007780.666629      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746007780.719226      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [3]:
# Define paths
input_video_path = "/kaggle/input/test-example/example.mp4"
output_video_path = "/kaggle/working/perspective_output.mp4"
mean_future_output_path = "/kaggle/working/mean_future_output.mp4"
dist_future_output_path = "/kaggle/working/dist_future_output.mp4"
bev_points_csv = "/kaggle/working/bev_points.csv"
ufld_model_path = "/kaggle/input/culane/pytorch/default/1/culane_18.pth"
repo_path = "/kaggle/working/Ultrafast-Lane-Detection-Inference-Pytorch-"
detector_path = os.path.join(repo_path, "ultrafastLaneDetector")

In [None]:
# Check GPU availability at the start
print(f"CUDA Available: {torch.cuda.is_available()}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
    print(f"GPU Device: {torch.cuda.get_device_name(0)}")
    print(f"Device: {device}")
else:
    print("Falling back to CPU")

In [4]:

# Ensure the repository is cloned (as per your notebook)
if not os.path.exists(repo_path):
    os.system("git clone https://github.com/ibaiGorordo/Ultrafast-Lane-Detection-Inference-Pytorch-.git")
    print("✅ Cloned Ultrafast-Lane-Detection repository")
else:
    print("✅ Ultrafast-Lane-Detection repository already exists")

Cloning into 'Ultrafast-Lane-Detection-Inference-Pytorch-'...


✅ Cloned Ultrafast-Lane-Detection repository


In [5]:
# Define paths
detector_path = os.path.join(repo_path, "ultrafastLaneDetector")
model_path = os.path.join(detector_path, "model")
simple_file_path = os.path.join(detector_path, "ultrafastLaneDetector.py")
model_file_path = os.path.join(model_path, "model.py")

In [6]:

# Modified UFLD code from your notebook
enhanced_code = '''
# --- FCWS Safe Distance Parameters ---
brake_buffer_by_class = {
    "car": 10,
    "truck": 20,
    "bus": 18,
    "motorcycle": 8,
    "person": 8,
    "unknown": 12
}
reaction_time = 1.5  # in seconds

def compute_safe_distance(object_class, current_speed_kmph):
    speed_mps = current_speed_kmph / 3.6
    buffer = brake_buffer_by_class.get(object_class.lower(), brake_buffer_by_class["unknown"])
    return round((reaction_time * speed_mps) + buffer, 2)

import cv2
import torch
import time
import numpy as np
import torchvision.transforms as transforms
from PIL import Image
from enum import Enum
import scipy.special
from scipy.interpolate import UnivariateSpline
from collections import deque 
from ultrafastLaneDetector.model import parsingNet

lane_colors = [(0,0,255), (0,255,0), (255,0,0), (0,255,255)]

tusimple_row_anchor = [64, 68, 72, 76, 80, 84, 88, 92, 96, 100, 104, 108, 112,
            116, 120, 124, 128, 132, 136, 140, 144, 148, 152, 156, 160, 164,
            168, 172, 176, 180, 184, 188, 192, 196, 200, 204, 208, 212, 216,
            220, 224, 228, 232, 236, 240, 244, 248, 252, 256, 260, 264, 268,
            272, 276, 280, 284]
culane_row_anchor = [121, 131, 141, 150, 160, 170, 180, 189, 199, 209, 219, 228, 238, 248, 258, 267, 277, 287]

class ModelType(Enum):
    TUSIMPLE = 0
    CULANE = 1

class ModelConfig():
    def __init__(self, model_type):
        if model_type == ModelType.TUSIMPLE:
            self.init_tusimple_config()
        else:
            self.init_culane_config()

    def init_tusimple_config(self):
        self.img_w = 1280
        self.img_h = 720
        self.row_anchor = tusimple_row_anchor
        self.griding_num = 100
        self.cls_num_per_lane = 56

    def init_culane_config(self):
        self.img_w = 1640
        self.img_h = 590
        self.row_anchor = culane_row_anchor
        self.griding_num = 200
        self.cls_num_per_lane = 18

class UltrafastLaneDetector():
    def __init__(self, model_path, model_type=ModelType.TUSIMPLE, use_gpu=False):
        self.use_gpu = use_gpu
        self.cfg = ModelConfig(model_type)
        self.model = self.initialize_model(model_path, self.cfg, use_gpu)
        self.img_transform = self.initialize_image_transform()
        self.lane_history = deque(maxlen=15)
        self.load_ui_elements()
        self.detected_objects = []  # Store detected objects for FCWS

    def load_ui_elements(self):
        """ Load UI icons for Lane Keeping, Lane Change Assist, and Forward Collision Warning System. """
        self.ui_icons = {
            "right_turn": cv2.imread("/kaggle/input/warning-departure/right_turn.png", cv2.IMREAD_UNCHANGED),
            "left_turn": cv2.imread("/kaggle/input/warning-departure/left_turn.png", cv2.IMREAD_UNCHANGED),
            "straight": cv2.imread("/kaggle/input/warning-departure/straight.png", cv2.IMREAD_UNCHANGED),
            "fcws_warning": cv2.imread("/kaggle/input/warning-departure/FCWS-warning.png", cv2.IMREAD_UNCHANGED),
            "fcws_prompt": cv2.imread("/kaggle/input/warning-departure/FCWS-prompt.png", cv2.IMREAD_UNCHANGED),
            "fcws_normal": cv2.imread("/kaggle/input/warning-departure/FCWS-normal.png", cv2.IMREAD_UNCHANGED),
            "lta_left": cv2.imread("/kaggle/input/warning-departure/LTA-left_lanes.png", cv2.IMREAD_UNCHANGED),
            "lta_right": cv2.imread("/kaggle/input/warning-departure/LTA-right_lanes.png", cv2.IMREAD_UNCHANGED),
            "warning": cv2.imread("/kaggle/input/warning-departure/warn.png", cv2.IMREAD_UNCHANGED),
        }

    @staticmethod
    def initialize_model(model_path, cfg, use_gpu):
        net = parsingNet(pretrained=False, backbone='18', cls_dim=(cfg.griding_num+1, cfg.cls_num_per_lane, 4), use_aux=False)
        map_loc = 'cuda' if use_gpu and torch.cuda.is_available() else 'cpu'
        state_dict = torch.load(model_path, map_location=map_loc)['model']
        compatible_state_dict = {k[7:] if 'module.' in k else k: v for k, v in state_dict.items()}
        net.load_state_dict(compatible_state_dict, strict=False)
        net.eval()
        if use_gpu and not torch.backends.mps.is_built():
            net = net.cuda()
        return net
        
    @staticmethod    
    def initialize_image_transform():
        return transforms.Compose([
            transforms.Resize((288, 800)),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])

    def overlay_ui(self, frame, fps=0.0):
        """
        Improved overlay with correctly aligned text and icons.
        """
        h, w, _ = frame.shape
    
        # LKA Status
        offset = self.compute_lane_offset()
        if abs(offset) < 20:
            lka_icon = self.ui_icons["straight"]
            lka_text_lines = ["LDWS: Good Lane Keeping", "LKAS: Keep Straight Ahead"]
            lka_color = (0, 255, 0)
        elif offset > 20:
            lka_icon = self.ui_icons["right_turn"]
            lka_text_lines = ["⚠ Lane Departure Right"]
            lka_color = (0, 0, 255)
        else:
            lka_icon = self.ui_icons["left_turn"]
            lka_text_lines = ["⚠ Lane Departure Left"]
            lka_color = (0, 0, 255)

        # FCWS
        fcws_icon = self.ui_icons["fcws_normal"]
        fcws_text = "FCWS: Normal Risk"
        if self.detect_collision_risk(current_speed_kmph=60):  # Or any dynamic value if you estimate speed
            fcws_icon = self.ui_icons["fcws_warning"]
            fcws_text = "⚠ FCWS: High Risk"
    
        # LCA
        if self.lanes_detected[0]:
            lca_icon = self.ui_icons["lta_left"]
        elif self.lanes_detected[3]:
            lca_icon = self.ui_icons["lta_right"]
        else:
            lca_icon = self.ui_icons["warning"]
    
        # === TOP LEFT: LKA BOX ===
        cv2.rectangle(frame, (30, 30), (280, 170), (0, 0, 0), 2)
        frame = self.overlay_icon(frame, lka_icon, (100, 50))
        for i, line in enumerate(lka_text_lines):
            cv2.putText(frame, line, (50, 155 + i*30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, lka_color, 2)
    
        # === TOP RIGHT: FCWS BOX ===
        cv2.rectangle(frame, (w - 250, 30), (w - 30, 240), (0, 0, 0), 2)
        frame = self.overlay_icon(frame, fcws_icon, (w - 225, 40))
        cv2.putText(frame, fcws_text, (w - 225, 225), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
    
        # === LCA Icon (Below LKA) ===
        frame = self.overlay_icon(frame, lca_icon, (50, 180))
    
        # === Bottom Left: FPS ===
        cv2.putText(frame, f"FPS: {fps:.2f}", (60, h - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    
        return frame

    def overlay_icon(self, frame, icon, position):
        """
        Overlay an icon with alpha transparency at the given position.
        """
        if icon is None:
            return frame  # Skip if image is not loaded
    
        h, w, _ = icon.shape
        x, y = position
    
        # Ensure it fits in the frame
        if y + h > frame.shape[0] or x + w > frame.shape[1]:
            return frame
    
        # Extract the alpha channel
        alpha_channel = icon[:, :, 3] / 255.0
        for c in range(3):  # Apply for each RGB channel
            frame[y:y+h, x:x+w, c] = (1 - alpha_channel) * frame[y:y+h, x:x+w, c] + alpha_channel * icon[:, :, c]
        
        return frame

    def prepare_input(self, img):
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img_pil = Image.fromarray(img)
        input_img = self.img_transform(img_pil)
        input_tensor = input_img[None, ...]
        if self.use_gpu and not torch.backends.mps.is_built():
            input_tensor = input_tensor.cuda()
        return input_tensor

    def inference(self, input_tensor):
        with torch.no_grad():
            return self.model(input_tensor)

    def process_output(self, output):
        processed = output[0].data.cpu().numpy()
        processed = processed[:, ::-1, :]
        prob = scipy.special.softmax(processed[:-1, :, :], axis=0)
        idx = np.arange(self.cfg.griding_num) + 1
        loc = np.sum(prob * idx.reshape(-1, 1, 1), axis=0)
        processed = np.argmax(processed, axis=0)
        loc[processed == self.cfg.griding_num] = 0
        processed = loc
    
        col_sample = np.linspace(0, 800 - 1, self.cfg.griding_num)
        col_sample_w = col_sample[1] - col_sample[0]
    
        lanes_points, lanes_detected = [], []
        for lane_num in range(processed.shape[1]):
            points = []
            valid_points = processed[:, lane_num] != 0
    
            # ✅ Stricter threshold to reduce false lanes
            if np.sum(valid_points) >= 6:  # You had >2 before — this improves robustness
                lanes_detected.append(True)
                for i in range(processed.shape[0]):
                    if processed[i, lane_num] > 0:
                        x = int(processed[i, lane_num] * col_sample_w * self.cfg.img_w / 800) - 1
                        y = int(self.cfg.img_h * (self.cfg.row_anchor[self.cfg.cls_num_per_lane - 1 - i] / 288)) - 1
                        points.append([x, y])
            else:
                lanes_detected.append(False)
            lanes_points.append(points)
    
        # ✅ Apply temporal smoothing
        self.lane_history.append(lanes_points)
        smoothed_lanes = self.smooth_lanes()
    
        return smoothed_lanes, lanes_detected

    def compute_lane_offset(self):
        """
        Calculate how far the vehicle is from the lane center.
        A positive value means deviating right, negative means deviating left.
        """
        if self.lanes_detected[1] and self.lanes_detected[2]:  # Middle lanes detected
            left_lane = np.array(self.lanes_points[1])
            right_lane = np.array(self.lanes_points[2])
    
            # Ensure equal length by interpolating shorter lane
            min_len = min(len(left_lane), len(right_lane))
            left_lane, right_lane = left_lane[:min_len], right_lane[:min_len]
    
            # Compute lane center
            lane_center_x = np.mean((left_lane[:, 0] + right_lane[:, 0]) / 2)
            car_position_x = self.cfg.img_w // 2  # Assume car is at image center
    
            # Compute offset (negative = left, positive = right)
            offset = car_position_x - lane_center_x
            return offset
        return 0  # Default, if lanes not detected

    def get_lka_status(self):
        """
        Determines if the vehicle is centered in the lane or deviating.
        """
        offset = self.compute_lane_offset()
    
        if abs(offset) < 20:  # Small deviation threshold
            return "LDWS: Good Lane Keeping", (0, 255, 0)  # Green safe zone
        elif offset > 20:
            return "⚠ Lane Departure Right", (0, 0, 255)  # Red warning
        else:
            return "⚠ Lane Departure Left", (0, 0, 255)

    def get_lca_status(self):
        """
        Determines if the driver can change lanes.
        """
        suggestions = []
        
        if self.lanes_detected[0]:  # Left lane detected
            suggestions.append("✅ Left Lane Available")
        if self.lanes_detected[3]:  # Right lane detected
            suggestions.append("✅ Right Lane Available")
    
        if not suggestions:
            suggestions.append("❌ No Safe Lane Change")
    
        return suggestions

    def detect_collision_risk(self, current_speed_kmph=60):
        """
        Smarter FCWS using class-aware safe distance.
        Only considers forward objects in ego lane.
        """
        ego_lane = self.get_ego_lane_index()
        for obj in self.detected_objects:
            if obj.get("lane_idx") == ego_lane:
                obj_type = obj["type"]
                dist = obj["distance"]
                safe_dist = compute_safe_distance(obj_type, current_speed_kmph)
                if dist < safe_dist:
                    return True
        return False

    def get_ego_lane_index(self):
        """
        Returns the index of the ego lane (center lane).
        """
        img_center = self.cfg.img_w // 2
        lane_centers = []
        for i, lane in enumerate(self.lanes_points[:4]):
            if len(lane) > 0:
                avg_x = np.mean([pt[0] for pt in lane])
                lane_centers.append((i, avg_x))
        sorted_lanes = sorted(lane_centers, key=lambda x: abs(x[1] - img_center))
        return sorted_lanes[0][0] if sorted_lanes else 1  # fallback to lane 1
    
    def detect_lanes(self, image, draw_points=True):
        """
        Detects lanes, applies lane smoothing, and overlays LKA/LCA and FCWS UI elements.
        Automatically resizes input to model resolution and restores it back.
        Also computes and displays real-time FPS.
        """
        # Initialize prev_time if not present
        if not hasattr(self, 'prev_time'):
            self.prev_time = time.time()
            fps = 0.0  # Skip FPS calculation for the first frame
        else:
            current_time = time.time()
            delta = current_time - self.prev_time
            fps = 1.0 / delta if delta > 0.001 else 0.0  # Prevent division by very small number
            self.prev_time = current_time
    
        # Resize image to model input size
        original_shape = image.shape[1], image.shape[0]  # (w, h)
        resized = cv2.resize(image, (self.cfg.img_w, self.cfg.img_h), interpolation=cv2.INTER_AREA)
    
        input_tensor = self.prepare_input(resized)
        output = self.inference(input_tensor)
        self.lanes_points, self.lanes_detected = self.process_output(output)
    
        # Draw lanes
        vis = self.draw_lanes(resized, self.lanes_points, self.lanes_detected, self.cfg, draw_points)
    
        # Overlay UI including FPS
        vis = self.overlay_ui(vis, fps)
    
        # Resize back to original size
        vis = cv2.resize(vis, original_shape, interpolation=cv2.INTER_LINEAR)
        return vis, fps
    
    def smooth_lanes(self, alpha=0.8):
        if len(self.lane_history) < 2:
            return self.lane_history[-1]
    
        smoothed_lanes = []
        for i in range(len(self.lane_history[-1])):
            weighted = np.array(self.lane_history[-1][i], dtype=np.float32)
            count = 1
    
            for past_frame in list(self.lane_history)[-2::-1]:
                if i >= len(past_frame):
                    continue
                prev = np.array(past_frame[i], dtype=np.float32)
                if len(prev) != len(weighted):
                    continue
                weighted = (alpha * weighted + (1 - alpha) * prev)
                count += 1
                if count >= 3:
                    break
    
            smoothed_lanes.append(weighted.astype(int).tolist())
        return smoothed_lanes

    def draw_lanes(self, input_img, lanes_points, lanes_detected, cfg, draw_points=True):
        vis = input_img.copy()
        lane_mask = vis.copy()
        
        detected_idxs = [i for i, d in enumerate(lanes_detected) if d]
        if len(detected_idxs) < 2:
            return vis
    
        # === 1. Compute average x for all detected lanes ===
        avg_xs = []
        for idx in detected_idxs:
            xs = [pt[0] for pt in lanes_points[idx]]
            avg_x = np.mean(xs) if xs else float('inf')
            avg_xs.append((idx, avg_x))
    
        # === 2. Sort lanes by avg x (from left to right) ===
        sorted_lanes = sorted(avg_xs, key=lambda x: x[1])
        sorted_idxs = [idx for idx, _ in sorted_lanes]
    
        # === 3. Target lane: Rightmost → largest avg x ===
        target_idx = sorted_idxs[-1]  # rightmost
        target_pos = sorted_idxs.index(target_idx)
    
        if target_pos == 0:
            target_pair = (sorted_idxs[0], sorted_idxs[1])
        else:
            target_pair = (sorted_idxs[target_pos - 1], sorted_idxs[target_pos])

        # === 4. Ego lane: Find the pair that surrounds image center ===
        img_center = vis.shape[1] // 2
        min_gap = float('inf')
        ego_pair = None
    
        for i in range(len(sorted_idxs) - 1):
            x1 = np.mean([pt[0] for pt in lanes_points[sorted_idxs[i]]])
            x2 = np.mean([pt[0] for pt in lanes_points[sorted_idxs[i + 1]]])
            if x1 < img_center < x2:
                gap = abs((x1 + x2) / 2 - img_center)
                if gap < min_gap:
                    min_gap = gap
                    ego_pair = (sorted_idxs[i], sorted_idxs[i + 1])
    
        # fallback: if nothing surrounds center, pick mid-pair
        if ego_pair is None:
            mid = len(sorted_idxs) // 2
            ego_pair = (sorted_idxs[mid - 1], sorted_idxs[mid])
    
        # === 5. Color assignment ===
        is_same = (set(ego_pair) == set(target_pair))
        
        # Ego lane (🟡 or 🟢)
        ego_color = (0, 255, 0) if is_same else (255, 191, 0)
        cv2.fillPoly(
            lane_mask,
            [np.vstack((lanes_points[ego_pair[0]], np.flipud(lanes_points[ego_pair[1]])))],
            color=ego_color
        )

        # Target lane (🔴 if not ego)
        if not is_same:
            cv2.fillPoly(
                lane_mask,
                [np.vstack((lanes_points[target_pair[0]], np.flipud(lanes_points[target_pair[1]])))],
                color=(0, 0, 255)
            )
    
        # === 6. Overlay blended mask ===
        vis = cv2.addWeighted(vis, 0.7, lane_mask, 0.3, 0)
    
        # === 7. Draw lane points ===
        if draw_points:
            for i, lane in enumerate(lanes_points):
                smoothed = self.fit_lane_curve(lane)
                for pt in smoothed:
                    cv2.circle(vis, tuple(pt), 3, lane_colors[i % len(lane_colors)], -1)
    
        return vis
            
    def fit_lane_curve(self, lane_points):
        """
        Smooth lane points using a Univariate Spline for better curve adaptation.
        """
        if len(lane_points) < 5:  # Avoid error if too few points
            return lane_points
        
        lane_points = np.array(lane_points)
        x, y = lane_points[:, 0], lane_points[:, 1]
    
        try:
            spline = UnivariateSpline(y, x, k=2, s=3)
            y_new = np.linspace(y.min(), y.max(), 100)
            x_new = spline(y_new)
            return list(zip(x_new.astype(int), y_new.astype(int)))
        except:
            return lane_points  # Return original points if spline fails

    def suggest_lane_change(self):
        """
        Suggest if the driver can safely change lanes.
        """
        suggestions = []
        lane_width_threshold = 3.5  # Example lane width in meters

        if self.lanes_detected[0]:
            suggestions.append("✅ Left lane available for a lane change.")
        if self.lanes_detected[3]:
            suggestions.append("✅ Right lane available for a lane change.")

        return suggestions

    def get_center_line(self):
        """
        Compute the centerline between left and right lanes.
        """
        if self.lanes_detected[1] and self.lanes_detected[2]:
            l, r = self.lanes_points[1], self.lanes_points[2]
            if len(l) == len(r):
                return ((np.array(l) + np.array(r)) // 2).tolist()
        return []
'''

# Write the modified ultrafastLaneDetector.py file
ufld_file_path = os.path.join(detector_path, "ultrafastLaneDetector.py")
try:
    os.makedirs(detector_path, exist_ok=True)
    with open(ufld_file_path, "w") as f:
        f.write(enhanced_code)
    print(f"✅ ultrafastLaneDetector.py has been updated with enhancements at {ufld_file_path}")
except Exception as e:
    print(f"❌ Failed to write ultrafastLaneDetector.py: {e}")
    sys.exit(1)

# Clear module cache to ensure the updated module is loaded
for module in list(sys.modules.keys()):
    if "ultrafastLaneDetector" in module or "model" in module:
        del sys.modules[module]
print("✅ Cleared ultrafastLaneDetector and model module cache")


✅ ultrafastLaneDetector.py has been updated with enhancements at /kaggle/working/Ultrafast-Lane-Detection-Inference-Pytorch-/ultrafastLaneDetector/ultrafastLaneDetector.py
✅ Cleared ultrafastLaneDetector and model module cache


In [7]:

# Add repo path to sys.path
sys.path.append(repo_path)
print("sys.path:", sys.path)

# Import UFLD components
try:
    from ultrafastLaneDetector.ultrafastLaneDetector import UltrafastLaneDetector, ModelType
    print("✅ Successfully imported UltrafastLaneDetector")
except ImportError as e:
    print(f"❌ Failed to import UltrafastLaneDetector: {e}")
    sys.exit(1)

# Load UFLD model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if not os.path.exists(ufld_model_path):
    raise FileNotFoundError(f"❌ culane_18.pth not found at {ufld_model_path}")
try:
    lane_detector = UltrafastLaneDetector(ufld_model_path, model_type=ModelType.CULANE, use_gpu=(device.type == "cuda"))
    lane_detector.model.to(device)
    print("✅ UFLD Model loaded")
except Exception as e:
    print(f"❌ Failed to load UFLD model: {e}")
    lane_detector = None
    sys.exit(1)

# Load YOLO model
try:
    yolo_model = YOLO("yolov8n.pt")
    print("✅ YOLO Model loaded (yolov8n.pt)")
except Exception as e:
    print(f"❌ Failed to load YOLO model: {e}")
    yolo_model = None
    sys.exit(1)

# Initialize DeepSort
deep_sort = DeepSort(embedder="mobilenet", max_age=30, nms_max_overlap=0.5)
print("✅ DeepSort initialized")

# Initialize video capture
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
    print(f"❌ Cannot open video: {input_video_path}")
    sys.exit(1)

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
print(f"Video properties: {frame_width}x{frame_height} at {fps} FPS")


sys.path: ['/kaggle/working', '/kaggle/lib/kagglegym', '/kaggle/lib', '/usr/lib/python311.zip', '/usr/lib/python3.11', '/usr/lib/python3.11/lib-dynload', '', '/usr/local/lib/python3.11/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.11/dist-packages/IPython/extensions', '/usr/local/lib/python3.11/dist-packages/setuptools/_vendor', '/root/.ipython', '/tmp/tmpnzfoxdk5', '/kaggle/working/Ultrafast-Lane-Detection-Inference-Pytorch-']
✅ Successfully imported UltrafastLaneDetector




✅ UFLD Model loaded
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:00<00:00, 87.6MB/s]


✅ YOLO Model loaded (yolov8n.pt)
✅ DeepSort initialized
Video properties: 1280x720 at 25 FPS


In [8]:

# Setup video writers
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
mean_out = cv2.VideoWriter(mean_future_output_path, fourcc, fps, (1000, 1000))
dist_out = cv2.VideoWriter(dist_future_output_path, fourcc, fps, (1000, 1000))

# Class colors and names for YOLO
CLASS_COLORS = {
    0: (0, 0, 255),      # person
    1: (0, 255, 0),      # bicycle
    2: (0, 255, 255),    # car (cyan, matching your example)
    3: (255, 255, 0),    # motorcycle
    5: (255, 0, 255),    # bus
    7: (255, 255, 0),    # truck (yellow, matching your example)
    9: (0, 255, 0)       # traffic light
}
CLASS_NAMES = yolo_model.names
SHOW_INFO_CLASSES = {0, 1, 2, 3, 5, 7}

# Camera parameters (placeholders)
focal_length = 1000
cx = frame_width // 2
cy = frame_height // 2
offset_front_left = -10
offset_front_right = 10
fov_simulated = 60

# Colors for different views (used in perspective view)
VIEW_COLORS = {
    "front": (255, 255, 255),
    "front_left": (255, 255, 0),
    "front_right": (0, 255, 255)
}

# Distance Estimator
def estimate_distance(bbox, focal_length=720, default_height=1.6):
    x, y, w, h = bbox
    if h <= 0:
        return 999
    return round((default_height * focal_length) / h, 1)

# Lane Assignment
def assign_lane(x_center, lane_boundaries):
    for i in range(len(lane_boundaries) - 1):
        if lane_boundaries[i + 1] <= x_center <= lane_boundaries[i]:
            return str(i + 1)
    return "_"

# Function to convert 2D lane points to 3D with offset
def compute_lane_points_3d(lanes_points, depth_map, focal_length, cx, cy, orig_shape, model_shape, x_offset=0):
    orig_width, orig_height = orig_shape
    model_width, model_height = model_shape
    depth_map = cv2.resize(depth_map, (model_width, model_height), interpolation=cv2.INTER_LINEAR)
    lane_points_3d = []
    
    for lane in lanes_points:
        lane_3d = []
        for point in lane:
            u, v = int(point[0]), int(point[1])
            z = 10.0  # Fixed depth for simplicity
            x = (u - cx) * z / focal_length + x_offset
            y = (v - cy) * z / focal_length
            x = np.clip(x, -50, 50)
            y = np.clip(y, -50, 50)
            z = np.clip(z, 0, 100)
            lane_3d.append([x, y, z])
        lane_points_3d.append(lane_3d)
    
    return lane_points_3d

# Function to transform object positions based on simulated view
def transform_object_position(obj_center, view_type, x_offset=0, fov=60):
    x, y, z = obj_center
    if view_type == "front":
        return np.array([x, y, z])
    elif view_type == "front_left":
        angle = np.radians(fov / 2)
        x_new = x * np.cos(angle) - z * np.sin(angle) + x_offset
        z_new = x * np.sin(angle) + z * np.cos(angle)
        return np.array([x_new, y, z_new])
    elif view_type == "front_right":
        angle = np.radians(-fov / 2)
        x_new = x * np.cos(angle) - z * np.sin(angle) + x_offset
        z_new = x * np.sin(angle) + z * np.cos(angle)
        return np.array([x_new, y, z_new])
    return np.array([x, y, z])

# Simple trajectory prediction
def predict_trajectory(obj, time_horizon=3.0, dt=0.1):
    x, z = obj["center"][0], obj["center"][2]
    speed = 10.0  # Assume constant speed for simplicity
    vx, vz = 0.0, speed
    x_future = x + vx * time_horizon
    z_future = z + vz * time_horizon
    mean = [x_future, z_future]
    cov = [[2.0, 0.0], [0.0, 5.0]]
    future_dist = multivariate_normal(mean=mean, cov=cov)
    return np.array([x_future, z_future]), future_dist

# BEV visualization to match the example image
def create_birds_eye_view(detected_objects_all_views, lane_points_3d_all, views, grid_size=200, frame_idx=0):
    x_range = (-20, 80)  # Adjusted to match your example
    y_range = (-30, 30)  # Adjusted to match your example
    
    # Predict future positions for all objects
    mean_futures = []
    future_dists = []
    for obj in detected_objects_all_views:
        mean_future, future_dist = predict_trajectory(obj)
        mean_futures.append(mean_future)
        future_dists.append(future_dist)
        print(f"Object {obj['type']} (View: {obj['view']}) predicted future position: ({mean_future[0]:.2f}, {mean_future[1]:.2f})")
    
    # Mean Future Plot (BEV)
    plt.figure(figsize=(10, 10), dpi=100, facecolor='darkblue')  # Match background color
    plt.xlim(x_range[0], x_range[1])
    plt.ylim(y_range[0], y_range[1])
    plt.grid(True, color='white', linestyle='--', alpha=0.5)  # Add grid
    plt.xlabel("X (meters)")
    plt.ylabel("Y (meters)")
    plt.title("Bird's-Eye View", color='white')
    
    # Draw ego vehicle as a triangle
    ego_x, ego_y = 0, 0
    ego_triangle = np.array([[ego_x, ego_y + 2], [ego_x - 1.5, ego_y - 2], [ego_x + 1.5, ego_y - 2]])
    plt.fill(ego_triangle[:, 0], ego_triangle[:, 1], color='red', alpha=0.7)
    
    # Plot objects as points with labels
    for obj, mean_future in zip(detected_objects_all_views, mean_futures):
        x, z = mean_future
        color = [c/255 for c in obj["color"]]
        marker = 'o' if obj["type"] == "car" else '^'  # Circle for cars, triangle for trucks
        plt.scatter(x, z, color=color, marker=marker, s=100, edgecolors='black')
        plt.text(x + 2, z, f"{obj['type']} ({obj['confidence']*100:.1f}%)", color='white', fontsize=8, ha='left')
    
    plt.gcf().canvas.draw()
    mean_img = np.frombuffer(plt.gcf().canvas.tostring_rgb(), dtype=np.uint8)
    mean_img = mean_img.reshape(plt.gcf().canvas.get_width_height()[::-1] + (3,))
    print(f"Mean image shape: {mean_img.shape}")
    
    if frame_idx == 5:
        cv2.imwrite("/kaggle/working/mean_future_frame5.jpg", cv2.cvtColor(mean_img, cv2.COLOR_RGB2BGR))
        print("✅ Saved Mean Future plot for frame 5 to /kaggle/working/mean_future_frame5.jpg")
    
    plt.close()
    
    # Distribution of Futures Plot (BEV with Heatmap)
    plt.figure(figsize=(10, 10), dpi=100, facecolor='darkblue')
    plt.xlim(x_range[0], x_range[1])
    plt.ylim(y_range[0], y_range[1])
    plt.grid(True, color='white', linestyle='--', alpha=0.5)
    plt.xlabel("X (meters)")
    plt.ylabel("Y (meters)")
    plt.title("Bird's-Eye View", color='white')
    
    # Draw ego vehicle
    plt.fill(ego_triangle[:, 0], ego_triangle[:, 1], color='red', alpha=0.7)
    
    # Create heatmap
    x_grid, z_grid = np.meshgrid(
        np.linspace(x_range[0], x_range[1], 200),
        np.linspace(y_range[0], y_range[1], 200)
    )
    pos = np.dstack((x_grid, z_grid))
    heatmap = np.zeros_like(x_grid)
    for future_dist in future_dists:
        heatmap += future_dist.pdf(pos)
    
    if heatmap.max() > 0:
        heatmap = heatmap / heatmap.max()
    
    plt.imshow(heatmap, extent=(x_range[0], x_range[1], y_range[0], y_range[1]), 
               origin='lower', cmap='jet', alpha=0.6)  # Use 'jet' colormap to match your example
    
    # Plot objects on top of heatmap
    for obj, mean_future in zip(detected_objects_all_views, mean_futures):
        x, z = mean_future
        color = [c/255 for c in obj["color"]]
        marker = 'o' if obj["type"] == "car" else '^'
        plt.scatter(x, z, color=color, marker=marker, s=100, edgecolors='black')
        plt.text(x + 2, z, f"{obj['type']} ({obj['confidence']*100:.1f}%)", color='white', fontsize=8, ha='left')
    
    plt.gcf().canvas.draw()
    dist_img = np.frombuffer(plt.gcf().canvas.tostring_rgb(), dtype=np.uint8)
    dist_img = dist_img.reshape(plt.gcf().canvas.get_width_height()[::-1] + (3,))
    print(f"Distribution image shape: {dist_img.shape}")
    
    if frame_idx == 5:
        cv2.imwrite("/kaggle/working/dist_future_frame5.jpg", cv2.cvtColor(dist_img, cv2.COLOR_RGB2BGR))
        print("✅ Saved Distribution of Futures plot for frame 5 to /kaggle/working/dist_future_frame5.jpg")
    
    plt.close()
    
    return mean_img, dist_img

In [9]:

# Video processing loop
frame_idx = 0
bev_points_data = []
print("✅ Starting video processing...")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_idx += 1
    print(f"Processing frame {frame_idx}")
    orig_shape = (frame.shape[1], frame.shape[0])
    
    if frame_idx < 5:
        print(f"Skipping frame {frame_idx}")
        continue
    
    if frame_idx == 5:
        cv2.imwrite("/kaggle/working/first_frame_front.jpg", frame)
        print("✅ Saved first frame for front to /kaggle/working/first_frame_front.jpg")
        print(f"Frame size: {frame.shape[1]}x{frame.shape[0]}")
    
    # Preprocess frame to improve YOLO detection
    frame_yolo = frame.copy()
    alpha = 1.5
    beta = 50
    frame_yolo = cv2.convertScaleAbs(frame_yolo, alpha=alpha, beta=beta)
    if frame_idx == 5:
        cv2.imwrite("/kaggle/working/preprocessed_frame5.jpg", frame_yolo)
        print("✅ Saved preprocessed frame 5 to /kaggle/working/preprocessed_frame5.jpg")
    
    # Detect lanes using UFLD
    with torch.no_grad():
        frame_with_lanes, fps = lane_detector.detect_lanes(frame)
    
    if frame_with_lanes is None:
        print(f"⚠ Frame {frame_idx}: Lane detection returned None, skipping frame")
        continue
    
    # Extract lane centers for lane assignment
    all_lanes = lane_detector.lanes_points[:4]
    valid_lanes = [lane for lane in all_lanes if len(lane) > 0]
    
    if len(valid_lanes) >= 2:
        lane_xs = sorted([np.median([pt[0] for pt in lane]) for lane in valid_lanes])[::-1]
    else:
        lane_xs = []
    
    # Use fixed depth for simplicity
    model_shape = (1280, 720)
    depth_map = np.ones((model_shape[1], model_shape[0]), dtype=np.float32) * 10.0
    
    # Define views
    views = [
        ("front", 0),
        ("front_left", offset_front_left),
        ("front_right", offset_front_right)
    ]
    
    # Compute 3D lane points for each view (needed for object transformations, but not plotted)
    all_lane_points_3d = []
    all_lanes_points = []
    for view_name, x_offset in views:
        lane_points_3d = compute_lane_points_3d(all_lanes, depth_map, focal_length, cx, cy, orig_shape, model_shape, x_offset)
        all_lane_points_3d.append(lane_points_3d)
        all_lanes_points.append((view_name, all_lanes))
        print(f"Lane points 3D ({view_name}): {[len(lane) for lane in lane_points_3d]}")
    
    # Overlay lanes on the frame (for perspective view)
    for view_name, lanes in all_lanes_points:
        color = VIEW_COLORS[view_name]
        for lane in lanes:
            if len(lane) > 0:
                points = np.array(lane, dtype=np.int32)
                cv2.polylines(frame_with_lanes, [points], False, color, 2)
                top_point = points[0]
                cv2.putText(frame_with_lanes, view_name, (top_point[0], top_point[1] - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    
    # YOLO detection
    yolo_results = yolo_model.predict(frame_yolo, imgsz=416, conf=0.1, iou=0.5, device=device)[0]
    print(f"YOLO Frame {frame_idx}: {yolo_results.boxes}")
    detections = []
    tracked_yolo_boxes = []
    
    for det in yolo_results.boxes:
        x1, y1, x2, y2 = map(int, det.xyxy[0].tolist())
        conf = float(det.conf[0])
        cls_id = int(det.cls[0])
        if conf < 0.1 or cls_id not in CLASS_COLORS:
            continue
        w, h = x2 - x1, y2 - y1
        detections.append(([x1, y1, w, h], conf, cls_id))
        tracked_yolo_boxes.append({
            "bbox": [x1, y1, w, h],
            "cls_id": cls_id,
            "center": (x1 + w/2, y1 + h/2)
        })
    
    # DeepSort tracking
    tracks = deep_sort.update_tracks(detections, frame=frame)
    
    # Process detected objects
    detected_objects_front = []
    for track in tracks:
        if not track.is_confirmed():
            continue
        
        track_id = track.track_id
        x1, y1, x2, y2 = track.to_ltrb()
        w, h = x2 - x1, y2 - y1
        cx = int(x1 + w/2)
        
        # Match track to YOLO box
        matched = None
        for yolo_box in tracked_yolo_boxes:
            yolo_cx, yolo_cy = yolo_box["center"]
            if abs(cx - yolo_cx) < 30:
                matched = yolo_box
                break
        
        if not matched:
            continue
        
        x, y, w, h = matched["bbox"]
        cls_id = matched["cls_id"]
        if cls_id not in SHOW_INFO_CLASSES:
            continue
        
        distance = estimate_distance([x, y, w, h])
        lane_number = assign_lane(x + w/2, lane_xs)
        color = CLASS_COLORS[cls_id]
        
        # Compute 3D position
        bbox_area = w * h
        z_est = 20.0 * (10000 / (bbox_area + 1000))
        obj_center = np.array([x / 100.0, y / 100.0, z_est])
        
        detected_objects_front.append({
            "type": CLASS_NAMES[cls_id],
            "distance": distance,
            "center": obj_center,
            "color": color,
            "lane_idx": lane_number,
            "confidence": float(track.confidence) if hasattr(track, 'confidence') else 0.6,
            "view": "front"
        })
        
        # Draw bounding box and labels
        cv2.rectangle(frame_with_lanes, (x, y), (x + w, y + h), color, 2)
        cv2.putText(frame_with_lanes, f"{distance:.1f}m", (x + 5, y + h - 12),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
        lane_text = f"Lane {lane_number}"
        text_size = cv2.getTextSize(lane_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
        cv2.putText(frame_with_lanes, lane_text, (x + w - text_size[0] - 5, y + h - 12),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
    
    # Transform objects for all views
    all_detected_objects = []
    for view_name, x_offset in views:
        for obj in detected_objects_front:
            obj_copy = obj.copy()
            obj_copy["center"] = transform_object_position(obj["center"], view_name, x_offset, fov=fov_simulated)
            obj_copy["view"] = view_name
            all_detected_objects.append(obj_copy)
    
    # Add dummy object if no detections
    if not all_detected_objects:
        print("⚠ No objects detected, adding a dummy object for testing")
        for view_name, x_offset in views:
            all_detected_objects.append({
                "type": "car",  # Use "car" to match your example
                "center": transform_object_position(np.array([0, 0, 10]), view_name, x_offset, fov=fov_simulated),
                "color": (0, 255, 255),  # Cyan for cars
                "lane_idx": "1",
                "confidence": 1.0,
                "distance": 10.0,
                "view": view_name
            })
    
    # Update lane_detector.detected_objects for FCWS
    if lane_detector:
        lane_detector.detected_objects = all_detected_objects
    
    # Generate BEV maps
    mean_img, dist_img = create_birds_eye_view(all_detected_objects, all_lane_points_3d, views, frame_idx=frame_idx)
    
    # Write to video outputs
    if frame_with_lanes.shape[:2] != (frame_height, frame_width):
        frame_with_lanes = cv2.resize(frame_with_lanes, (frame_width, frame_height))
    out.write(frame_with_lanes)
    mean_out.write(cv2.cvtColor(mean_img, cv2.COLOR_RGB2BGR))
    dist_out.write(cv2.cvtColor(dist_img, cv2.COLOR_RGB2BGR))
    
    # Save BEV points
    bev_points = []
    for obj in all_detected_objects:
        x, y = obj["center"][:2]
        bev_points.append({
            "frame": frame_idx,
            "x": x,
            "y": y,
            "source": "object",
            "lane_idx": obj["lane_idx"],
            "view": obj["view"]
        })
    bev_points_data.extend(bev_points)

# Save BEV points to CSV
pd.DataFrame(bev_points_data).to_csv(bev_points_csv, index=False)
print(f"✅ BEV points saved to: {bev_points_csv}")

# Release resources
cap.release()
out.release()
mean_out.release()
dist_out.release()
cv2.destroyAllWindows()
print(f"✅ Processed videos saved to: {output_video_path}, {mean_future_output_path}, and {dist_future_output_path}")

# Print sample BEV points
print("Sample BEV Points from Last Frame:")
for pt_data in bev_points_data[-10:]:
    print(f"Frame {pt_data['frame']}: ({pt_data['x']:.2f}, {pt_data['y']:.2f}) from {pt_data['source']} (Lane {pt_data['lane_idx']}, View: {pt_data['view']})")

✅ Starting video processing...
Processing frame 1
Skipping frame 1
Processing frame 2
Skipping frame 2
Processing frame 3
Skipping frame 3
Processing frame 4
Skipping frame 4
Processing frame 5
✅ Saved first frame for front to /kaggle/working/first_frame_front.jpg
Frame size: 1280x720
✅ Saved preprocessed frame 5 to /kaggle/working/preprocessed_frame5.jpg
Lane points 3D (front): [0, 12, 13, 12]
Lane points 3D (front_left): [0, 12, 13, 12]
Lane points 3D (front_right): [0, 12, 13, 12]

0: 256x416 (no detections), 40.1ms
Speed: 1.9ms preprocess, 40.1ms inference, 98.9ms postprocess per image at shape (1, 3, 256, 416)
YOLO Frame 5: ultralytics.engine.results.Boxes object with attributes:

cls: tensor([], device='cuda:0')
conf: tensor([], device='cuda:0')
data: tensor([], device='cuda:0', size=(0, 6))
id: None
is_track: False
orig_shape: (720, 1280)
shape: torch.Size([0, 6])
xywh: tensor([], device='cuda:0', size=(0, 4))
xywhn: tensor([], device='cuda:0', size=(0, 4))
xyxy: tensor([], devi