In [None]:
!pip install -q ultralytics opencv-python numpy torch

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m914.6/914.6 kB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m54.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m23.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m39.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import cv2
import numpy as np
import math
import time
from ultralytics import YOLO
from collections import deque
from concurrent.futures import ThreadPoolExecutor

class LaneDetector:
    def __init__(self):
        self.prev_left_line = None
        self.prev_right_line = None
        # Store last N frames of lane positions for smoothing
        self.left_lines_history = deque(maxlen=5)
        self.right_lines_history = deque(maxlen=5)

    def preprocess_image(self, image):
        # Convert to HSV for better lane marking detection
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

        # Create mask for white and yellow lane markings
        lower_white = np.array([0, 0, 200])
        upper_white = np.array([180, 30, 255])
        white_mask = cv2.inRange(hsv, lower_white, upper_white)

        lower_yellow = np.array([20, 100, 100])
        upper_yellow = np.array([30, 255, 255])
        yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)

        # Combine masks
        combined_mask = cv2.bitwise_or(white_mask, yellow_mask)

        # Apply slight Gaussian blur to reduce noise
        blurred = cv2.GaussianBlur(combined_mask, (5, 5), 0)

        return blurred

    def detect_lane(self, image):
        height, width = image.shape[:2]

        # Preprocess image
        processed = self.preprocess_image(image)

        # Apply Canny edge detection
        edges = cv2.Canny(processed, 50, 150)

        # Define ROI - focus on bottom half but consider wider area for cyclist view
        roi_vertices = np.array([
            [(0, height),
             (width * 0.35, height * 0.5),
             (width * 0.65, height * 0.5),
             (width, height)]
        ], dtype=np.int32)

        # Create and apply ROI mask
        mask = np.zeros_like(edges)
        cv2.fillPoly(mask, roi_vertices, 255)
        masked_edges = cv2.bitwise_and(edges, mask)

        # Detect lines using probabilistic Hough transform
        lines = cv2.HoughLinesP(
            masked_edges,
            rho=1,
            theta=np.pi/180,
            threshold=20,
            minLineLength=30,
            maxLineGap=50
        )

        left_lines, right_lines = [], []
        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]
                if x2 - x1 == 0:
                    continue

                slope = (y2 - y1) / (x2 - x1)
                length = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)

                # Filter lines based on slope and length
                if abs(slope) < 0.3 or abs(slope) > 2.0:
                    continue

                if slope < 0:
                    left_lines.append(line[0])
                else:
                    right_lines.append(line[0])

        # Process detected lines
        left_line = self._process_lines(left_lines, height, True)
        right_line = self._process_lines(right_lines, height, False)

        # Update line history
        if left_line is not None:
            self.left_lines_history.append(left_line)
        if right_line is not None:
            self.right_lines_history.append(right_line)

        # Get smoothed lines
        left_line = self._get_smoothed_line(self.left_lines_history)
        right_line = self._get_smoothed_line(self.right_lines_history)

        # Calculate lane center and width
        lane_center = width // 2
        lane_width = width // 2  # Default value

        if left_line is not None and right_line is not None:
            # Calculate at bottom of frame
            lane_center = (left_line[0] + right_line[0]) // 2
            lane_width = right_line[0] - left_line[0]

        return lane_center, lane_width, left_line, right_line

    def _process_lines(self, lines, height, is_left):
        if not lines:
            return self.prev_left_line if is_left else self.prev_right_line

        # Convert lines to points for polynomial fitting
        points = np.array([(x1, y1) for x1, y1, x2, y2 in lines] +
                         [(x2, y2) for x1, y1, x2, y2 in lines])

        # Fit polynomial
        coeffs = np.polyfit(points[:, 1], points[:, 0], deg=1)

        # Generate line points
        y_bottom = height
        y_top = int(height * 0.5)
        x_bottom = int(coeffs[0] * y_bottom + coeffs[1])
        x_top = int(coeffs[0] * y_top + coeffs[1])

        line = [x_bottom, y_bottom, x_top, y_top]

        # Update previous line
        if is_left:
            self.prev_left_line = line
        else:
            self.prev_right_line = line

        return line

    def _get_smoothed_line(self, line_history):
        if not line_history:
            return None
        return np.mean(list(line_history), axis=0, dtype=np.int32)

