In [10]:
pip install torch numpy opencv-python tqdm scipy

Note: you may need to restart the kernel to use updated packages.


In [5]:
import cv2
import numpy as np
from tqdm import tqdm
import logging
import time
from pathlib import Path

class LaneDetector:
    def __init__(self, roi_vertices=None):
        """
        Initialize the lane detector with region of interest vertices.
        If not provided, it will use default values for a typical dashcam video.
        """
        self.roi_vertices = roi_vertices

    def get_roi_vertices(self, frame_shape):
        """
        Get the region of interest vertices based on frame dimensions.
        """
        if self.roi_vertices is None:
            height, width = frame_shape[:2]
            roi_vertices = np.array([
                [(0, height),
                 (width * 0.45, height * 0.6),
                 (width * 0.55, height * 0.6),
                 (width, height)]
            ], dtype=np.int32)
            return roi_vertices
        return self.roi_vertices

    def apply_roi_mask(self, image, vertices):
        """
        Apply a mask to only process the region of interest.
        """
        mask = np.zeros_like(image)
        cv2.fillPoly(mask, vertices, 255)
        masked_image = cv2.bitwise_and(image, mask)
        return masked_image

    def detect_edges(self, frame):
        """
        Convert frame to grayscale and detect edges using Canny edge detection.
        """
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (5, 5), 0)
        edges = cv2.Canny(blurred, 50, 150)
        return edges

    def detect_lines(self, edges):
        """
        Detect lines using Hough Transform.
        """
        lines = cv2.HoughLinesP(
            edges,
            rho=1,
            theta=np.pi/180,
            threshold=50,
            minLineLength=40,
            maxLineGap=100
        )
        return lines

    def separate_lines(self, lines):
        """
        Separate detected lines into left and right lane lines.
        """
        left_lines = []
        right_lines = []
        
        if lines is None:
            return left_lines, right_lines

        for line in lines:
            x1, y1, x2, y2 = line[0]
            slope = (y2 - y1) / (x2 - x1) if x2 != x1 else 0
            
            if abs(slope) < 0.5:
                continue
                
            if slope < 0:
                left_lines.append(line)
            else:
                right_lines.append(line)
                
        return left_lines, right_lines

    def average_lines(self, lines, frame_shape):
        """
        Average multiple lines into a single line with improved error handling.
        
        Args:
            lines (list): List of detected lines
            frame_shape (tuple): Shape of the video frame
            
        Returns:
            np.array: Averaged line coordinates or None if no valid lines
        """
        if not lines or len(lines) == 0:
            return None

        # Convert lines to a more manageable format
        line_points = []
        for line in lines:
            if line is not None and len(line) > 0:
                x1, y1, x2, y2 = line[0]
                line_points.append([x1, y1, x2, y2])

        if not line_points:
            return None

        # Convert to numpy array for calculations
        line_points = np.array(line_points)
        
        try:
            # Calculate averages
            x1_avg = np.mean(line_points[:, 0])
            y1_avg = np.mean(line_points[:, 1])
            x2_avg = np.mean(line_points[:, 2])
            y2_avg = np.mean(line_points[:, 3])
            
            # Handle potential division by zero
            if abs(x2_avg - x1_avg) < 1e-6:
                return None
                
            slope = (y2_avg - y1_avg) / (x2_avg - x1_avg)
            intercept = y1_avg - slope * x1_avg
            
            # Calculate line endpoints
            y1 = frame_shape[0]
            y2 = int(frame_shape[0] * 0.6)
            
            # Ensure we don't divide by zero
            if abs(slope) < 1e-6:
                return None
                
            x1 = int((y1 - intercept) / slope)
            x2 = int((y2 - intercept) / slope)
            
            # Validate coordinates are within frame bounds
            width = frame_shape[1]
            if not (0 <= x1 <= width and 0 <= x2 <= width):
                return None
                
            return np.array([[x1, y1, x2, y2]])
            
        except (ValueError, IndexError, ZeroDivisionError) as e:
            return None

    def draw_lines(self, frame, left_line, right_line, color=(0, 255, 0), thickness=5):
        """
        Draw detected lane lines on the frame.
        """
        line_image = np.zeros_like(frame)
        
        if left_line is not None:
            x1, y1, x2, y2 = left_line[0]
            cv2.line(line_image, (x1, y1), (x2, y2), color, thickness)
            
        if right_line is not None:
            x1, y1, x2, y2 = right_line[0]
            cv2.line(line_image, (x1, y1), (x2, y2), color, thickness)
            
        return cv2.addWeighted(frame, 0.8, line_image, 1.0, 0)

    def process_frame(self, frame):
        """
        Process a single frame to detect and draw lane lines.
        """
        roi_vertices = self.get_roi_vertices(frame.shape)
        edges = self.detect_edges(frame)
        masked_edges = self.apply_roi_mask(edges, roi_vertices)
        lines = self.detect_lines(masked_edges)
        left_lines, right_lines = self.separate_lines(lines)
        left_line = self.average_lines(left_lines, frame.shape)
        right_line = self.average_lines(right_lines, frame.shape)
        result = self.draw_lines(frame, left_line, right_line)
        
        return result

