In [None]:
# %pip install ultralytics

In [10]:
import cv2
from ultralytics import YOLO, solutions
from google.colab.patches import cv2_imshow
from time import time

import numpy as np

from ultralytics.solutions.solutions import BaseSolution
from ultralytics.utils.plotting import Annotator, colors

model = YOLO("../models/yolov8s.pt")
names = model.model.names

In [18]:
class MySpeedEstimator(solutions.SpeedEstimator):

    def estimate_speed_2(self, im0):
        self.annotator = Annotator(im0, line_width=self.line_width)  # Initialize annotator
        self.extract_tracks(im0)  # Extract tracks

        self.annotator.draw_region(
            reg_pts=self.region, color=(104, 0, 123), thickness=self.line_width * 2
        )  # Draw region

        for box, track_id, cls in zip(self.boxes, self.track_ids, self.clss):
            self.store_tracking_history(track_id, box)  # Store track history

            # Check if track_id is already in self.trk_pp or trk_pt initialize if not
            if track_id not in self.trk_pt:
                self.trk_pt[track_id] = 0
            if track_id not in self.trk_pp:
                self.trk_pp[track_id] = self.track_line[-1]

            speed_label = f"{int(self.spd[track_id])} km/h" if track_id in self.spd else self.names[int(cls)]
            self.annotator.box_label(box, label=speed_label, color=colors(track_id, True))  # Draw bounding box

            # Draw tracks of objects
            self.annotator.draw_centroid_and_tracks(
                self.track_line, color=colors(int(track_id), True), track_thickness=self.line_width
            )

            # Calculate object speed and direction based on region intersection
            if self.LineString([self.trk_pp[track_id], self.track_line[-1]]).intersects(self.r_s):
                direction = "known"
            else:
                direction = "unknown"

            # Perform speed calculation and tracking updates if direction is valid
            if direction == "known":
                self.trkd_ids.append(track_id)
                time_difference = time() - self.trk_pt[track_id]
                if time_difference > 0:
                    self.spd[track_id] = np.abs(self.track_line[-1][1] - self.trk_pp[track_id][1]) / time_difference

            self.trk_pt[track_id] = time()
            self.trk_pp[track_id] = self.track_line[-1]

        self.display_output(im0)  # display output with base class function

        return im0  # return output image for more usage

In [19]:
# cap = cv2.VideoCapture("../media/vehicles.mp4")
cap = cv2.VideoCapture("../media/vehicles.mp4")
assert cap.isOpened(), "Error reading video file"
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
video_writer = cv2.VideoWriter(
    "../media/vehicles_out.avi",
    cv2.VideoWriter_fourcc(*"mp4v"),
    fps, (w, h)
)

# Define speed region points
speed_region = [[1252, 787], [2298, 803], [5039, 2159], [-550, 2159]]
# speed_region = [[24, 360], [740, 360], [400, 60], [210, 60]]
# speed_obj = solutions.SpeedEstimator(show=True, region=speed_region, model='../models/yolo11n.pt')
speed_obj = MySpeedEstimator(show=True, region=speed_region, model='../models/yolo11n.pt')

i = 0
while cap.isOpened():
    success, frame = cap.read()
    if not success:        
        break
    # print(f"Frame shape: {frame.shape}")
    # frame = cv2.resize(frame, (960, 540))
    # results = model.track(frame, persist=True, show=False)


    out = speed_obj.estimate_speed_2(frame)
    # print(speed_obj.spd)

    video_writer.write(frame)
    # cv2_imshow(out)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break
    # if i > 50:
    #     break
    # i += 1

cap.release()
video_writer.release()
cv2.destroyAllWindows()

Ultralytics Solutions: ✅ {'region': [[1252, 787], [2298, 803], [5039, 2159], [-550, 2159]], 'show_in': True, 'show_out': True, 'colormap': None, 'up_angle': 145.0, 'down_angle': 90, 'kpts': [6, 8, 10], 'analytics_type': 'line', 'json_file': None, 'records': 5, 'show': True, 'model': '../models/yolo11n.pt'}


0: 384x640 3 cars, 1 truck, 14.6ms
Speed: 2.6ms preprocess, 14.6ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 1 truck, 14.1ms
Speed: 2.6ms preprocess, 14.1ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 1 truck, 14.3ms
Speed: 2.6ms preprocess, 14.3ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 1 truck, 12.2ms
Speed: 2.2ms preprocess, 12.2ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 1 truck, 12.4ms
Speed: 2.2ms preprocess, 12.4ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 1 truc

In [1]:
import cv2
from google.colab.patches import cv2_imshow
import pandas as pd
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
from scipy.spatial import distance
import matplotlib.pyplot as plt
import seaborn as sns
import copy

