<a href="https://colab.research.google.com/github/SriramShravya/traffic_prediction/blob/main/robofl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ultralytics roboflow


In [None]:
from roboflow import Roboflow #from roboflow

rf = Roboflow(api_key="7vYkKz547Z8GPgQrTSMy")
project = rf.workspace("traffic-zi58y").project("my-first-project-dcbu4")
version = project.version(1)
dataset = version.download("yolov8")

In [None]:
#training yolov8 on roboflow dataset
from ultralytics import YOLO

model = YOLO("yolov8s.pt")  # Better choice for small dataset

model.train(
    data="/content/My-First-Project-1/data.yaml",
    epochs=150,     # or 200 if needed
    imgsz=640,
    patience=15     # Stop early if no improvement
)


In [None]:
!cat /content/My-First-Project-1/data.yaml


In [None]:
model.val()


In [None]:
from ultralytics import YOLO
import cv2

# Load trained YOLOv8 model
model = YOLO("/content/runs/detect/train/weights/best.pt")

# Load input video
input_path = "/content/video10s.mp4"
cap = cv2.VideoCapture(input_path)

# Get video properties
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

# Define output video writer
output_path = "/content/output.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # Codec
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Frame-by-frame inference
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Predict with model
    results = model.predict(source=frame, conf=0.5, imgsz=640, verbose=False)

    # Draw results on frame
    annotated_frame = results[0].plot()  # Draw boxes and labels

    # Write annotated frame to output video
    out.write(annotated_frame)

# Release everything
cap.release()
out.release()
print(f"✅ Output video saved to: {output_path}")


In [None]:
!pip install opencv-python pandas numpy torchvision


In [None]:
from google.colab.patches import cv2_imshow


In [None]:
from ultralytics import YOLO
import cv2
import torch
import pandas as pd
import numpy as np
from torchvision.ops import box_iou

# Load models
auto_model = YOLO("/content/runs/detect/train/weights/best.pt")  # Auto-rickshaw model
base_model = YOLO("yolov8s.pt")  # General detection model

# Input video
video_path = "/content/video10s.mp4"
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Output video setup
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter("output2_annotated.mp4", fourcc, fps, (width, height))

# Tracking
frame_idx = 0
output_data = []

# Unique ID tracking sets
counted_person_ids = set()
counted_car_ids = set()
counted_bike_ids = set()
counted_auto_ids = set()

