**Hard Braking Detection from Drone Videos**

This project detects hard braking events at high-speed signalized intersections using drone footage. The system employs a customized YOLO model for vehicle detection, ByteTrack for tracking, and a homography transformation to convert pixel-based movements into real-world speed estimates.

How It Works:

1. **Vehicle Detection & Tracking**:

   - YOLO identifies vehicles, while ByteTrack assigns unique IDs for consistent tracking across frames.

3. **Speed Estimation**:

   - A perspective transform converts pixel distances to real-world speeds (mph) using homography scaling.

5. **Turning Movement Associations**:

   - Vehicles are associated with their respective turning movements by detecting when their buffered bounding boxes cross predefined lane lines.

7. **Hard Braking Identification**:


   - A 20-frame Sliding Window Technique is used to track speed drops.

   - A 70%+ speed reduction within the window, from speeds over 15 mph, flags potential hard braking.

   - The next 100 frames are analyzed to confirm no sudden speed recovery, eliminating false positives.
  
   - The last 20 frames before braking ensure the vehicle was previously traveling at a stable speed.

8. **Output**:


   - A processed video visualizing detected vehicles, speeds, and lane tags.
     
   - An Excel file listing hard braking incidents with frame ranges, initial/final speeds, and percentage drop analysis.

This methodology enables data-driven intersection safety analysis, helping identify areas prone to abrupt braking events and potential collision risks.

In [None]:
# Install required libraries
!pip install -q opencv-python numpy pandas tqdm ultralytics supervision xlsxwriter

# Import essential libraries
import os
import cv2
import numpy as np
import pandas as pd
import supervision as sv
from tqdm import tqdm
from ultralytics import YOLO
from supervision.assets import VideoAssets, download_assets
from collections import defaultdict, deque

In [None]:

# File Paths: Define paths for input video, output video, and trajectory data storage
EXCEL_OUTPUT_PATH = "/Users/gideonowusu/Downloads/PhD 2028/Projects/UAV-TrafficVision/Trajectory Data.xlsx"
SOURCE_VIDEO_PATH = '/Users/gideonowusu/Downloads/PhD 2028/Projects/UAV-TrafficVision/Vehicle Speed Estimation from Drone Footage/High-Speed Intersection Video Sample.mp4'
TARGET_VIDEO_PATH = '/Users/gideonowusu/Downloads/PhD 2028/Projects/UAV-TrafficVision/Vehicle Speed Estimation from Drone Footage/High-Speed Intersection Video Sample Output.mp4'
FINAL_HARDBRAKING_DATA = "/content/gdrive/MyDrive/Hard Braking/Trial Hard Braking Data.xlsx"
# Real-World Measurements: Convert pixel values to meters using a known reference
REAL_WORLD_WIDTH_M = 18  
REAL_WORLD_HEIGHT_M = 120  
PIXEL_WIDTH_PX = 25        
PIXEL_HEIGHT_PX = 250      

# Compute the scaling ratios to convert pixel measurements to meters
width_ratio = REAL_WORLD_WIDTH_M / PIXEL_WIDTH_PX
height_ratio = REAL_WORLD_HEIGHT_M / PIXEL_HEIGHT_PX

# Perspective Transformation: Define the polygon for transformation (original → top-down view)
initial_polygon = np.array([[270, 350], [460, 320], [1900, 660], [1720, 850]])  
final_transformed = np.array([[0, 0], [24, 0], [24, 249], [0, 249]])  

# Function to create the smallest buffered bounding box
def create_smallest_buffered_box(bbox, num_steps):
    x1, y1, x2, y2 = bbox
    center_x = (x1 + x2) / 2
    center_y = (y1 + y2) / 2
    step_x = (center_x - x1) / num_steps
    step_y = (center_y - y1) / num_steps

    buffered_x1 = x1 + step_x
    buffered_y1 = y1 + step_y
    buffered_x2 = x2 - step_x
    buffered_y2 = y2 - step_y

    return (buffered_x1, buffered_y1, buffered_x2, buffered_y2)

# Function to check if line segments (p1, p2) and (q1, q2) intersect
def lines_intersect(p1, p2, q1, q2):
    def ccw(A, B, C):
        return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])

    return ccw(p1, q1, q2) != ccw(p2, q1, q2) and ccw(p1, p2, q1) != ccw(p1, p2, q2)

# Function to check if a line intersects a bounding box
def line_intersects_bbox(p1, p2, bbox):
    x1, y1, x2, y2 = bbox
    rect = [(x1, y1), (x2, y1), (x2, y2), (x1, y2)]

    for i in range(4):
        if lines_intersect(p1, p2, rect[i], rect[(i + 1) % 4]):
            return True

    return False

