# **Algoritmo de Detección de Coches - Adrián Rodríguez Pérez y Víctor Díaz Puga**

In [1]:
!py -m pip install opencv-python
!py -m pip install matplotlib

"py" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.
"py" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.


In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from collections import deque

-----

### **Visualización del Vídeo**

In [3]:
#1. Visualizamos el vídeo
import cv2

# Abrir el archivo de vídeo
cap = cv2.VideoCapture("trafico.mp4")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Mostrar el fotograma en una ventana
    cv2.imshow("Reproducción", frame)

    # Espera 25 ms; si se pulsa 'q', sale del bucle
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


-----

### **Promediado de Imagen del Vídeo "trafico.mp4"**

Ejecutamos todo el vídeo a tiempo real para procesar cada frame

In [56]:
# Promediamos las imágenes
cap = cv2.VideoCapture("trafico.mp4")
ret, frame = cap.read()

# Variables para acumular frames
accumulated_frame = None
frame_count = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break  # Fin del video

    # Convierte el frame a float32 para acumulación
    frame_float = frame.astype(np.float32)

    if accumulated_frame is None:
        accumulated_frame = frame_float
    else:
        accumulated_frame += frame_float
    frame_count += 1

# Calcula el promedio
background = accumulated_frame / frame_count

# Convierte de nuevo a uint8 para mostrar/guardar
background_uint8 = cv2.convertScaleAbs(background)

cv2.imwrite('carretera_promedio.jpg', background_uint8)
cv2.waitKey(0)
cv2.destroyAllWindows()

-----

### **Vídeo Procesado**

Detectamos cada coche del vídeo y obtenemos su velocidad y le proporcionamos un id a cada uno, el algoritmo se divide en:
 - **Rutas y Parámetros**: En esta sección guardamos el vídeo y la imagen promedio que usaremos para detectar el movimiento, además estableceremos todas las constantes que usaremos más tarde
 - **Carga y Preparación**: Obtendremos las variables que dependen del vídeo como el tamaño del mismo o su relación entre píxeles y metros, también definimos la máscara para eliminar el movimiento del temporizador
 - **Estado del Tracker**: Definimos todas las funciones que usaremos después en el bucle para obtener la posición, la velocidad y el seguimiento de los coches con sus ids asociados
 - **Bucle Principal**: Procesa cada frame del vídeo para analizar el movimiento, hace todos los cálculos necesarios para dibujar el contador de coches, dónde se encuentra cada uno, su respectivo id y a la velocidad a la que se desplaza en el mismo vídeo y mostrarlo




In [5]:
# Ejemplo de Uso con "trafico.mp4"
# ---------- RUTAS y PARÁMETROS ----------
VIDEO_PATH = "trafico.mp4"
FONDO_PATH = "carretera_promedio.jpg"

THRESH = 30  # umbral binarización
MIN_AREA_BASE = 400  # base
MIN_AREA_TOP = 1200  # altura
KERNEL_SIZE = (5, 5)

TS_Y1, TS_Y2 = -120, -64
TS_X1, TS_X2 = -311, -221

# Calibración simple px->m (reemplaza por medidas reales)
KNOWN_DISTANCE_M = 7
CAL_P1 = (1113, 843)
CAL_P2 = (1199, 762)

# Tracker params
MAX_DISAPPEAR = 15  # frames antes de borrar objeto
MAX_DISTANCE = 200  # distancia máxima para considerar que sigue siendo el mismo objeto
PERSPECTIVE_FACTOR = 2  # ajusta la variación de px/m con la altura
SPEED_AVG_WINDOW = 100  # usamos 100 frames para promediar velocidades
DIST_THRESHOLD = 90  # píxeles de distancia máxima para fusionar

# ---------- CARGA y PREPARACIÓN ----------
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise SystemExit(f"No se pudo abrir el vídeo '{VIDEO_PATH}'")

frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))  # obtenemos la anchura del vídeo
frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))  # obtenemos la altura del vídeo
FPS = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 25.0  # fps del vídeo

# máscara del timestamp (en píxeles absolutos)
def resolve_coord(v, limit): return v if v >= 0 else limit + v
ts_y1 = resolve_coord(TS_Y1, frame_h); ts_y2 = resolve_coord(TS_Y2, frame_h)
ts_x1 = resolve_coord(TS_X1, frame_w); ts_x2 = resolve_coord(TS_X2, frame_w)
mask_full = np.ones((frame_h, frame_w), dtype=np.uint8) * 255

