In [61]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import os

# Load YOLOv8 model once globally
model = YOLO('yolov8x.pt')
model.to("cuda") 
# Define vehicle classes
vehicle_classes = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

def process_lane_video(input_path, output_path, min_box_width=10, min_box_height=10, display=True):
    cap = cv2.VideoCapture(input_path)
    original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Choose target resolution
    target_width, target_height = 1920, 1080
    if original_width < target_width:
        target_width, target_height = original_width, original_height

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (target_width, target_height))

    seen_ids = set()
    vehicle_counts = defaultdict(int)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if (original_width, original_height) != (target_width, target_height):
            frame = cv2.resize(frame, (target_width, target_height))

        try:
            results = model.track(frame, persist=True, tracker="bytetrack.yaml", conf=0.25, verbose=False)
            boxes = results[0].boxes

            if boxes.id is not None:
                ids = boxes.id.cpu().numpy().astype(int)
                cls = boxes.cls.cpu().numpy().astype(int)
                xyxy = boxes.xyxy.cpu().numpy()

                for box, class_id, track_id in zip(xyxy, cls, ids):
                    if class_id not in vehicle_classes:
                        continue

                    width = box[2] - box[0]
                    height = box[3] - box[1]
                    if width < min_box_width or height < min_box_height:
                        continue

                    vehicle_type = vehicle_classes[class_id]

                    if track_id not in seen_ids:
                        seen_ids.add(track_id)
                        vehicle_counts[vehicle_type] += 1

                    x1, y1, x2, y2 = map(int, box)
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(frame, f"{vehicle_type} {track_id}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

            # Overlay counts
            y_offset = 30
            for vehicle_type, count in vehicle_counts.items():
                cv2.putText(frame, f"{vehicle_type}: {count}", (10, y_offset),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
                y_offset += 30

            cv2.putText(frame, f"Total: {sum(vehicle_counts.values())}", (10, y_offset),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

            out.write(frame)

            if display:
                cv2.imshow("Vehicle Counting", frame)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        except Exception as e:
            print(f"⚠️ Processing error: {e}")
            continue

    cap.release()
    out.release()
    cv2.destroyAllWindows()

    print("\n✅ Final Vehicle Counts:")
    for v, c in vehicle_counts.items():
        print(f"{v}: {c}")
    print(f"🎥 Output video saved to: {output_path}")

    return dict(vehicle_counts)  # Return counts as a regular dict




In [62]:
import cv2
import numpy as np
from collections import defaultdict
from ultralytics import YOLO

# Load YOLOv8 model to GPU
model = YOLO("yolov8x.pt")
model.to("cuda")  # Move model to GPU

# Define vehicle classes
vehicle_classes = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

def process_traffic_image(image_path, display=True, save_output=True, output_path="output.jpg"):
    image = cv2.imread(image_path)
    if image is None:
        print(f"❌ Could not read image from: {image_path}")
        return {}

    # Optional resize to speed up detection
    max_dim = 1280
    h, w = image.shape[:2]
    if max(h, w) > max_dim:
        scale = max_dim / max(h, w)
        image = cv2.resize(image, (int(w * scale), int(h * scale)))

    vehicle_counts = defaultdict(int)

    try:
        results = model.predict(image, conf=0.25, iou=0.6, device="cuda", verbose=False)[0]

        if results.boxes is not None:
            boxes = results.boxes
            cls = boxes.cls.cpu().numpy().astype(int)
            xyxy = boxes.xyxy.cpu().numpy()

            for box, class_id in zip(xyxy, cls):
                if class_id not in vehicle_classes:
                    continue

                vehicle_type = vehicle_classes[class_id]
                vehicle_counts[vehicle_type] += 1

                x1, y1, x2, y2 = map(int, box)
                cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(image, f"{vehicle_type}", (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        # Display counts on the image
        y_offset = 30
        for vehicle_type, count in vehicle_counts.items():
            cv2.putText(image, f"{vehicle_type}: {count}", (10, y_offset),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
            y_offset += 30

        cv2.putText(image, f"Total: {sum(vehicle_counts.values())}", (10, y_offset),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        # Show image
        if display:
            cv2.imshow("Traffic Detection", image)
            cv2.waitKey(0)
            cv2.destroyAllWindows()

        # Save output image
        if save_output:
            cv2.imwrite(output_path, image)
            print(f"💾 Image saved to: {output_path}")

    except Exception as e:
        print(f"⚠️ Processing error: {e}")

    print("\n✅ Vehicle Counts from Image:")
    for v, c in vehicle_counts.items():
        print(f"{v}: {c}")

    return dict(vehicle_counts)


In [63]:
lane_counts = []

In [64]:
img_paths = [
    r"C:\Users\Devesh\Downloads\traffic1.png",
    r"C:\Users\Devesh\Downloads\new2.jpg"

]



for idx, path in enumerate(img_paths):
    output = rf"C:\Users\Devesh\Downloads\output_lane{idx+1}.jpg"
    lane_count = process_traffic_image(path, display=False, save_output=True, output_path=output)
    lane_counts.append(lane_count)
print("\n📊 All Lane Counts:")
print(lane_counts)

💾 Image saved to: C:\Users\Devesh\Downloads\output_lane1.jpg

✅ Vehicle Counts from Image:
car: 19
💾 Image saved to: C:\Users\Devesh\Downloads\output_lane2.jpg

✅ Vehicle Counts from Image:
car: 20
bus: 6
truck: 2
motorcycle: 2

📊 All Lane Counts:
[{'car': 19}, {'car': 20, 'bus': 6, 'truck': 2, 'motorcycle': 2}]


In [65]:
video_paths = [
   r"C:\Users\Devesh\Downloads\5927708-hd_1080_1920_30fps.mp4",
   r"C:\Users\Devesh\Downloads\Untitled video - Made with Clipchamp.mp4"

]



for idx, path in enumerate(video_paths):
    output = rf"C:\Users\Devesh\Downloads\output_lane{idx+1}.mp4"
    lane_count = process_lane_video(path, output, display=False)
    lane_counts.append(lane_count)

print("\n📊 All Lane Counts:")
print(lane_counts)


✅ Final Vehicle Counts:
car: 32
bus: 2
motorcycle: 4
truck: 35
🎥 Output video saved to: C:\Users\Devesh\Downloads\output_lane1.mp4

✅ Final Vehicle Counts:
bus: 7
car: 29
truck: 7
🎥 Output video saved to: C:\Users\Devesh\Downloads\output_lane2.mp4

📊 All Lane Counts:
[{'car': 19}, {'car': 20, 'bus': 6, 'truck': 2, 'motorcycle': 2}, {'car': 32, 'bus': 2, 'motorcycle': 4, 'truck': 35}, {'bus': 7, 'car': 29, 'truck': 7}]


In [66]:
def allocate_signal_times(vehicle_counts_list, base_time_per_vehicle=2, priority_bonus=10, max_total_time=300, min_time_per_lane=20):
    """
    Args:
        vehicle_counts_list: List of dicts, one for each lane.
        base_time_per_vehicle: Seconds allocated per vehicle by default.
        priority_bonus: Extra time for bus-heavy lanes with 5+ more vehicles.
        max_total_time: Maximum total time to distribute (default is 300 seconds).
        min_time_per_lane: Minimum time allocated per lane (default is 20 seconds).

    Returns:
        Dictionary with lane index and allocated time in seconds,
        sorted by descending time (lane needing more time appears first).
    """
    times = []
    total_vehicles_per_lane = []

    # Step 1: Compute base times and total vehicle count
    for counts in vehicle_counts_list:
        total = sum(counts.values())
        total_vehicles_per_lane.append(total)
        time = total * base_time_per_vehicle
        times.append(time)

    # Step 2: Apply priority bonus adjustments
    for i in range(len(vehicle_counts_list)):
        for j in range(i + 1, len(vehicle_counts_list)):
            diff = abs(total_vehicles_per_lane[i] - total_vehicles_per_lane[j])
            if diff >= 5:
                buses_i = vehicle_counts_list[i].get('bus', 0)
                buses_j = vehicle_counts_list[j].get('bus', 0)

                if buses_i > buses_j:
                    times[i] += priority_bonus
                    times[j] -= min(priority_bonus, times[j])
                elif buses_j > buses_i:
                    times[j] += priority_bonus
                    times[i] -= min(priority_bonus, times[i])

    # Step 3: Ensure each lane gets at least 20 seconds
    times = [max(min_time_per_lane, int(t)) for t in times]

    # Step 4: Calculate remaining time to distribute
    total_allocated_time = sum(times)

    # If the total allocated time exceeds max_total_time, scale it down proportionally
    if total_allocated_time > max_total_time:
        scaling_factor = max_total_time / total_allocated_time
        times = [int(t * scaling_factor) for t in times]

    # Step 5: Calculate remaining time after ensuring each lane has 20 seconds
    remaining_time = max_total_time - sum(times)

    # Step 6: Allocate the remaining time proportionally based on vehicle counts
    if remaining_time > 0:
        # Calculate the proportional share for each lane based on vehicle count
        total_vehicles = sum(total_vehicles_per_lane)
        if total_vehicles > 0:
            proportional_shares = [total_vehicles_per_lane[i] / total_vehicles for i in range(len(vehicle_counts_list))]
            additional_times = [int(remaining_time * share) for share in proportional_shares]

            # Add the proportional time to each lane's time
            for i in range(len(times)):
                times[i] += additional_times[i]

    # Step 7: Enforce the maximum time of 300 seconds per lane
    times = [min(300, t) for t in times]

    # Step 8: Create a list of (lane_name, time) and sort by time descending
    lane_times = [(f"lane_{i+1}", t) for i, t in enumerate(times)]
    lane_times.sort(key=lambda x: x[1], reverse=True)

    return dict(lane_times)


In [67]:
ans=allocate_signal_times(lane_counts)

In [68]:
ans

{'lane_3': 119, 'lane_4': 101, 'lane_2': 61, 'lane_1': 17}