###### Importaciones y configuración

In [7]:
import os
import cv2
import time
import pandas as pd
from pathlib import Path
from tqdm import tqdm

from ultralytics import YOLO

INPUT_VIDEO = "data/lemons_video.mp4"
OUTPUT_DIR = "outputs"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Línea virtual de conteo (x1, y1) -> (x2, y2)
LINE_P1 = (100, 360)
LINE_P2 = (1180, 360)

CONFIDENCE = 0.35      # confianza mínima
IOU = 0.5              # IoU para NMS
MODEL_NAME = "yolov8n.pt"

RESIZE_TO = None

###### Utilidades

In [8]:
def draw_line(frame, p1, p2, color=(0, 255, 255), thickness=2):
    cv2.line(frame, p1, p2, color, thickness)

def put_text(frame, text, org=(20, 40), color=(0, 255, 0), scale=1.0, thickness=2):
    cv2.putText(frame, text, org, cv2.FONT_HERSHEY_SIMPLEX, scale, color, thickness, cv2.LINE_AA)

def center_of_bbox(xyxy):
    x1, y1, x2, y2 = xyxy
    cx = int((x1 + x2) / 2)
    cy = int((y1 + y2) / 2)
    return cx, cy

def side_of_line(p, a, b):

    return (b[0]-a[0])*(p[1]-a[1]) - (b[1]-a[1])*(p[0]-a[0])

def ensure_dir(path):
    Path(path).parent.mkdir(parents=True, exist_ok=True)

def readable_time(seconds):
    return time.strftime('%H:%M:%S', time.gmtime(seconds))


###### Cargar modelo YOLO

In [9]:
model = YOLO(MODEL_NAME)

###### Procesamiento del video: detección, tracking y conteo

In [10]:
# Diccionarios para trackear posiciones previas por ID y conteo
prev_side_by_id = {}
count_total = 0

# Parámetros para evitar conteos dobles y jitter
COOLDOWN_FRAMES = 15
MIN_TRACK_AGE = 3
EPS_SIDE = 5.0

last_count_frame_by_id = {}   # ID -> último frame donde se contó (para cooldown)
track_age_by_id = {}          # ID -> cuántos frames ha llevado en escena

csv_path = os.path.join(OUTPUT_DIR, "results.csv")
ensure_dir(csv_path)
csv_rows = []
csv_cols = ["frame", "timestamp_sec", "count_frame", "count_total"]

crosses_csv_path = os.path.join(OUTPUT_DIR, "crosses.csv")
crosses_rows = []
crosses_cols = ["frame", "timestamp_sec", "track_id", "direction", "bbox", "confidence"]

cap = cv2.VideoCapture(INPUT_VIDEO)
assert cap.isOpened(), f"No se pudo abrir el video: {INPUT_VIDEO}"

fps_in = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

if isinstance(RESIZE_TO, (tuple, list)) and len(RESIZE_TO) == 2:
    w, h = RESIZE_TO

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out_path = os.path.join(OUTPUT_DIR, "output_video.mp4")
out = cv2.VideoWriter(out_path, fourcc, fps_in, (w, h))

frame_idx = 0
pbar = tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or None, desc="Procesando")

results_gen = model.track(
    source=INPUT_VIDEO,
    conf=CONFIDENCE,
    iou=IOU,
    stream=True,
    persist=True,
    verbose=False
)

for result in results_gen:
    frame = result.orig_img.copy()

    if isinstance(RESIZE_TO, (tuple, list)) and len(RESIZE_TO) == 2:+
        frame = cv2.resize(frame, RESIZE_TO)

    boxes = result.boxes.xyxy.cpu().numpy() if result.boxes is not None else []
    ids = result.boxes.id.cpu().numpy().astype(int) if (result.boxes is not None and result.boxes.id is not None) else []
    confs = result.boxes.conf.cpu().numpy() if (result.boxes is not None and result.boxes.conf is not None) else []
    count_frame = 0

    draw_line(frame, LINE_P1, LINE_P2, color=(0, 255, 255), thickness=2)

    # Actualizar edades de tracks: incrementa en 1 si aparece, resetea para los que ya no aparecen
    present_ids = set(int(x) for x in ids) if len(ids) > 0 else set()

    for tid in present_ids:
        track_age_by_id[tid] = track_age_by_id.get(tid, 0) + 1

    # Procesar cada detección con ID
    for i, box in enumerate(boxes):
        track_id = int(ids[i]) if i < len(ids) else None
        x1, y1, x2, y2 = map(int, box[:4])
        cx, cy = center_of_bbox((x1, y1, x2, y2))

        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 140, 255), 2)
        put_text(frame, f"ID {track_id}", (x1, max(y1-8, 10)), color=(255, 255, 255), scale=0.5, thickness=1)
        cv2.circle(frame, (cx, cy), 3, (0, 255, 0), -1)

        # Determinar el lado actual respecto a la línea
        raw_side = side_of_line((cx, cy), LINE_P1, LINE_P2)
        side_now = 0 if abs(raw_side) < EPS_SIDE else raw_side
        prev_side = prev_side_by_id.get(track_id)

        age = track_age_by_id.get(track_id, 0)
        if age < MIN_TRACK_AGE:
            prev_side_by_id[track_id] = side_now if side_now != 0 else prev_side_by_id.get(track_id)
            continue

        if prev_side is not None and side_now != 0:
            if (prev_side > 0 and side_now < 0) or (prev_side < 0 and side_now > 0):
                # Verificar cooldown para evitar conteos repetidos por jitter o reassignment de ID
                last_frame = last_count_frame_by_id.get(track_id, -9999)
                if (frame_idx - last_frame) >= COOLDOWN_FRAMES:
                    count_total += 1
                    count_frame += 1
                    last_count_frame_by_id[track_id] = frame_idx

                    direction = "A->B" if (prev_side > 0 and side_now < 0) else "B->A"
                    conf = float(confs[i]) if i < len(confs) else None
                    bbox_str = f"[{x1},{y1},{x2},{y2}]"
                    tsec = round(frame_idx / fps_in, 3)
                    crosses_rows.append([frame_idx, tsec, track_id, direction, bbox_str, conf])

                    put_text(frame, f"CRUCE ID {track_id} {direction}", (x1, y2+18), (0, 200, 255), 0.5, 2)

        if side_now != 0:
            prev_side_by_id[track_id] = side_now

    put_text(frame, f"Frame: {frame_idx}", (20, 40), (0, 255, 0), 0.8, 2)
    put_text(frame, f"Contador frame: {count_frame}", (20, 70), (0, 255, 0), 0.8, 2)
    put_text(frame, f"Total: {count_total}", (20, 100), (0, 255, 0), 0.9, 2)

    out.write(frame)

    tsec = frame_idx / fps_in
    csv_rows.append([frame_idx, round(tsec, 3), count_frame, count_total])

    frame_idx += 1
    pbar.update(1)

cap.release()
out.release()
pbar.close()

df = pd.DataFrame(csv_rows, columns=csv_cols)
df.to_csv(csv_path, index=False)

df_cross = pd.DataFrame(crosses_rows, columns=crosses_cols)
df_cross.to_csv(crosses_csv_path, index=False)

print("Procesamiento finalizado.")
print("Video con overlay:", out_path)
print("Resultados CSV:", csv_path)
print("Registro de cruces:", crosses_csv_path)


Procesando:  98%|█████████▊| 150/153 [00:19<00:00,  7.60it/s]

Procesamiento finalizado.
Video con overlay: outputs/output_video.mp4
Resultados CSV: outputs/results.csv
Registro de cruces: outputs/crosses.csv