# aplicar máscara timestamp
if True:  # la desactivaríamos para vídeos sin cronómetro
    mask_full[ts_y1:ts_y2, ts_x1:ts_x2] = 0  # pone a 0 la parte del cronómetro de la imagen binaria
ts_rect = (ts_x1, ts_y1, ts_x2 - ts_x1, ts_y2 - ts_y1)

kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, KERNEL_SIZE)

fondo = cv2.imread(FONDO_PATH, cv2.IMREAD_GRAYSCALE)  # escala de grises
fondo_masked = cv2.bitwise_and(fondo, fondo, mask=mask_full)  # aplicar máscara de tiempo al fondo

# escala px->m
cal_dist_px = np.hypot(CAL_P2[0]-CAL_P1[0], CAL_P2[1]-CAL_P1[1])
PIXELS_PER_METER = cal_dist_px / KNOWN_DISTANCE_M  # calculamos los píxeles por metro

def pixels_per_meter_at(y_center, frame_h=frame_h):
    """
    Devuelve px/m estimados en función de la altura del objeto en el vídeo
    Los objetos más bajos en la imagen están más cerca, más px por metro
    """
    # normalizar y en [0,1] (0 = arriba, 1 = abajo)
    yn = float(y_center) / float(frame_h)
    # factor de corrección centrado en 1.0
    corr = 1 + PERSPECTIVE_FACTOR * (yn - 0.5) * 2
    # limitar corr para evitar velocidades extremos
    corr = max(0.3, min(corr, 3))  # factor entre 0.3 y 3
    return PIXELS_PER_METER * corr

# ---------- ESTADO DEL TRACKER (persistente) ----------
next_object_id = 0
objects = {}  # id -> posición central del coche
disappeared = {}  # id -> número de frames consecutivos en los que ese id no se detectó
tracks = {}  # id -> historial de posiciones (frame_index, posición)
frame_index = 0  # contador global de frames

def centroid_from_box(box):
    x, y, w, h = box  # x e y son la esquina superior izquierda, w y h el ancho y alto
    return (int(x + w/2), int(y + h/2))  # centro del blob (suma para bajar y mover a la derecha)

def euclidean(a, b):
    return np.hypot(a[0]-b[0], a[1]-b[1])  # distancia entre 2 puntos

# registrar nuevo objeto en el tracker
def register_object(centroid):
    global next_object_id
    objects[next_object_id] = centroid
    disappeared[next_object_id] = 0
    tracks[next_object_id] = deque(maxlen=50)  # genera la tupla
    tracks[next_object_id].append((frame_index, centroid))
    next_object_id += 1

# eliminar objeto del tracker
def deregister_object(obj_id):  # le pasamos el id del objeto a eliminar
    objects.pop(obj_id, None)
    disappeared.pop(obj_id, None)
    tracks.pop(obj_id, None)

# actualizar el tracker con nuevas detecciones
def update_tracker(detections_centroids):  # le pasamos la lista de centroides detectados en el frame actual
    if len(detections_centroids) == 0:  # si no hay detecciones en el frame actual, aumentamos el contador de desaparecidos a todos
        for oid in list(disappeared.keys()):
            disappeared[oid] += 1
            if disappeared[oid] > MAX_DISAPPEAR:
                deregister_object(oid)  # eliminar objeto
        return

    if len(objects) == 0:
        for c in detections_centroids: register_object(c); return  # registrar todos los coches cuando no hay ninguno

    obj_ids = list(objects.keys())  # lista de ids de vehículos actuales
    obj_centroids = [objects[i] for i in obj_ids]  # lista de centroides actuales

    D = np.zeros((len(obj_centroids), len(detections_centroids)))  # matriz de distancias (objetos existente x detecciones nuevas)
    for i, oc in enumerate(obj_centroids):
        for j, dc in enumerate(detections_centroids):
            D[i, j] = euclidean(oc, dc)

    rows = D.min(axis=1).argsort()
    cols = D.argmin(axis=1)[rows]
    used_rows, used_cols = set(), set()

    for r, c in zip(rows, cols):
        if r in used_rows or c in used_cols: continue
        if D[r, c] > MAX_DISTANCE: continue  # si la distancia es muy grande, no es el mismo objeto
        oid = obj_ids[r]  # nueva detección para el mejor futuro cadidato del objeto existente
        objects[oid] = detections_centroids[c]
        disappeared[oid] = 0
        tracks[oid].append((frame_index, detections_centroids[c]))
        used_rows.add(r); used_cols.add(c)

    for j in range(len(detections_centroids)):
        if j not in used_cols: register_object(detections_centroids[j])  # registra nuevas detecciones que no coinciden con objetos existentes

    for i in range(len(obj_centroids)):  # si hay ids de objetos existentes que no se han actualizado, aumenta su contador de desaparecido
        if i not in used_rows:
            oid = obj_ids[i]
            disappeared[oid] += 1
            if disappeared[oid] > MAX_DISAPPEAR: deregister_object(oid)