In [4]:
class VehicleTracker:
    def __init__(self, yolo_model_path, deepsort_config, video_path, output_path):
        self.yolo_model = YOLO(yolo_model_path)

        self.cap = cv2.VideoCapture(video_path)
        self.output_path = output_path
        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.out = cv2.VideoWriter(
            output_path,
            cv2.VideoWriter_fourcc(*"mp4v"),
            self.fps,
            (640, 368),
        )

        self.frame_count = 0
        self.vehicle_data = defaultdict(lambda: {"frames": 0, "track": [], "reids": 0, "last_seen": 0, "type" : "" , "frame_init": None, "frame_end": None, "time_init": None, "time_end": None })
        self.trajectories = {}
        self.vehicle_counts = defaultdict(int)
        self.density_per_frame = []
        self.heat_map = np.zeros((self.frame_height, self.frame_width), dtype=np.float32)
        self.last_frame_tracks = {}

    def process_frame_yolo(self, frame):
        results = self.yolo_model.track(frame, persist=True, verbose=False)
        current_frame_tracks = {}

        for result in results:
            if result is None or result.boxes is None or result.boxes.id is None:
                continue
            boxes = result.boxes.xyxy.cpu().numpy()
            track_ids = result.boxes.id.cpu().numpy()
            class_ids = result.boxes.cls.cpu().numpy()
            for box, track_id, class_id in zip(boxes, track_ids, class_ids):
                class_name = results[0].names[int(class_id)]

                reid = 0
                if track_id in self.last_frame_tracks and track_id != self.last_frame_tracks[track_id]:
                    reid = 1

                if track_id not in self.vehicle_data:
                    self.vehicle_data[track_id] = {"frames": 1, "track": [], "reids": 0, "last_seen": 0, "type" : class_name, "frame_init": self.frame_count, "frame_end": None, "time_init": self.frame_count/self.fps, "time_end": None}

                self.vehicle_data[track_id]["frames"] += 1
                if track_id not in self.vehicle_data:
                    self.vehicle_data[track_id] = {"frames": 1, "track": [], "reids": 0, "last_seen": 0, "type" : class_name}

                self.vehicle_data[track_id]["last_seen"] = self.frame_count
                self.vehicle_data[track_id]["type"] = class_name
                self.vehicle_data[track_id]["reids"] += reid
                self.vehicle_data[track_id]["frame_end"] = self.frame_count
                self.vehicle_data[track_id]["time_end"] = self.frame_count/self.fps

                x1, y1, x2, y2 = map(int, box)
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"{track_id}:{class_name}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                center_x = (x1 + x2) // 2
                center_y = (y1 + y2) // 2
                self.vehicle_data[track_id]["track"].append((center_x, center_y))
                self.trajectories[track_id] = self.trajectories.get(track_id, []) + [(center_x, center_y)] # Agrega la trayectoria a los registros
                current_frame_tracks[track_id] = track_id

        self.last_frame_tracks = current_frame_tracks # Actualizar los tracks de este frame

        # Estimación de densidad de tráfico
        self.density_per_frame.append(len(results))

        # Mapa de calor de trayectorias
        for track_id, data in self.trajectories.items():
            try:
                for x,y in data:
                    self.heat_map[y,x] += 1 # Incrementa en 1 la intensidad del pixel
            except:
                pass
        return frame

    def run(self):
        while True:
            ret, frame = self.cap.read()
            if not ret:
                break

            frame = cv2.resize(frame, (640, 368))

            self.frame_count += 1

            frame = self.process_frame_yolo(frame)

            counts = {}
            for track_id, data in self.vehicle_data.items():
                class_name = data["type"]
                if class_name in counts:
                    counts[class_name] += 1
                else:
                    counts[class_name] = 1

            # Dibujar conteo en el frame
            text_x, text_y = 10, 30  # Posición inicial del texto
            for class_name, count in counts.items():
                text = f"{class_name}: {count}"
                cv2.putText(frame, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                text_y += 30

            # self.out.write(frame)
            cv2_imshow(frame)
            # cv2.imshow("frame", frame)
            # plt.imshow(frame)
            # plt.axis("off")
            # plt.pause(0.01)

        self.cap.release()
        self.out.release()
        cv2.destroyAllWindows()

    def get_vehicle_data(self):
        return self.vehicle_data

    def get_density_per_frame(self):
        return self.density_per_frame

    def get_heat_map(self):
        return self.heat_map

    def get_trajectories(self):
        return self.trajectories

    def get_frame_count(self):
        return self.frame_count

# Configuración del modelo YOLO
tracker1 = VehicleTracker(
    "../models/yolov8s.pt",
    {},
    "../media/vehicles.mp4",
    "../media/output_yolo1.mp4"

)
tracker1.run()

KeyboardInterrupt: 