class RiskAssessor:
    def __init__(self):
        self.vehicle_history = {}  # Store vehicle tracking history
        self.risk_threshold_close = 50  # pixels
        self.risk_threshold_speed = 15  # pixels per frame

    def calculate_risk(self, vehicle_data, lane_center, lane_width, frame_time):
        vehicle_id = vehicle_data['id']
        vehicle_x = vehicle_data['center_x']
        vehicle_y = vehicle_data['center_y']

        # Get vehicle history
        if vehicle_id in self.vehicle_history:
            prev_data = self.vehicle_history[vehicle_id]
            dx = vehicle_x - prev_data['x']
            dy = vehicle_y - prev_data['y']
            dt = frame_time - prev_data['time']

            # Calculate metrics
            speed = np.sqrt(dx**2 + dy**2) / dt if dt > 0 else 0
            lane_distance = abs(vehicle_x - lane_center)
            in_cyclist_lane = abs(vehicle_x - lane_center) < (lane_width * 0.4)

            # Risk factors
            risk_factors = {
                'speed': min(1.0, speed / self.risk_threshold_speed),
                'proximity': min(1.0, (lane_width - lane_distance) / self.risk_threshold_close),
                'lane_invasion': 1.0 if in_cyclist_lane else 0.0
            }

            # Calculate overall risk
            risk_score = (
                risk_factors['speed'] * 0.4 +
                risk_factors['proximity'] * 0.3 +
                risk_factors['lane_invasion'] * 0.3
            )

            risk_level = "SAFE"
            if risk_score > 0.7:
                risk_level = "DANGER"
            elif risk_score > 0.4:
                risk_level = "WARNING"

            # Additional danger checks
            if in_cyclist_lane and speed > self.risk_threshold_speed * 0.8:
                risk_level = "DANGER"

        else:
            risk_level = "SAFE"
            speed = 0

        # Update history
        self.vehicle_history[vehicle_id] = {
            'x': vehicle_x,
            'y': vehicle_y,
            'time': frame_time
        }

        return risk_level, speed

