# Tracking de objetos con YoloV8 y Bytetrack

#Paquetes necesarios

In [1]:
# !pip install ultralytics==8.0.84
# !pip install Cython
# !pip install numpy
# !pip install lap


In [2]:
import os
# esto es para evitar un error en Windows: OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
import cv2
import numpy as np
from ultralytics.nn.autobackend import AutoBackend
from ultralytics.yolo.utils.plotting import Annotator, colors
import torch
from bytetrack.byte_tracker import BYTETracker
from ultralytics.yolo.data.dataloaders.stream_loaders import LoadImages
from ultralytics.yolo.utils.ops import non_max_suppression, scale_boxes
import time
import cv2


Definimos el modelos de Yolo

In [3]:
conf_thres = 0.25
iou_thres = 0.45
classes = [0, 2, 3, 5, 7, 1]
agnostic_nms = False
max_det = 1000
line_thickness = 2
half = False
imgsz = (1280, 704)
vid_stride = 1

In [4]:
save_vid = True
video_file = "../../stream_bitrate2.mp4"

Selección de dispositivo a usar en el proceso

In [5]:
# Verificar si CUDA (GPU) está disponible y luego seleccionar el dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Usando dispositivo:", device)

Usando dispositivo: cuda


Inicialización del modelo

In [6]:
model = AutoBackend("yolov8n.pt").to(device)
model.warmup()
stride, names, pt = model.stride, model.names, model.pt


YOLOv8n summary (fused): 168 layers, 3151904 parameters, 0 gradients, 8.7 GFLOPs


Inicialización del tracker

In [7]:
bytetracker = BYTETracker(
    track_thresh=0.6, match_thresh=0.8, track_buffer=120, frame_rate=30
)
tracker = bytetracker

Carga de imágenes del video para su procesamiento

In [8]:
dataset = LoadImages(
    video_file,
    imgsz=imgsz,
    stride=stride,
    auto=pt,
    transforms=None,
    vid_stride=vid_stride,
)


In [9]:
#zonas
zona_personas = (805, 483, 1101, 618)
zona_personas2 = (2, 336, 289, 435)
zona_coches = (620, 142, 1149, 428)
peaton = (107, 550, 886, 461)

#Almacenamiento de los tiempos
tiempo_en_zona_peaton = {}
tiempo_espera_personas = {}
tiempo_espera_vehiculos = {}

#Lista de vehículos
vehiculos = [2, 3, 5, 7] #2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'

def interseccion_zona(bbox, zona):
    """ Verifica si la caja delimitadora (bbox) se cruza con la zona definida. """
    x1, y1, x2, y2 = bbox
    zx1, zy1, zx2, zy2 = zona
    return not (x2 < zx1 or x1 > zx2 or y2 < zy1 or y1 > zy2)

Interfaz gráfica que simula el funcionamiento del semáforo