# Compute the transformation matrix to correct the perspective
def get_perspective_transform(source, target):
    """Computes the transformation matrix from source to target perspective."""
    return cv2.getPerspectiveTransform(source.astype(np.float32), target.astype(np.float32))

# Function to apply the transformation to points
def warp_points(points, transform_matrix):
    """Applies a perspective warp transformation to a set of 2D points."""
    if not np.any(points):  
        return points
    float_points = np.expand_dims(points.astype(np.float32), axis=1)  
    warped = cv2.perspectiveTransform(float_points, transform_matrix)  
    return np.squeeze(warped, axis=1)  

# Load customized YOLO Model for Drone Vehicle Detection (Use model of your choice)
model = YOLO("/Users/gideonowusu/Downloads/PhD 2028/Projects/UAV-TrafficVision/Neuves.pt")  

# Get video metadata (frame rate, resolution, etc.)
video_info = sv.VideoInfo.from_video_path(video_path=SOURCE_VIDEO_PATH)

# Initialize ByteTrack for Object Tracking
byte_track = sv.ByteTrack(
    frame_rate=video_info.fps,  
    track_activation_threshold=0.25,  
    minimum_matching_threshold=0.8,  
    lost_track_buffer=30  
)

# Annotators for Drawing on Video Frames
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=2)  
label_annotator = sv.LabelAnnotator(text_scale=0.5, text_thickness=1, text_position=sv.Position.TOP_CENTER)  

# Define a Region of Interest (ROI) for Tracking
polygon_zone = sv.PolygonZone(polygon=initial_polygon)  

# Dictionary to store Needed Info
coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))  
trajectory_data = []  
object_tags = {}
vehicles_intersected_lines = set()

# Compute the transformation matrix once for efficiency
perspective_matrix = get_perspective_transform(initial_polygon, final_transformed)

# Frame Processing Function (Called for Each Video Frame)
def callback(frame: np.ndarray, index: int) -> np.ndarray:
    """Processes each video frame: detects vehicles, tracks them, and estimates speed."""

    global trajectory_data  

    # Run YOLO detection on the frame
    results = model(frame, verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)

    # Separate detected utility poles (class_id = 1) from vehicles 
    pole_detections = detections[detections.class_id == 1]
    non_pole_detections = detections[detections.class_id != 1]

    # Only track vehicles inside the defined polygon (ignoring background detections)
    non_pole_detections = non_pole_detections[polygon_zone.trigger(non_pole_detections)]
    final_detections = byte_track.update_with_detections(detections=non_pole_detections)

    # Draw Turning Movement Lane Lines
    for line in initial_lines:
            frame = cv2.line(frame, line["start"], line["end"], (255, 0, 0), 2)

    # Compute vehicle positions
    for tracker_id, bbox in zip(final_detections.tracker_id, final_detections.xyxy):

        buffered_bbox = create_smallest_buffered_box(bbox, num_steps=2)

        if tracker_id not in object_tags and tracker_id is not None:
            for line in initial_lines:
                if line_intersects_bbox(line["start"], line["end"], buffered_bbox):
                            # Associate the vehicle with the first line it intersects
                    object_tags[tracker_id] = line["tag"]
                    vehicles_intersected_lines.add(tracker_id)
                    break

        if tracker_id in object_tags:
            vehicle_data.append({
                "Frame": index,
                "Center_X": (bbox[0] + bbox[2]) / 2,
                "Center_Y": (bbox[1] + bbox[3]) / 2,
                "Buffered_X1": buffered_bbox[0],
                "Buffered_Y1": buffered_bbox[1],
                "Buffered_X2": buffered_bbox[2],
                "Buffered_Y2": buffered_bbox[3],
                "Original_X1": bbox[0],
                "Original_Y1": bbox[1],
                "Original_X2": bbox[2],
                "Original_Y2": bbox[3],
                "Object ID": tracker_id,
                # "Class ID": class_id,
                "Line Tag": object_tags[tracker_id]
            })
            
    # Get the center coordinates of detected objects and apply perspective transformation
    points = final_detections.get_anchors_coordinates(anchor=sv.Position.CENTER)
    points_transformed = warp_points(points, perspective_matrix).astype(int)  

    # Speed Estimation
    labels = []
    for tracker_id, (x, y) in zip(final_detections.tracker_id, points_transformed):
        coordinates[tracker_id].append((x, y))  

        if len(coordinates[tracker_id]) < video_info.fps / 2:
            speed_mph = 0  
        else:
            (x1, y1) = coordinates[tracker_id][0]  
            (x2, y2) = coordinates[tracker_id][-1]  

            # Compute Euclidean distance (total displacement in pixels)
            distance_pixels = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)

            # Convert pixel distance to real-world meters
            distance_meters = distance_pixels * np.mean([width_ratio, height_ratio])  

            # Compute speed (meters per second → mph)
            time = len(coordinates[tracker_id]) / video_info.fps  
            speed_m_per_s = distance_meters / time  
            speed_mph = speed_m_per_s * 2.23694  

        # Add label with tracking ID and speed
        labels.append(f"#{tracker_id} {int(speed_mph)} mph")

        # Store tracking data for output
        trajectory_data.append({
            "Frame": index,
            "Tracker ID": tracker_id,
            "Center X": x,
            "Center Y": y,
            "Speed (mph)": speed_mph,
            "Line Tag": object_tags.get(tracker_id, "No Tag")
        })

    # Draw ROI polygon on the frame
    cv2.polylines(frame, [initial_polygon.astype(np.int32)], isClosed=True, color=(0, 0, 255), thickness=2)

    # Annotate the video frame with bounding boxes and speed labels
    annotated_frame = frame.copy()
    annotated_frame = bounding_box_annotator.annotate(scene=annotated_frame, detections=final_detections)
    annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=final_detections, labels=labels)

    return annotated_frame  

