In [1]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import pandas as pd
import time

video_paths = [
    "videos/video21.mp4",
    "videos/video22.mp4",
    "videos/video23.mp4",
    "videos/video24.mp4"
]
model_path = "runs/detect/train4/weights/best.pt"
meters_per_pixel = 0.01
INFER_IMG_SIZE = 640

font = cv2.FONT_HERSHEY_SIMPLEX

colors = {
    0: (0, 255, 0),     
    1: (0, 0, 255),     
    2: (255, 0, 0),     
    3: (0, 255, 255),   
    4: (255, 0, 255)    
}

class_map = {
    0: "car",
    1: "moto",
    2: "bus",
    3: "truck",     
    4: "auto"
}

print("Cell executed ✅")


Cell executed ✅


In [2]:
caps = [cv2.VideoCapture(p) for p in video_paths]
assert all(c.isOpened() for c in caps)

model = YOLO(model_path)
fps = int(caps[0].get(cv2.CAP_PROP_FPS))
frame_w = int(caps[0].get(cv2.CAP_PROP_FRAME_WIDTH))
frame_h = int(caps[0].get(cv2.CAP_PROP_FRAME_HEIGHT))

SUB_W = 480
SUB_H = int(frame_h * (SUB_W / frame_w))
out_w, out_h = SUB_W * 2, SUB_H * 2

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter("output_grid9.mp4", fourcc, fps, (out_w, out_h))