def process_video(input_path, output_path):
    # Initialize components
    model = YOLO('yolov8n.pt')
    lane_detector = LaneDetector()
    risk_assessor = RiskAssessor()
    target_classes = [2, 3, 5, 7, 1]  # car, motorcycle, bus, truck, bicycle

    cap = cv2.VideoCapture(input_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (1280, 720))

    frame_count = 0

    with ThreadPoolExecutor(max_workers=2) as executor:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            frame_time = time.time()

            # Resize frame
            resized_frame = cv2.resize(frame, (1280, 720))

            # Process lane detection and YOLO detection concurrently
            lane_future = executor.submit(lane_detector.detect_lane, resized_frame)
            yolo_future = executor.submit(model, resized_frame)

            # Get results
            lane_center, lane_width, left_line, right_line = lane_future.result()
            results = yolo_future.result()

            # Draw lane overlay
            if left_line is not None and right_line is not None:
                overlay = resized_frame.copy()
                cv2.fillPoly(overlay, [np.array([
                    [left_line[0], left_line[1]],
                    [left_line[2], left_line[3]],
                    [right_line[2], right_line[3]],
                    [right_line[0], right_line[1]]
                ])], (0, 255, 0, 128))
                cv2.addWeighted(overlay, 0.35, resized_frame, 0.65, 0, resized_frame)

            # Process detections
            for result in results:
                boxes = result.boxes
                for box in boxes:
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])

                    if cls in target_classes and conf >= 0.5:
                        x1, y1, x2, y2 = map(int, box.xyxy[0])

                        vehicle_data = {
                            'id': f"{cls}_{x1}_{y1}",
                            'center_x': (x1 + x2) // 2,
                            'center_y': (y1 + y2) // 2
                        }

                        risk_level, speed = risk_assessor.calculate_risk(
                            vehicle_data, lane_center, lane_width, frame_time
                        )

                        color = {
                            "SAFE": (0, 255, 0),
                            "WARNING": (0, 255, 255),
                            "DANGER": (0, 0, 255)
                        }[risk_level]

                        class_name = result.names[cls]
                        label = f'{class_name} {conf:.2f} | Risk: {risk_level} | Speed: {speed:.1f}'

                        cv2.rectangle(resized_frame, (x1, y1), (x2, y2), color, 2)
                        cv2.putText(resized_frame, label, (x1, y1 - 10),
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            # Write frame
            out.write(resized_frame)

            if frame_count % 30 == 0:
                fps_calc = 30 / (time.time() - frame_time)
                print(f"FPS: {fps_calc:.2f}")

    cap.release()
    out.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    input_path = '/content/lane.mp4'
    output_path = 'exp3.mp4'
    process_video(input_path, output_path)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Speed: 3.4ms preprocess, 140.0ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 2 trucks, 228.1ms
Speed: 3.3ms preprocess, 228.1ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 2 trucks, 228.2ms
Speed: 3.1ms preprocess, 228.2ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 cars, 2 trucks, 224.9ms
Speed: 3.4ms preprocess, 224.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 2 trucks, 226.6ms
Speed: 3.1ms preprocess, 226.6ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 2 trucks, 239.3ms
Speed: 5.7ms preprocess, 239.3ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 2 trucks, 224.2ms
Speed: 3.2ms preprocess, 224.2ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 cars, 2

In [None]:
import cv2
import numpy as np
import torch
from ultralytics import YOLO
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from scipy.spatial import distance
import time

class SpeedDetector:
    def __init__(self, fps, scale_factor=0.05, smoothing_factor=0.4):
        self.fps = fps
        self.scale_factor = scale_factor  # pixels to meters
        self.alpha = smoothing_factor
        self.prev_gray = None
        self.smoothed_speeds = {}
        self.prev_centers = {}

    def calculate_speed(self, frame, detection_data):
        if self.prev_gray is None:
            self.prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            return 0

        current_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        x1, y1, x2, y2 = detection_data['bbox']
        obj_id = detection_data['id']

        # Crop regions for optical flow
        prev_gray_crop = self.prev_gray[y1:y2, x1:x2]
        current_gray_crop = current_gray[y1:y2, x1:x2]

        if prev_gray_crop.shape[0] > 1 and prev_gray_crop.shape[1] > 1:
            # Calculate optical flow
            flow = cv2.calcOpticalFlowFarneback(
                prev_gray_crop,
                current_gray_crop,
                None,
                0.5, 3, 15, 3, 5, 1.2, 0
            )

            # Calculate magnitude
            mag = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
            avg_motion = np.mean(mag)

            # Convert to km/h
            dt = 1 / self.fps
            speed_mps = (avg_motion * self.scale_factor) / dt
            speed_kmph = speed_mps * 3.6

            # Apply smoothing
            if obj_id not in self.smoothed_speeds:
                self.smoothed_speeds[obj_id] = speed_kmph
            else:
                self.smoothed_speeds[obj_id] = (
                    self.alpha * speed_kmph +
                    (1 - self.alpha) * self.smoothed_speeds[obj_id]
                )

        self.prev_gray = current_gray
        return self.smoothed_speeds.get(obj_id, 0)

class LaneDetector:
    def __init__(self):
        self.prev_left_line = None
        self.prev_right_line = None
        self.left_lines_history = deque(maxlen=5)
        self.right_lines_history = deque(maxlen=5)

    def detect_lane(self, image):
        height, width = image.shape[:2]

        # Convert to HSV and create lane marking mask
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

        # Optimize thresholds for lane detection
        white_mask = cv2.inRange(
            hsv,
            np.array([0, 0, 200]),
            np.array([180, 30, 255])
        )
        yellow_mask = cv2.inRange(
            hsv,
            np.array([20, 100, 100]),
            np.array([30, 255, 255])
        )

        mask = cv2.bitwise_or(white_mask, yellow_mask)
        edges = cv2.Canny(mask, 50, 150)

        # Define ROI polygon
        roi_vertices = np.array([
            [(0, height),
             (width * 0.35, height * 0.5),
             (width * 0.65, height * 0.5),
             (width, height)]
        ], dtype=np.int32)

        # Apply ROI mask
        roi_mask = np.zeros_like(edges)
        cv2.fillPoly(roi_mask, roi_vertices, 255)
        masked_edges = cv2.bitwise_and(edges, roi_mask)

        # Detect lines
        lines = cv2.HoughLinesP(
            masked_edges,
            rho=1,
            theta=np.pi/180,
            threshold=20,
            minLineLength=30,
            maxLineGap=50
        )

        left_lines, right_lines = [], []
        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]
                if x2 - x1 == 0:
                    continue

                slope = (y2 - y1) / (x2 - x1)
                if abs(slope) < 0.3 or abs(slope) > 2.0:
                    continue

                if slope < 0:
                    left_lines.append(line[0])
                else:
                    right_lines.append(line[0])

        # Process lines
        left_line = self._process_lane_lines(left_lines, height, True)
        right_line = self._process_lane_lines(right_lines, height, False)

        # Update history
        if left_line is not None:
            self.left_lines_history.append(left_line)
        if right_line is not None:
            self.right_lines_history.append(right_line)

        # Get smoothed lines
        left_line = self._get_smoothed_line(self.left_lines_history)
        right_line = self._get_smoothed_line(self.right_lines_history)

        # Calculate lane info
        lane_center = width // 2
        lane_width = width // 2

        if left_line is not None and right_line is not None:
            lane_center = (left_line[0] + right_line[0]) // 2
            lane_width = right_line[0] - left_line[0]

        return lane_center, lane_width, left_line, right_line

    def _process_lane_lines(self, lines, height, is_left):
        if not lines:
            return self.prev_left_line if is_left else self.prev_right_line

        points = np.array([(x1, y1) for x1, y1, x2, y2 in lines] +
                         [(x2, y2) for x1, y1, x2, y2 in lines])

        coeffs = np.polyfit(points[:, 1], points[:, 0], deg=1)

        y_bottom = height
        y_top = int(height * 0.5)
        x_bottom = int(coeffs[0] * y_bottom + coeffs[1])
        x_top = int(coeffs[0] * y_top + coeffs[1])

        line = [x_bottom, y_bottom, x_top, y_top]

        if is_left:
            self.prev_left_line = line
        else:
            self.prev_right_line = line

        return line

    def _get_smoothed_line(self, line_history):
        if not line_history:
            return None
        return np.mean(list(line_history), axis=0, dtype=np.int32)

