In [None]:
from google.colab import drive
drive.mount('/content/drive')

To dynamically adjust the region of interest (ROI) based on detected lanes from previous frames

In [None]:
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

# Gaussian blur kernel size
BLUR_KERNEL_SIZE = 5

# Preprocessing with improved noise reduction
def preprocess(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Detect white lanes
    white_mask = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)[1]

    # Detect yellow lanes
    lower_yellow = np.array([20, 100, 100])
    upper_yellow = np.array([30, 255, 255])
    yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)

    # Combine masks
    combined_mask = cv2.bitwise_or(white_mask, yellow_mask)

    # Reduce noise with Gaussian blur
    blurred = cv2.GaussianBlur(combined_mask, (BLUR_KERNEL_SIZE, BLUR_KERNEL_SIZE), 0)

    return blurred

# Define region of interest (ROI) dynamically
def region_of_interest(img, vertices):
    mask = np.zeros_like(img)
    cv2.fillPoly(mask, [np.array(vertices, np.int32)], 255)
    return cv2.bitwise_and(img, mask)

# Fit lines using weighted averages (exponential moving average)
def smooth_line_fit(history, new_fit, alpha=0.2):
    if new_fit is None:
        return history[-1] if history else None
    if history:
        return alpha * new_fit + (1 - alpha) * history[-1]
    return new_fit

# Draw lane lines with improved fitting
def draw_lane_lines(frame, edges, left_fit_history, right_fit_history, history_size=5):
    lines = cv2.HoughLinesP(edges, rho=1, theta=np.pi / 180, threshold=50, minLineLength=50, maxLineGap=150)

    left_fit = []
    right_fit = []

    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            if x2 == x1:  # Skip vertical lines
                continue
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - slope * x1

            # Classify lines into left or right based on slope
            if slope < -0.5:  # Left lane
                left_fit.append((slope, intercept))
            elif slope > 0.5:  # Right lane
                right_fit.append((slope, intercept))

    # Average current frame's line fits
    left_fit_avg = np.mean(left_fit, axis=0) if left_fit else None
    right_fit_avg = np.mean(right_fit, axis=0) if right_fit else None

    # Smooth fits using weighted average
    left_fit_smooth = smooth_line_fit(left_fit_history, left_fit_avg)
    right_fit_smooth = smooth_line_fit(right_fit_history, right_fit_avg)

    # Update history
    if left_fit_smooth is not None:
        left_fit_history.append(left_fit_smooth)
        if len(left_fit_history) > history_size:
            left_fit_history.pop(0)

    if right_fit_smooth is not None:
        right_fit_history.append(right_fit_smooth)
        if len(right_fit_history) > history_size:
            right_fit_history.pop(0)

    # Draw the lane lines
    if left_fit_smooth is not None:
        frame = extend_and_draw_line(frame, left_fit_smooth, color=(255, 0, 0))  # Left lane in blue
    if right_fit_smooth is not None:
        frame = extend_and_draw_line(frame, right_fit_smooth, color=(0, 255, 0))  # Right lane in green

    return frame

# Helper function to extend and draw lines
def extend_and_draw_line(frame, line_fit, color=(0, 255, 0), thickness=4):
    slope, intercept = line_fit
    height, width, _ = frame.shape
    y1 = height  # Bottom of the image
    y2 = int(height * 0.6)  # A little below the middle of the image
    # Calculate corresponding x-coordinates
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)
    # Draw the line
    cv2.line(frame, (x1, y1), (x2, y2), color, thickness)
    return frame

# Main function for processing video
def main():
    video_input_path = '/content/challenge_video.mp4'  # Path to input video
    video_output_path = 'output_video.avi'  # Path to save processed video

    video = cv2.VideoCapture(video_input_path)
    if not video.isOpened():
        print("Error: Unable to access video.")
        return

    # Get video properties
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = video.get(cv2.CAP_PROP_FPS)

    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(video_output_path, fourcc, fps, (width, height))

    # Initialize line fit histories
    left_fit_history = []
    right_fit_history = []

    while True:
        ret, frame = video.read()
        if not ret:
            print("End of video stream.")
            break

        # Preprocess the frame
        processed = preprocess(frame)

        # Define the region of interest (ROI)
        roi_vertices = [
            (int(0.1 * width), height),
            (int(0.4 * width), int(height * 0.6)),
            (int(0.6 * width), int(height * 0.6)),
            (int(0.9 * width), height)
        ]
        roi = region_of_interest(processed, roi_vertices)

        # Detect edges
        edges = cv2.Canny(roi, 50, 150)

        # Draw lane lines
        output_frame = draw_lane_lines(frame, edges, left_fit_history, right_fit_history)

        # Save the output frame
        out.write(output_frame)

        # Display the frame in Colab
        cv2_imshow(output_frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release resources
    video.release()
    out.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


To implement multi-lane detection, we can use a clustering algorithm like DBSCAN to group detected lines into clusters corresponding to different lanes

In [None]:
import cv2
import numpy as np
from sklearn.cluster import DBSCAN
from google.colab.patches import cv2_imshow  # Importing cv2_imshow for Colab

# Gaussian blur kernel size
BLUR_KERNEL_SIZE = 5

def preprocess(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Detect white lanes
    white_mask = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)[1]

    # Detect yellow lanes
    lower_yellow = np.array([20, 100, 100])
    upper_yellow = np.array([30, 255, 255])
    yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)

    # Combine masks
    combined_mask = cv2.bitwise_or(white_mask, yellow_mask)

    # Reduce noise with Gaussian blur
    blurred = cv2.GaussianBlur(combined_mask, (BLUR_KERNEL_SIZE, BLUR_KERNEL_SIZE), 0)

    return blurred

