In [None]:
!pip install roboflow opencv-python requests tqdm roboflow supervision ultralytics inference-gpu

In [None]:
api_key = "" # API Key del proyecto
model_name = "traffic-control-managment"

In [None]:
import roboflow

rf = roboflow.Roboflow(api_key=api_key) 
project = rf.workspace().project(model_name)
model = project.version(1).model

In [None]:
test_image_path = "Frames\frame_00000.jpg"
prediction = model.predict(test_image_path).json()

# Ver los objetos detectados
for pred in prediction['predictions']:
    print(f"{pred['class']} - {pred['confidence']:.2f} - {pred['x']}, {pred['y']}")

print(f"Model classes {model.classes}")
print([[pred['class'], pred['class_id']] for pred in prediction['predictions']])

In [None]:
import cv2
import supervision as sv
from roboflow import Roboflow
import numpy as np
from google.colab.patches import cv2_imshow
import time
from dataclasses import dataclass

# === Settings ===
VIDEO_PATH = "/content/traffic_video_output_video_1_minute.mp4"
OUTPUT_PATH = "/content/output_video.mp4"

TARGET_FPS = 5
MODEL_CONFIDENCE = 0.4  # Higher more precision
MODEL_OVERLAP = 30
MIN_GREEN_TIME = 10  # Seconds
VEHICLE_THRESHOLD = 5  # para cambiar a verde

# === Video Info ===
video_info = sv.VideoInfo.from_video_path(VIDEO_PATH)
frame_generator = sv.get_video_frames_generator(VIDEO_PATH)

# === Classes ===
CLASS_NAMES = {0: "Car", 1: "Pedestrian"}

# === Tracker ===
tracker = sv.ByteTrack(
    track_activation_threshold=0.25,
    lost_track_buffer=30,
    minimum_matching_threshold=0.8,
    frame_rate=video_info.fps
)

# === Annotators ===
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()
line_annotator = sv.LineZoneAnnotator()

# === Estructura de datos para carriles ===
@dataclass
class Lane:
    name: str
    line_waiting: sv.LineZone
    line_exiting: sv.LineZone
    light_status: str = "RED"
    last_switch: float = time.time()
    green_start: float = None
    waiting_ids: set = None
    exiting_ids: set = None
    last_counted_frame: dict = None

    def __post_init__(self):
        self.waiting_ids = set()
        self.exiting_ids = set()
        self.last_counted_frame = {}

    def update_counts(self, car_detections, frame_number):
        waiting_crossed_in, _ = self.line_waiting.trigger(car_detections)
        exiting_crossed_in, _ = self.line_exiting.trigger(car_detections)

        for detection_idx in range(len(car_detections)):
            tracker_id = car_detections.tracker_id[detection_idx]
            if tracker_id is None:
                continue

            if tracker_id in self.last_counted_frame and frame_number - self.last_counted_frame[tracker_id] < 30:
                continue

            if waiting_crossed_in[detection_idx] and tracker_id not in self.waiting_ids:
                self.waiting_ids.add(tracker_id)
                self.last_counted_frame[tracker_id] = frame_number

            if self.light_status == "GREEN" and exiting_crossed_in[detection_idx] and tracker_id not in self.exiting_ids:
                self.exiting_ids.add(tracker_id)
                self.last_counted_frame[tracker_id] = frame_number

        return len(self.waiting_ids), len(self.exiting_ids)

    def update_light(self):
        now = time.time()
        time_in_state = now - self.last_switch
        waiting = len(self.waiting_ids)

        if self.light_status == "RED":
            if waiting >= VEHICLE_THRESHOLD and time_in_state > 5:
                self.light_status = "GREEN"
                self.last_switch = now
                self.green_start = now
                self.exiting_ids = set()
        else:
            if now - self.green_start >= MIN_GREEN_TIME and waiting < 3:
                self.light_status = "RED"
                self.last_switch = now
                self.green_start = None
                self.waiting_ids = set()

        time_left = max(0, MIN_GREEN_TIME - (now - self.green_start)) if self.light_status == "GREEN" else 0
        return self.light_status, time_left

# === Definición de carriles ===
lanes = [
    Lane(
        name="North",
        line_waiting=sv.LineZone(
            start=sv.Point(638, 414), end=sv.Point(369, 439),
            triggering_anchors=[sv.Position.CENTER, sv.Position.BOTTOM_CENTER],
            minimum_crossing_threshold=1
        ),
        line_exiting=sv.LineZone(
            start=sv.Point(837, 573), end=sv.Point(457, 606),
            triggering_anchors=[sv.Position.CENTER, sv.Position.BOTTOM_CENTER],
            minimum_crossing_threshold=1
        )
    )
]

# === Filtro ===
def filter_car_detections(detections):
    return sv.Detections(
        xyxy=detections.xyxy,
        confidence=detections.confidence,
        class_id=detections.class_id,
        tracker_id=detections.tracker_id if detections.tracker_id is not None else None
    )

# === Procesamiento ===
def process_frame(frame: np.ndarray, frame_number: int) -> np.ndarray:
    result = model.predict(frame, confidence=MODEL_CONFIDENCE, overlap=MODEL_OVERLAP).json()
    detections = sv.Detections.from_inference(result)
    all_detections = filter_car_detections(detections)
    tracked_detections = tracker.update_with_detections(all_detections)

    for lane in lanes:
        waiting, exiting = lane.update_counts(tracked_detections, frame_number)
        lane.update_light()

    annotated = frame.copy()
    labels = [
        f"#{tid} {CLASS_NAMES.get(cls, '?')} {conf:.2f}"
        for tid, cls, conf in zip(
            tracked_detections.tracker_id,
            tracked_detections.class_id,
            tracked_detections.confidence
        )
    ]
    annotated = box_annotator.annotate(annotated, tracked_detections)
    annotated = label_annotator.annotate(annotated, tracked_detections, labels)

    for i, lane in enumerate(lanes):
        annotated = line_annotator.annotate(annotated, lane.line_waiting)
        annotated = line_annotator.annotate(annotated, lane.line_exiting)

        color = (0, 255, 0) if lane.light_status == "GREEN" else (0, 0, 255)
        y_offset = 70 + i * 80

        cv2.putText(annotated, f"{lane.name}: {lane.light_status}", (50, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
        cv2.putText(annotated, f"Waiting: {len(lane.waiting_ids)}", (250, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        cv2.putText(annotated, f"Exited: {len(lane.exiting_ids)}", (450, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)

        if lane.light_status == "GREEN":
            time_left = max(0, MIN_GREEN_TIME - (time.time() - lane.green_start))
            cv2.putText(annotated, f"Time left: {time_left:.1f}s", (650, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        cv2.circle(annotated, (30, y_offset - 10), 10, color, -1)

    return annotated

# === Escritura para el video ===
video_writer = cv2.VideoWriter(
    OUTPUT_PATH,
    cv2.VideoWriter_fourcc(*'mp4v'),
    TARGET_FPS,
    (video_info.width, video_info.height)
)

for i, frame in enumerate(frame_generator):
    if i % int(video_info.fps / TARGET_FPS) != 0:
        continue

    processed = process_frame(frame, i) # Procesa cada frame del video
    video_writer.write(processed) # Crea el nuevo video

    if i % 100 == 0: # Muestra el frame de ejemplo (Cada 100 frames)
        cv2_imshow(processed)

video_writer.release()
cv2.destroyAllWindows()
print(f"✅ Video guardado en {OUTPUT_PATH}")