### Dependancies

In [None]:
import supervision as sv
from ultralytics import YOLO
import cv2
import numpy as np
from shapely.geometry import box, Polygon

### Inference Testing

#### Initializing the Framework

In [None]:
# Loading the trained model
model = YOLO('TRAINING/traffic_detection/weights/best.pt')  #define the trained model directory

# Defining the polygon for the Detection Zone
polygon = np.array([[0, 432], [380, 161], [506, 184], [310, 508]])
zone_polygon = Polygon(polygon)  

# Initiating tracking with ByteTrack
tracker = sv.ByteTrack(
    track_activation_threshold=0.15,
    lost_track_buffer=50,
    minimum_matching_threshold=0.8,
    frame_rate=8,
    minimum_consecutive_frames=2
)

# Defining annotators
box_annotator = sv.BoundingBoxAnnotator(thickness=2)
label_annotator = sv.LabelAnnotator(text_thickness=1, text_scale=0.5)

# Storing vehicle states
vehicle_states = {}
threshold = 5               # Threshold to identify the vehicle motion status
area_threshold = 0.05       # Threshold to consider a vehicle inside the detection zone
max_rows = 15  


cap = cv2.VideoCapture('inference/test1.mp4')   #define the sample video directory
fps = cap.get(cv2.CAP_PROP_FPS)  

#### Function to identify vehicle motion status

In [None]:

def is_vehicle_stopped(last_pos, current_pos, threshold):
    """Check if the vehicle's position has changed beyond the threshold."""
    dist = np.linalg.norm(np.array(current_pos) - np.array(last_pos))
    return dist < threshold

#### Function to identify detections inside the zone

In [None]:
def is_detection_in_zone(bbox, zone_polygon, area_threshold):
    """Check if the bounding box area inside the zone polygon exceeds the threshold."""
    x_min, y_min, x_max, y_max = bbox
    bbox_polygon = box(x_min, y_min, x_max, y_max)                       # Create bounding box as Shapely box
    intersection_area = bbox_polygon.intersection(zone_polygon).area     # Calculate intersection area
    bbox_area = bbox_polygon.area                                        # Calculate bounding box area
    return (intersection_area / bbox_area) > area_threshold

#### Callback function to execute "Vehicle Count" and "Vehicle Stopped Delay Time" extraction

In [None]:

def callback(frame: np.ndarray, _: int) -> np.ndarray:
    global vehicle_states
    results = model(frame)[0]
    detections = sv.Detections.from_ultralytics(results)
    detections = tracker.update_with_detections(detections)

    labels = []
    active_ids = set()  # Tracking active tracker IDs in the Detection Zone
    vehicle_count = 0   # Initialize vehicle counter for this frame

    # Filter detections based on area overlap
    filtered_boxes = []
    filtered_labels = []
    for tracker_id, class_id, xyxy in zip(detections.tracker_id, detections.class_id, detections.xyxy):
        class_name = results.names[class_id]
        tracker_id = int(tracker_id)
        cx, cy = (xyxy[0] + xyxy[2]) / 2, (xyxy[1] + xyxy[3]) / 2       # Bounding Box Centroid

        # Check if detection is within the zone based on area threshold
        if is_detection_in_zone(xyxy, zone_polygon, area_threshold):
            vehicle_count += 1              # Increment vehicle count if within zone
            active_ids.add(tracker_id)      # Mark as active only if within zone

            # Tracking vehicle state for delay calculation
            if tracker_id not in vehicle_states:
                vehicle_states[tracker_id] = {"position": (cx, cy), "stopped_frames": 0, "is_stopped": False}

            vehicle_info = vehicle_states[tracker_id]

            # Checking if vehicle is stopped
            if is_vehicle_stopped(vehicle_info["position"], (cx, cy), threshold):
                if not vehicle_info["is_stopped"]:
                    vehicle_info["is_stopped"] = True
                vehicle_info["stopped_frames"] += 1
            else:
                if vehicle_info["is_stopped"]:
                    vehicle_info["is_stopped"] = False
                    vehicle_info["stopped_frames"] = 0

            # Update position and prepare label continously across frames
            vehicle_info["position"] = (cx, cy)
            labels.append(f"#{tracker_id} {class_name}")

            # Collect filtered detections and labels
            filtered_boxes.append(xyxy)
            filtered_labels.append(f"#{tracker_id} {class_name}")

    # Only annotate if there are filtered detections
    if filtered_boxes:
        # Create filtered detections
        filtered_detections = sv.Detections.from_ultralytics(results)
        filtered_detections.xyxy = np.array(filtered_boxes)
        filtered_detections.tracker_id = [tracker_id for tracker_id in active_ids]
        filtered_detections.class_id = [class_id for _, class_id, _ in zip(filtered_detections.tracker_id, detections.class_id, filtered_boxes)]

        # Annotate filtered bounding boxes and labels on the frame
        frame = box_annotator.annotate(scene=frame.copy(), detections=filtered_detections)
        frame = label_annotator.annotate(scene=frame.copy(), detections=filtered_detections, labels=filtered_labels)

    # Visualizing the detection zone
    cv2.polylines(frame, [polygon], isClosed=True, color=(0, 255, 0), thickness=2)

    # Visualizing vehicle count in the middle of the detection zone
    center_x, center_y = np.mean(polygon, axis=0).astype(int)
    count_text = f"Count: {vehicle_count}"
    (text_width, text_height), _ = cv2.getTextSize(count_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
    cv2.rectangle(frame, 
                  (center_x - text_width // 2 - 5, center_y - text_height // 2 - 5), 
                  (center_x + text_width // 2 + 5, center_y + text_height // 2 + 5), 
                  (0, 255, 0), -1)
    cv2.putText(frame, count_text, (center_x - text_width // 2, center_y + text_height // 2), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2, cv2.LINE_AA)

    # Visualizing individual stopped delay times
    delay_table = [""] * max_rows
    row = 0
    for tracker_id, vehicle_info in vehicle_states.items():
        if tracker_id in active_ids:
            delay_time = vehicle_info["stopped_frames"] / fps
            if vehicle_info["is_stopped"]:
                delay_text = f"Vehicle {tracker_id}: {delay_time:.2f}s"
            else:
                delay_text = f"Vehicle {tracker_id}: Moving"

            if row < max_rows:
                delay_table[row] = delay_text  
                row += 1
    for i, text in enumerate(delay_table):
        if text:  
            cv2.putText(frame, text, (frame.shape[1] - 200, 50 + i * 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)

    return frame


#### Processing the output result

In [None]:
sv.process_video(source_path='inference/inference6.mp4', target_path='final/inference6.mp4', callback=callback)     #define the path location to save the output video