In [10]:
def dibujar_persona(im0, centro, color):
    radio_cabeza = 5
    longitud_cuerpo = 8
    longitud_brazos = 7
    longitud_piernas = 7

    centro_cabeza = (centro[0], centro[1] - radio_cabeza - 3)
    inicio_cuerpo = (centro[0], centro[1])
    fin_cuerpo = (centro[0], centro[1] + longitud_cuerpo)

    inicio_brazo_izquierdo = (centro[0], centro[1] + 2)
    fin_brazo_izquierdo = (centro[0] - longitud_brazos, centro[1] + 2)
    inicio_brazo_derecho = (centro[0], centro[1] + 2)
    fin_brazo_derecho = (centro[0] + longitud_brazos, centro[1] + 2)

    inicio_pierna_izquierda = (centro[0], centro[1] + longitud_cuerpo)
    fin_pierna_izquierda = (centro[0] - longitud_piernas // 2, centro[1] + longitud_cuerpo + longitud_piernas)
    inicio_pierna_derecha = (centro[0], centro[1] + longitud_cuerpo)
    fin_pierna_derecha = (centro[0] + longitud_piernas // 2, centro[1] + longitud_cuerpo + longitud_piernas)

    # Dibuja la cabeza
    cv2.circle(im0, centro_cabeza, radio_cabeza, color, -1)

    # Dibuja el cuerpo
    cv2.line(im0, inicio_cuerpo, fin_cuerpo, color, 2)

    # Dibuja los brazos
    cv2.line(im0, inicio_brazo_izquierdo, fin_brazo_izquierdo, color, 2)
    cv2.line(im0, inicio_brazo_derecho, fin_brazo_derecho, color, 2)

    # Dibuja las piernas
    cv2.line(im0, inicio_pierna_izquierda, fin_pierna_izquierda, color, 2)
    cv2.line(im0, inicio_pierna_derecha, fin_pierna_derecha, color, 2)


def dibujar_semaforo(im0, estado_semaforo, estado_semaforo_personas):
    alto, ancho = im0.shape[:2]
    radio = 15
    distancia_entre_luces = 40

    # Semáforo para vehículos
    x1, y1 = ancho - 120, 20  # Posición del semáforo para vehículos
    cv2.rectangle(im0, (x1 - 20, y1 - 20), (x1 + 20, y1 + 3 * distancia_entre_luces), (0, 0, 0), -1)  # Fondo negro
    color_rojo = (0, 0, 255) if estado_semaforo == "Rojo" else (0, 0, 0)
    color_amarillo = (0, 255, 255) if estado_semaforo == "Amarillo" else (0, 0, 0)
    color_verde = (0, 255, 0) if estado_semaforo == "Verde" else (0, 0, 0)
    cv2.circle(im0, (x1, y1), radio, color_rojo, -1)
    cv2.circle(im0, (x1, y1 + distancia_entre_luces), radio, color_amarillo, -1)
    cv2.circle(im0, (x1, y1 + 2 * distancia_entre_luces), radio, color_verde, -1)

    # Semáforo para personas
    x2, y2 = ancho - 60, 20  # Posición del semáforo para personas
    cv2.rectangle(im0, (x2 - 20, y2 - 20), (x2 + 20, y2 + 2 * distancia_entre_luces), (0, 0, 0), -1)  # Fondo negro
    color_rojo_personas = (0, 0, 255)
    color_verde_personas = (0, 255, 0)
    centro_rojo = (x2, y2)
    centro_verde = (x2, y2 + distancia_entre_luces)

    # Dibuja las luces y las personas
    cv2.circle(im0, centro_rojo, radio, color_rojo_personas if estado_semaforo_personas == "Rojo" else (0, 0, 0), -1)
    cv2.circle(im0, centro_verde, radio, color_verde_personas if estado_semaforo_personas == "Verde" else (0, 0, 0), -1)
    
    # Si la luz está activa, dibuja la persona
    if estado_semaforo_personas == "Rojo":
        dibujar_persona(im0, centro_rojo, (255, 255, 255))  # Dibuja una persona blanca en la luz roja
    if estado_semaforo_personas == "Verde":
        dibujar_persona(im0, centro_verde, (255, 255, 255))  # Dibuja una persona blanca en la luz verde



Clase que se encarga de establecer el estado del semáforo

In [11]:
class ContadorSemaforo:
    def __init__(self):
        self.contador_personas = 0
        self.contador_vehiculos = 0
        self.tiempo_espera_personas = {}
        self.tiempo_espera_vehiculos = {}
        self.tiempo_en_zona_peaton = {}
        self.estado_semaforo = "Verde"
        self.estado_semaforo_personas = "Rojo"
        self.umbral_espera_maximo = 60

    def asignar_estado_semaforo(self):
        tiempo_total_personas = sum(self.tiempo_espera_personas.values())
        tiempo_total_vehiculos = sum(self.tiempo_espera_vehiculos.values())
        print("Tiempo de espera personas", tiempo_total_personas )
        print("Tiempo de espera vehiculos", tiempo_total_vehiculos )

        if (
            (self.contador_vehiculos > self.contador_personas and tiempo_total_vehiculos > tiempo_total_personas) or
            (self.contador_vehiculos < self.contador_personas and tiempo_total_vehiculos > tiempo_total_personas) or
            any(tiempo > self.umbral_espera_maximo for tiempo in self.tiempo_espera_vehiculos.values())
        ):
            self.estado_semaforo = "Verde"
            self.estado_semaforo_personas = "Rojo"
        elif (
            (self.contador_personas > self.contador_vehiculos and tiempo_total_personas > tiempo_total_vehiculos) or
            (self.contador_personas < self.contador_vehiculos and tiempo_total_personas > tiempo_total_vehiculos) or
            any(tiempo > self.umbral_espera_maximo for tiempo in self.tiempo_espera_personas.values())
        ):
            self.estado_semaforo = "Rojo"
            self.estado_semaforo_personas = "Verde"
        self.tiempo_espera_personas.clear()
        self.tiempo_espera_vehiculos.clear()
        return self.estado_semaforo, self.estado_semaforo_personas

Proyecto final

In [12]:
import datetime
# Crear una instancia de la clase ContadorSemaforo
contador_semaforo = ContadorSemaforo()

estado_semaforo = "Verde"
estado_semaforo_personas = "Rojo"
for frame_idx, batch in enumerate(dataset):    
    
    tiempo_actual = time.time()
    path, im, im0s, vid_cap, s = batch
    detections = np.empty((0, 5))
    # Asegurarse de que los datos de entrada (imágenes) también estén en la GPU
    im = torch.from_numpy(im).to(device)
    im = im.half() if half else im.float()  # uint8 to fp16/32
    im /= 255.0  # 0 - 255 to 0.0 - 1.0
    im = torch.unsqueeze(im, 0)

    result = model(im)

    p = non_max_suppression(
        result, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det
    )
    contador_personas = 0  # Inicializar el contador de personas en cada frame
    contador_vehiculos = 0

    for i, det in enumerate(p):
        p, im0, _ = path, im0s.copy(), getattr(dataset, "frame", 0)

        if det is not None and len(det):
            det[:, :4] = scale_boxes(
                im.shape[2:], det[:, :4], im0.shape
            ).round()  # rescale boxes to im0 size

        track_result = tracker.update(det.cpu(), im0)

        annotator = Annotator(im0, line_width=line_thickness, example=str(names))

        # draw boxes for visualization
        if len(track_result) > 0:
            for j, (output) in enumerate(track_result):
                bbox = output[0:4]
                id = output[4]
                cls = output[5]
                conf = output[6]

                c = int(cls)  # integer class
                id = int(id)  # integer id
                
                if c == 0 and (interseccion_zona(bbox, zona_personas) or interseccion_zona(bbox, zona_personas2)):
                    contador_personas +=1
                    contador_semaforo.contador_personas = contador_personas
                     # Verificar si el objeto persona está dentro de la zona personas
                    if id not in tiempo_espera_personas:
                        tiempo_espera_personas[id] = tiempo_actual

                    tiempo_deteccion = tiempo_actual - tiempo_espera_personas[id]
                    
                    contador_semaforo.tiempo_espera_personas[id] = tiempo_deteccion
                    
                    color_especial = (0, 100, 0) # Color verde fuerte
                    
                elif c == 0 and interseccion_zona(bbox, peaton):
                    #Si entra en la zona peatonal se resetea el tiempo de espera
                    if id in tiempo_espera_personas:
                        del tiempo_espera_personas[id]
                    #Registramos el tiempo
                    if id not in tiempo_en_zona_peaton:
                        tiempo_en_zona_peaton[id] = tiempo_actual
                    tiempo_deteccion = tiempo_actual - tiempo_en_zona_peaton[id]

                    contador_semaforo.tiempo_en_zona_peaton[id] = tiempo_deteccion
                        
                    color_especial = (0, 100, 100) 

                elif (c in vehiculos) and interseccion_zona(bbox, zona_coches):
                    contador_vehiculos +=1
                    contador_semaforo.contador_vehiculos = contador_vehiculos

                    if id not in tiempo_espera_vehiculos:
                        tiempo_espera_vehiculos[id] = tiempo_actual

                    tiempo_deteccion = tiempo_actual - tiempo_espera_vehiculos[id]
                    contador_semaforo.tiempo_espera_vehiculos[id] = tiempo_deteccion
                    
                    color_especial = (0, 0, 255)  
                    
                else:
                    # Si no está dentro de ninguna zona específica, usa el color predeterminado y quitamos el tiempo
                    if id in tiempo_espera_personas:
                        del tiempo_espera_personas[id]
                    if id in tiempo_espera_vehiculos:
                        del tiempo_espera_vehiculos[id]
                    color_especial = colors(c, True)
                    tiempo_deteccion = 0
                label = f"{id} {names[c]} {tiempo_deteccion:.2f}s" 
                annotator.box_label(bbox, label, color=color_especial)
                
               
    # Visualizar la cantidad de personas y vehículos en la imagen
    cv2.putText(im0, f"Personas: {contador_personas}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0, 255), 2)
    cv2.putText(im0, f"Vehiculos: {contador_vehiculos}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 255), 2)
    
    #Descomentar en caso de querer ver las zonas
    """ 
    cv2.rectangle(im0, zona_personas[:2], zona_personas[2:], (250, 151, 0), 2)
    cv2.rectangle(im0, zona_personas2[:2], zona_personas2[2:], (250, 151, 0), 2)
    cv2.rectangle(im0, zona_coches[:2], zona_coches[2:], (195, 98, 255) , 2)
    cv2.rectangle(im0, peaton[:2], peaton[2:], (0, 255, 255) , 2)
    """
    
    dibujar_semaforo(im0, estado_semaforo, estado_semaforo_personas)
             
    # Stream results
    im0 = annotator.result()
    cv2.imshow(str(p), im0)
    key = cv2.waitKey(20)

    if key == 27:  # Presiona 'Esc' para salir
        break
    elif key == 114 or key == 82:  # Presiona 'R' o 'r' para realizar una acción
        estado_semaforo = "Rojo"
        estado_semaforo_personas = "Verde"
        pass
    elif key == 118 or key == 86:  # Presiona 'V' o 'v' para realizar otra acción
        estado_semaforo = "Verde"
        estado_semaforo_personas = "Rojo"
        pass
    elif key == 99 or key == 67:  # Presiona 'C' o 'c' para capturar imagen
        nombre_archivo = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".jpg"
        cv2.imwrite(nombre_archivo, im0)
        print(f"Imagen guardada como {nombre_archivo}")
    elif key == 113 or key == 81:  # Presiona 'Q' o 'q' para mostrar y cambiar el estado del semáforo
        estado_semaforo, estado_semaforo_personas = contador_semaforo.asignar_estado_semaforo()
        if len(contador_semaforo.tiempo_en_zona_peaton) > 0:
            tiempo_medio_personas_peaton = sum(tiempo for tiempo in contador_semaforo.tiempo_en_zona_peaton.values()) / len(contador_semaforo.tiempo_en_zona_peaton)
            print(f'Tiempo medio de personas en la zona peatonal: {tiempo_medio_personas_peaton:.2f} segundos y han cruzado: {len(contador_semaforo.tiempo_en_zona_peaton)}')
        else:
            print('No han cruzado.')

       
        
cv2.destroyAllWindows()

Guardar el video de la ejecución 

In [14]:

save_vid = True
video_file = "../../stream_bitrate2.mp4"

fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # codec
fps = 24  # Asumiendo 30 fps, ajusta según tu video
save_path = '../../output.mp4'
vid_writer = cv2.VideoWriter(save_path, fourcc, fps, (1280, 704))  # Ajusta la resolución según sea necesario


# Crear una instancia de la clase ContadorSemaforo
contador_semaforo = ContadorSemaforo()

estado_semaforo = "Verde"
estado_semaforo_personas = "Rojo"
for frame_idx, batch in enumerate(dataset):    
    
    tiempo_actual = time.time()
    path, im, im0s, vid_cap, s = batch
    detections = np.empty((0, 5))
    # Asegurarse de que los datos de entrada (imágenes) también estén en la GPU
    im = torch.from_numpy(im).to(device)
    im = im.half() if half else im.float()  # uint8 to fp16/32
    im /= 255.0  # 0 - 255 to 0.0 - 1.0
    im = torch.unsqueeze(im, 0)

    result = model(im)

    p = non_max_suppression(
        result, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det
    )
    contador_personas = 0  # Inicializar el contador de personas en cada frame
    contador_vehiculos = 0

    for i, det in enumerate(p):
        p, im0, _ = path, im0s.copy(), getattr(dataset, "frame", 0)

        if det is not None and len(det):
            det[:, :4] = scale_boxes(
                im.shape[2:], det[:, :4], im0.shape
            ).round()  # rescale boxes to im0 size

        track_result = tracker.update(det.cpu(), im0)

        annotator = Annotator(im0, line_width=line_thickness, example=str(names))

        # draw boxes for visualization
        if len(track_result) > 0:
            for j, (output) in enumerate(track_result):
                bbox = output[0:4]
                id = output[4]
                cls = output[5]
                conf = output[6]

                c = int(cls)  # integer class
                id = int(id)  # integer id
                
                if c == 0 and (interseccion_zona(bbox, zona_personas) or interseccion_zona(bbox, zona_personas2)):
                    contador_personas +=1
                    contador_semaforo.contador_personas = contador_personas
                     # Verificar si el objeto persona está dentro de la zona personas
                    if id not in tiempo_espera_personas:
                        tiempo_espera_personas[id] = tiempo_actual

                    tiempo_deteccion = tiempo_actual - tiempo_espera_personas[id]
                    
                    contador_semaforo.tiempo_espera_personas[id] = tiempo_deteccion
                    
                    color_especial = (0, 100, 0) # Color verde fuerte
                    
                elif c == 0 and interseccion_zona(bbox, peaton):
                    #Si entra en la zona peatonal se resetea el tiempo de espera
                    if id in tiempo_espera_personas:
                        del tiempo_espera_personas[id]
                    #Registramos el tiempo
                    if id not in tiempo_en_zona_peaton:
                        tiempo_en_zona_peaton[id] = tiempo_actual
                    tiempo_deteccion = tiempo_actual - tiempo_en_zona_peaton[id]

                    contador_semaforo.tiempo_en_zona_peaton[id] = tiempo_deteccion
                        
                    color_especial = (0, 100, 100) 

                elif (c in vehiculos) and interseccion_zona(bbox, zona_coches):
                    contador_vehiculos +=1
                    contador_semaforo.contador_vehiculos = contador_vehiculos

                    if id not in tiempo_espera_vehiculos:
                        tiempo_espera_vehiculos[id] = tiempo_actual

                    tiempo_deteccion = tiempo_actual - tiempo_espera_vehiculos[id]
                    contador_semaforo.tiempo_espera_vehiculos[id] = tiempo_deteccion
                    
                    color_especial = (0, 0, 255)  
                    
                else:
                    # Si no está dentro de ninguna zona específica, usa el color predeterminado y quitamos el tiempo
                    if id in tiempo_espera_personas:
                        del tiempo_espera_personas[id]
                    if id in tiempo_espera_vehiculos:
                        del tiempo_espera_vehiculos[id]
                    color_especial = colors(c, True)
                    tiempo_deteccion = 0
                label = f"{id} {names[c]} {tiempo_deteccion:.2f}s" 
                annotator.box_label(bbox, label, color=color_especial)
                
               
    # Visualizar la cantidad de personas y vehículos en la imagen
    cv2.putText(im0, f"Personas: {contador_personas}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0, 255), 2)
    cv2.putText(im0, f"Vehiculos: {contador_vehiculos}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 255), 2)
                
    #cv2.rectangle(im0, zona_personas[:2], zona_personas[2:], (250, 151, 0), 2)
    #cv2.rectangle(im0, zona_personas2[:2], zona_personas2[2:], (250, 151, 0), 2)
    #cv2.rectangle(im0, zona_coches[:2], zona_coches[2:], (195, 98, 255) , 2)
    #cv2.rectangle(im0, peaton[:2], peaton[2:], (0, 255, 255) , 2)
    dibujar_semaforo(im0, estado_semaforo, estado_semaforo_personas)
    
    # Save results
    im0 = annotator.result()
    vid_writer.write(im0)



vid_writer.release()
cv2.destroyAllWindows()