Cargamos las librerías, los modelos con las clases que vamos a usar y definimos las variables globales que nos ayudarán con el conteo de las clases

In [5]:
import csv
import math

import cv2
from ultralytics import YOLO

yolo_classes = {
    0: "PERSONA",
    2: "COCHE",
    3: "MOTO",
    5: "GUAGUA"
}
model = YOLO('yolo11n.pt')
model_plates = YOLO('best.pt')

class_counts = {key: 0 for key in yolo_classes.values()}
track_count = set()


Definimos funciones que nos ayudarán a sacar los datos de las cajas de detección, en este caso las coordenadas de la caja y el _tracking ID_ de la clase detectada.

In [6]:
def get_box_coordinates(box, parent=None):
    if parent is not None:
        dx, dy, _, _ = get_box_coordinates(parent)
    else:
        dx, dy = 0, 0
    x1, y1, x2, y2 = box.xyxy[0]
    x1, y1, x2, y2 = int(x1) + dx, int(y1) + dy, int(x2) + dx, int(y2) + dy
    return x1, y1, x2, y2


def get_box_id(box):
    return None if box.id is None else int(box.id[0].tolist())

Creamos la clase `Detection`. Esta clase nos facilitará acceder a datos comunes de las cajas de detección y a guardar los datos en el archivo CSV

In [7]:
class Detection:
    def __init__(self, box):
        self.x1, self.y1, self.x2, self.y2 = get_box_coordinates(box)
        self.box_class = int(box.cls[0])
        self.class_name = yolo_classes[self.box_class]
        self.track_id = get_box_id(box)
        self.confidence = math.ceil((box.conf[0] * 100)) / 100
        self.has_license_plate = False
        self.m_confidence = None
        self.my2 = None
        self.mx2 = None
        self.mx1 = None
        self.my1 = None

    def add_license_plate(self, boxes):
        if len(boxes) < 1: return
        box = boxes[0]
        self.has_license_plate = True
        self.m_confidence = math.ceil((box.conf[0] * 100)) / 100
        self.mx1, self.my1, self.mx2, self.my2 = get_box_coordinates(box)
        self.mx1 += self.x1
        self.mx2 += self.x1
        self.my2 += self.y1
        self.my2 += self.y1

    def write_into_csv(self, csv_writer, frame_count):
        csv_writer.writerow(
            [frame_count, self.class_name, self.confidence, self.track_id, self.x1, self.y1, self.x2, self.y2,
             self.has_license_plate, self.m_confidence, self.mx1, self.my1, self.mx2, self.my2]
        )

    def is_person(self):
        return self.box_class == 0

    def get_cropped_frame(self, frame):
        return frame[self.y1:self.y2, self.x1:self.x2]



Creamos otra función que nos facilitará dibujar los rectángulos, sobre todo si corresponden a los de dentro de un rectángulo (como es el caso de las matrículas dentro de las cajas del coche), ya que calcula las coordenadas teniendo en cuenta las coordenadas del padre.

In [8]:
def draw_rectangles(frame, box, text, parent=None, box_color=(0, 255, 0), text_color=(255, 0, 155)):
    x1, y1, x2, y2 = get_box_coordinates(box, parent)

    box_id = get_box_id(box)
    if box_id is not None:
        text = f"{text} - {str(box_id)}"

    cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
    cv2.putText(frame, text, [x1, y1], cv2.FONT_HERSHEY_SIMPLEX, 1, text_color, 2)


Creamos otras dos funciones que nos permitirán llevar la cuenta de las clases y mostrarlas en el video.

In [9]:
def update_class_count(detection, frame):
    if detection.track_id not in track_count:
        track_count.add(detection.track_id)
        class_counts[detection.class_name] += 1

    text = generate_count_text()

    x = 0
    y = 50
    font_scale = 0.8
    thickness = 2

    text_w, text_h = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)[0]

    cv2.rectangle(frame, (text_w, text_h + 10), (x, y), (180, 180, 180), cv2.FILLED)
    cv2.putText(frame, generate_count_text(), (x, y), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), thickness)


def generate_count_text():
    found_classes = []
    for k, v in class_counts.items():
        if v == 1:
            found_classes.append(f"{v} {k}")
        elif v > 1:
            found_classes.append(f"{v} {k}S")

    return " - ".join(found_classes)

Creamos la función que realizará las detecciones, dibujará los rectángulos de la mismas y actualizará el conteo de clases. Además, para las detecciones que no son personas, se buscará y dibujará el rectángulo de la matrícula.

In [10]:
def track_objects(result, frame):
    detections = []
    for box in result.boxes:
        detection = Detection(box)
        update_class_count(detection, frame)
        if detection.is_person():
            draw_rectangles(frame, box, detection.class_name)
        else:
            draw_rectangles(frame, box, detection.class_name)
            result_license_plates = model_plates(detection.get_cropped_frame(frame), verbose=False)
            detection.add_license_plate(result_license_plates[0].boxes)
            for box2 in result_license_plates[0].boxes:
                draw_rectangles(frame, box2, "MATRICULA", parent=box, box_color=(255, 0, 0),
                                text_color=(0, 0, 255))
        detections.append(detection)
    return detections

Por último, creamos la función principal que abrirá el video e irá guardando tanto el video procesado con los resultados como el csv con los datos recolectados.

In [11]:
def process_video(video, export_video, export_csv, show=False):
    vid = cv2.VideoCapture(video)

    total_frames = vid.get(cv2.CAP_PROP_FRAME_COUNT)
    frame_count = 0

    fps = int(vid.get(cv2.CAP_PROP_FPS))
    frame_width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    output_video = cv2.VideoWriter(
        export_video,
        cv2.VideoWriter_fourcc(*'mp4v'),
        fps,
        (frame_width, frame_height)
    )
    csv_file = open(export_csv, mode='w', newline='')
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(["fotograma", "tipo_objeto", "confianza", "identificador_tracking", "x1", "y1", "x2", "y2",
                         "tiene_matricula", "m_confianza", "mx1", "my1", "mx2", "my2"])

    while vid.isOpened():
        ret, frame = vid.read()
        frame_count += 1
        if ret:
            percentage = math.floor(frame_count / total_frames * 100)
            print("\r", f"{percentage}% procesado...", end="")
            result = model.track(frame, persist=True, classes=list(yolo_classes.keys()), verbose=False)
            detections = track_objects(result[0], frame)
            for detection in detections:
                detection.write_into_csv(csv_writer, frame_count)
            output_video.write(frame)
            if show: cv2.imshow('frame', frame)
        else:
            break
        if cv2.waitKey(20) == 27:
            break

    print("\r100% procesado!")
    print(f"\nVideo guardado en el archivo {export_video}")
    print(f"Archivo csv guardado en el archivo {export_csv}")
    output_video.release()
    csv_file.close()


Ejecutamos la función con el vídeo de ejemplo. El vídeo procesado se guardará en el archivo `video_final.mp4`, y el csv en `datos.csv`. Para mostrar el procesamiento en tiempo real se puede poner el parámetro `show` a `True`


In [12]:
#tanto el video de prueba como el final no se pueden subir a GitHub por su gran tamaño. El link al video final se puede encontrar en el readme.

process_video("video_prueba.mp4", "video_final.mp4", "datos.csv", show=False)

100% procesado!...

Video guardado en el archivo video_final.mp4
Archivo csv guardado en el archivo datos.csv