# Constants for class IDs (from COCO)
PERSON_ID = 0
CAR_ID = 2
MOTORCYCLE_ID = 3

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Get predictions from both models
    auto_result = auto_model.track(source=frame, conf=0.3, imgsz=640, persist=True, verbose=False)[0]
    base_result = base_model.track(source=frame, conf=0.3, imgsz=640, persist=True, verbose=False)[0]

    auto_boxes = auto_result.boxes
    base_boxes = base_result.boxes

    person_boxes = []   # List of (box, id) for persons
    vehicle_boxes = []  # List of box tensors for cars, bikes, autos

    # Auto detection
    auto_tensor = torch.stack([box for box in auto_boxes.xyxy.cpu()]) if auto_boxes else torch.empty((0, 4))
    auto_ids = auto_boxes.id.cpu() if auto_boxes and auto_boxes.id is not None else torch.full((0,), -1)

    for i, box in enumerate(auto_boxes.xyxy.cpu() if auto_boxes else []):
        x1, y1, x2, y2 = map(int, box.tolist())
        track_id = int(auto_ids[i]) if i < len(auto_ids) else -1
        if track_id not in counted_auto_ids:
            counted_auto_ids.add(track_id)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 2)
        cv2.putText(frame, f"Auto ID:{track_id}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 2)
        vehicle_boxes.append(box.unsqueeze(0))  # Include in vehicle tensor

    if base_boxes is not None:
        base_cls = base_boxes.cls.cpu()
        base_xyxy = base_boxes.xyxy.cpu()
        base_ids = base_boxes.id.cpu() if base_boxes.id is not None else torch.full((len(base_cls),), -1)

        for i, c in enumerate(base_cls):
            box = base_xyxy[i].unsqueeze(0)
            x1, y1, x2, y2 = map(int, box.squeeze(0).tolist())
            track_id = int(base_ids[i])

            # Skip if overlapping with auto
            if auto_tensor.shape[0] > 0 and (box_iou(box, auto_tensor) > 0.5).any():
                continue

            label = ""
            is_vehicle = False

            if int(c.item()) == PERSON_ID:
                person_boxes.append((box, track_id))  # Save person box + id
                continue  # Draw later if not on vehicle
            elif int(c.item()) == CAR_ID:
                label = "Car"
                if track_id not in counted_car_ids:
                    counted_car_ids.add(track_id)
                is_vehicle = True
            elif int(c.item()) == MOTORCYCLE_ID:
                label = "Motorcycle"
                if track_id not in counted_bike_ids:
                    counted_bike_ids.add(track_id)
                is_vehicle = True

            if label:
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"{label} ID:{track_id}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

            if is_vehicle:
                vehicle_boxes.append(box)

    # Filter and count only walking persons
    if person_boxes:
        person_tensor = torch.cat([b for b, _ in person_boxes], dim=0)
        person_ids = [pid for _, pid in person_boxes]
        vehicle_tensor = torch.cat(vehicle_boxes, dim=0) if vehicle_boxes else torch.empty((0, 4))

        for i in range(person_tensor.shape[0]):
            person_box = person_tensor[i].unsqueeze(0)
            track_id = person_ids[i]

            is_on_vehicle = False
            if vehicle_tensor.shape[0] > 0:
                ious = box_iou(person_box, vehicle_tensor)
                max_iou = ious.max().item()

                if max_iou > 0.3:
                    is_on_vehicle = True
                else:
                    # Vertical overlap check (helps in cases where person is inside bounding box of vehicle)
                    px1, py1, px2, py2 = person_box.squeeze().tolist()
                    for vbox in vehicle_tensor:
                        vx1, vy1, vx2, vy2 = vbox.tolist()
                        if px1 > vx1 and px2 < vx2 and py2 < vy2 and py1 > vy1:
                            is_on_vehicle = True
                            break

            if is_on_vehicle:
                continue  # Skip persons on vehicles

            if track_id not in counted_person_ids:
                counted_person_ids.add(track_id)

            x1, y1, x2, y2 = map(int, person_box.squeeze().tolist())
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
            cv2.putText(frame, f"Person ID:{track_id}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    # Overlay live stats
    stats = (
        f"Persons: {len(counted_person_ids)}  "
        f"Cars: {len(counted_car_ids)}  "
        f"Bikes: {len(counted_bike_ids)}  "
        f"Autos: {len(counted_auto_ids)}"
    )
    cv2.putText(frame, stats, (10, height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)

    out.write(frame)

    # Save data per frame
    output_data.append({
        "frame": frame_idx,
        "timestamp": f"{int(frame_idx // fps // 3600):02}:{int((frame_idx // fps % 3600) // 60):02}:{int(frame_idx // fps % 60):02}",
        "person_count": len(counted_person_ids),
        "car_count": len(counted_car_ids),
        "motorcycle_count": len(counted_bike_ids),
        "autorickshaw_count": len(counted_auto_ids)
    })

    frame_idx += 1

cap.release()
out.release()

# Save full-frame CSV
df = pd.DataFrame(output_data)
df.to_csv("combined_traffic_counts.csv", index=False)
print("✅ Full-frame CSV saved as combined_traffic_counts.csv")
print("✅ Video saved as output2_annotated.mp4")

# 10-second summary
interval_sec = 10
interval_frames = int(fps * interval_sec)
summarized_data = []

for i in range(0, len(output_data), interval_frames):
    chunk = output_data[i:i + interval_frames]
    if not chunk:
        continue

    start_time = chunk[0]['timestamp']
    end_time = chunk[-1]['timestamp']
    person_total = chunk[-1]['person_count']
    car_total = chunk[-1]['car_count']
    bike_total = chunk[-1]['motorcycle_count']
    auto_total = chunk[-1]['autorickshaw_count']

    summarized_data.append({
        "interval_start": start_time,
        "interval_end": end_time,
        "total_persons": person_total,
        "total_cars": car_total,
        "total_motorcycles": bike_total,
        "total_autorickshaws": auto_total
    })

df_summary = pd.DataFrame(summarized_data)
df_summary.to_csv(f"summarized_traffic_every_{interval_sec}s.csv", index=False)
print(f"✅ Summary CSV saved as summarized_traffic_every_{interval_sec}s.csv")
