Implementación del Sistema de Detección y Conteo de Vehículos
Para la realización de este trabajo se ha utilizado exclusivamente la librería OpenCV (cv2) junto con numpy para el manejo de matrices y operaciones matemáticas. El objetivo del código es procesar un flujo de video de tráfico, detectar vehículos en movimiento, realizar un seguimiento (tracking) de los mismos y contarlos según crucen líneas virtuales predefinidas en diferentes carriles y direcciones.

1. Preprocesamiento y Funciones Auxiliares
El primer paso fundamental es el acondicionamiento de la imagen. Dado que el video original puede tener una resolución elevada (1920x1080), se define la función rescale para redimensionar cada frame a una escala más manejable (0.6x). Esto no solo facilita la visualización en pantalla, sino que reduce significativamente la carga computacional, permitiendo un procesamiento más fluido en tiempo real.

In [4]:
def rescale(frame, scale=0.6):
    """Escala un frame a un tamaño más manejable."""
    w = int(frame.shape[1] * scale)
    h = int(frame.shape[0] * scale)
    return cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA)

Adicionalmente, se implementan funciones geométricas clave para la lógica de conteo: point_line_signed_distance y point_to_segment_proj_dist. Estas funciones permiten calcular la posición relativa de un punto (centroide del vehículo) respecto a un segmento de línea, determinando si ha ocurrido un cruce y la distancia exacta al segmento, lo cual es vital para evitar falsos positivos.

2. Detección de Movimiento y Sustracción de Fondo
El núcleo del sistema de detección recae en el algoritmo de sustracción de fondo MOG2 (createBackgroundSubtractorMOG2). Este método modela cada píxel como una mezcla de gaussianas, lo que le permite adaptarse a cambios de iluminación graduales y separar los objetos en movimiento del fondo estático.

In [5]:
object_detector = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=40, detectShadows=True)

Se establece un periodo de calibración inicial de 60 frames donde el sistema procesa el video sin realizar conteo. Esto permite que el modelo de fondo se estabilice y aprenda la escena estática, evitando detecciones erróneas al inicio de la ejecución.

3. Procesamiento Morfológico y Limpieza de Máscara
La máscara binaria obtenida del detector MOG2 suele contener ruido (movimiento de árboles, reflejos) y fragmentación (vehículos divididos en partes). Para solucionar esto, se aplica una cadena de procesamiento de imagen robusta:

Suavizado Gaussiano (GaussianBlur): Se aplica al frame en escala de grises antes de la detección para reducir el ruido de alta frecuencia del asfalto.

Umbralizado (threshold): Se aplica un corte estricto (250) a la máscara resultante para eliminar sombras grises (detectShadows=True las marca en gris), dejando solo los píxeles blancos correspondientes a objetos sólidos.

Operaciones Morfológicas:

Erosión (erode): Con un kernel de 3x3, se eliminan pequeños píxeles de ruido aislados.

Dilatación (dilate): Con un kernel de 5x5, se expanden las regiones blancas restantes para recuperar el volumen del vehículo.

Cierre (morphologyEx con MORPH_CLOSE): Con un kernel de 11x11, se fusionan componentes cercanos. Esto es crítico para detectar camiones o vehículos largos como un único objeto en lugar de múltiples fragmentos (cabina + remolque).

In [6]:
_, image_bin = cv2.threshold(image_mask, 250, 255, cv2.THRESH_BINARY)
image_bin = cv2.erode(image_bin, kernel_erode, iterations=1)
image_bin = cv2.dilate(image_bin, kernel_dilate, iterations=2)
image_bin = cv2.morphologyEx(image_bin, cv2.MORPH_CLOSE, kernel_close)

4. Detección de Contornos y Filtrado
Sobre la máscara binaria limpia, se utiliza cv2.findContours para identificar los objetos. Se aplica un filtrado riguroso basado en el área (MIN_AREA_BLOB = 250), dimensiones mínimas (MIN_W, MIN_H) y relación de aspecto (MIN_ASPECT_RATIO, MAX_ASPECT_RATIO). Además, se introduce un filtro de horizonte (if y < 150: continue), descartando cualquier movimiento en la parte superior de la imagen (cielo o vegetación lejana) que no corresponda a la carretera.

