In [None]:
import cv2, time
import numpy as np
import logging
import pycuda.driver as drv

from ObjectTracker import BYTETracker
from taskConditions import TaskConditions, Logger
from ObjectDetector import YoloDetector, EfficientdetDetector
from ObjectDetector.utils import ObjectModelType, CollisionType
from ObjectDetector.distanceMeasure import SingleCamDistanceMeasure

from TrafficLaneDetector import UltrafastLaneDetector, UltrafastLaneDetectorV2
from TrafficLaneDetector.ufldDetector.perspectiveTransformation import PerspectiveTransformation
from TrafficLaneDetector.ufldDetector.utils import LaneModelType, OffsetType, CurvatureType

LOG = Logger(None, logging.INFO, logging.INFO)

VIDEO_SRC = "./TrafficLaneDetector/temp/demo-7.mp4"
LANE_CFG = {
    "model_path": "./TrafficLaneDetector/models/culane_res18_fp16.trt",
    "model_type": LaneModelType.UFLDV2_CULANE
}

OBJ_CFG = {
    "model_path": './ObjectDetector/models/yolov10n-coco_fp16.trt',
    "model_type": ObjectModelType.YOLOV10,
    "classes_path": './ObjectDetector/models/coco_label.txt',
    "box_score": 0.4,
    "box_nms_iou": 0.5
}

class Panel:
    COLLISION_MAP = {
        CollisionType.UNKNOWN: (0, 255, 255),
        CollisionType.NORMAL: (0, 255, 0),
        CollisionType.PROMPT: (0, 102, 255),
        CollisionType.WARNING: (0, 0, 255)
    }

    OFFSET_MAP = {
        OffsetType.UNKNOWN: (0, 255, 255),
        OffsetType.RIGHT: (0, 0, 255),
        OffsetType.LEFT: (0, 0, 255),
        OffsetType.CENTER: (0, 255, 0)
    }

    CURVATURE_MAP = {
        CurvatureType.UNKNOWN: (0, 255, 255),
        CurvatureType.STRAIGHT: (0, 255, 0),
        CurvatureType.EASY_LEFT: (0, 102, 255),
        CurvatureType.EASY_RIGHT: (0, 102, 255),
        CurvatureType.HARD_LEFT: (0, 0, 255),
        CurvatureType.HARD_RIGHT: (0, 0, 255)
    }

    def __init__(self):
        self.warning_img = cv2.resize(cv2.imread('./assets/FCWS-warning.png', cv2.IMREAD_UNCHANGED), (100, 100))
        self.prompt_img = cv2.resize(cv2.imread('./assets/FCWS-prompt.png', cv2.IMREAD_UNCHANGED), (100, 100))
        self.normal_img = cv2.resize(cv2.imread('./assets/FCWS-normal.png', cv2.IMREAD_UNCHANGED), (100, 100))
        self.left_img = cv2.resize(cv2.imread('./assets/left_turn.png', cv2.IMREAD_UNCHANGED), (200, 200))
        self.right_img = cv2.resize(cv2.imread('./assets/right_turn.png', cv2.IMREAD_UNCHANGED), (200, 200))
        self.straight_img = cv2.resize(cv2.imread('./assets/straight.png', cv2.IMREAD_UNCHANGED), (200, 200))
        self.unknown_img = cv2.resize(cv2.imread('./assets/warn.png', cv2.IMREAD_UNCHANGED), (200, 200))
        self.left_lane_img = cv2.resize(cv2.imread('./assets/LTA-left_lanes.png', cv2.IMREAD_UNCHANGED), (300, 200))
        self.right_lane_img = cv2.resize(cv2.imread('./assets/LTA-right_lanes.png', cv2.IMREAD_UNCHANGED), (300, 200))
        self.fps = 0
        self.count = 0
        self.start = time.time()

    def update_fps(self):
        self.count += 1
        if self.count >= 30:
            self.fps = self.count / (time.time() - self.start)
            self.count = 0
            self.start = time.time()

