In [1]:
import cv2
import numpy as np
import random
import time
import threading

# Load pre-trained cascade classifiers for each type of vehicle
car_cascade = cv2.CascadeClassifier(r"C:\Users\91957\Downloads\cars.xml")
two_wheeler_cascade = cv2.CascadeClassifier(r"C:\Users\91957\Downloads\two_wheeler.xml")
bus_cascade = cv2.CascadeClassifier(r"C:\Users\91957\Downloads\Bus_front.xml")

# Define the delay between consecutive vehicle detections (in seconds)
detection_delay = 3

# Function to detect vehicles in an image
def detect_vehicles(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    # Detect cars
    cars = car_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    # Detect two-wheelers
    two_wheelers = two_wheeler_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    # Detect buses
    buses = bus_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    # Combine all detections into a single list
    all_vehicles = list(cars) + list(two_wheelers) + list(buses)
    
    return len(all_vehicles), all_vehicles

# Simulate traffic volume data based on the number of detected vehicles
def simulate_traffic(total_vehicles):
    # Adjust traffic volume based on the total number of detected vehicles
    # You can define your logic here to map the number of vehicles to traffic volume
    # For simplicity, we'll use a linear mapping
    traffic_volume = total_vehicles * 10  # Example: Each vehicle contributes 10 units of traffic volume
    return min(100, traffic_volume)  # Ensure traffic volume does not exceed 100

# Algorithm to adjust signal duration based on traffic volume
def adjust_signal_duration(traffic_volume):
    if traffic_volume < 30:
        return 3  # Low traffic, short green signal
    elif 30 <= traffic_volume < 70:
        return 5   # Moderate traffic, medium green signal
    else:
        return 10   # High traffic, longer green signal

# Function to process frames and display traffic information
def process_frames():
    cap = cv2.VideoCapture(0)  # Use the default camera (change to desired camera index if needed)
    
    while True:
        # Capture frame from camera
        ret, frame = cap.read()
        
        # Detect vehicles in the frame
        total_vehicles, vehicle_boxes = detect_vehicles(frame)
        
        # Simulate traffic volume based on the number of detected vehicles
        traffic_volume = simulate_traffic(total_vehicles)
        
        # Adjust signal duration based on traffic volume
        signal_duration = adjust_signal_duration(traffic_volume)
        
        # Draw bounding boxes around detected vehicles
        for (x, y, w, h) in vehicle_boxes:
            cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
        
        # Display the number of vehicles detected
        cv2.putText(frame, f"Total Vehicles: {total_vehicles}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
        
        # Display the signal duration
        cv2.putText(frame, f"Signal Duration: {signal_duration} seconds", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

        # Display the frame with bounding boxes, vehicle count, and signal duration
        cv2.imshow('Traffic Monitoring', frame)
        
        # Introduce a delay between consecutive vehicle detections
        time.sleep(detection_delay)
        
        # Exit loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release the camera and close OpenCV windows
    cap.release()
    cv2.destroyAllWindows()

# Run the process_frames function in a separate thread
if __name__ == "__main__":
    thread = threading.Thread(target=process_frames)
    thread.start()