class RiskAssessor:
    def __init__(self):
        self.vehicle_history = {}
        self.risk_threshold_close = 50
        self.risk_threshold_speed = 15

    def calculate_risk(self, vehicle_data, lane_center, lane_width, frame_time):
        vehicle_id = vehicle_data['id']
        vehicle_x = vehicle_data['center_x']
        vehicle_y = vehicle_data['center_y']
        vehicle_speed = vehicle_data.get('speed', 0)

        if vehicle_id in self.vehicle_history:
            prev_data = self.vehicle_history[vehicle_id]
            dx = vehicle_x - prev_data['x']
            dy = vehicle_y - prev_data['y']
            dt = frame_time - prev_data['time']

            # Calculate risk factors
            speed_factor = min(1.0, vehicle_speed / 50.0)  # Normalized speed
            proximity_factor = min(1.0, (lane_width - abs(vehicle_x - lane_center)) / self.risk_threshold_close)
            lane_invasion = abs(vehicle_x - lane_center) < (lane_width * 0.4)

            # Calculate risk score
            risk_score = (
                speed_factor * 0.4 +
                proximity_factor * 0.3 +
                float(lane_invasion) * 0.3
            )

            risk_level = "SAFE"
            if risk_score > 0.7:
                risk_level = "DANGER"
            elif risk_score > 0.4:
                risk_level = "WARNING"

        else:
            risk_level = "SAFE"
            risk_score = 0

        # Update history
        self.vehicle_history[vehicle_id] = {
            'x': vehicle_x,
            'y': vehicle_y,
            'time': frame_time,
            'speed': vehicle_speed
        }

        return risk_level, risk_score