5. Seguimiento (Tracking) y Lógica de Conteo
El sistema implementa un algoritmo de seguimiento basado en la distancia Euclidiana entre centroides. Para cada frame, se comparan los centroides detectados con los objetos ya registrados en tracking_objects. Si la distancia es menor a MAX_DISTANCE, se actualiza la posición del objeto existente; de lo contrario, se registra como un nuevo vehículo con un ID único.

El conteo se realiza mediante líneas virtuales definidas por coordenadas (P1, P2). Se evalúa si la trayectoria de un vehículo cruza estas líneas analizando el cambio de signo en la distancia con signo (point_line_signed_distance) entre su posición anterior y la actual.

Se han definido 5 líneas de conteo estratégicas para cubrir diferentes flujos de tráfico:

V1 y V2: Tráfico principal Norte y Sur.

V3 y V4: Incorporaciones laterales.

V5: Carril de desaceleración.

Cada vehículo cuenta con un registro de banderas (direction_counted) para asegurar que solo sea contabilizado una vez por cada línea que cruce, garantizando la precisión del sistema "Total" y de los contadores individuales por vía.

6. Visualización de Resultados
Finalmente, el sistema genera una visualización en tiempo real compuesta por tres capas de información:

Video Original: Referencia visual.

Máscara Procesada: Visualización de la "visión" interna del algoritmo, útil para depuración.

Monitor Final: Overlay con las cajas delimitadoras (bounding boxes) de los vehículos, sus IDs, las líneas de conteo y un panel de estadísticas actualizado frame a frame.

In [None]:
import cv2
import numpy as np
from collections import defaultdict

VIDEO_PATH = 'trafico.mp4'
SCALE = 0.6
MIN_AREA, MAX_DIST = 100, 80
PX_TO_METERS = 0.15 

KERNEL_ERODE = np.ones((3, 3), np.uint8)
KERNEL_DILATE = np.ones((5, 5), np.uint8)
KERNEL_CLOSE = np.ones((11, 11), np.uint8)

LINES_CFG = [
    {"id": "north", "label": "NORTE (P)", "pts": ((550, 450), (650, 400)), "col": (255, 0, 0),   "th": 25, "tol": (0, 1)},
    {"id": "south", "label": "SUR (P)",   "pts": ((750, 425), (850, 375)), "col": (0, 0, 255),   "th": 25, "tol": (0, 1)},
    {"id": "inc_n", "label": "INC NORTE", "pts": ((300, 550), (450, 500)), "col": (255, 255, 0), "th": 40, "tol": (0, 1)},
    {"id": "inc_s", "label": "INC SUR",   "pts": ((880, 290), (940, 250)), "col": (0, 255, 255), "th": 25, "tol": (0, 1)},
    {"id": "decel", "label": "DECEL",     "pts": ((150, 200), (120, 165)),  "col": (255, 0, 255), "th": 60, "tol": (-0.2, 1.2)} 
]

def get_crossing_info(pt, a, b):
    v_line = np.array(b) - np.array(a)
    v_pt = np.array(pt) - np.array(a)
    line_len_sq = v_line.dot(v_line)
    if line_len_sq == 0: return np.linalg.norm(v_pt), 0, 0
    t = v_pt.dot(v_line) / line_len_sq
    dist = np.linalg.norm(np.array(pt) - (np.array(a) + t * v_line))
    sign = (b[0]-a[0])*(pt[1]-a[1]) - (b[1]-a[1])*(pt[0]-a[0])
    return dist, t, sign

def rescale(frame, scale=SCALE):
    return cv2.resize(frame, (int(frame.shape[1]*scale), int(frame.shape[0]*scale)), interpolation=cv2.INTER_AREA)

def estimate_speed(hist, lookback=6):
    """Calcula km/h basándose en la distancia recorrida en los últimos 'lookback' frames."""
    if len(hist) < lookback: return 0.0
    p1, t1 = hist[-lookback][:2], hist[-lookback][2]
    p2, t2 = hist[-1][:2], hist[-1][2]
    dist_px = np.hypot(p2[0]-p1[0], p2[1]-p1[1])
    dist_m = dist_px * PX_TO_METERS
    time_s = (t2 - t1) / 1000.0
    if time_s <= 0: return 0.0
    speed_ms = dist_m / time_s
    return speed_ms * 3.6

cap = cv2.VideoCapture(VIDEO_PATH)
bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=250, varThreshold=32, detectShadows=True)

