In [10]:
import cv2
import numpy as np
import time
import os
import pickle
from ultralytics import YOLO

In [11]:
# -----------------------
# PID controller
# -----------------------
class PIDController:
    def __init__(self, Kp=0.6, Ki=0.01, Kd=0.1):
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.prev_error = 0.0
        self.integral = 0.0
        self.prev_time = None

    def step(self, error):
        now = time.time()
        dt = (now - self.prev_time) if self.prev_time else 0.033
        self.prev_time = now
        dt = max(1e-3, dt)

        self.integral += error * dt
        derivative = (error - self.prev_error) / dt
        output = self.Kp * error + self.Ki * self.integral + self.Kd * derivative
        self.prev_error = error
        return output

In [12]:

# -----------------------
# Lane detection
# -----------------------
def detect_and_annotate(frame, mtx=None, dist=None):
    if mtx is not None and dist is not None:
        frame = cv2.undistort(frame, mtx, dist, None, mtx)

    h, w = frame.shape[:2]
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blur, 50, 150)

    mask = np.zeros_like(edges)
    polygon = np.array([[(0,h),(w,h),(w,int(h*0.6)),(0,int(h*0.6))]], np.int32)
    cv2.fillPoly(mask, polygon, 255)
    roi = cv2.bitwise_and(edges, mask)

    lines = cv2.HoughLinesP(roi, 1, np.pi/180, 50, minLineLength=40, maxLineGap=120)

    left_points, right_points = [], []
    if lines is not None:
        for l in lines:
            x1,y1,x2,y2 = l[0]
            if x2 == x1: continue
            slope = (y2-y1)/(x2-x1)
            if slope < -0.3: left_points += [(x1,y1),(x2,y2)]
            elif slope > 0.3: right_points += [(x1,y1),(x2,y2)]

    def fit_line(points):
        if len(points) < 2: return None
        ys = np.array([p[1] for p in points])
        xs = np.array([p[0] for p in points])
        a,b = np.polyfit(ys,xs,1)
        y1,y2 = h,int(h*0.6)
        x1,x2 = int(a*y1+b), int(a*y2+b)
        return (x1,y1,x2,y2)

    left_line, right_line = fit_line(left_points), fit_line(right_points)

    overlay = np.zeros_like(frame)
    if left_line is not None:
        cv2.line(overlay, (left_line[0], left_line[1]), (left_line[2], left_line[3]), (255,0,0), 10)
    if right_line is not None:
        cv2.line(overlay, (right_line[0], right_line[1]), (right_line[2], right_line[3]), (0,0,255), 10)

    annotated = cv2.addWeighted(frame,0.8,overlay,1.0,0.0)

    if left_line and right_line:
        left_x = (left_line[0]+left_line[2])/2.0
        right_x = (right_line[0]+right_line[2])/2.0
        lane_center = (left_x+right_x)/2.0
        image_center = w/2.0
        offset = (lane_center-image_center)/(w/2.0)
    else:
        offset = 0.0

    cv2.line(annotated, (w//2,h), (w//2,int(h*0.6)), (0,255,0),2)
    return offset, annotated

In [13]:
# -----------------------
# Traffic light color classification (HSV)
# -----------------------
def classify_traffic_light(crop):
    hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV)
    masks = {
        "Red": cv2.inRange(hsv,(0,100,100),(10,255,255)) | cv2.inRange(hsv,(160,100,100),(179,255,255)),
        "Yellow": cv2.inRange(hsv,(15,100,100),(35,255,255)),
        "Green": cv2.inRange(hsv,(40,100,100),(90,255,255))
    }
    counts = {k: cv2.countNonZero(v) for k,v in masks.items()}
    return max(counts, key=counts.get)

In [14]:
# -----------------------
# Camera calibration loader
# -----------------------
def load_calibration(pickle_path='calibration_pickle.p'):
    if os.path.exists(pickle_path):
        with open(pickle_path, 'rb') as f:
            data = pickle.load(f)
        return data.get('mtx'), data.get('dist')
    return None, None

In [15]:
# -----------------------
# Main video processing pipeline
# -----------------------
def process_video(input_source=0, output_path=None):
    mtx, dist = load_calibration()
    cap = cv2.VideoCapture(input_source)
    if not cap.isOpened():
        raise RuntimeError("Cannot open video")

    fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    out = None
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    pid = PIDController()

    # Load YOLO models
    model = YOLO(r"D:\lane_detection_handover\yolov8n.pt")          # Main YOLO (traffic lights + objects)
    speed_model = YOLO(r"D:\lane_detection_handover\my_traffic_model.pt")  # Speed limit detection
    print("Loaded YOLO classes:", model.names)
    print("Loaded Speed classes:", speed_model.names)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # 1️⃣ Lane detection
        offset, lane_annotated = detect_and_annotate(frame, mtx, dist)
        steer = pid.step(-offset)
        annotated = lane_annotated.copy()

        # 2️⃣ Main YOLO detection
        results = model(frame, conf=0.3, device="cpu", verbose=False)
        for r in results:
            for box in r.boxes:
                cls_id = int(box.cls[0])
                label = model.names[cls_id]
                conf = float(box.conf[0])
                x1, y1, x2, y2 = map(int, box.xyxy[0])

                if label.lower() == "traffic light":
                    crop = frame[y1:y2, x1:x2]
                    if crop.size > 0:
                        color = classify_traffic_light(crop)
                        cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 255), 2)
                        cv2.putText(annotated, f"{color}", (x1, y1 - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
                else:
                    cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(annotated, f"{label} {conf:.2f}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        # 3️⃣ Speed detection YOLO
        speed_results = speed_model(frame, conf=0.3, device="cpu", verbose=False)
        for r in speed_results:
            for box in r.boxes:
                cls_id = int(box.cls[0])
                label = speed_model.names[cls_id]
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                cv2.rectangle(annotated, (x1, y1), (x2, y2), (255, 0, 255), 2)
                cv2.putText(annotated, f"Speed:{label}", (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 255), 2)

        # 4️⃣ Steering overlay
        cv2.putText(annotated, f"Steer:{steer:+.2f}", (20, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        # Save/show frame
        if out:
            out.write(annotated)
        cv2.imshow("Full Pipeline", annotated)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    if out: out.release()
    cv2.destroyAllWindows()

# -----------------------
# Run pipeline
# -----------------------
if __name__ == "__main__":
    process_video(0)  # 0 = webcam, or provide video path

Loaded YOLO classes: {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cel

KeyboardInterrupt: 