def process_frame(frame, model, lane_detector, speed_detector, risk_assessor, frame_time):
    # Resize frame
    frame = cv2.resize(frame, (1280, 720))
    with ThreadPoolExecutor(max_workers=2) as executor:
        lane_future = executor.submit(lane_detector.detect_lane, frame)
        yolo_future = executor.submit(model, frame, verbose=False)

        # Get results
        lane_center, lane_width, left_line, right_line = lane_future.result()
        results = yolo_future.result()

    # Draw lane overlay
    if left_line is not None and right_line is not None:
        overlay = frame.copy()
        cv2.fillPoly(overlay, [np.array([
            [left_line[0], left_line[1]],
            [left_line[2], left_line[3]],
            [right_line[2], right_line[3]],
            [right_line[0], right_line[1]]
        ])], (0, 255, 0, 128))
        cv2.addWeighted(overlay, 0.35, frame, 0.65, 0, frame)

    # Process detections
    for result in results:
        for box in result.boxes:
            cls = int(box.cls[0])
            conf = float(box.conf[0])

            if conf >= 0.5:
                x1, y1, x2, y2 = map(int, box.xyxy[0])

                # Calculate speed using optical flow
                detection_data = {
                    'id': f"{cls}_{x1}_{y1}",
                    'bbox': (x1, y1, x2, y2),
                    'center_x': (x1 + x2) // 2,
                    'center_y': (y1 + y2) // 2
                }

                speed = speed_detector.calculate_speed(frame, detection_data)
                detection_data['speed'] = speed

                # Calculate risk
                risk_level, risk_score = risk_assessor.calculate_risk(
                    detection_data, lane_center, lane_width, frame_time
                )

                # Draw bounding box and information
                color = {
                    "SAFE": (0, 255, 0),
                    "WARNING": (0, 255, 255),
                    "DANGER": (0, 0, 255)
                }[risk_level]

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

                # Display information
                info_text = [
                    f"ID: {detection_data['id']}",
                    f"Speed: {speed:.1f} km/h",
                    f"Risk: {risk_level}",
                    f"Score: {risk_score:.2f}"
                ]

                for i, text in enumerate(info_text):
                    y_offset = y1 - 10 - (i * 15)
                    cv2.putText(frame, text, (x1, y_offset),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)

    return frame

def process_video(input_path, output_path):
    # Initialize components
    model = YOLO('yolov8n.pt')
    if torch.cuda.is_available():
        model.to('cuda')

    cap = cv2.VideoCapture(input_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Initialize components
    lane_detector = LaneDetector()
    speed_detector = SpeedDetector(fps)
    risk_assessor = RiskAssessor()

    # Setup video writer
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (1280, 720))

    frame_count = 0
    start_time = time.time()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        current_time = time.time()

        # Process frame
        processed_frame = process_frame(
            frame,
            model,
            lane_detector,
            speed_detector,
            risk_assessor,
            current_time
        )
        out.write(processed_frame)
        if frame_count % 30 == 0:
            fps_calc = 30 / (time.time() - start_time)
            print(f"FPS: {fps_calc:.2f}")
            start_time = time.time()

    cap.release()
    out.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    input_path = '/content/lane.mp4'
    output_path = 'exp4.mp4'
    process_video(input_path, output_path)

FPS: 4.07
FPS: 5.10
FPS: 3.97
FPS: 5.31
FPS: 4.15
FPS: 4.69
FPS: 3.56
FPS: 4.68
FPS: 4.40
FPS: 5.42
FPS: 4.42
FPS: 5.20
FPS: 4.23
FPS: 4.64
FPS: 3.71
FPS: 4.35
FPS: 4.06
FPS: 5.62
FPS: 4.50
FPS: 5.61
FPS: 4.48
FPS: 5.48
FPS: 4.82
FPS: 4.88
FPS: 4.92
FPS: 4.35
FPS: 4.41
FPS: 4.30
FPS: 4.23
FPS: 4.41
FPS: 4.10
FPS: 4.80
FPS: 4.31
FPS: 4.99
FPS: 5.01
FPS: 4.75
FPS: 5.55
FPS: 4.45
FPS: 5.39
FPS: 4.44
FPS: 5.45
FPS: 4.36
FPS: 5.32
FPS: 4.34
FPS: 5.41
FPS: 4.53
FPS: 5.14
FPS: 4.60
FPS: 4.58
FPS: 4.50
FPS: 4.28
FPS: 4.29
FPS: 4.82
FPS: 4.83
FPS: 4.62
FPS: 4.74
FPS: 4.05
FPS: 4.66
FPS: 4.30
FPS: 3.96
FPS: 4.94
FPS: 4.74
FPS: 4.74
FPS: 4.71
FPS: 4.31
FPS: 4.48
FPS: 4.33
FPS: 4.28
FPS: 4.31
FPS: 3.83
FPS: 4.58
FPS: 3.83
FPS: 4.64
FPS: 4.01
FPS: 5.15
FPS: 4.45
FPS: 5.82
FPS: 5.10
FPS: 5.12
FPS: 5.65