cv2.namedWindow("Multi-Video Analysis", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Multi-Video Analysis", out_w, out_h)

unique_ids = [set() for _ in range(4)]
last_centroids = [dict() for _ in range(4)]
speed_samples = [list() for _ in range(4)]
classwise_counts = [defaultdict(int) for _ in range(4)]

print("Cell executed ✅")


Cell executed ✅


In [3]:
while True:
    frames = []
    rets = []
    for cap in caps:
        ret, frame = cap.read()
        rets.append(ret)
        frames.append(frame if ret else np.zeros((frame_h, frame_w, 3), dtype=np.uint8))
    if not any(rets):
        break

    results = model.track(frames, imgsz=INFER_IMG_SIZE, conf=0.25, iou=0.45, persist=True)
    processed_subframes = []

    for idx, det in enumerate(results):
        sub = cv2.resize(frames[idx], (SUB_W, SUB_H))

        if det.boxes is not None and len(det.boxes) > 0:
            xyxy = det.boxes.xyxy.cpu().numpy()
            cls_ids = det.boxes.cls.cpu().numpy().astype(int)
            track_ids = det.boxes.id.cpu().numpy().astype(int) if det.boxes.id is not None else [-1] * len(cls_ids)

            for (x1, y1, x2, y2), class_idx, track_id in zip(xyxy, cls_ids, track_ids):
                if class_idx == 3:
                    class_idx = 4

                cx = int((x1 + x2) / 2)
                cy = int((y1 + y2) / 2)

                scale_x = SUB_W / frame_w
                scale_y = SUB_H / frame_h
                sx1, sy1 = int(x1 * scale_x), int(y1 * scale_y)
                sx2, sy2 = int(x2 * scale_x), int(y2 * scale_y)
                scx, scy = int(cx * scale_x), int(cy * scale_y)

                color = colors[class_idx]
                label_text = f"{track_id}:{class_map[class_idx]}"
                cv2.rectangle(sub, (sx1, sy1), (sx2, sy2), color, 2)
                cv2.putText(sub, label_text, (sx1, sy1 - 5), font, 0.3, color, 1)

                unique_ids[idx].add(track_id)
                classwise_counts[idx][class_idx] += 1

                prev = last_centroids[idx].get(track_id, None)
                if prev is not None:
                    dx, dy = cx - prev[0], cy - prev[1]
                    pixel_dist = np.hypot(dx, dy)
                    speed_m_s = pixel_dist * meters_per_pixel * fps
                    speed_kmh = (speed_m_s * 3.6) / 6
                    speed_samples[idx].append(speed_kmh)
                    cv2.putText(sub, f"{speed_kmh:.1f} km/h", (sx1, sy2 + 12), font, 0.3, color, 1)

                last_centroids[idx][track_id] = (cx, cy)

        total = len(unique_ids[idx])
        avg_speed = np.mean(speed_samples[idx]) if speed_samples[idx] else 0
        green_time = min(60, 10 + int(total * 0.2 + avg_speed * 0.02))

        stats = [
            f"Video {idx+1}",
            f"Count: {total}",
            f"Avg Speed: {avg_speed:.1f} km/h",
            f"Green Time: {green_time} s"
        ]
        y_offset = 20
        for stat in stats:
            cv2.putText(sub, stat, (10, y_offset), font, 0.5, (0, 0, 0), 3)
            cv2.putText(sub, stat, (10, y_offset), font, 0.5, (255, 255, 255), 1)
            y_offset += 25

        processed_subframes.append(sub)

    top = np.hstack((processed_subframes[0], processed_subframes[1]))
    bottom = np.hstack((processed_subframes[2], processed_subframes[3]))
    grid = np.vstack((top, bottom))

    out.write(grid)
    cv2.imshow("Multi-Video Analysis", grid)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

print("Cell executed ✅")



0: 384x640 8 cars, 1 motorcycle, 18.2ms
1: 384x640 4 cars, 2 motorcycles, 18.2ms
2: 384x640 2 cars, 18.2ms
3: 384x640 2 cars, 2 motorcycles, 2 trucks, 18.2ms
Speed: 3.4ms preprocess, 18.2ms inference, 38.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 cars, 5.8ms
1: 384x640 4 cars, 1 motorcycle, 5.8ms
2: 384x640 1 car, 5.8ms
3: 384x640 3 cars, 2 motorcycles, 1 bus, 2 trucks, 5.8ms
Speed: 2.9ms preprocess, 5.8ms inference, 2.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 5.8ms
1: 384x640 1 car, 5.8ms
2: 384x640 2 cars, 5.8ms
3: 384x640 1 car, 5.8ms
Speed: 3.0ms preprocess, 5.8ms inference, 2.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 motorcycle, 5.7ms
1: 384x640 3 cars, 2 motorcycles, 5.7ms
2: 384x640 1 car, 5.7ms
3: 384x640 3 cars, 4 motorcycles, 1 bus, 2 trucks, 5.7ms
Speed: 2.9ms preprocess, 5.7ms inference, 2.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 motorcycle, 5.8ms
1: 384x64

In [4]:
for cap in caps:
    cap.release()
out.release()
cv2.destroyAllWindows()

print("Cell executed ✅")


Cell executed ✅


In [5]:
csv_rows = []
for i in range(4):
    total = len(unique_ids[i])
    avg = np.mean(speed_samples[i]) if speed_samples[i] else 0
    green_time = min(60, 10 + int(total * 0.2 + avg_speed * 0.02))
    print(f"Video {i + 1}: Vehicles: {total}, Avg Speed: {avg:.2f} km/h, Green Time: {green_time} s")
    for cls_id, cnt in classwise_counts[i].items():
        csv_rows.append({
            "Video": f"Video {i+1}",
            "Class": class_map.get(cls_id, "auto"),
            "Count": cnt
        })

df = pd.DataFrame(csv_rows)
df.to_csv("classwise_stats.csv", index=False)

print("Cell executed ✅")


Video 1: Vehicles: 30, Avg Speed: 57.60 km/h, Green Time: 17 s
Video 2: Vehicles: 80, Avg Speed: 63.72 km/h, Green Time: 27 s
Video 3: Vehicles: 181, Avg Speed: 57.61 km/h, Green Time: 47 s
Video 4: Vehicles: 206, Avg Speed: 57.00 km/h, Green Time: 52 s
Cell executed ✅


In [6]:
final_frame = np.zeros((out_h, out_w, 3), dtype=np.uint8)
for i in range(4):
    total = len(unique_ids[i])
    avg = np.mean(speed_samples[i]) if speed_samples[i] else 0
    green_time = min(60, 10 + int(total * 0.2))
    text = f"Video {i+1} → Count: {total}, Avg Speed: {avg:.1f} km/h, Green Time: {green_time} s"
    y = 40 + 60 * i
    cv2.putText(final_frame, text, (20, y), font, 1.0, (255, 255, 255), 2)

for _ in range(fps * 3):
    out.write(final_frame)

print("Cell executed ✅")


Cell executed ✅