# calcular velocidad promedio para un track
def compute_speed_for_track(track_deque):
    pts = list(track_deque)[-SPEED_AVG_WINDOW:]
    if len(pts) < 2:
        return None

    # sumar desplazamientos segmentales en píxeles y convertir por segmento con su escala local
    total_m = 0.0
    total_secs = 0.0
    for k in range(1, len(pts)):
        f0, c0 = pts[k-1]
        f1, c1 = pts[k]
        df = f1 - f0
        if df <= 0:
            continue
        dp_px = euclidean(c0, c1)
        # usar y promedio del segmento para estimar px->m local
        y_avg = (c0[1] + c1[1]) / 2.0
        pxpm = pixels_per_meter_at(y_avg)
        dp_m = dp_px / pxpm
        secs = df / float(FPS)
        total_m += dp_m
        total_secs += secs

    if total_secs <= 0 or total_m <= 0:
        return None

    speed_kmh = (total_m / total_secs) * 3.6
    return speed_kmh

def rect_intersection_area(a, b):
    ax, ay, aw, ah = a; bx, by, bw, bh = b
    ix1, iy1 = max(ax, bx), max(ay, by)
    ix2, iy2 = min(ax+aw, bx+bw), min(ay+ah, by+bh)
    iw, ih = max(0, ix2-ix1), max(0, iy2-iy1)
    return iw*ih

def nms_boxes(boxes, iou_thresh):  # devuelve lista de cajas que no se superponen
    if not boxes: return []
    boxes = sorted(boxes, key=lambda b: b[2]*b[3], reverse=True)
    keep = []
    def iou(a,b):  # te dice si está muy superpuesto el objeto entre 0 y 1
        x1 = max(a[0], b[0]); y1 = max(a[1], b[1])
        x2 = min(a[0]+a[2], b[0]+b[2]); y2 = min(a[1]+a[3], b[1]+b[3])
        w = max(0, x2-x1); h = max(0, y2-y1)
        inter = w*h
        union = a[2]*a[3] + b[2]*b[3] - inter
        return inter/union if union>0 else 0
    for b in boxes:
        if all(iou(b, k) <= iou_thresh for k in keep):
            keep.append(b)
    return keep

def fuse_close_centroids(detections_centroids):
    fused_centroids = []
    used = set()

    for i, c1 in enumerate(detections_centroids):
        if i in used:
            continue
        grupo = [c1]
        for j, c2 in enumerate(detections_centroids):
            if j <= i or j in used:
                continue
            dx = c1[0] - c2[0]
            dy = c1[1] - c2[1]
            dist = (dx**2 + dy**2)**0.5
            if dist < DIST_THRESHOLD:
                grupo.append(c2)
                used.add(j)
        # Promediamos el grupo de centroides cercanos
        if len(grupo) > 1:
            avg_x = sum(c[0] for c in grupo) / len(grupo)
            avg_y = sum(c[1] for c in grupo) / len(grupo)
            fused_centroids.append((int(avg_x), int(avg_y)))
        else:
            fused_centroids.append(c1)
    return fused_centroids