In [None]:
import cv2
import numpy as np
import torch
from ultralytics import YOLO
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from scipy.spatial import distance
import time

class accDetector:
    def __init__(self, fps, scale_factor=0.05, smoothing_factor=0.4):
        self.fps = fps
        self.scale_factor = scale_factor  # pixels to meters
        self.alpha = smoothing_factor
        self.prev_gray = None
        self.smoothed_accs = {}
        self.prev_centers = {}

    def calculate_acc(self, frame, detection_data):
        if self.prev_gray is None:
            self.prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            return 0

        current_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        x1, y1, x2, y2 = detection_data['bbox']
        obj_id = detection_data['id']

        # Crop regions for optical flow
        prev_gray_crop = self.prev_gray[y1:y2, x1:x2]
        current_gray_crop = current_gray[y1:y2, x1:x2]

        if prev_gray_crop.shape[0] > 1 and prev_gray_crop.shape[1] > 1:
            # Calculate optical flow
            flow = cv2.calcOpticalFlowFarneback(
                prev_gray_crop,
                current_gray_crop,
                None,
                0.5, 3, 15, 3, 5, 1.2, 0
            )

            # Calculate magnitude
            mag = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
            avg_motion = np.mean(mag)

            # Convert to km/h
            dt = 1 / self.fps
            acc_mps = (avg_motion * self.scale_factor) / dt
            acc_kmph = acc_mps * 3.6

            # Apply smoothing
            if obj_id not in self.smoothed_accs:
                self.smoothed_accs[obj_id] = acc_kmph
            else:
                self.smoothed_accs[obj_id] = (
                    self.alpha * acc_kmph +
                    (1 - self.alpha) * self.smoothed_accs[obj_id]
                )

        self.prev_gray = current_gray
        return self.smoothed_accs.get(obj_id, 0)

