In [None]:
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import cv2
import os
import math
from moviepy import VideoFileClip
from IPython.display import HTML
%matplotlib inline

In [None]:
# Image Processing function
def grayscale(img):
    """Convert image to grayscale"""
    return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)

def canny(img, low_threshold, high_threshold):
    """Apply Canny edge detection"""
    return cv2.Canny(img, low_threshold, high_threshold)

def gaussian_blur(img, kernel_size):
    """Apply Gaussian blur"""
    return cv2.GaussianBlur(img, (kernel_size, kernel_size), 0)

def region_of_interest(img, vertices):
    """Mask everything outside the region of interest"""
    mask = np.zeros_like(img)
    if len(img.shape) > 2:
        channel_count = img.shape[2]
        ignore_mask_color = (255,) * channel_count
    else:
        ignore_mask_color = 255
    cv2.fillPoly(mask, [vertices], ignore_mask_color)
    return cv2.bitwise_and(img, mask)


In [None]:
# Lane Detection Function
def average_slope_intercept(lines, y_min, y_max):
    """Average multiple detected lines into two main lanes"""
    left_lines = []
    right_lines = []
    
    if lines is None:
        return None
    
    for line in lines:
        for x1, y1, x2, y2 in line:
            if abs(y2 - y1) < 1e-6:  # Skip near-horizontal lines
                continue
                
            slope = (x2 - x1) / (y2 - y1)
            intercept = x1 - slope * y1
            
            if slope < -0.3:  # Left lane
                left_lines.append((slope, intercept))
            elif slope > 0.3:  # Right lane
                right_lines.append((slope, intercept))
    
    left_avg = np.average(left_lines, axis=0) if left_lines else None
    right_avg = np.average(right_lines, axis=0) if right_lines else None
    
    lanes = []
    for avg in [left_avg, right_avg]:
        if avg is not None:
            slope, intercept = avg
            x_min = int(slope * y_min + intercept)
            x_max = int(slope * y_max + intercept)
            lanes.append([(x_min, y_min), (x_max, y_max)])
    
    return lanes

def draw_lanes(img, lanes, color=[255, 0, 0], thickness=10):
    """Draw the detected lanes on the image"""
    line_img = np.zeros_like(img)
    if lanes is not None:
        for lane in lanes:
            cv2.line(line_img, lane[0], lane[1], color, thickness)
    return line_img

def weighted_img(img, initial_img, α=0.8, β=1., γ=0.):
    """Blend two images together"""
    return cv2.addWeighted(initial_img, α, img, β, γ)

In [None]:
def process_image(image):
    """Complete image processing pipeline for lane detection"""
    height, width = image.shape[:2]
    
    gray = grayscale(image)
    blur = gaussian_blur(gray, 5)
    edges = canny(blur, 50, 150)
    
    roi_vertices = np.array([[
        (width * 0.1, height),
        (width * 0.45, height * 0.6),
        (width * 0.55, height * 0.6),
        (width * 0.9, height)
    ]], dtype=np.int32)
    masked_edges = region_of_interest(edges, roi_vertices)
    
    lines = cv2.HoughLinesP(
        masked_edges,
        rho=2,
        theta=np.pi/180,
        threshold=20,
        minLineLength=40,
        maxLineGap=150
    )
    
    lanes = average_slope_intercept(lines, y_min=int(height*0.6), y_max=height)
    lane_img = draw_lanes(image, lanes)
    
    result = weighted_img(lane_img, image)
    return result

In [None]:
def test_on_images():
    """Test the pipeline on all images in test_images directory"""
    os.makedirs('test_images_output', exist_ok=True)
    test_images = [f for f in os.listdir("test_images/") if f.endswith('.jpg')]
    
    for image_name in test_images:
        try:
            image = mpimg.imread(os.path.join('test_images', image_name))
            processed = process_image(image)
            
            output_path = os.path.join('test_images_output', image_name)
            plt.imsave(output_path, processed)
            
            f, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))
            ax1.imshow(image)
            ax1.set_title('Original', fontsize=20)
            ax2.imshow(processed)
            ax2.set_title('Processed', fontsize=20)
            plt.show()
            
        except Exception as e:
            print(f"Error processing {image_name}: {str(e)}")

In [None]:
def process_video(input_path, output_path):
    """Process video file through the pipeline using frame-by-frame processing"""
    clip = VideoFileClip(input_path)
    
    fps = clip.fps
    duration = clip.duration
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (clip.w, clip.h))
    
    try:
        for t in np.arange(0, duration, 1.0/fps):
            frame = clip.get_frame(t)
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            processed_frame = process_image(frame_rgb)
            processed_frame_bgr = cv2.cvtColor(processed_frame, cv2.COLOR_RGB2BGR)
            out.write(processed_frame_bgr)
            
    except Exception as e:
        print(f"Error during video processing: {str(e)}")
    finally:
        out.release()
        clip.close()
    
    print(f"Video processing complete: {output_path}")

In [None]:
if __name__ == '__main__':
    print("Starting lane detection pipeline...")
    
    os.makedirs('test_images_output', exist_ok=True)
    os.makedirs('test_videos_output', exist_ok=True)
    
    print("\nProcessing test images...")
    test_on_images()
    
    print("\nProcessing test videos...")
    videos_to_process = {
        'solidWhiteRight.mp4': 'solidWhiteRight_output.mp4',
        'solidYellowLeft.mp4': 'solidYellowLeft_output.mp4',
        'challenge.mp4': 'challenge_output.mp4'
    }
    
    for input_file, output_file in videos_to_process.items():
        print(f"\nProcessing {input_file}...")
        input_path = os.path.join('test_videos', input_file)
        output_path = os.path.join('test_videos_output', output_file)
        
        process_video(input_path, output_path)
    
    print("\nProcessing complete! Check output directories for results.")

In [None]:
from IPython.display import HTML
from base64 import b64encode

def show_video(video_path):
    video_file = open(video_path, "rb").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"  # Fixed f-string
    return HTML(f"""<video width=500 controls><source src="{video_url}"></video>""")  # Added closing )

show_video("test_videos_output/challenge_output.mp4")