if __name__ == "__main__":
    cap = cv2.VideoCapture(VIDEO_SRC)
    if not cap.isOpened():
        raise Exception("Invalid video source")

    width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    output = cv2.VideoWriter(VIDEO_SRC[:-4]+'_out.mp4', fourcc, 30.0, (width, height))
    cv2.namedWindow("ADAS Simulation", cv2.WINDOW_NORMAL)

    LOG.info("[Pycuda] Cuda Version: {}".format(drv.get_version()))
    LOG.info("[Driver] Cuda Version: {}".format(drv.get_driver_version()))
    LOG.info("-" * 40)

    lane_model = UltrafastLaneDetectorV2(logger=LOG) if "UFLDV2" in LANE_CFG["model_type"].name else UltrafastLaneDetector(logger=LOG)
    lane_transform = PerspectiveTransformation((width, height), logger=LOG)

    obj_detector = YoloDetector(logger=LOG) if OBJ_CFG["model_type"] == ObjectModelType.YOLOV10 else EfficientdetDetector(logger=LOG)
    obj_tracker = BYTETracker(names=obj_detector.colors_dict)
    distance_calc = SingleCamDistanceMeasure()
    panel = Panel()
    task = TaskConditions()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_copy = frame.copy()
        obj_detector.DetectFrame(frame)
        lane_model.DetectFrame(frame)

        distance_calc.updateDistance(obj_detector.object_info)
        vehicle_dist = distance_calc.calcCollisionPoint(lane_model.lane_info.area_points)

        if task.CheckStatus() and lane_model.lane_info.area_status:
            lane_transform.updateTransformParams(*lane_model.lane_info.lanes_points[1:3], task.transform_status)

        bird_view = lane_transform.transformToBirdView(frame_copy)
        bird_lanes = [lane_transform.transformToBirdViewPoints(lane) for lane in lane_model.lane_info.lanes_points]
        direction, curvature = lane_transform.calcCurveAndOffset(bird_view, *bird_lanes[1:3])

        task.UpdateCollisionStatus(vehicle_dist, lane_model.lane_info.area_status)
        task.UpdateOffsetStatus(lane_transform.calcCurveAndOffset(bird_view, *bird_lanes[1:3])[1])
        task.UpdateRouteStatus(direction, curvature)

        lane_transform.DrawDetectedOnBirdView(bird_view, bird_lanes, task.offset_msg)
        lane_model.DrawDetectedOnFrame(frame_copy, task.offset_msg)
        obj_detector.DrawDetectedOnFrame(frame_copy)
        obj_tracker.DrawTrackedOnFrame(frame_copy, False)
        distance_calc.DrawDetectedOnFrame(frame_copy)

        cv2.imshow("ADAS Simulation", frame_copy)
        output.write(frame_copy)
        if cv2.waitKey(1) == ord('q'):
            break

    output.release()
    cap.release()
    cv2.destroyAllWindows()


In [10]:
import threading
import time
import random

class SelfDrivingCar:
    def __init__(self):
        self.running = True

    def stop(self):
        self.running = False

class FCWS(threading.Thread):
    def __init__(self, car_system):
        super().__init__()
        self.car_system = car_system

    def run(self):
        while self.car_system.running:
            collision_risk = random.randint(1, 100)
            driver_reaction = random.randint(1, 100)

            if collision_risk > 90:
                print("⚠️  Obstacle detected ahead! Driver warned.")
                time.sleep(1.5)

                if driver_reaction > 30:
                    print("✅ Driver applied brakes. Obstacle avoided.")
                else:
                    print("❌ Driver failed to react!")
                    print("🛑 Autonomous Emergency Braking (AEB) activated!")
                    print("🚘 Car has stopped.")
                    self.car_system.stop()
            else:
                print("🚗 Moving forward safely.")

            time.sleep(1)

class LDWS_LKAS(threading.Thread):
    def __init__(self, car_system):
        super().__init__()
        self.car_system = car_system

    def run(self):
        while self.car_system.running:
            lane_drift = random.randint(1, 100)
            driver_reaction = random.randint(1, 100)

            if lane_drift > 85:
                print("⚠️  Warning! Car is drifting out of the lane!")
                time.sleep(1)

                if driver_reaction > 40:
                    print("✅ Driver corrected steering.")
                else:
                    print("❌ No correction! Activating Lane Keeping Assist (LKAS).")
                    print("🛣️ Car is steered back into the lane.")
            else:
                print("🚗 Car is within the lane.")

            time.sleep(1)

if __name__ == "__main__":
    print("🚗 Starting Self-Driving Safety System (FCWS + LDWS + LKAS)")

    car_system = SelfDrivingCar()
    fcws_thread = FCWS(car_system)
    lane_thread = LDWS_LKAS(car_system)

    fcws_thread.start()
    lane_thread.start()

    try:
        time.sleep(30)
    except KeyboardInterrupt:
        print("\n🛑 Simulation interrupted by user.")

    car_system.stop()
    fcws_thread.join()
    lane_thread.join()

    print("🏁 Simulation Ended.")


🚗 Starting Self-Driving Safety System (FCWS + LDWS + LKAS)
⚠️  Obstacle detected ahead! Driver warned.
🚗 Car is within the lane.
🚗 Car is within the lane.
✅ Driver applied brakes. Obstacle avoided.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Car is within the lane.
⚠️  Obstacle detected ahead! Driver warned.
🚗 Car is within the lane.
✅ Driver applied brakes. Obstacle avoided.
🚗 Car is within the lane.
⚠️  Obstacle detected ahead! Driver warned.
🚗 Car is within the lane.
🚗 Car is within the lane.
✅ Driver applied brakes. Obstacle avoided.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Moving forward safely.
✅ Driver corrected steering.
🚗 Moving forward safely.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Car is within the lane.
🚗 Moving forward safely.
🚗 Car is wi