class LaneDetector:
    def __init__(self):
        self.prev_left_line = None
        self.prev_right_line = None
        self.left_lines_history = deque(maxlen=5)
        self.right_lines_history = deque(maxlen=5)

    def detect_lane(self, image):
        height, width = image.shape[:2]

        # Convert to HSV and create lane marking mask
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

        # Optimize thresholds for lane detection
        white_mask = cv2.inRange(
            hsv,
            np.array([0, 0, 200]),
            np.array([180, 30, 255])
        )
        yellow_mask = cv2.inRange(
            hsv,
            np.array([20, 100, 100]),
            np.array([30, 255, 255])
        )

        mask = cv2.bitwise_or(white_mask, yellow_mask)
        edges = cv2.Canny(mask, 50, 150)

        # Define ROI polygon
        roi_vertices = np.array([
            [(0, height),
             (width * 0.35, height * 0.5),
             (width * 0.65, height * 0.5),
             (width, height)]
        ], dtype=np.int32)

        # Apply ROI mask
        roi_mask = np.zeros_like(edges)
        cv2.fillPoly(roi_mask, roi_vertices, 255)
        masked_edges = cv2.bitwise_and(edges, roi_mask)

        # Detect lines
        lines = cv2.HoughLinesP(
            masked_edges,
            rho=1,
            theta=np.pi/180,
            threshold=20,
            minLineLength=30,
            maxLineGap=50
        )

        left_lines, right_lines = [], []
        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]
                if x2 - x1 == 0:
                    continue

                slope = (y2 - y1) / (x2 - x1)
                if abs(slope) < 0.3 or abs(slope) > 2.0:
                    continue

                if slope < 0:
                    left_lines.append(line[0])
                else:
                    right_lines.append(line[0])

        # Process lines
        left_line = self._process_lane_lines(left_lines, height, True)
        right_line = self._process_lane_lines(right_lines, height, False)

        # Update history
        if left_line is not None:
            self.left_lines_history.append(left_line)
        if right_line is not None:
            self.right_lines_history.append(right_line)

        # Get smoothed lines
        left_line = self._get_smoothed_line(self.left_lines_history)
        right_line = self._get_smoothed_line(self.right_lines_history)

        # Calculate lane info
        lane_center = width // 2
        lane_width = width // 2

        if left_line is not None and right_line is not None:
            lane_center = (left_line[0] + right_line[0]) // 2
            lane_width = right_line[0] - left_line[0]

        return lane_center, lane_width, left_line, right_line

    def _process_lane_lines(self, lines, height, is_left):
        if not lines:
            return self.prev_left_line if is_left else self.prev_right_line

        points = np.array([(x1, y1) for x1, y1, x2, y2 in lines] +
                         [(x2, y2) for x1, y1, x2, y2 in lines])

        coeffs = np.polyfit(points[:, 1], points[:, 0], deg=1)

        y_bottom = height
        y_top = int(height * 0.5)
        x_bottom = int(coeffs[0] * y_bottom + coeffs[1])
        x_top = int(coeffs[0] * y_top + coeffs[1])

        line = [x_bottom, y_bottom, x_top, y_top]

        if is_left:
            self.prev_left_line = line
        else:
            self.prev_right_line = line

        return line

    def _get_smoothed_line(self, line_history):
        if not line_history:
            return None
        return np.mean(list(line_history), axis=0, dtype=np.int32)

class RiskAssessor:
    def __init__(self):
        self.vehicle_history = {}
        self.risk_threshold_close = 50
        self.risk_threshold_acc = 15

    def calculate_risk(self, vehicle_data, lane_center, lane_width, frame_time):
        vehicle_id = vehicle_data['id']
        vehicle_x = vehicle_data['center_x']
        vehicle_y = vehicle_data['center_y']
        vehicle_acc = vehicle_data.get('acc', 0)

        if vehicle_id in self.vehicle_history:
            prev_data = self.vehicle_history[vehicle_id]
            dx = vehicle_x - prev_data['x']
            dy = vehicle_y - prev_data['y']
            dt = frame_time - prev_data['time']

            # Calculate risk factors
            acc_factor = min(1.0, vehicle_acc / 50.0)  # Normalized acc
            proximity_factor = min(1.0, (lane_width - abs(vehicle_x - lane_center)) / self.risk_threshold_close)
            lane_invasion = abs(vehicle_x - lane_center) < (lane_width * 0.4)

            # Calculate risk score
            risk_score = (
                acc_factor * 0.5 +
                proximity_factor * 0.4 +
                float(lane_invasion) * 0.1
            )

            risk_level = "SAFE"
            if risk_score > 0.7:
                risk_level = "DANGER"
            elif risk_score > 0.4:
                risk_level = "WARNING"

        else:
            risk_level = "SAFE"
            risk_score = 0

        # Update history
        self.vehicle_history[vehicle_id] = {
            'x': vehicle_x,
            'y': vehicle_y,
            'time': frame_time,
            'acc': vehicle_acc
        }

        return risk_level, risk_score