# Process the video and save output
sv.process_video(source_path=SOURCE_VIDEO_PATH, target_path=TARGET_VIDEO_PATH, callback=callback)

# Save speed data to an Excel file
trajectory_df = pd.DataFrame(trajectory_data)
trajectory_df.to_excel(EXCEL_OUTPUT_PATH, index=False)


In [None]:
def detect_hard_braking(input_file, output_file):
    """
    Detects hard braking incidents from an Excel file and saves the results in a new Excel file,
    including the associated Line Tag.
    """
    # Load the dataset
    df = pd.read_excel(input_file)

    hard_braking_events = []

    # Get unique vehicle IDs
    unique_vehicles = df["Tracker ID"].unique()

    for vehicle_id in unique_vehicles:
        vehicle_data = df[df["Tracker ID"] == vehicle_id].sort_values(by="Frame").reset_index(drop=True)

        for i in range(len(vehicle_data) - 19):  # Ensuring a full 20-frame window
            # Define the sliding window
            window = vehicle_data.iloc[i : i + 20]

            # Get initial and final speed
            initial_speed = window.iloc[0]["Speed (mph)"]
            final_speed = window.iloc[-1]["Speed (mph)"]

            # Get the associated Line Tag (choosing the most frequent tag in the window)
            line_tag = window["Line Tag"].mode().iloc[0] if not window["Line Tag"].mode().empty else "Unknown"

            # Compute percentage drop in speed
            if initial_speed > 0:  
                percentage_drop = ((initial_speed - final_speed) / initial_speed) * 100

                if percentage_drop > 70 and initial_speed > 15:
                    # Check the next 100 frames for speed fluctuations
                    post_window_data = vehicle_data.iloc[i + 20 : i + 120] if i + 120 < len(vehicle_data) else vehicle_data.iloc[i + 20 :]
                    if not post_window_data.empty:
                        max_post_speed = post_window_data["Speed (mph)"].max()

                        if max_post_speed <= final_speed + 5:  # Allow slight fluctuation
                            # Check speed trend in previous 20 frames
                            pre_window_data = vehicle_data.iloc[max(i - 20, 0) : i]

                            if not pre_window_data.empty:
                                min_pre_speed = pre_window_data["Speed (mph)"].min()
                                max_pre_speed = pre_window_data["Speed (mph)"].max()

                                if min_pre_speed >= initial_speed - 5 and max_pre_speed <= initial_speed + 5:
                                    # Log hard braking event
                                    hard_braking_events.append({
                                        "Tracker ID": vehicle_id,
                                        "Frame Start": window.iloc[0]["Frame"],
                                        "Frame End": window.iloc[-1]["Frame"],
                                        "Initial Speed (mph)": initial_speed,
                                        "Final Speed (mph)": final_speed,
                                        "Percentage Drop": percentage_drop,
                                        "Max Speed in Next 100 Frames": max_post_speed,
                                        "Min Speed in Previous 20 Frames": min_pre_speed,
                                        "Max Speed in Previous 20 Frames": max_pre_speed,
                                        "Line Tag": line_tag  
                                    })

    # Convert results into a DataFrame
    hard_braking_df = pd.DataFrame(hard_braking_events)

    # Save results to an Excel file
    hard_braking_df.to_excel(output_file, index=False)

    print(f"Hard braking incidents saved to {output_file}")

detect_hard_braking(EXCEL_OUTPUT_PATH, FINAL_HARDBRAKING_DATA)