In [17]:
import cv2
import numpy as np
from ultralytics import YOLO
import matplotlib.pyplot as plt
import os

In [18]:
def load_model(model_path='yolov8n.pt'):
    # Load the YOLOv8 model using the YOLO class
    model = YOLO(model_path)  # Correctly load the model
    return model

In [19]:
# Load the trained models
lane_model = YOLO("../models/yolo8_lane_detection/train2/weights/best.pt")  # Path to your lane detection model
object_model = YOLO("yolov8n.pt")  # Optional: Use a general object detection model (pretrained or trained)


In [20]:
print(object_model.names)

{0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microw

In [21]:
def process_frame(frame, lane_model, object_model=None, current_speed=50, speed_limit=60):
    # Initialize feedback string
    feedback = "Good Driving"
    action = ""

    # Measure lane detection processing time
    lane_start = cv2.getTickCount()
    lane_results = lane_model.predict(source=frame, show=False, conf=0.5)
    lane_end = cv2.getTickCount()
    lane_time = (lane_end - lane_start) / cv2.getTickFrequency() * 1000  # Convert to milliseconds

    # Measure object detection processing time (if object_model is provided)
    object_time = 0
    object_results = None
    detected_objects = []
    if object_model:
        object_start = cv2.getTickCount()
        object_results = object_model.predict(source=frame, show=False, conf=0.2)
        object_end = cv2.getTickCount()
        object_time = (object_end - object_start) / cv2.getTickFrequency() * 1000  # Convert to milliseconds
        detected_objects = [(result.names[int(cls)], box) for result in object_results for box, cls in zip(result.boxes.xyxy, result.boxes.cls)]

    # Annotate lanes on the frame
    annotated_frame = lane_results[0].plot()

    # Decision Logic for Object and Lane Detection
    for obj, box in detected_objects:
        if obj == "car" and box[0] < frame.shape[1] // 2:  # Car on the left
            feedback = "Turn Right to avoid car"
        elif obj == "pedestrian" and box[3] > frame.shape[0] - 100:  # Pedestrian close
            feedback = "Slow Down! Pedestrian ahead"
        elif obj == "stop sign":
            feedback = "STOP! Stop sign detected"
        elif obj == "speed limit":
            feedback = f"Speed Limit {speed_limit} km/h"

    # Simulated lane drift detection
    # Replace 'drift_to_left' or 'drift_to_right' with actual lane detection logic
    drift_to_left = False  # Example placeholder
    drift_to_right = False  # Example placeholder
    if drift_to_left:
        action = "Turn Left"
        feedback = "Lane Departure: Adjust Left"
    elif drift_to_right:
        action = "Turn Right"
        feedback = "Lane Departure: Adjust Right"

    # Speed Feedback
    if current_speed > speed_limit:
        action = "Slow Down"
        feedback = "Over Speed Limit!"
    elif current_speed < speed_limit - 10:
        action = "Speed Up"

    # Annotate Feedback and Processing Times
    cv2.putText(annotated_frame, f"Lane Detection: {lane_time:.1f}ms", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
    if object_model:
        cv2.putText(annotated_frame, f"Object Detection: {object_time:.1f}ms", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
    total_time = lane_time + object_time
    cv2.putText(annotated_frame, f"Total Processing: {total_time:.1f}ms", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    # Display detected objects
    if detected_objects:
        detected_text = ', '.join([obj for obj, _ in detected_objects])
        cv2.putText(annotated_frame, f"Objects: {detected_text}", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)

    # Display feedback and actions
    cv2.putText(annotated_frame, f"Feedback: {feedback}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
    cv2.putText(annotated_frame, f"Action: {action}", (10, 180), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)

    return annotated_frame


In [22]:
def driver_assistance(input_path, output_path, lane_model, object_model=None):
    # Set a default output path if none is provided
    if output_path is None:
        output_path = "output_video.mp4"

    # Check if the input is a video
    if os.path.isfile(input_path):
        file_ext = os.path.splitext(input_path)[-1].lower()
        
        if file_ext in ['.mp4', '.avi', '.mov']:
            # Process video
            cap = cv2.VideoCapture(input_path)
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = cap.get(cv2.CAP_PROP_FPS)
            
            # Video writer to save output
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
            
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Process the frame for lane and object detection
                processed_frame = process_frame(frame, lane_model, object_model)
                
                # Show the processed frame in real time with instructions
                cv2.imshow("Driver Assistance", processed_frame)  # Show frame in a window
                
                # Write the processed frame to the output video
                out.write(processed_frame)
                
                # Handle key press for exit (optional)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            
            cap.release()
            out.release()
            cv2.destroyAllWindows()
            print(f"Processed video saved at {output_path}")
        else:
            print("Unsupported file format. Please provide a valid video format.")
    else:
        print("Invalid file path. Please check the input path.")

In [23]:
input_path="../data/Screenshot/data.mp4"
output_path = "data/Detect/output_video.mp4"

In [24]:
driver_assistance(input_path,output_path,lane_model,object_model)


0: 384x640 (no detections), 72.5ms
Speed: 2.8ms preprocess, 72.5ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 train, 60.5ms
Speed: 1.8ms preprocess, 60.5ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 145.6ms
Speed: 4.1ms preprocess, 145.6ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 train, 60.2ms
Speed: 2.0ms preprocess, 60.2ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 172.1ms
Speed: 2.9ms preprocess, 172.1ms inference, 1.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 train, 50.5ms
Speed: 9.1ms preprocess, 50.5ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 132.6ms
Speed: 3.2ms preprocess, 132.6ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 train, 51.3ms
Speed: 1.7ms preprocess, 51.3ms inference, 1.3ms postproces