def process_frame(frame, model, lane_detector, acc_detector, risk_assessor, frame_time):
    # Resize frame
    frame = cv2.resize(frame, (1280, 720))
    with ThreadPoolExecutor(max_workers=2) as executor:
        lane_future = executor.submit(lane_detector.detect_lane, frame)
        yolo_future = executor.submit(model, frame, verbose=False)

        # Get results
        lane_center, lane_width, left_line, right_line = lane_future.result()
        results = yolo_future.result()

    # Draw lane overlay
    if left_line is not None and right_line is not None:
        overlay = frame.copy()
        cv2.fillPoly(overlay, [np.array([
            [left_line[0], left_line[1]],
            [left_line[2], left_line[3]],
            [right_line[2], right_line[3]],
            [right_line[0], right_line[1]]
        ])], (0, 255, 0, 128))
        cv2.addWeighted(overlay, 0.35, frame, 0.65, 0, frame)

    # Process detections
    for result in results:
        for box in result.boxes:
            cls = int(box.cls[0])
            conf = float(box.conf[0])

            if conf >= 0.5:
                x1, y1, x2, y2 = map(int, box.xyxy[0])

                # Calculate acc using optical flow
                detection_data = {
                    'id': f"{cls}_{x1}_{y1}",
                    'bbox': (x1, y1, x2, y2),
                    'center_x': (x1 + x2) // 2,
                    'center_y': (y1 + y2) // 2
                }

                acc = acc_detector.calculate_acc(frame, detection_data)
                detection_data['acc'] = acc

                # Calculate risk
                risk_level, risk_score = risk_assessor.calculate_risk(
                    detection_data, lane_center, lane_width, frame_time
                )

                # Draw bounding box and information
                color = {
                    "SAFE": (0, 255, 0),
                    "WARNING": (0, 255, 255),
                    "DANGER": (0, 0, 255)
                }[risk_level]

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

                # Display information
                info_text = [
                    f"ID: {detection_data['id']}",
                    f"acc: {acc:.1f} km/h",
                    f"Risk: {risk_level}",
                    f"Score: {risk_score:.2f}"
                ]

                for i, text in enumerate(info_text):
                    y_offset = y1 - 10 - (i * 15)
                    cv2.putText(frame, text, (x1, y_offset),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)

    return frame

def process_video(input_path, output_path):
    # Initialize components
    model = YOLO('yolov8n.pt')
    if torch.cuda.is_available():
        model.to('cuda')

    cap = cv2.VideoCapture(input_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Initialize components
    lane_detector = LaneDetector()
    acc_detector = accDetector(fps)
    risk_assessor = RiskAssessor()

    # Setup video writer
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (1280, 720))

    frame_count = 0
    start_time = time.time()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        current_time = time.time()

        # Process frame
        processed_frame = process_frame(
            frame,
            model,
            lane_detector,
            acc_detector,
            risk_assessor,
            current_time
        )
        out.write(processed_frame)
        if frame_count % 30 == 0:
            fps_calc = 30 / (time.time() - start_time)
            print(f"FPS: {fps_calc:.2f}")
            start_time = time.time()

    cap.release()
    out.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    input_path = '/content/youtube_video.mp4'
    output_path = 'youtube_output.mp4'
    process_video(input_path, output_path)

FPS: 5.41
FPS: 4.34
FPS: 4.16
FPS: 2.33
FPS: 3.38
FPS: 2.65
FPS: 4.10
FPS: 4.20
FPS: 5.20
FPS: 4.85
FPS: 6.11
FPS: 5.38
FPS: 5.30
FPS: 4.65
FPS: 3.57
FPS: 3.52
FPS: 3.87
FPS: 3.37


In [None]:
!pip install -q yt_dlp

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/172.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m163.8/172.0 kB[0m [31m4.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.0/172.0 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/3.2 MB[0m [31m46.7 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m3.2/3.2 MB[0m [31m67.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m39.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import yt_dlp

# YouTube Shorts URL
video_url = "https://www.youtube.com/shorts/R-sw3lKb974"

# Download settings
ydl_opts = {
    "format": "mp4",
    "outtmpl": "youtube_video.mp4",  # Output filename
}

# Download video
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
    ydl.download([video_url])

print("Download complete")

[youtube] Extracting URL: https://www.youtube.com/shorts/R-sw3lKb974
[youtube] R-sw3lKb974: Downloading webpage
[youtube] R-sw3lKb974: Downloading tv client config
[youtube] R-sw3lKb974: Downloading player 9c6dfc4a
[youtube] R-sw3lKb974: Downloading tv player API JSON
[youtube] R-sw3lKb974: Downloading ios player API JSON
[youtube] R-sw3lKb974: Downloading m3u8 information
[info] R-sw3lKb974: Downloading 1 format(s): 18
[download] Destination: youtube_video.mp4
[download] 100% of    1.09MiB in 00:00:00 at 2.50MiB/s   
Download complete
