In [1]:
import cv2
import tensorflow as tf
from ultralytics import YOLO
import numpy as np
import time
import pandas as pd


In [2]:
def load_yolo_model(model_path='yolov8n.pt'):
    """
    Load YOLO model from a specified path.
    """
    model = YOLO(model_path)  # Adjust model variant based on accuracy/speed tradeoff
    return model


In [3]:
def initialize_video_stream(source=0):
    """
    Initialize video capture from the specified source.
    """
    video_capture = cv2.VideoCapture(source)
    if not video_capture.isOpened():
        print("Error: Unable to open video source.")
    return video_capture


In [None]:
def preprocess_frame(frame, target_size=(640, 640)):
    """
    Preprocess the frame to fit the YOLO input requirements.
    Convert from BGR to RGB and resize.
    """
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert to RGB
    frame_resized = cv2.resize(frame_rgb, target_size)
    return frame_resized  # No normalization


In [None]:
def run_object_detection(model, frame):
    """
    Run object detection on a frame using the YOLO model.
    """
    results = model(frame)
    detections = results[0].boxes  # Access the boxes directly from the first result
    return detections


In [None]:
def process_detections(detections, confidence_threshold=0.3):
    """
    Process and filter detections based on a confidence threshold.
    """
    filtered_detections = []
    
    for box in detections:
        conf = box.conf.item()  # Confidence score
        if conf >= confidence_threshold:
            # Extract bounding box coordinates
            x1, y1, x2, y2 = box.xyxy[0]  # Coordinates
            label = box.cls.item()  # Class label
            filtered_detections.append({
                'box': (int(x1), int(y1), int(x2), int(y2)),
                'confidence': conf,
                'label': int(label)
            })
    
    return filtered_detections


In [7]:
def annotate_frame(frame, detections):
    """
    Draw bounding boxes and labels on the frame for each detection.
    """
    for det in detections:
        x1, y1, x2, y2 = int(det['box'][0]), int(det['box'][1]), int(det['box'][2]), int(det['box'][3])
        label = f"{det['label']} {det['confidence']:.2f}"
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    return frame


In [8]:
def extract_detection_data(detections):
    """
    Extract metrics from detections and format for logging.
    """
    frame_data = []
    for det in detections:
        x1, y1, x2, y2 = det['box']
        confidence = det['confidence']
        label = det['label']
        
        # Calculate width and height of bounding box
        width = x2 - x1
        height = y2 - y1
        aspect_ratio = width / height if height > 0 else 0
        
        # Frame-level data dictionary
        data = {
            'BoundingBox_X1': x1,
            'BoundingBox_Y1': y1,
            'BoundingBox_X2': x2,
            'BoundingBox_Y2': y2,
            'Confidence': confidence,
            'Label': label,
            'Width': width,
            'Height': height,
            'AspectRatio': aspect_ratio
        }
        frame_data.append(data)
    return frame_data


In [9]:
def save_detection_data_to_csv(log_data, filename='detection_log.csv'):
    """
    Save detection data to a CSV file for analysis.
    """
    df = pd.DataFrame(log_data)
    df.to_csv(filename, index=False)
    print(f"Detection data saved to {filename}")


In [10]:
def calculate_average_confidence(df):
    """
    Calculate average confidence per class from detection data.
    """
    avg_conf = df.groupby('Label')['Confidence'].mean()
    print("Average confidence per class:\n", avg_conf)
    return avg_conf


In [None]:
def run_real_time_detection(model_path='yolov8n.pt', video_source=0, confidence_threshold=0.3, max_duration=120000):
    """
    Run real-time obstacle detection and log data for analysis.
    Stop after max_duration seconds or if 'q' is pressed.
    """
    # Load model and initialize video stream
    model = load_yolo_model(model_path)
    video_stream = initialize_video_stream(video_source)
    
    log_data = []  # List to store detection data for each frame
    start_time = time.time()  # Record the start time
    
    while True:
        # Check if max duration has passed
        elapsed_time = time.time() - start_time
        if elapsed_time > max_duration:
            print("Time limit reached. Ending detection.")
            break
        
        # Capture frame-by-frame
        ret, frame = video_stream.read()
        if not ret:
            print("Error: Frame capture failed.")
            break
        
        # Preprocess and run detection
        preprocessed_frame = preprocess_frame(frame)
        detections = run_object_detection(model, preprocessed_frame)
        
        # Process and annotate detections
        filtered_detections = process_detections(detections, confidence_threshold)
        annotated_frame = annotate_frame(frame, filtered_detections)
        
        # Log data for analysis
        frame_data = extract_detection_data(filtered_detections)
        log_data.extend(frame_data)  # Append data for each detected object
        
        # Display the resulting frame
        cv2.imshow('Real-Time Obstacle Detection', annotated_frame)
        
        # Break loop on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            print("Detection ended by user.")
            break
    
    # Release video capture and close windows
    video_stream.release()
    cv2.destroyAllWindows()
    
    # Save logged data to a CSV file for later analysis
    save_detection_data_to_csv(log_data)


In [12]:
# run_real_time_detection()
run_real_time_detection(confidence_threshold=0.3)




0: 640x640 1 person, 1 cat, 336.8ms
Speed: 8.0ms preprocess, 336.8ms inference, 4.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 426.8ms
Speed: 134.9ms preprocess, 426.8ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 cat, 259.9ms
Speed: 8.0ms preprocess, 259.9ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1 cat, 246.9ms
Speed: 7.0ms preprocess, 246.9ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1 cat, 305.8ms
Speed: 9.0ms preprocess, 305.8ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1 cat, 253.9ms
Speed: 9.0ms preprocess, 253.9ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1 cat, 237.9ms
Speed: 9.0ms preprocess, 237.9ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1 cat, 235.9ms
Speed: 7.0ms preprocess, 235.9ms 

In [13]:
df = pd.read_csv('detection_log.csv')
average_confidence_per_class = calculate_average_confidence(df)


Average confidence per class:
 Label
0     0.585530
15    0.521495
16    0.424588
21    0.375217
28    0.327772
39    0.365895
40    0.442594
41    0.363268
57    0.347409
59    0.374774
61    0.525016
72    0.401563
Name: Confidence, dtype: float64
