<a href="https://colab.research.google.com/github/Tharaneesh-s/Smart-Traffic-Signal-System-with-AI-Cloud/blob/main/vehicle%20detection%20and%20it's%20density.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install yt-dlp opencv-python-headless ultralytics supervision

In [None]:
import yt_dlp

# Replace with your YouTube video URLs
YOUTUBE_VIDEO_URL1 = "https://youtu.be/iJZcjZD0fw0?si=ipq66LnIp4-0Hv2a"
YOUTUBE_VIDEO_URL2 = "https://youtu.be/MNn9qKG2UFI?si=W4cklEsDahsk2p4p"
YOUTUBE_VIDEO_URL3 = "https://youtu.be/UVCMOQyIz6A?si=98yfZkzIv1aHGuM0"
YOUTUBE_VIDEO_URL4 = "https://youtu.be/UVCMOQyIz6A?si=98yfZkzIv1aHGuM0"

def get_best_video_url(youtube_url):
    ydl_opts = {
        'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
        'quiet': True,
        'noplaylist': True,
        'skip_download': True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info_dict = ydl.extract_info(youtube_url, download=False)

        # Print the full info_dict for debugging
        print("Info Dictionary:", info_dict)

        # Extract the best format URL
        formats = info_dict.get('formats', [])
        for format in formats:
            if format.get('ext') == 'mp4' and format.get('format_id') == '18':  # You can adjust format_id or conditions as needed
                return format.get('url')
        return None

# Get video URLs
video_url1 = get_best_video_url(YOUTUBE_VIDEO_URL1)
video_url2 = get_best_video_url(YOUTUBE_VIDEO_URL2)
video_url3 = get_best_video_url(YOUTUBE_VIDEO_URL3)
video_url4 = get_best_video_url(YOUTUBE_VIDEO_URL4)
print(video_url1)
print(video_url2)
print(video_url3)
print(video_url4)

In [None]:
!pip install paho-mqtt

In [None]:
import paho.mqtt.client as mqtt
import cv2
from ultralytics import YOLO
import supervision as sv
import threading
import time
import logging

# Suppress unwanted logs from libraries
logging.getLogger('ultralytics').setLevel(logging.ERROR)
logging.getLogger('torch').setLevel(logging.ERROR)

# MQTT Configuration
broker = "test.mosquitto.org"
port = 1883
char_topic = "device/traffic1"
num_topic = "device/num"
combined_topic = "device/traffic18"
qos = 0

# Vehicle density values
density_values = {
    'person': 0.2,
    'bicycle': 0.5,
    'car': 1.0,
    'motorbike': 0.7,
    'bus': 3.0,
    'truck': 2.5,
    'unknown': 0.0
}

# Connect to MQTT Broker
def on_connect(client, userdata, flags, rc, properties=None):
    if rc == 0:
        print("Connected to MQTT Broker!")
    else:
        print(f"Failed to connect, return code {rc}\n")

# Initialize MQTT Client
client = mqtt.Client(protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.connect(broker, port, keepalive=60)
client.loop_start()

model = YOLO("yolov8x.pt", verbose=False)

# Vehicle and lane information
vehicle_counts = {'A': 0, 'B': 0, 'C': 0, 'D': 0}
vehicle_types = {'A': {}, 'B': {}, 'C': {}, 'D': {}}
waiting_times = {'A': 0, 'B': 0, 'C': 0, 'D': 0}
opened_lanes = []  # Tracks which lanes have been opened

# Lane opening constants
MIN_OPEN_TIME = 10  # Minimum opening time in seconds
MAX_OPEN_TIME = 60  # Maximum time a lane can stay open

# Function to calculate opening time dynamically
def calculate_opening_time(vehicle_count):
    return min(MAX_OPEN_TIME, MIN_OPEN_TIME + vehicle_count * 2)

# Function to map class IDs to vehicle types
def get_vehicle_type(class_id):
    vehicle_classes = {
        0: 'person',
        1: 'bicycle',
        2: 'car',
        3: 'motorcycle',
        5: 'bus',
        7: 'truck',
        # Add more mappings if needed
    }
    return vehicle_classes.get(class_id, 'unknown')

# Function to calculate density for a lane
def calculate_density(types_count):
    return sum(density_values.get(vehicle_type, 0) * count for vehicle_type, count in types_count.items())

# Function to process a video stream for a lane
def process_video_stream(video_url, lane_id):
    cap = cv2.VideoCapture(video_url)
    if not cap.isOpened():
        print(f"[{lane_id}] Error opening video stream.")
        return

    ret, frame = cap.read()
    if not ret:
        print(f"[{lane_id}] End of video stream.")
        return

    # Process the frame
    result = model(frame, imgsz=1280)[0]
    detections = sv.Detections.from_ultralytics(result)
    detections = detections[detections.confidence > 0.3]
    detections = detections[detections.class_id != 0]  # Exclude class 0 (persons)

    vehicle_count = len(detections)
    vehicle_counts[lane_id] = vehicle_count

    # Count vehicle types
    types_count = {}
    for class_id in detections.class_id:
        vehicle_type = get_vehicle_type(class_id)
        types_count[vehicle_type] = types_count.get(vehicle_type, 0) + 1

    vehicle_types[lane_id] = types_count

    # Calculate density
    lane_density = calculate_density(types_count)

    print(f"[{lane_id}] Detected {vehicle_count} vehicles.")
    for vehicle_type, count in types_count.items():
        print(f"[{lane_id}] {vehicle_type}: {count}")
    print(f"[{lane_id}] Density: {lane_density:.2f}")

    cap.release()

# Function to manage lane control based on priorities and dynamic timings
def manage_lane_control(video_urls):
    while True:
        print("Calculating lane priorities...")

        # Process all video streams and print vehicle information
        for lane_id, video_url in zip(['A', 'B', 'C', 'D'], video_urls):
            process_video_stream(video_url, lane_id)

        # Calculate lane priority scores (placeholder logic)
        lane_priority_scores = {
            'A': waiting_times['A'],
            'B': waiting_times['B'],
            'C': waiting_times['C'],
            'D': waiting_times['D'],
        }

        # Select lane with highest priority
        available_lanes = {k: v for k, v in lane_priority_scores.items() if k not in opened_lanes}
        if available_lanes:
            max_priority_lane = max(available_lanes, key=available_lanes.get)
            print(f"Lane {max_priority_lane} selected for opening.")

            # Dynamically calculate opening time for the lane
            lane_opening_time = calculate_opening_time(vehicle_counts[max_priority_lane])
            print(f"Lane {max_priority_lane} will remain open for {lane_opening_time} seconds.")

            # Publish MQTT messages
            client.publish(char_topic, max_priority_lane, qos=qos)
            client.publish(num_topic, lane_opening_time, qos=qos)
            combined_message = f"{max_priority_lane}-{lane_opening_time}"
            client.publish(combined_topic, combined_message, qos=qos)
            print(f"Published MQTT messages: {max_priority_lane} to {char_topic}, {lane_opening_time} to {num_topic}, {combined_message} to {combined_topic}")

            # Wait for the lane's calculated opening time
            time.sleep(lane_opening_time)

            # Mark this lane as opened in the current cycle
            opened_lanes.append(max_priority_lane)

            # If all lanes have been opened, reset for a new cycle
            if len(opened_lanes) == 4:
                print("All lanes have been opened, resetting cycle.")
                opened_lanes.clear()

        # Sleep for a short time before rechecking priorities
        time.sleep(1)

# Function to process all video streams and control lanes
def process_videos(video_urls):
    global vehicle_counts, vehicle_types, waiting_times
    vehicle_counts = {chr(64 + i): 0 for i in range(1, 5)}
    vehicle_types = {chr(64 + i): {} for i in range(1, 5)}
    waiting_times = {chr(64 + i): 0 for i in range(1, 5)}

    # Start lane control management in a separate thread
    lane_control_thread = threading.Thread(target=manage_lane_control, args=(video_urls,))
    lane_control_thread.start()

    lane_control_thread.join()

    # Stop MQTT client once done
    client.loop_stop()
    client.disconnect()

# Video URLs (replace with actual video paths or URLs)
video_urls = [video_url1, video_url2, video_url3, video_url4]

# Start processing videos
process_videos(video_urls)