# ---------- BUCLE PRINCIPAL ----------
while True:
    ret, frame = cap.read()
    if not ret: break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray = cv2.GaussianBlur(gray, (5,5), 0)

    gray_masked = cv2.bitwise_and(gray, gray, mask=mask_full)
    diff = cv2.absdiff(gray_masked, fondo_masked)
    _, thresh = cv2.threshold(diff, THRESH, 255, cv2.THRESH_BINARY)

    opened = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=1)
    closed = cv2.morphologyEx(opened, cv2.MORPH_CLOSE, kernel, iterations=2)
    cleaned = cv2.dilate(closed, kernel, iterations=2)

    contours, _ = cv2.findContours(cleaned, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    detecciones = []

    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area < MIN_AREA_BASE: continue
        x, y, w, h = cv2.boundingRect(cnt)

        # ignorar si intersecta fuertemente con timestamp
        inter = rect_intersection_area((x,y,w,h), ts_rect)
        if inter > 0 and (inter / float(w*h)) > 0.15: continue

        # área mínima dinámica según perspectiva
        y_center = y + h//2
        vertical_factor = y_center / frame_h
        min_area = MIN_AREA_BASE + (1 - vertical_factor) * (MIN_AREA_TOP - MIN_AREA_BASE)
        if area < min_area: continue

        # caso simple: cajas pequeñas -> añadir tal cual
        if area < 2 * min_area or w < 80:
            detecciones.append((x,y,w,h)); continue

        # para blobs grandes: intentar segmentar
        roi = cleaned[y:y+h, x:x+w].copy()
        roi = cv2.morphologyEx(roi, cv2.MORPH_CLOSE, kernel, iterations=2)

        num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(roi, connectivity=8)
        roi_filtered = np.zeros_like(roi)
        for i in range(1, num_labels):
            a = stats[i, cv2.CC_STAT_AREA]
            if a >= max(80, 0.01 * area): roi_filtered[labels==i] = 255

        ccnts, _ = cv2.findContours(roi_filtered, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        if len(ccnts) <= 1:
            detecciones.append((x,y,w,h)); continue

        dist = cv2.distanceTransform(roi_filtered, cv2.DIST_L2, 5)
        _, sure_fg = cv2.threshold(dist, 0.2 * dist.max(), 255, 0)
        sure_fg = np.uint8(sure_fg)
        unknown = cv2.subtract(roi_filtered, sure_fg)
        num_markers, markers = cv2.connectedComponents(sure_fg)
        markers = markers + 1
        markers[unknown==255] = 0
        roi_color = cv2.cvtColor(roi, cv2.COLOR_GRAY2BGR)
        cv2.watershed(roi_color, markers)

        for m in range(2, num_markers+1):
            mask_m = np.uint8(markers == m) * 255
            cnts_m, _ = cv2.findContours(mask_m, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            for c_m in cnts_m:
                a_m = cv2.contourArea(c_m)
                if a_m < 80: continue
                x2, y2, w2, h2 = cv2.boundingRect(c_m)
                detecciones.append((x + x2, y + y2, w2, h2))

    # NMS y filtrado final
    final_boxes = nms_boxes(detecciones, iou_thresh=0)

    # tracker update
    detections_centroids = [centroid_from_box(b) for b in final_boxes]

    # Fusión de centroides muy cercanos
    detections_centroids = fuse_close_centroids(detections_centroids)
    
    update_tracker(detections_centroids)

    # DIBUJAR: sólo velocidad (si disponible) y bbox
    for (x, y, w, h) in final_boxes:
        aspect = w / float(h + 1e-6)
        if aspect < 0.25 or aspect > 4.5 or h < 12: continue

        c = centroid_from_box((x,y,w,h))
        assigned_id = None; min_d = 1e9
        for oid, cent in objects.items():
            d = euclidean(c, cent)
            if d < min_d and d < MAX_DISTANCE:
                min_d = d; assigned_id = oid

        color = (0,255,0)

        label = ""
        if assigned_id is not None:
            speed = compute_speed_for_track(tracks[assigned_id])
            if speed is not None:
                label = f"{int(round(speed))} km/h, ID:{assigned_id}"

        if speed is not None:
            if speed <= 120:
                cv2.rectangle(frame, (x,y), (x+w, y+h), color, 2)
            else:
                cv2.rectangle(frame, (x,y), (x+w, y+h), (0,0,255), 2)

        if label:
            (text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            text_x = x + max(0, (w - text_w)//2)
            text_y = y - 8
            cv2.rectangle(frame, (text_x - 4, text_y - text_h - 4), (text_x + text_w + 4, text_y + 4), (0,0,0), -1)
            cv2.putText(frame, label, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    # info y calibración visual
    cv2.putText(frame, f"Scale: {PIXELS_PER_METER:.1f}px/m, FPS: {FPS:.0f}, Num Coches: {next_object_id}", (140,50), cv2.FONT_HERSHEY_SIMPLEX, 1, (117, 38, 40), 2)

    cv2.imshow("Frame", frame)
    frame_index += 1
    if cv2.waitKey(25) & 0xFF == ord("q"): break

cap.release()
cv2.destroyAllWindows()