In [210]:
from ultralytics import YOLO
import supervision as sv
import cv2
import numpy as np
from collections import defaultdict, deque
import os

In [211]:
class speed_analyics():

    # intialising parameters
    def __init__(self, 
                 source: np.ndarray, 
                 target: np.ndarray, model):
        
        self.model = model
        self.source = source.astype(np.float32)
        self.target = target.astype(np.float32)
        self.transform_matrix = cv2.getPerspectiveTransform(self.source, self.target)

    # function for transforming perspective
    def transform_prespective(self, 
                              source_corr: np.ndarray):

        reshaped_source_corr = source_corr.reshape(-1, 1, 2).astype(np.float32)
        transformed_source_corr = cv2.perspectiveTransform(reshaped_source_corr, self.transform_matrix)
        transformed_source_corr = transformed_source_corr.reshape(-1, 2)

        return transformed_source_corr
    
    # function for dynamic labels
    def detection_labels(self, 
                         detected_classes, 
                         vehicle_coordinates, 
                         tracking, 
                         detections, 
                         video_info):

        labels = []

        for i in range(len(detected_classes)):
            if detected_classes[0][i] in tracking:
                # labels = [f"#{track_id}" for track_id in detections.tracker_id]
                # labels = [f"x: {x}, y: {y}" for [x, y] in vehicle_points]
                for tracker_id in detections.tracker_id:

                    if len(vehicle_coordinates[tracker_id]) < video_info.fps / 2:
                        labels.append(f"#{tracker_id}")
                    
                    else:
                        coordinate_start = vehicle_coordinates[tracker_id][-1]
                        coordinate_end = vehicle_coordinates[tracker_id][0]

                        distance = abs(coordinate_start - coordinate_end)
                        time = len(vehicle_coordinates[tracker_id]) / video_info.fps
                        speed = distance / time * 3.6
                        
                        labels.append(f"#{tracker_id} {int(speed)} km/h")
        
        return labels
    
    # function for detecting and localizing objects
    def object_detection(self, 
                         frame, 
                         video_info, 
                         tracking, 
                         detection_zone, 
                         byte_track, 
                         vehicle_coordinates, 
                         bounding_box_annotator, 
                         label_annotator, 
                         new_w, new_h):

        results = self.model(frame)[0]

        detections = sv.Detections.from_ultralytics(results)
        detections = detections[detection_zone.trigger(detections)]
        detections = byte_track.update_with_detections(detections=detections)

        vehicle_points = detections.get_anchors_coordinates(anchor= sv.Position.BOTTOM_CENTER)
        vehicle_points = self.transform_prespective(vehicle_points).astype(int)

        for tracker_id, [_, y] in zip(detections.tracker_id, vehicle_points):
            vehicle_coordinates[tracker_id].append(y)

        detected_classes = list(detections.data.values())

        labels = self.detection_labels(detected_classes= detected_classes, 
                                       vehicle_coordinates= vehicle_coordinates, 
                                       tracking= tracking, 
                                       detections= detections, 
                                       video_info= video_info)

        # annotated_image = sv.draw_polygon(frame, polygon= SOURCE, color=sv.Color.RED)
        frame = bounding_box_annotator.annotate(scene=frame, 
                                                detections=detections)
        frame = label_annotator.annotate(scene=frame, 
                                         detections=detections, 
                                         labels=labels)

        frame = cv2.resize(frame, (new_w, 
                                   new_h))

        return frame

    def run_video(self, Video_path, tracking):

        video_info = sv.VideoInfo.from_video_path(Video_path)

        byte_track = sv.ByteTrack(frame_rate=video_info.fps)

        run = True

        thickness = sv.calculate_optimal_line_thickness(resolution_wh = video_info.resolution_wh)
        text_scale = sv.calculate_optimal_text_scale(resolution_wh= video_info.resolution_wh)

        label_annotator = sv.LabelAnnotator(text_scale=text_scale, 
                                            text_thickness= thickness)
        bounding_box_annotator = sv.BoxAnnotator(thickness=thickness)

        detection_zone = sv.PolygonZone(polygon= self.source)

        vehicle_coordinates = defaultdict(lambda: deque(maxlen= video_info.fps))

        cap = cv2.VideoCapture(Video_path) # reading the video

        w, h = video_info.resolution_wh[0], video_info.resolution_wh[1]
        new_h, new_w = int(h/4), int(w/4)

        # running the feed
        while run:

            ret, frame = cap.read()

            if ret:

                frame = self.object_detection(frame, 
                                              video_info,
                                              tracking, 
                                              detection_zone, 
                                              byte_track, 
                                              vehicle_coordinates, 
                                              bounding_box_annotator, 
                                              label_annotator, 
                                              new_w, new_h)

                cv2.imshow('img', frame) # runnig the feed

            # press 'q' to quit
            k = cv2.waitKey(1)

            if k == ord('q') or not ret:
                run = False

        cap.release()
        cv2.destroyAllWindows()

In [212]:
Video_path = r'D:\speed_detection\data\vehicles.mp4'
model_path = r'D:\speed_detection\yolo11x.pt'

model = YOLO(model_path)

tracking = ['car', 'truck']

source_corr = np.array([[1252, 787], [2298, 803], [5039, 2159], [-550, 2159]])

target_w, target_h = 25, 250

target_corr = np.array([[0, 0], [target_w - 1, 0], [target_w - 1, target_h - 1], [0, target_h - 1],])

In [213]:
analyics = speed_analyics(source= source_corr, target= target_corr, model= model)
analyics.run_video(Video_path= Video_path, tracking= tracking)


0: 384x640 4 cars, 1 truck, 52.4ms
Speed: 3.6ms preprocess, 52.4ms inference, 6.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 1 truck, 1 fire hydrant, 55.6ms
Speed: 6.9ms preprocess, 55.6ms inference, 6.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 2 trucks, 60.0ms
Speed: 0.0ms preprocess, 60.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 1 truck, 55.6ms
Speed: 2.0ms preprocess, 55.6ms inference, 2.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 1 truck, 1 fire hydrant, 55.5ms
Speed: 7.0ms preprocess, 55.5ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 1 truck, 56.9ms
Speed: 4.7ms preprocess, 56.9ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 1 truck, 58.7ms
Speed: 3.3ms preprocess, 58.7ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 1 truck, 55.5ms