# Estabilización inicial
for _ in range(150):
    ret, fr = cap.read()
    if ret: bg_subtractor.apply(cv2.cvtColor(rescale(fr), cv2.COLOR_BGR2GRAY))

# Estado Global
tracking_objs = {} 
counts = {cfg["id"]: 0 for cfg in LINES_CFG}
counts["total"] = 0
counted_ids = {cfg["id"]: set() for cfg in LINES_CFG} 
next_id = 0

while True:
    ret, frame = cap.read()
    if not ret: break
    
    frame = rescale(frame)
    curr_t = cap.get(cv2.CAP_PROP_POS_MSEC)
    
    # 1. Procesamiento de Imagen
    mask = bg_subtractor.apply(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), learningRate=0.003)
    mask[mask == 127] = 0 
    mask = cv2.medianBlur(mask, 5)
    
    # --- VISUALIZACIÓN 1: Máscara Cruda ---
    cv2.imshow("1. Mascara Cruda (BgSub)", mask)

    _, bin_img = cv2.threshold(mask, 200, 255, cv2.THRESH_BINARY)
    bin_img = cv2.morphologyEx(bin_img, cv2.MORPH_OPEN, KERNEL_ERODE)
    bin_img = cv2.morphologyEx(bin_img, cv2.MORPH_CLOSE, KERNEL_CLOSE)
    
    # --- VISUALIZACIÓN 2: Binaria Final ---
    cv2.imshow("2. Imagen Binarizada (Final)", bin_img)

    # 2. Tracking
    contours, _ = cv2.findContours(bin_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    curr_objs = {}
    
    for cnt in contours:
        if cv2.contourArea(cnt) < MIN_AREA: continue
        x, y, w, h = cv2.boundingRect(cnt)
        if w < 10 or h < 10 or not (0.2 < w/h < 5.0): continue
        
        cx, cy = x + w//2, y + h//2
        
        c_id, min_d = -1, MAX_DIST
        for tid, (tx, ty, thist) in tracking_objs.items():
            d = np.hypot(cx - tx, cy - ty)
            if d < min_d: min_d, c_id = d, tid
            
        if c_id == -1: c_id, next_id = next_id, next_id + 1
        
        hist = tracking_objs[c_id][2] if c_id in tracking_objs else []
        hist.append((cx, cy, curr_t))
        if len(hist) > 20: hist = hist[-20:]
        curr_objs[c_id] = (cx, cy, hist)

        # Cálculo de velocidad
        kmh = estimate_speed(hist)

        # Lógica de Cruce
        if len(hist) >= 2:
            prev_pt, curr_pt = hist[-2][:2], hist[-1][:2]
            for line in LINES_CFG:
                lid = line["id"]
                if c_id in counted_ids[lid]: continue
                dist, t, sign_curr = get_crossing_info(curr_pt, line["pts"][0], line["pts"][1])
                _, _, sign_prev = get_crossing_info(prev_pt, line["pts"][0], line["pts"][1])
                
                if (sign_curr * sign_prev < 0) and (dist < line["th"]) and (line["tol"][0] <= t <= line["tol"][1]):
                    counts[lid] += 1
                    counts["total"] += 1
                    counted_ids[lid].add(c_id)

        # Dibujar
        is_counted = any(c_id in s for s in counted_ids.values())
        col = (0, 255, 0) if is_counted else (0, 255, 255)
        cv2.rectangle(frame, (x, y), (x+w, y+h), col, 2)
        label = f"ID:{c_id} {int(kmh)}km/h"
        cv2.putText(frame, label, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, col, 1)

    tracking_objs = curr_objs

    # 3. Panel Informativo
    overlay = frame.copy()
    cv2.rectangle(overlay, (10, 10), (320, 60 + len(LINES_CFG)*30), (0, 0, 0), -1)
    frame = cv2.addWeighted(overlay, 0.4, frame, 0.6, 0)
    
    cv2.putText(frame, f"TOTAL: {counts['total']}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
    for i, line in enumerate(LINES_CFG):
        cv2.line(frame, line["pts"][0], line["pts"][1], line["col"], 2)
        txt = f"{line['label']}: {counts[line['id']]}"
        cv2.putText(frame, txt, (20, 80 + i*30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, line["col"], 2)

    cv2.imshow("3. Traffic Monitor", frame)
    if cv2.waitKey(15) & 0xFF == ord('d'): break

cap.release()
cv2.destroyAllWindows()

### Código para el segundo vídeo de prueba con la misma estructura y esqueleto pero parámetros cambiados

In [1]:
import cv2
import numpy as np
from collections import defaultdict

def rescale(frame, scale=0.8): 
    w = int(frame.shape[1] * scale)
    h = int(frame.shape[0] * scale)
    return cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA)

def point_line_signed_distance(pt, a, b):
    return (b[0]-a[0])*(pt[1]-a[1]) - (b[1]-a[1])*(pt[0]-a[0])

def point_to_segment_proj_dist(pt, a, b):
    pax = pt[0] - a[0]; pay = pt[1] - a[1]
    bax = b[0] - a[0]; bay = b[1] - a[1]
    denom = bax*bax + bay*bay
    if denom == 0: return np.hypot(pax, pay), 0.0
    t = (pax*bax + pay*bay) / denom
    if t < 0.0: projx, projy = a
    elif t > 1.0: projx, projy = b
    else: projx, projy = a[0] + t * bax, a[1] + t * bay
    return np.hypot(pt[0]-projx, pt[1]-projy), t

cap = cv2.VideoCapture('trafico_2.mp4')
FPS = cap.get(cv2.CAP_PROP_FPS) or 25

object_detector = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=25, detectShadows=True)

tracking_objects = {} 
car_ids_counted = set()
vehicle_counts = {"Total": 0}
left_road_count = 0; right_road_count = 0  
direction_counted = defaultdict(set)
next_car_id = 0

MIN_AREA_BLOB = 250    
MAX_DISTANCE = 150     
MAX_DISAPPEARED = 10   

kernel_close = np.ones((9, 9), np.uint8)  
kernel_open = np.ones((3, 3), np.uint8)   
kernel_dilate = np.ones((3, 3), np.uint8) 

PX_TO_KMH_FACTOR = 2.6
LINE_V1_P1 = (20, 420); LINE_V1_P2 = (450, 420)
LINE_V2_P1 = (600, 420); LINE_V2_P2 = (1050, 420)
LINE_DIST_THRESH = 40

print("Calibrando...")
for _ in range(40):
    ret, frame = cap.read()
    if not ret: break
    frame_s = rescale(frame)
    gray = cv2.cvtColor(frame_s, cv2.COLOR_BGR2GRAY)
    gray_blur = cv2.GaussianBlur(gray, (5, 5), 0)
    object_detector.apply(gray_blur)

while True:
    ret, frame = cap.read()
    if not ret: break

    frame_s = rescale(frame)
    current_time = cap.get(cv2.CAP_PROP_POS_MSEC)
    
    cv2.imshow("1. Video Original", frame_s)

    gray = cv2.cvtColor(frame_s, cv2.COLOR_BGR2GRAY)
    gray_blur = cv2.GaussianBlur(gray, (5, 5), 0)

    image_mask_raw = object_detector.apply(gray_blur, learningRate=0.001)
    
    _, image_bin = cv2.threshold(image_mask_raw, 200, 255, cv2.THRESH_BINARY)
    
    image_bin = cv2.morphologyEx(image_bin, cv2.MORPH_CLOSE, kernel_close) 
    image_bin = cv2.morphologyEx(image_bin, cv2.MORPH_OPEN, kernel_open)
    image_bin = cv2.dilate(image_bin, kernel_dilate, iterations=1)         
    
    cv2.imshow("2. Mascara Procesada", image_bin)

    contours, _ = cv2.findContours(image_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    img_final = frame_s.copy()
    detections = []

    cv2.line(img_final, LINE_V1_P1, LINE_V1_P2, (255, 0, 0), 2)
    cv2.line(img_final, LINE_V2_P1, LINE_V2_P2, (0, 0, 255), 2)

    for cnt in contours:
        area = cv2.contourArea(cnt)
        if area < MIN_AREA_BLOB: continue
        
        x, y, w, h = cv2.boundingRect(cnt)
        
        if y < 220: continue 
        
        cx, cy = x + w//2, y + h//2
        detections.append((cx, cy, x, y, w, h))

    if len(tracking_objects) == 0:
        for det in detections:
            cx, cy, x, y, w, h = det
            tracking_objects[next_car_id] = {'center':(cx,cy), 'history':[(cx,cy)], 'disappeared':0, 'box':(x,y,w,h)}
            next_car_id += 1
    else:
        object_ids = list(tracking_objects.keys())
        current_centroids = [tracking_objects[obj_id]['center'] for obj_id in object_ids]
        
        used_rows = set(); used_cols = set()
        if len(detections) > 0:
            D = np.zeros((len(current_centroids), len(detections)))
            for i in range(len(current_centroids)):
                for j in range(len(detections)):
                    D[i, j] = np.hypot(current_centroids[i][0]-detections[j][0], current_centroids[i][1]-detections[j][1])

            rows = D.min(axis=1).argsort(); cols = D.argmin(axis=1)[rows]

            for (row, col) in zip(rows, cols):
                if row in used_rows or col in used_cols: continue
                if D[row, col] > MAX_DISTANCE: continue

                obj_id = object_ids[row]
                cx, cy, x, y, w, h = detections[col]
                
                tracking_objects[obj_id]['center'] = (cx, cy)
                tracking_objects[obj_id]['box'] = (x, y, w, h)
                tracking_objects[obj_id]['disappeared'] = 0
                tracking_objects[obj_id]['history'].append((cx, cy))
                if len(tracking_objects[obj_id]['history']) > 20: tracking_objects[obj_id]['history'].pop(0)

                used_rows.add(row); used_cols.add(col)

        for row in range(len(object_ids)):
            if row not in used_rows: tracking_objects[object_ids[row]]['disappeared'] += 1
        for col in range(len(detections)):
            if col not in used_cols:
                cx, cy, x, y, w, h = detections[col]
                tracking_objects[next_car_id] = {'center':(cx,cy), 'history':[(cx,cy)], 'disappeared':0, 'box':(x,y,w,h)}
                next_car_id += 1

    tracking_objects = {k: v for k, v in tracking_objects.items() if v['disappeared'] <= MAX_DISAPPEARED}

    for car_id, data in tracking_objects.items():
        if data['disappeared'] > 0: continue
        cx, cy = data['center']; x, y, w, h = data['box']; hist = data['history']

        def count_vehicle(direction_key, ref_counter):
            if direction_key not in direction_counted[car_id]:
                direction_counted[car_id].add(direction_key)
                if car_id not in car_ids_counted:
                    vehicle_counts["Total"] += 1
                    car_ids_counted.add(car_id)
                return True
            return False

        if len(hist) >= 3:
            (cx_prev, cy_prev) = hist[-3]
            (cx_curr, cy_curr) = hist[-1]
            if (point_line_signed_distance((cx_prev, cy_prev), LINE_V1_P1, LINE_V1_P2) * point_line_signed_distance((cx_curr, cy_curr), LINE_V1_P1, LINE_V1_P2) < 0):
                d, t = point_to_segment_proj_dist((cx_curr, cy_curr), LINE_V1_P1, LINE_V1_P2)
                if d < LINE_DIST_THRESH and 0 <= t <= 1: 
                    if count_vehicle("left_road", left_road_count): left_road_count += 1
            if (point_line_signed_distance((cx_prev, cy_prev), LINE_V2_P1, LINE_V2_P2) * point_line_signed_distance((cx_curr, cy_curr), LINE_V2_P1, LINE_V2_P2) < 0):
                d, t = point_to_segment_proj_dist((cx_curr, cy_curr), LINE_V2_P1, LINE_V2_P2)
                if d < LINE_DIST_THRESH and 0 <= t <= 1: 
                    if count_vehicle("right_road", right_road_count): right_road_count += 1

        color = (0, 255, 0) if car_id in car_ids_counted else (0, 255, 255)
        cv2.rectangle(img_final, (x, y), (x+w, y+h), color, 2)
        cv2.putText(img_final, f"ID:{car_id}", (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    # Interfaz
    overlay = img_final.copy()
    cv2.rectangle(overlay, (0,0), (300,120), (0,0,0), -1)
    img_final = cv2.addWeighted(overlay, 0.6, img_final, 0.4, 0)
    cv2.putText(img_final, f"TOTAL: {vehicle_counts['Total']}", (10,35), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,255,0), 2)
    cv2.putText(img_final, f"Ida: {left_road_count}", (10,70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,200,200), 1)
    cv2.putText(img_final, f"Vuelta: {right_road_count}", (10,100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200,200,255), 1)

    cv2.imshow("3. Monitor de Trafico (Final)", img_final)

    if cv2.waitKey(15) & 0xFF == ord('d'):
        break

cap.release()
cv2.destroyAllWindows()

Calibrando...