def region_of_interest(img, vertices):
    mask = np.zeros_like(img)
    cv2.fillPoly(mask, [np.array(vertices, np.int32)], 255)
    return cv2.bitwise_and(img, mask)

# Extend and draw detected lanes
def extend_and_draw_line(frame, line_fit, color, thickness=4):
    slope, intercept = line_fit
    height, width, _ = frame.shape
    y1 = height  # Bottom of the image
    y2 = int(height * 0.6)  # A little below the middle of the image
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)
    cv2.line(frame, (x1, y1), (x2, y2), color, thickness)

# Improved multi-lane detection
def detect_and_draw_lanes(frame, edges, debug=False):
    lines = cv2.HoughLinesP(edges, rho=1, theta=np.pi / 180, threshold=50, minLineLength=50, maxLineGap=150)
    if lines is None:
        return frame

    # Extract line parameters (slope, intercept)
    line_features = []
    for line in lines:
        x1, y1, x2, y2 = line[0]
        if x2 == x1:  # Skip vertical lines
            continue
        slope = (y2 - y1) / (x2 - x1)
        intercept = y1 - slope * x1
        # Filter out near-horizontal lines
        if abs(slope) < 0.3 or abs(slope) > 5:  # Adjust thresholds as needed
            continue
        line_features.append([slope, intercept])

    if not line_features:
        return frame

    # Cluster lines using DBSCAN
    line_features = np.array(line_features)
    eps_value = 0.2  # DBSCAN distance threshold for clustering
    clustering = DBSCAN(eps=eps_value, min_samples=2).fit(line_features)
    labels = clustering.labels_

    # Draw lanes for each cluster
    unique_labels = set(labels)
    for label in unique_labels:
        if label == -1:  # Noise points
            continue
        cluster_lines = line_features[labels == label]
        avg_slope, avg_intercept = np.mean(cluster_lines, axis=0)
        extend_and_draw_line(frame, (avg_slope, avg_intercept), color=(0, 255, 0))  # Green for clustered lanes

    # Debug visualization (optional)
    if debug:
        for line, label in zip(line_features, labels):
            slope, intercept = line
            color = (0, 255, 255) if label == -1 else (255, 0, 0)  # Yellow for noise, Blue for clusters
            extend_and_draw_line(frame, (slope, intercept), color, thickness=2)

    return frame

# Main function for video processing
def main():
    video_input_path = '/content/challenge_video.mp4'
    video_output_path = 'output_video.avi'

    video = cv2.VideoCapture(video_input_path)
    if not video.isOpened():
        print("Error: Unable to access video.")
        return

    # Get video properties
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = video.get(cv2.CAP_PROP_FPS)

    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(video_output_path, fourcc, fps, (width, height))

    while True:
        ret, frame = video.read()
        if not ret:
            print("End of video stream.")
            break

        # Preprocess the frame
        processed = preprocess(frame)

        # Define the region of interest (ROI)
        roi_vertices = [
            (int(0.1 * width), height),
            (int(0.4 * width), int(height * 0.6)),
            (int(0.6 * width), int(height * 0.6)),
            (int(0.9 * width), height)
        ]
        roi = region_of_interest(processed, roi_vertices)

        # Detect edges
        edges = cv2.Canny(roi, 50, 150)

        # Detect and draw lanes
        output_frame = detect_and_draw_lanes(frame, edges, debug=True)

        # Save and display the output frame
        out.write(output_frame)
        cv2_imshow(output_frame)  # Use cv2_imshow for Colab

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release resources
    video.release()
    out.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()
