In [1]:
import numpy as np
import cv2 
import matplotlib.pyplot as plt



In [None]:
import cv2
import numpy as np
import threading
import queue

# Global variables for threading and state
frame_queue = queue.Queue(maxsize=10)
result_queue = queue.Queue(maxsize=10)
running = True
prev_left = None
prev_right = None
frame_count = 0
width = None
height = None
scale_factor = 0.5

def average_lines(lines):
    """Average detected lines to compute a single line."""
    if not lines:
        return None
    x1s, y1s, x2s, y2s = zip(*[line for line in lines])
    avg_x1, avg_y1 = int(np.mean(x1s)), int(np.mean(y1s))
    avg_x2, avg_y2 = int(np.mean(x2s)), int(np.mean(y2s))

    if avg_x2 != avg_x1:
        slope = (avg_y2 - avg_y1) / (avg_x2 - avg_x1)
        new_x1 = int(avg_x1 + (height - avg_y1) / slope)
        new_x2 = int(avg_x1 + (height // 2 - avg_y1) / slope)
        return [new_x1, height, new_x2, height // 2]
    return [avg_x1, avg_y1, avg_x2, avg_y2]

def detect_obstacles(frame, masked_edges, gray):
    """Detect obstacles by finding contours that don't follow lane-like slopes"""
    # Find all contours in the masked edges
    contours, _ = cv2.findContours(masked_edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Also find contours in the binary thresholded image
    _, binary = cv2.threshold(gray, 100, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    binary = cv2.bitwise_and(binary, masked_edges)
    contours += cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0]
    
    obstacle_bbox = None
    real_obstacle = False
    
    for contour in contours:
        # Skip small contours (noise)
        if cv2.contourArea(contour) < 50:
            continue
            
        # Get bounding box of contour
        x, y, w, h = cv2.boundingRect(contour)
        
        # Calculate the slope of the contour's major axis
        if len(contour) >= 5:  # Minimum points needed for ellipse fitting
            ellipse = cv2.fitEllipse(contour)
            (x_ell, y_ell), (MA, ma), angle = ellipse
            slope = np.tan(np.radians(angle))
            
            # Check if the slope is not lane-like (not within typical lane slope ranges)
            is_lane_like = (-2 < slope < -0.5) or (0.5 < slope < 2)
            
            # If it's not lane-like, consider it an obstacle
            if not is_lane_like:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
                obstacle_bbox = (x, y, x + w, y + h)
                real_obstacle = True
        else:
            # For small contours that can't fit ellipse, use aspect ratio check
            aspect_ratio = float(w)/h
            if not (0.2 < aspect_ratio < 5.0):  # Typical lanes are long and narrow
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
                obstacle_bbox = (x, y, x + w, y + h)
                real_obstacle = True
                
    return real_obstacle, obstacle_bbox

def check_clear_path(avg_left, avg_right, obstacle_bbox):
    """Check for a clear path to overtake."""
    if not (avg_left and avg_right and obstacle_bbox):
        return None
    lane_left = min(avg_left[0], avg_left[2])
    lane_right = max(avg_right[0], avg_right[2])
    obs_x_center = (obstacle_bbox[0] + obstacle_bbox[2]) // 2
    lane_center = (lane_left + lane_right) // 2

    if obs_x_center < lane_center and lane_left > width // 10:
        return "left"
    elif obs_x_center > lane_center and lane_right < width * 0.9:
        return "right"
    return None

def create_roi_vertices():
    """Create ROI vertices based on previous lane lines or default trapezoid."""
    global prev_left, prev_right
    if not (prev_left and prev_right):
        return np.array([[
            (width // 8, height),
            (3 * width // 8, height // 2),
            (5 * width // 8, height // 2),
            (7 * width // 8, height)
        ]], dtype=np.int32)
    lx1, ly1, lx2, ly2 = prev_left
    rx1, ry1, rx2, ry2 = prev_right
    return np.array([[
        (max(0, lx1), min(height, ly1)),
        (max(0, lx2), min(height, ly2)),
        (min(width, rx2), min(height, ry2)),
        (min(width, rx1), min(height, ry1))
    ]], dtype=np.int32)

def capture_frames(video_path):
    """Capture video frames and put them in the queue."""
    global running
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video file.")
        running = False
        return
    while running and cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            running = False
            break
        frame = cv2.resize(frame, (0, 0), fx=scale_factor, fy=scale_factor)
        try:
            frame_queue.put_nowait(frame)
        except queue.Full:
            pass
    cap.release()

def process_frames():
    """Process frames for lane and obstacle detection."""
    global running, frame_count, prev_left, prev_right, width, height
    while running:
        try:
            frame = frame_queue.get(timeout=1)
            frame_count += 1
            height, width = frame.shape[:2]

            # Preprocessing
            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            h_channel = hsv[:, :, 0]
            yellow_mask = cv2.inRange(h_channel, 18, 25)
            white_mask = cv2.inRange(h_channel, 220, 255)
            combined_mask = cv2.bitwise_or(yellow_mask, white_mask)
            edges = cv2.Canny(combined_mask, 100, 200)

            # ROI
            vertices = create_roi_vertices()
            roi_mask = np.zeros_like(edges)
            cv2.fillPoly(roi_mask, vertices, 255)
            masked_edges = cv2.bitwise_and(edges, roi_mask)

            # Lane detection
            lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 30, minLineLength=30, maxLineGap=200)
            left_lines, right_lines = [], []
            if lines is not None:
                for line in lines:
                    x1, y1, x2, y2 = line[0]
                    if x2 == x1:
                        continue
                    slope = (y2 - y1) / (x2 - x1)
                    if -2 < slope < -0.5:
                        left_lines.append(line[0])
                    elif 0.5 < slope < 2:
                        right_lines.append(line[0])

            # Average lanes
            avg_left = average_lines(left_lines)
            avg_right = average_lines(right_lines)

            # Validate lanes
            if avg_left and avg_right:
                lane_distance = abs(avg_right[0] - avg_left[0])
                if lane_distance < width // 4 or lane_distance > width * 0.8:
                    avg_left, avg_right = prev_left, prev_right

            if avg_left:
                prev_left = avg_left
            if avg_right:
                prev_right = avg_right

            if frame_count % 30 == 0:
                prev_left, prev_right = None, None

            # Obstacle detection with new logic
            real_obstacle, obstacle_bbox = detect_obstacles(frame, masked_edges, gray)

            # Decision logic
            decision = "MOVE FORWARD"
            warning = ""
            if real_obstacle and obstacle_bbox and avg_left and avg_right:
                lane_center = (avg_left[0] + avg_right[0]) // 2
                obs_x_center = (obstacle_bbox[0] + obstacle_bbox[2]) // 2
                if abs(obs_x_center - lane_center) < width // 10:
                    clear_path = check_clear_path(avg_left, avg_right, obstacle_bbox)
                    if clear_path:
                        decision = f"OVERTAKE {clear_path.upper()}"
                    else:
                        decision = "STOP - No clear path"
                        warning = "Obstacle ahead, no safe path"
                else:
                    decision = "STEER " + ("LEFT" if obs_x_center > lane_center else "RIGHT")
            elif real_obstacle:
                decision = "STOP - Obstacle detected"
            elif not (avg_left and avg_right):
                decision = "CAUTION - Lane unclear"
                if prev_left and not prev_right:
                    decision = "STEER RIGHT"
                elif prev_right and not prev_left:
                    decision = "STEER LEFT"
             # Print decision with frame number
            print(f"Frame {frame_count}: Decision - {decision}")
            if warning:
                print(f"Frame {frame_count}: Warning - {warning}")
            # Visualization
            lane_image = np.zeros_like(frame)
            for line, color in [(avg_left, (0, 255, 0)), (avg_right, (0, 255, 0))]:
                if line:
                    x1, y1, x2, y2 = line
                    cv2.line(lane_image, (x1, y1), (x2, y2), color, 5)
            debug_frame = frame.copy()
            cv2.polylines(debug_frame, vertices, isClosed=True, color=(255, 0, 0), thickness=2)
            result = cv2.addWeighted(debug_frame, 0.8, lane_image, 1, 0)
            cv2.putText(result, decision, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            if warning:
                cv2.putText(result, warning, (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

            result_queue.put((result, masked_edges))
        except queue.Empty:
            continue

def display_frames():
    """Display processed frames."""
    global running
    while running:
        try:
            result, masked_edges = result_queue.get(timeout=1)
            cv2.imshow("Masked Edges", masked_edges)
            cv2.imshow("Lane Detection", result)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                running = False
        except queue.Empty:
            continue

def main():
    video_path = r"C:\Users\User\Desktop\Dip_Dataset\PXL_20250325_045117252.TS.mp4"
    capture_thread = threading.Thread(target=capture_frames, args=(video_path,))
    process_thread = threading.Thread(target=process_frames)
    display_thread = threading.Thread(target=display_frames)

    capture_thread.start()
    process_thread.start()
    display_thread.start()

    capture_thread.join()
    process_thread.join()
    display_thread.join()

    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()