In [None]:
import cv2
import numpy as np
import torch
import time
from ultralytics import YOLO
from smart_traffic_vision import SmartTrafficVision
from Controller import TrafficController

def draw_traffic_status(frame, controller):
    x, y = 20, 60
    line_h = 30

    texts = [
        f"CURRENT GREEN : Lane {controller.current_lane}",
        f"GREEN TIME   : {controller.green_time} sec",
        f"TIME LEFT    : {controller.time_left()} sec",
    ]

    for i, txt in enumerate(texts):
        cv2.putText(
            frame,
            txt,
            (x, y + i * line_h),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.8,
            (0, 0, 0),
            2
        )


# =========================
# COMPUTE GREEN TIME
# =========================
def compute_green_time(car_count):
    t_start = 1   # وقت بداية المرور
    t_follow = 0.5  # لكل عربية إضافية

    if car_count == 0:
        return 2  # أقل وقت أخضر

    time_needed = t_start + (car_count - 1) * t_follow
    return max(2, min(10, time_needed))  # حددت 10s كحد أقصى للعرض السريع


# =========================
# MAIN
# =========================
def main():
    normalized_lanes = [
        [0, 0.169010, 0.261111, 0.279687, 0.174074],
        [0, 0.169531, 0.511111, 0.281771, 0.244444],
        [0, 0.834635, 0.274074, 0.285938, 0.220370],
        [0, 0.834375, 0.533333, 0.276042, 0.170370],
        [0, 0.571875, 0.837963, 0.127083, 0.229630],
        [0, 0.437240, 0.834722, 0.120313, 0.215741],
        [0, 0.443229, 0.086574, 0.145833, 0.160185],
        [0, 0.588542, 0.084722, 0.109375, 0.167593]
    ]

    input_video = "trafficSystem.mp4"
    cap = cv2.VideoCapture(input_video)

    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    lanes = []
    for i, d in enumerate(normalized_lanes, 1):
        _, xc, yc, w, h = d
        x1 = int((xc - w / 2) * frame_width)
        y1 = int((yc - h / 2) * frame_height)
        x2 = int((xc + w / 2) * frame_width)
        y2 = int((yc + h / 2) * frame_height)
        lanes.append((i, [(x1, y1), (x2, y1), (x2, y2), (x1, y2)]))

    vision = SmartTrafficVision(lanes)
    controller = TrafficController(green_time=1)

    model = YOLO("Models/epoch_16.pt").to("cuda" if torch.cuda.is_available() else "cpu")

    # =========================
    # Video writer setup
    # =========================
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_video = None

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        results = model.track(frame, persist=True, tracker="bytetrack.yaml", conf=0.4, verbose=False)

        vehicles = []
        for r in results:
            if r.boxes is None:
                continue
            for box in r.boxes:
                if box.id is None:
                    continue

                cls_name = model.names[int(box.cls[0])]
                if cls_name not in ["car", "bus", "truck", "motorcycle"]:
                    continue

                vehicles.append(
                    SmartTrafficVision.Vehicle(int(box.id[0]), tuple(map(int, box.xyxy[0])))
                )

        frame = vision.process_frame(frame, vehicles)
        draw_traffic_status(frame, controller)

        if controller.is_phase_finished():
            counts = vision.get_lane_counts()
            next_lane = controller.current_lane % len(lanes) + 1
            cars = counts[f"Lane {next_lane}"]
            new_time = compute_green_time(cars)

            controller.reset(new_time, next_lane)

        # =========================
        # Handle video save
        # =========================
        key = cv2.waitKey(1) & 0xFF

        if key == ord('s'):  # Start recording
            if out_video is None:
                out_name = f"traffic_output_{int(time.time())}.mp4"
                out_video = cv2.VideoWriter(out_name, fourcc, 20, (frame_width, frame_height))
                print(f"Recording video to {out_name}")

        if out_video is not None:
            out_video.write(frame)

        cv2.imshow("Smart Traffic System", frame)

        if key == ord('q'):
            break

    cap.release()
    if out_video is not None:
        out_video.release()
    cv2.destroyAllWindows()


if __name__ == "__main__":
    main()


Recording video to traffic_output_1766070186.mp4