class VideoProcessor:
    def __init__(self, input_path, output_path, lane_detector):
        """
        Initialize video processor with input/output paths and lane detector.
        """
        self.input_path = Path(input_path)
        self.output_path = Path(output_path)
        self.lane_detector = lane_detector
        self.setup_logging()
        
    def setup_logging(self):
        """Configure logging for tracking processing progress and errors."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('lane_detection.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def validate_video(self):
        """
        Validate input video file and check if output directory exists.
        """
        if not self.input_path.exists():
            self.logger.error(f"Input video not found: {self.input_path}")
            return False
            
        if not self.output_path.parent.exists():
            self.logger.info(f"Creating output directory: {self.output_path.parent}")
            self.output_path.parent.mkdir(parents=True)
            
        return True
    
    def get_video_properties(self, cap):
        """
        Get video properties from capture object.
        """
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        return width, height, fps, frame_count
    
    def create_video_writer(self, width, height, fps):
        """
        Create video writer object with specified properties.
        """
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        return cv2.VideoWriter(
            str(self.output_path), 
            fourcc, 
            fps, 
            (width, height)
        )
    
    def add_performance_metrics(self, frame, frame_time, fps):
        """
        Add processing performance metrics to frame.
        """
        metrics_text = f"Frame Time: {frame_time:.2f}ms | FPS: {fps:.1f}"
        cv2.putText(
            frame,
            metrics_text,
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            1,
            (0, 255, 0),
            2
        )
        return frame
    
    def process_video(self, display_progress=True, show_metrics=True):
        """
        Process video file with lane detection and optional progress display.
        """
        if not self.validate_video():
            return False
            
        cap = cv2.VideoCapture(str(self.input_path))
        width, height, fps, frame_count = self.get_video_properties(cap)
        writer = self.create_video_writer(width, height, fps)
        
        self.logger.info(f"Processing video: {self.input_path}")
        self.logger.info(f"Resolution: {width}x{height} | FPS: {fps} | Frames: {frame_count}")
        
        try:
            frames = range(frame_count)
            if display_progress:
                frames = tqdm(frames, desc="Processing frames")
            
            processing_times = []
            for _ in frames:
                ret, frame = cap.read()
                if not ret:
                    break
                
                start_time = time.time()
                processed_frame = self.lane_detector.process_frame(frame)
                frame_time = (time.time() - start_time) * 1000
                processing_times.append(frame_time)
                
                if show_metrics:
                    current_fps = 1000 / np.mean(processing_times[-30:])
                    processed_frame = self.add_performance_metrics(
                        processed_frame, 
                        frame_time,
                        current_fps
                    )
                
                writer.write(processed_frame)
            
            avg_frame_time = np.mean(processing_times)
            avg_fps = 1000 / avg_frame_time
            self.logger.info(f"Processing complete!")
            self.logger.info(f"Average frame time: {avg_frame_time:.2f}ms")
            self.logger.info(f"Average FPS: {avg_fps:.1f}")
            
        except Exception as e:
            self.logger.error(f"Error processing video: {str(e)}")
            return False
            
        finally:
            cap.release()
            writer.release()
            
        return True

def process_video_file(input_path, output_path):
    """
    Process a video file with lane detection.
    
    Args:
        input_path (str): Path to input video file
        output_path (str): Path to save processed video
        
    Returns:
        bool: True if processing successful, False otherwise
    """
    detector = LaneDetector()
    processor = VideoProcessor(input_path, output_path, detector)
    return processor.process_video()

# Example usage
if __name__ == "__main__":
    input_video = "test_video.mp4"
    output_video = "processed_video.mp4"
    
    success = process_video_file(input_video, output_video)
    
    if success:
        print("Video processing completed successfully!")
    else:
        print("Error processing video. Check logs for details.")

2025-02-17 12:30:58,062 - INFO - Processing video: test_video.mp4
2025-02-17 12:30:58,063 - INFO - Resolution: 1280x720 | FPS: 50 | Frames: 1295
Processing frames: 100%|███████████████████████████████████████████████████████████| 1295/1295 [00:14<00:00, 91.08it/s]
2025-02-17 12:31:12,284 - INFO - Processing complete!
2025-02-17 12:31:12,285 - INFO - Average frame time: 5.08ms
2025-02-17 12:31:12,285 - INFO - Average FPS: 196.9


Video processing completed successfully!


In [3]:
import cv2
import numpy as np
from tqdm import tqdm
import logging
import time
from pathlib import Path
from collections import deque
from scipy.signal import savgol_filter

class AdvancedLaneDetector:
    def __init__(self, config=None):
        """
        Initialize advanced lane detector with configurable parameters.
        
        Args:
            config (dict): Configuration parameters for lane detection
        """
        self.config = {
            'roi_height_ratio': 0.6,
            'roi_width_ratio': 0.1,
            'canny_low': 50,
            'canny_high': 150,
            'hough_threshold': 50,
            'min_line_length': 40,
            'max_line_gap': 100,
            'min_line_angle': 20,
            'max_line_angle': 160,
            'smoothing_window': 10,
            'gradient_threshold': (20, 100),
            'color_threshold': ([90, 40, 40], [130, 255, 255]),
            'perspective_offset': 0.25
        }
        if config:
            self.config.update(config)
            
        self.left_line_history = deque(maxlen=self.config['smoothing_window'])
        self.right_line_history = deque(maxlen=self.config['smoothing_window'])
        self.camera_matrix = None
        self.dist_coeffs = None

    def calibrate_camera(self, calibration_images, checkerboard_size=(9, 6)):
        """
        Calibrate camera using checkerboard images.
        
        Args:
            calibration_images (list): List of calibration image paths
            checkerboard_size (tuple): Size of the checkerboard pattern
        """
        objpoints = []
        imgpoints = []
        objp = np.zeros((checkerboard_size[0] * checkerboard_size[1], 3), np.float32)
        objp[:, :2] = np.mgrid[0:checkerboard_size[0], 0:checkerboard_size[1]].T.reshape(-1, 2)

        for image_path in calibration_images:
            img = cv2.imread(image_path)
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            ret, corners = cv2.findChessboardCorners(gray, checkerboard_size, None)
            
            if ret:
                objpoints.append(objp)
                imgpoints.append(corners)

        if imgpoints:
            ret, self.camera_matrix, self.dist_coeffs, _, _ = cv2.calibrateCamera(
                objpoints, imgpoints, gray.shape[::-1], None, None
            )

    def undistort_image(self, img):
        """
        Undistort image using camera calibration parameters.
        """
        if self.camera_matrix is not None and self.dist_coeffs is not None:
            return cv2.undistort(img, self.camera_matrix, self.dist_coeffs, None, self.camera_matrix)
        return img

    def get_perspective_transform(self, img_shape):
        """
        Get perspective transform matrix for bird's eye view.
        """
        offset = self.config['perspective_offset'] * img_shape[1]
        src = np.float32([
            [img_shape[1] * (0.5 - self.config['roi_width_ratio']), img_shape[0] * 0.65],
            [img_shape[1] * (0.5 + self.config['roi_width_ratio']), img_shape[0] * 0.65],
            [img_shape[1] * (1 - self.config['roi_width_ratio']), img_shape[0]],
            [img_shape[1] * self.config['roi_width_ratio'], img_shape[0]]
        ])
        dst = np.float32([
            [offset, 0],
            [img_shape[1] - offset, 0],
            [img_shape[1] - offset, img_shape[0]],
            [offset, img_shape[0]]
        ])
        return cv2.getPerspectiveTransform(src, dst)

    def apply_threshold(self, img):
        """
        Apply combined color and gradient thresholds.
        """
        # Convert to HLS color space
        hls = cv2.cvtColor(img, cv2.COLOR_BGR2HLS)
        
        # Color threshold
        lower = np.array(self.config['color_threshold'][0])
        upper = np.array(self.config['color_threshold'][1])
        color_mask = cv2.inRange(hls, lower, upper)
        
        # Gradient threshold
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
        abs_sobelx = np.absolute(sobelx)
        scaled_sobel = np.uint8(255 * abs_sobelx / np.max(abs_sobelx))
        gradient_mask = cv2.inRange(
            scaled_sobel,
            self.config['gradient_threshold'][0],
            self.config['gradient_threshold'][1]
        )
        
        # Combine thresholds
        combined = cv2.bitwise_or(color_mask, gradient_mask)
        return combined

    def detect_lane_pixels(self, binary_warped):
        """
        Detect lane pixels using sliding window approach.
        """
        histogram = np.sum(binary_warped[binary_warped.shape[0]//2:, :], axis=0)
        midpoint = histogram.shape[0]//2
        leftx_base = np.argmax(histogram[:midpoint])
        rightx_base = np.argmax(histogram[midpoint:]) + midpoint

        nwindows = 9
        window_height = binary_warped.shape[0]//nwindows
        nonzero = binary_warped.nonzero()
        nonzeroy = np.array(nonzero[0])
        nonzerox = np.array(nonzero[1])
        
        leftx_current = leftx_base
        rightx_current = rightx_base
        margin = 100
        minpix = 50
        
        left_lane_inds = []
        right_lane_inds = []
        
        for window in range(nwindows):
            win_y_low = binary_warped.shape[0] - (window + 1) * window_height
            win_y_high = binary_warped.shape[0] - window * window_height
            
            win_xleft_low = leftx_current - margin
            win_xleft_high = leftx_current + margin
            win_xright_low = rightx_current - margin
            win_xright_high = rightx_current + margin
            
            good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                             (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0]
            good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                              (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0]
            
            left_lane_inds.append(good_left_inds)
            right_lane_inds.append(good_right_inds)
            
            if len(good_left_inds) > minpix:
                leftx_current = np.int32(np.mean(nonzerox[good_left_inds]))
            if len(good_right_inds) > minpix:
                rightx_current = np.int32(np.mean(nonzerox[good_right_inds]))
        
        left_lane_inds = np.concatenate(left_lane_inds)
        right_lane_inds = np.concatenate(right_lane_inds)
        
        leftx = nonzerox[left_lane_inds]
        lefty = nonzeroy[left_lane_inds]
        rightx = nonzerox[right_lane_inds]
        righty = nonzeroy[right_lane_inds]
        
        return leftx, lefty, rightx, righty

    def fit_polynomial(self, leftx, lefty, rightx, righty):
        """
        Fit polynomial to detected lane pixels.
        """
        left_fit = np.polyfit(lefty, leftx, 2)
        right_fit = np.polyfit(righty, rightx, 2)
        
        # Add to history for smoothing
        self.left_line_history.append(left_fit)
        self.right_line_history.append(right_fit)
        
        # Apply smoothing
        left_fit_smoothed = np.mean(self.left_line_history, axis=0)
        right_fit_smoothed = np.mean(self.right_line_history, axis=0)
        
        return left_fit_smoothed, right_fit_smoothed

    def calculate_curvature(self, leftx, lefty, rightx, righty, img_shape):
        """
        Calculate radius of curvature for both lanes.
        """
        ym_per_pix = 30/720  # meters per pixel in y dimension
        xm_per_pix = 3.7/700  # meters per pixel in x dimension
        
        left_fit_cr = np.polyfit(lefty * ym_per_pix, leftx * xm_per_pix, 2)
        right_fit_cr = np.polyfit(righty * ym_per_pix, rightx * xm_per_pix, 2)
        
        y_eval = img_shape[0] * ym_per_pix
        
        left_curverad = ((1 + (2*left_fit_cr[0]*y_eval + left_fit_cr[1])**2)**1.5) / np.absolute(2*left_fit_cr[0])
        right_curverad = ((1 + (2*right_fit_cr[0]*y_eval + right_fit_cr[1])**2)**1.5) / np.absolute(2*right_fit_cr[0])
        
        return left_curverad, right_curverad

    def draw_lanes(self, img, left_fit, right_fit, perspective_matrix):
        """
        Draw detected lanes on the original image.
        """
        ploty = np.linspace(0, img.shape[0]-1, img.shape[0])
        left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
        right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
        
        # Create an image to draw the lines on
        warp_zero = np.zeros_like(img[:, :, 0]).astype(np.uint8)
        color_warp = np.dstack((warp_zero, warp_zero, warp_zero))
        
        # Recast the x and y points into usable format for cv2.fillPoly()
        pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))])
        pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))])
        pts = np.hstack((pts_left, pts_right))
        
        # Draw the lane
        cv2.fillPoly(color_warp, np.int_([pts]), (0, 255, 0))
        cv2.polylines(color_warp, np.int_([pts_left]), False, (255, 0, 0), thickness=15)
        cv2.polylines(color_warp, np.int_([pts_right]), False, (0, 0, 255), thickness=15)
        
        # Warp back to original image space
        newwarp = cv2.warpPerspective(color_warp, np.linalg.inv(perspective_matrix), (img.shape[1], img.shape[0]))
        
        # Combine with original image
        result = cv2.addWeighted(img, 1, newwarp, 0.3, 0)
        return result

    def process_frame(self, frame):
        """
        Process a single frame with advanced lane detection.
        """
        # Undistort image if camera is calibrated
        undistorted = self.undistort_image(frame)
        
        # Get perspective transform matrix
        perspective_matrix = self.get_perspective_transform(frame.shape)
        
        # Apply thresholds and transform perspective
        binary = self.apply_threshold(undistorted)
        warped = cv2.warpPerspective(binary, perspective_matrix, (frame.shape[1], frame.shape[0]))
        
        # Detect lane pixels
        leftx, lefty, rightx, righty = self.detect_lane_pixels(warped)
        
        # Fit polynomials
        left_fit, right_fit = self.fit_polynomial(leftx, lefty, rightx, righty)
        
        # Calculate curvature
        left_curverad, right_curverad = self.calculate_curvature(leftx, lefty, rightx, righty, frame.shape)
        
        # Draw lanes
        result = self.draw_lanes(frame, left_fit, right_fit, perspective_matrix)
        
        # Add curvature information
        curvature_text = f'Curve Radius: {(left_curverad + right_curverad)/2:.0f}m'
        cv2.putText(result, curvature_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        
        return result

class EnhancedVideoProcessor:
    def __init__(self, input_path, output_path, lane_detector):
        self.input_path = Path(input_path)
        self.output_path = Path(output_path)
        self.lane_detector = lane_detector
        self.setup_logging()
        
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('advanced_lane_detection.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def process_video(self, display_progress=True, show_metrics=True):
        """
        Process video with enhanced lane detection and diagnostics.
        """
        if not self.input_path.exists():
            self.logger.error(f"Input video not found: {self.input_path}")
            return False
            
        cap = cv2.VideoCapture(str(self.input_path))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Create video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(
            str(self.output_path),
            fourcc,
            fps,
            (width, height)
        )
        
        self.logger.info(f"Processing video: {self.input_path}")
        self.logger.info(f"Resolution: {width}x{height} | FPS: {fps} | Frames: {frame_count}")
        
        # Performance monitoring
        processing_times = []
        detection_metrics = {
            'lane_confidence': [],
            'curvature_values': [],
            'failed_frames': 0
        }
        
        try:
            frames = range(frame_count)
            if display_progress:
                frames = tqdm(frames, desc="Processing frames")
            
            for frame_idx in frames:
                ret, frame = cap.read()
                if not ret:
                    break
                
                try:
                    # Process frame and measure performance
                    start_time = time.time()
                    processed_frame = self.lane_detector.process_frame(frame)
                    frame_time = (time.time() - start_time) * 1000
                    processing_times.append(frame_time)
                    
                    # Add performance overlay if requested
                    if show_metrics:
                        current_fps = 1000 / np.mean(processing_times[-30:])
                        metrics_text = [
                            f"Frame Time: {frame_time:.1f}ms",
                            f"FPS: {current_fps:.1f}",
                            f"Frame: {frame_idx}/{frame_count}"
                        ]
                        
                        for i, text in enumerate(metrics_text):
                            cv2.putText(
                                processed_frame,
                                text,
                                (10, 30 + i * 30),
                                cv2.FONT_HERSHEY_SIMPLEX,
                                1,
                                (255, 255, 255),
                                2
                            )
                    
                    writer.write(processed_frame)
                    
                except Exception as e:
                    self.logger.warning(f"Failed to process frame {frame_idx}: {str(e)}")
                    detection_metrics['failed_frames'] += 1
                    writer.write(frame)  # Write original frame on failure
            
            # Calculate and log performance statistics
            avg_frame_time = np.mean(processing_times)
            avg_fps = 1000 / avg_frame_time
            
            self.logger.info("Processing complete!")
            self.logger.info(f"Average frame time: {avg_frame_time:.2f}ms")
            self.logger.info(f"Average FPS: {avg_fps:.1f}")
            self.logger.info(f"Failed frames: {detection_metrics['failed_frames']}")
            
            # Generate performance report
            self._generate_performance_report(
                detection_metrics,
                processing_times,
                frame_count
            )
            
        except Exception as e:
            self.logger.error(f"Error processing video: {str(e)}")
            return False
            
        finally:
            cap.release()
            writer.release()
            
        return True
    
    def _generate_performance_report(self, metrics, processing_times, total_frames):
        """
        Generate detailed performance report.
        """
        report_path = self.output_path.with_suffix('.report.txt')
        
        with open(report_path, 'w') as f:
            f.write("Lane Detection Performance Report\n")
            f.write("===============================\n\n")
            
            # Processing statistics
            f.write("Processing Statistics:\n")
            f.write(f"Total frames: {total_frames}\n")
            f.write(f"Successfully processed: {total_frames - metrics['failed_frames']}\n")
            f.write(f"Failed frames: {metrics['failed_frames']}\n")
            f.write(f"Success rate: {((total_frames - metrics['failed_frames']) / total_frames) * 100:.2f}%\n\n")
            
            # Performance metrics
            f.write("Performance Metrics:\n")
            f.write(f"Average processing time: {np.mean(processing_times):.2f}ms\n")
            f.write(f"Standard deviation: {np.std(processing_times):.2f}ms\n")
            f.write(f"Minimum processing time: {np.min(processing_times):.2f}ms\n")
            f.write(f"Maximum processing time: {np.max(processing_times):.2f}ms\n")
            f.write(f"Average FPS: {1000 / np.mean(processing_times):.2f}\n\n")
            
            # Processing distribution
            percentiles = [25, 50, 75, 90, 95, 99]
            f.write("Processing Time Distribution:\n")
            for p in percentiles:
                f.write(f"{p}th percentile: {np.percentile(processing_times, p):.2f}ms\n")

def process_video_file(input_path, output_path, config=None):
    """
    Process a video file with advanced lane detection.
    
    Args:
        input_path (str): Path to input video file
        output_path (str): Path to save processed video
        config (dict, optional): Configuration parameters for lane detection
        
    Returns:
        bool: True if processing successful, False otherwise
    """
    detector = AdvancedLaneDetector(config)
    processor = EnhancedVideoProcessor(input_path, output_path, detector)
    return processor.process_video()

In [4]:
# Basic usage
success = process_video_file("test_video.mp4", "output.mp4")

# With custom configuration
config = {
    'roi_height_ratio': 0.6,
    'roi_width_ratio': 0.1,
    'smoothing_window': 15,
    'gradient_threshold': (30, 100),
    'color_threshold': ([85, 35, 35], [135, 255, 255])
}
success = process_video_file("test_video.mp4", "output.mp4", config)

2025-02-17 13:01:45,893 - INFO - Processing video: test_video.mp4
2025-02-17 13:01:45,894 - INFO - Resolution: 1280x720 | FPS: 50 | Frames: 1295
Processing frames: 100%|███████████████████████████████████████████████████████████| 1295/1295 [00:50<00:00, 25.52it/s]
2025-02-17 13:02:36,650 - INFO - Processing complete!
2025-02-17 13:02:36,651 - INFO - Average frame time: 32.66ms
2025-02-17 13:02:36,652 - INFO - Average FPS: 30.6
2025-02-17 13:02:36,652 - INFO - Failed frames: 0
2025-02-17 13:02:36,714 - INFO - Processing video: test_video.mp4
2025-02-17 13:02:36,715 - INFO - Resolution: 1280x720 | FPS: 50 | Frames: 1295
  left_fit = np.polyfit(lefty, leftx, 2)
  left_fit_cr = np.polyfit(lefty * ym_per_pix, leftx * xm_per_pix, 2)
  left_fit = np.polyfit(lefty, leftx, 2)
  left_fit_cr = np.polyfit(lefty * ym_per_pix, leftx * xm_per_pix, 2)
  left_fit = np.polyfit(lefty, leftx, 2)
  left_fit_cr = np.polyfit(lefty * ym_per_pix, leftx * xm_per_pix, 2)
  left_fit = np.polyfit(lefty, leftx, 2)
