In [1]:
from ultralytics import YOLO
import cv2
import tkinter as tk
from tkinter import filedialog, messagebox, simpledialog
import time
import threading
import socket
import json
from queue import Queue

In [2]:
# Load YOLOv8 model
model = YOLO('yolov8n.pt')  # Automatically downloads if not present

# Vehicle classes to detect
vehicle_classes = ['car', 'truck', 'bus', 'bicycle', 'motorcycle']

class TrafficSignalController:
    def __init__(self, signal_id, neighbors=None):
        self.signal_id = signal_id
        self.current_signal_time = 10
        self.max_time = 70
        self.min_time = 10
        self.neighbor_signals = {}  # Stores neighbor congestion data
        self.congestion_level = "low"
        
        # Initialize neighbors (simulated)
        if neighbors:
            for neighbor_id in neighbors:
                self.neighbor_signals[neighbor_id] = {
                    'congestion': 'low',
                    'direction': 'outgoing',
                    'last_update': time.time()
                }

    def calculate_signal_time(self, frame_vehicle_count):
        """Adjust signal time based on vehicle count & neighbor congestion"""
        # Base calculation
        if frame_vehicle_count <= 5:
            base_time = 10
        elif 6 <= frame_vehicle_count <= 25:
            base_time = 10 + (frame_vehicle_count - 5) * 2
        else:
            base_time = 50 + (frame_vehicle_count - 25) * 0.8

        # Adjust based on neighbor congestion
        neighbor_adjustment = 0
        current_time = time.time()
        
        for neighbor_id, data in list(self.neighbor_signals.items()):
            # Remove stale entries (>5 sec old)
            if current_time - data['last_update'] > 5:
                continue
                
            if data['congestion'] == 'high' and data['direction'] == 'incoming':
                neighbor_adjustment += 5  # Increase time if neighbor is congested
            elif data['congestion'] == 'low' and data['direction'] == 'outgoing':
                neighbor_adjustment -= 2  # Decrease time if neighbor is clear

        final_time = max(self.min_time, min(base_time + neighbor_adjustment, self.max_time))
        return final_time

    def update_neighbor_status(self, neighbor_id, congestion, direction):
        """Update simulated neighbor's congestion status"""
        if neighbor_id in self.neighbor_signals:
            self.neighbor_signals[neighbor_id] = {
                'congestion': congestion,
                'direction': direction,
                'last_update': time.time()
            }
            # Print the received update
            print(f"Signal {neighbor_id} received update from Signal {self.signal_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")

    def send_congestion_update(self, congestion, direction):
        """Simulate sending congestion status to neighbors"""
        self.congestion_level = congestion
        for neighbor_id in self.neighbor_signals:
            # Print the status being sent before updating
            print(f"Signal {self.signal_id} sending to Signal {neighbor_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")
            # Simulate neighbor receiving the update
            self.update_neighbor_status(neighbor_id, congestion, direction)

class VideoProcessor:
    def __init__(self, signal_id, neighbors=None):
        self.cap = None
        self.processing = False
        self.signal_controller = TrafficSignalController(signal_id, neighbors)
        self.last_frame_details = {
            'frame_number': 0,
            'vehicle_count': 0,
            'signal_time': 10,
            'congestion_level': 'low'
        }
        self.congestion_thresholds = {'low': 10, 'medium': 25, 'high': 40}
        self.direction = 'outgoing'  # Default traffic direction

    def get_congestion_level(self, count):
        if count < self.congestion_thresholds['low']:
            return 'low'
        elif count < self.congestion_thresholds['medium']:
            return 'medium'
        else:
            return 'high'

    def process_video(self, video_source):
        self.cap = cv2.VideoCapture(video_source)
        if not self.cap.isOpened():
            messagebox.showerror("Error", "Could not open video source!")
            return

        self.processing = True
        frame_number = 0
        last_update_time = 0

        while self.processing:
            ret, frame = self.cap.read()
            if not ret:
                break

            frame_number += 1
            results = model(frame)
            result_frame = results[0].plot()

            # Count vehicles
            vehicle_count = sum(1 for result in results for box in result.boxes 
                              if model.names[int(box.cls)] in vehicle_classes)

            congestion = self.get_congestion_level(vehicle_count)
            signal_time = self.signal_controller.calculate_signal_time(vehicle_count)

            # Update last frame details
            self.last_frame_details = {
                'frame_number': frame_number,
                'vehicle_count': vehicle_count,
                'signal_time': signal_time,
                'congestion_level': congestion
            }

            # Send congestion updates (throttled to 1 per second)
            if time.time() - last_update_time > 1:
                print(f"\n--- Frame {frame_number} Update ---")
                print(f"Current congestion at Signal {self.signal_controller.signal_id}: {congestion}")
                self.signal_controller.send_congestion_update(congestion, self.direction)
                last_update_time = time.time()

            # Display info
            self.display_info(result_frame, frame_number, vehicle_count, signal_time, congestion)
            cv2.imshow(f"Signal {self.signal_controller.signal_id}", result_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                self.stop_processing()

        self.cleanup()

    def display_info(self, frame, frame_num, vehicles, signal_time, congestion):
        cv2.putText(frame, f"Signal {self.signal_controller.signal_id}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        cv2.putText(frame, f"Vehicles: {vehicles}", (10, 70), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(frame, f"Signal Time: {signal_time:.1f}s", (10, 110), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        color = (0, 0, 255) if congestion == 'high' else (0, 255, 0) if congestion == 'medium' else (255, 255, 0)
        cv2.putText(frame, f"Congestion: {congestion}", (10, 150), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        cv2.putText(frame, "Press 'q' to stop", (10, 190), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    def stop_processing(self):
        self.processing = False

    def cleanup(self):
        if self.cap:
            self.cap.release()
        cv2.destroyAllWindows()

def start_signal_system():
    root = tk.Tk()
    root.title("Traffic Signal Controller")
    root.geometry("400x250")

    def start_processing():
        try:
            signal_id = int(signal_id_entry.get())
            neighbors = neighbor_entry.get().strip()
            neighbor_ids = [int(n) for n in neighbors.split(',')] if neighbors else []
            
            processor = VideoProcessor(signal_id, neighbor_ids)
            
            if webcam_var.get():
                processor.process_video(0)  # Webcam
            else:
                file_path = filedialog.askopenfilename(filetypes=[("Video Files", "*.mp4;*.avi")])
                if file_path:
                    processor.process_video(file_path)
        except ValueError:
            messagebox.showerror("Error", "Invalid signal/neighbor ID!")

    # GUI Layout
    tk.Label(root, text="Signal ID:").pack()
    signal_id_entry = tk.Entry(root)
    signal_id_entry.pack()

    tk.Label(root, text="Neighbor IDs (comma-separated):").pack()
    neighbor_entry = tk.Entry(root)
    neighbor_entry.pack()

    webcam_var = tk.BooleanVar(value=True)
    tk.Checkbutton(root, text="Use Webcam", variable=webcam_var).pack(pady=5)

    tk.Button(root, text="Start", command=start_processing).pack(pady=10)
    root.mainloop()

if __name__ == "__main__":
    start_signal_system()


0: 480x640 3 persons, 1 chair, 208.9ms
Speed: 22.9ms preprocess, 208.9ms inference, 18.3ms postprocess per image at shape (1, 3, 480, 640)

--- Frame 1 Update ---
Current congestion at Signal 1: low
Signal 1 sending to Signal 2: Congestion = low, Direction = outgoing
Signal 2 received update from Signal 1: Congestion = low, Direction = outgoing

0: 480x640 3 persons, 1 chair, 90.9ms
Speed: 3.4ms preprocess, 90.9ms inference, 2.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 3 persons, 1 chair, 95.8ms
Speed: 3.1ms preprocess, 95.8ms inference, 2.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 3 persons, 1 chair, 90.0ms
Speed: 2.0ms preprocess, 90.0ms inference, 1.4ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 3 persons, 1 chair, 89.1ms
Speed: 1.8ms preprocess, 89.1ms inference, 1.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 3 persons, 1 chair, 85.4ms
Speed: 1.9ms preprocess, 85.4ms inference, 1.3ms postprocess per imag

In [3]:
# Load YOLOv8 model
model = YOLO('yolov8n.pt')  # Automatically downloads if not present

# Vehicle classes to detect
vehicle_classes = ['car', 'truck', 'bus', 'bicycle', 'motorcycle']

class TrafficSignalController:
    def __init__(self, signal_id, neighbors=None):
        self.signal_id = signal_id
        self.current_signal_time = 10
        self.max_time = 70
        self.min_time = 10
        self.neighbor_signals = {}  # Stores neighbor congestion data
        self.congestion_level = "low"
        
        # Initialize neighbors (simulated)
        if neighbors:
            for neighbor_id in neighbors:
                self.neighbor_signals[neighbor_id] = {
                    'congestion': 'low',
                    'direction': 'outgoing',
                    'last_update': time.time()
                }

    def calculate_signal_time(self, frame_vehicle_count):
        """Adjust signal time based on vehicle count & neighbor congestion"""
        # Check for any high congestion neighbors (regardless of direction)
        has_high_congestion_neighbor = any(
            data['congestion'] == 'high'
            for data in self.neighbor_signals.values()
            if time.time() - data['last_update'] <= 5  # Only consider recent updates
        )

        # If any neighbor has high congestion, enforce max 50 seconds
        if has_high_congestion_neighbor:
            max_allowed = 50
            # Print special message about the restriction
            high_neighbors = [nid for nid, data in self.neighbor_signals.items() 
                            if data['congestion'] == 'high' and time.time() - data['last_update'] <= 5]
            for neighbor_id in high_neighbors:
                print(f"Signal {self.signal_id} received from Signal {neighbor_id}: "
                      f"Congestion = high, Message: \"Don't increase the Signal Time than 50 seconds\"")
        else:
            max_allowed = 70

        # Base calculation with the determined max allowed time
        if frame_vehicle_count <= 5:
            base_time = 10
        elif 6 <= frame_vehicle_count <= 25:
            base_time = 10 + (frame_vehicle_count - 5) * 2
        else:
            base_time = 50 + (frame_vehicle_count - 25) * 0.8

        # Apply the maximum time limit strictly
        final_time = min(base_time, max_allowed)
    
        # Ensure minimum time is respected
        final_time = max(self.min_time, final_time)

        return final_time

    def update_neighbor_status(self, neighbor_id, congestion, direction):
        """Update simulated neighbor's congestion status"""
        if neighbor_id in self.neighbor_signals:
            self.neighbor_signals[neighbor_id] = {
                'congestion': congestion,
                'direction': direction,
                'last_update': time.time()
            }
            # Print the received update
            print(f"Signal {neighbor_id} received update from Signal {self.signal_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")
            # Special message if congestion is high
            if congestion == 'high':
                print(f"Signal {self.signal_id} received from Signal {neighbor_id}: "
                      f"Congestion = high, Message: \"Don't increase the Signal Time than 50 seconds\"")

    def send_congestion_update(self, congestion, direction):
        """Simulate sending congestion status to neighbors"""
        self.congestion_level = congestion
        for neighbor_id in self.neighbor_signals:
            # Print the status being sent before updating
            print(f"Signal {self.signal_id} sending to Signal {neighbor_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")
            # Simulate neighbor receiving the update
            self.update_neighbor_status(neighbor_id, congestion, direction)

class VideoProcessor:
    def __init__(self, signal_id, neighbors=None):
        self.cap = None
        self.processing = False
        self.signal_controller = TrafficSignalController(signal_id, neighbors)
        self.last_frame_details = {
            'frame_number': 0,
            'vehicle_count': 0,
            'signal_time': 10,
            'congestion_level': 'low'
        }
        self.congestion_thresholds = {'low': 10, 'medium': 25, 'high': 40}
        self.direction = 'outgoing'  # Default traffic direction

    def get_congestion_level(self, count):
        if count < self.congestion_thresholds['low']:
            return 'low'
        elif count < self.congestion_thresholds['medium']:
            return 'medium'
        else:
            return 'high'

    def process_video(self, video_source):
        self.cap = cv2.VideoCapture(video_source)
        if not self.cap.isOpened():
            messagebox.showerror("Error", "Could not open video source!")
            return

        self.processing = True
        frame_number = 0
        last_update_time = 0

        while self.processing:
            ret, frame = self.cap.read()
            if not ret:
                break

            frame_number += 1
            results = model(frame)
            result_frame = results[0].plot()

            # Count vehicles
            vehicle_count = sum(1 for result in results for box in result.boxes 
                              if model.names[int(box.cls)] in vehicle_classes)

            congestion = self.get_congestion_level(vehicle_count)
            signal_time = self.signal_controller.calculate_signal_time(vehicle_count)

            # Update last frame details
            self.last_frame_details = {
                'frame_number': frame_number,
                'vehicle_count': vehicle_count,
                'signal_time': signal_time,
                'congestion_level': congestion
            }

            # Send congestion updates (throttled to 1 per second)
            if time.time() - last_update_time > 1:
                print(f"\n--- Frame {frame_number} Update ---")
                print(f"Current congestion at Signal {self.signal_controller.signal_id}: {congestion}")
                self.signal_controller.send_congestion_update(congestion, self.direction)
                last_update_time = time.time()

            # Display info
            self.display_info(result_frame, frame_number, vehicle_count, signal_time, congestion)
            cv2.imshow(f"Signal {self.signal_controller.signal_id}", result_frame)
            
            # Print the final allocated time for this signal
            print(f"Final allocated time for Signal {self.signal_controller.signal_id}: {signal_time:.1f} seconds")

            if cv2.waitKey(1) & 0xFF == ord('q'):
                self.stop_processing()

        self.cleanup()

    def display_info(self, frame, frame_num, vehicles, signal_time, congestion):
        cv2.putText(frame, f"Signal {self.signal_controller.signal_id}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        cv2.putText(frame, f"Vehicles: {vehicles}", (10, 70), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(frame, f"Signal Time: {signal_time:.1f}s", (10, 110), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        color = (0, 0, 255) if congestion == 'high' else (0, 255, 0) if congestion == 'medium' else (255, 255, 0)
        cv2.putText(frame, f"Congestion: {congestion}", (10, 150), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        cv2.putText(frame, "Press 'q' to stop", (10, 190), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    def stop_processing(self):
        self.processing = False

    def cleanup(self):
        if self.cap:
            self.cap.release()
        cv2.destroyAllWindows()

def start_signal_system():
    root = tk.Tk()
    root.title("Traffic Signal Controller")
    root.geometry("400x250")

    def start_processing():
        try:
            signal_id = int(signal_id_entry.get())
            neighbors = neighbor_entry.get().strip()
            neighbor_ids = [int(n) for n in neighbors.split(',')] if neighbors else []
            
            # Ask for initial congestion status of each neighbor
            neighbor_status = {}
            for neighbor_id in neighbor_ids:
                status = simpledialog.askstring("Neighbor Status", 
                                               f"Enter initial congestion status for Signal {neighbor_id} (low/medium/high):")
                direction = simpledialog.askstring("Traffic Direction", 
                                                 f"Enter traffic direction for Signal {neighbor_id} (incoming/outgoing):")
                neighbor_status[neighbor_id] = {
                    'congestion': status.lower() if status else 'low',
                    'direction': direction.lower() if direction else 'outgoing'
                }
            
            processor = VideoProcessor(signal_id, neighbor_ids)
            
            # Set initial neighbor statuses
            for neighbor_id, status in neighbor_status.items():
                processor.signal_controller.neighbor_signals[neighbor_id] = {
                    'congestion': status['congestion'],
                    'direction': status['direction'],
                    'last_update': time.time()
                }
            
            if webcam_var.get():
                processor.process_video(0)  # Webcam
            else:
                file_path = filedialog.askopenfilename(filetypes=[("Video Files", "*.mp4;*.avi")])
                if file_path:
                    processor.process_video(file_path)
        except ValueError:
            messagebox.showerror("Error", "Invalid signal/neighbor ID!")

    # GUI Layout
    tk.Label(root, text="Signal ID:").pack()
    signal_id_entry = tk.Entry(root)
    signal_id_entry.pack()

    tk.Label(root, text="Neighbor IDs (comma-separated):").pack()
    neighbor_entry = tk.Entry(root)
    neighbor_entry.pack()

    webcam_var = tk.BooleanVar(value=True)
    tk.Checkbutton(root, text="Use Webcam", variable=webcam_var).pack(pady=5)

    tk.Button(root, text="Start", command=start_processing).pack(pady=10)
    root.mainloop()

if __name__ == "__main__":
    start_signal_system()


0: 384x640 12 persons, 22 cars, 14 motorcycles, 2 trucks, 128.0ms
Speed: 9.1ms preprocess, 128.0ms inference, 7.2ms postprocess per image at shape (1, 3, 384, 640)
Signal 1 received from Signal 2: Congestion = high, Message: "Don't increase the Signal Time than 50 seconds"

--- Frame 1 Update ---
Current congestion at Signal 1: high
Signal 1 sending to Signal 2: Congestion = high, Direction = outgoing
Signal 2 received update from Signal 1: Congestion = high, Direction = outgoing
Signal 1 received from Signal 2: Congestion = high, Message: "Don't increase the Signal Time than 50 seconds"
Final allocated time for Signal 1: 50.0 seconds

0: 384x640 11 persons, 21 cars, 12 motorcycles, 2 trucks, 98.8ms
Speed: 5.5ms preprocess, 98.8ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)
Signal 1 received from Signal 2: Congestion = high, Message: "Don't increase the Signal Time than 50 seconds"
Final allocated time for Signal 1: 50.0 seconds

0: 384x640 16 persons, 20 cars, 14


### Basic Concept:
Each traffic signal constantly monitors its own congestion and shares this info with its neighbors. When it receives updates from neighbors, it adjusts its timing to help balance traffic flow across the whole area.

### How It Works Step-by-Step:

1. **Each Signal Knows Its Neighbors**
   - When you start the system, you tell each signal who its neighbors are (like Signal 1 knows Signals 2 and 3 are its neighbors)

2. **Constant Monitoring**
   - Each signal counts vehicles at its intersection every second
   - Determines if congestion is Low/Medium/High based on vehicle count

3. **Sharing Updates**
   - Every second, each signal sends messages like:
     *"Hey Neighbor! I'm Signal 1, my congestion is High, and traffic is coming your way (incoming)"*

4. **Making Adjustments**
   When a signal gets updates from neighbors, it changes its timing like this:
   - **If neighbor has HIGH congestion** and traffic is coming your way:
     *Increase your green time* → Helps clear their backlog
   - **If neighbor has LOW congestion** and traffic is going away from you:
     *Decrease your green time* → Lets other busier signals get more time
   - Uses simple math: Base time (from its own traffic) + Adjustments (from neighbors)

5. **Safety Limits**
   - Never goes below minimum (e.g., 10 seconds)
   - Never goes above maximum (e.g., 70 seconds)

### Simple Example:

Signal 1 (Congestion: High) → Signal 2:
- Signal 2 sees: "Signal 1 is jammed and traffic is coming my way"
- Signal 2 responds: "I'll stay green longer to help clear Signal 1's traffic"

Signal 3 (Congestion: Low) → Signal 2:
- Signal 2 sees: "Signal 3 is clear and traffic is moving away"
- Signal 2 responds: "I can reduce my green time since Signal 3 doesn't need help"

### Final Decision:
Signal 2 combines:
- Its own traffic count
- Signal 1's "help needed" message (+5 seconds)
- Signal 3's "all clear" message (-2 seconds)
= Final timing adjustment

This way, all signals work together like a team to keep traffic flowing smoothly across multiple intersections!

In [None]:


# Load YOLOv8 model
model = YOLO('yolov8n.pt')  # Automatically downloads if not present

# Vehicle classes to detect
vehicle_classes = ['car', 'truck', 'bus', 'bicycle', 'motorcycle']

class TrafficSignalController:
    def __init__(self, signal_id, neighbors=None):
        self.signal_id = signal_id
        self.current_signal_time = 10
        self.max_time = 70
        self.min_time = 10
        self.neighbor_signals = {}  # Stores neighbor congestion data
        self.congestion_level = "low"
        
        # Initialize neighbors (simulated)
        if neighbors:
            for neighbor_id in neighbors:
                self.neighbor_signals[neighbor_id] = {
                    'congestion': 'low',  # Default will be overwritten by user input
                    'direction': 'outgoing',
                    'last_update': time.time()
                }

    def calculate_signal_time(self, frame_vehicle_count):
        """Adjust signal time based on vehicle count & neighbor congestion"""
        # Check for any high congestion neighbors (regardless of direction)
        has_high_congestion_neighbor = any(
            data['congestion'] == 'high'
            for data in self.neighbor_signals.values()
            if time.time() - data['last_update'] <= 5  # Only consider recent updates
        )

        # If any neighbor has high congestion, enforce max 50 seconds
        if has_high_congestion_neighbor:
            max_allowed = 50
            # Print special message about the restriction
            high_neighbors = [nid for nid, data in self.neighbor_signals.items() 
                            if data['congestion'] == 'high' and time.time() - data['last_update'] <= 5]
            for neighbor_id in high_neighbors:
                print(f"Signal {self.signal_id} received from Signal {neighbor_id}: "
                      f"Congestion = high, Message: \"Don't increase the Signal Time than 50 seconds\"")
        else:
            max_allowed = 70

        # Base calculation with the determined max allowed time
        if frame_vehicle_count <= 5:
            base_time = 10
        elif 6 <= frame_vehicle_count <= 25:
            base_time = 10 + (frame_vehicle_count - 5) * 2
        else:
            base_time = 50 + (frame_vehicle_count - 25) * 0.8

        # Apply the maximum time limit strictly
        final_time = min(base_time, max_allowed)
    
        # Ensure minimum time is respected
        final_time = max(self.min_time, final_time)

        return final_time

    def update_neighbor_status(self, neighbor_id, congestion, direction):
        """Update simulated neighbor's congestion status"""
        if neighbor_id in self.neighbor_signals:
            self.neighbor_signals[neighbor_id] = {
                'congestion': congestion,
                'direction': direction,
                'last_update': time.time()
            }
            # Print the received update
            print(f"Signal {self.signal_id} received update from Signal {neighbor_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")
            # Special message if congestion is high
            if congestion == 'high':
                print(f"Signal {self.signal_id} received from Signal {neighbor_id}: "
                      f"Congestion = high, Message: \"Don't increase the Signal Time than 50 seconds\"")

    def send_congestion_update(self, congestion, direction):
        """Simulate sending congestion status to neighbors"""
        self.congestion_level = congestion
        for neighbor_id in self.neighbor_signals:
            # Print the status being sent before updating
            print(f"Signal {self.signal_id} sending to Signal {neighbor_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")
            # Simulate neighbor receiving the update
            self.update_neighbor_status(neighbor_id, congestion, direction)

class VideoProcessor:
    def __init__(self, signal_id, neighbors=None, initial_neighbor_status=None):
        self.cap = None
        self.processing = False
        self.signal_controller = TrafficSignalController(signal_id, neighbors)
        self.last_frame_details = {
            'frame_number': 0,
            'vehicle_count': 0,
            'signal_time': 10,
            'congestion_level': 'low'
        }
        self.congestion_thresholds = {'low': 10, 'medium': 25, 'high': 40}
        self.direction = 'outgoing'  # Default traffic direction

        # Set initial neighbor statuses if provided
        if initial_neighbor_status:
            for neighbor_id, status in initial_neighbor_status.items():
                self.signal_controller.neighbor_signals[neighbor_id] = {
                    'congestion': status['congestion'],
                    'direction': status['direction'],
                    'last_update': time.time()
                }

    def get_congestion_level(self, count):
        if count < self.congestion_thresholds['low']:
            return 'low'
        elif count < self.congestion_thresholds['medium']:
            return 'medium'
        else:
            return 'high'

    def process_video(self, video_source):
        self.cap = cv2.VideoCapture(video_source)
        if not self.cap.isOpened():
            messagebox.showerror("Error", "Could not open video source!")
            return

        self.processing = True
        frame_number = 0
        last_update_time = 0

        while self.processing:
            ret, frame = self.cap.read()
            if not ret:
                break

            frame_number += 1
            results = model(frame)
            result_frame = results[0].plot()

            # Count vehicles
            vehicle_count = sum(1 for result in results for box in result.boxes 
                              if model.names[int(box.cls)] in vehicle_classes)

            congestion = self.get_congestion_level(vehicle_count)
            signal_time = self.signal_controller.calculate_signal_time(vehicle_count)

            # Update last frame details
            self.last_frame_details = {
                'frame_number': frame_number,
                'vehicle_count': vehicle_count,
                'signal_time': signal_time,
                'congestion_level': congestion
            }

            # Send congestion updates (throttled to 1 per second)
            if time.time() - last_update_time > 1:
                print(f"\n--- Frame {frame_number} Update ---")
                print(f"Current congestion at Signal {self.signal_controller.signal_id}: {congestion}")
                self.signal_controller.send_congestion_update(congestion, self.direction)
                last_update_time = time.time()

            # Display info
            self.display_info(result_frame, frame_number, vehicle_count, signal_time, congestion)
            cv2.imshow(f"Signal {self.signal_controller.signal_id}", result_frame)
            
            # Print the final allocated time for this signal
            print(f"Final allocated time for Signal {self.signal_controller.signal_id}: {signal_time:.1f} seconds")

            if cv2.waitKey(1) & 0xFF == ord('q'):
                self.stop_processing()

        self.cleanup()

    def display_info(self, frame, frame_num, vehicles, signal_time, congestion):
        cv2.putText(frame, f"Signal {self.signal_controller.signal_id}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        cv2.putText(frame, f"Vehicles: {vehicles}", (10, 70), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(frame, f"Signal Time: {signal_time:.1f}s", (10, 110), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        color = (0, 0, 255) if congestion == 'high' else (0, 255, 0) if congestion == 'medium' else (255, 255, 0)
        cv2.putText(frame, f"Congestion: {congestion}", (10, 150), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        cv2.putText(frame, "Press 'q' to stop", (10, 190), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    def stop_processing(self):
        self.processing = False

    def cleanup(self):
        if self.cap:
            self.cap.release()
        cv2.destroyAllWindows()

def start_signal_system():
    root = tk.Tk()
    root.title("Traffic Signal Controller")
    root.geometry("400x250")

    def start_processing():
        try:
            signal_id = int(signal_id_entry.get())
            neighbors = neighbor_entry.get().strip()
            neighbor_ids = [int(n) for n in neighbors.split(',')] if neighbors else []
            
            # Ask for initial congestion status of each neighbor
            neighbor_status = {}
            for neighbor_id in neighbor_ids:
                while True:
                    status = simpledialog.askstring("Neighbor Status", 
                                                  f"Enter initial congestion status for Signal {neighbor_id} (low/medium/high):")
                    if status and status.lower() in ['low', 'medium', 'high']:
                        break
                    messagebox.showwarning("Invalid Input", "Please enter 'low', 'medium', or 'high'")
                
                while True:
                    direction = simpledialog.askstring("Traffic Direction", 
                                                     f"Enter traffic direction for Signal {neighbor_id} (incoming/outgoing):")
                    if direction and direction.lower() in ['incoming', 'outgoing']:
                        break
                    messagebox.showwarning("Invalid Input", "Please enter 'incoming' or 'outgoing'")
                
                neighbor_status[neighbor_id] = {
                    'congestion': status.lower(),
                    'direction': direction.lower()
                }
            
            # Create processor with initial neighbor statuses
            processor = VideoProcessor(signal_id, neighbor_ids, neighbor_status)
            
            if webcam_var.get():
                processor.process_video(0)  # Webcam
            else:
                file_path = filedialog.askopenfilename(filetypes=[("Video Files", "*.mp4;*.avi")])
                if file_path:
                    processor.process_video(file_path)
        except ValueError:
            messagebox.showerror("Error", "Invalid signal/neighbor ID!")

    # GUI Layout
    tk.Label(root, text="Signal ID:").pack()
    signal_id_entry = tk.Entry(root)
    signal_id_entry.pack()

    tk.Label(root, text="Neighbor IDs (comma-separated):").pack()
    neighbor_entry = tk.Entry(root)
    neighbor_entry.pack()

    webcam_var = tk.BooleanVar(value=True)
    tk.Checkbutton(root, text="Use Webcam", variable=webcam_var).pack(pady=5)

    tk.Button(root, text="Start", command=start_processing).pack(pady=10)
    root.mainloop()

if __name__ == "__main__":
    start_signal_system()


0: 384x640 12 persons, 22 cars, 14 motorcycles, 2 trucks, 86.1ms
Speed: 4.6ms preprocess, 86.1ms inference, 1.9ms postprocess per image at shape (1, 3, 384, 640)

--- Frame 1 Update ---
Current congestion at Signal 1: high
Signal 1 sending to Signal 2: Congestion = high, Direction = outgoing
Signal 1 received update from Signal 2: Congestion = high, Direction = outgoing
Signal 1 received from Signal 2: Congestion = high, Message: "Don't increase the Signal Time than 50 seconds"
Final allocated time for Signal 1: 60.4 seconds

0: 384x640 11 persons, 21 cars, 12 motorcycles, 2 trucks, 82.0ms
Speed: 3.1ms preprocess, 82.0ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Signal 1 received from Signal 2: Congestion = high, Message: "Don't increase the Signal Time than 50 seconds"
Final allocated time for Signal 1: 50.0 seconds

0: 384x640 16 persons, 20 cars, 14 motorcycles, 4 trucks, 76.1ms
Speed: 3.6ms preprocess, 76.1ms inference, 1.3ms postprocess per image at shape (

In [4]:
# Load YOLOv8 model
model = YOLO('yolov8n.pt')  # Automatically downloads if not present

# Vehicle classes to detect
vehicle_classes = ['car', 'truck', 'bus', 'bicycle', 'motorcycle']

class TrafficSignalController:
    def __init__(self, signal_id, neighbors=None):
        self.signal_id = signal_id
        self.current_signal_time = 10
        self.max_time = 70
        self.min_time = 10
        self.neighbor_signals = {}  # Stores neighbor congestion data
        self.congestion_level = "low"
        
        # Initialize neighbors (simulated)
        if neighbors:
            for neighbor_id, status in neighbors.items():
                self.neighbor_signals[neighbor_id] = {
                    'congestion': status['congestion'],
                    'direction': status['direction'],
                    'last_update': time.time()
                }

    def calculate_signal_time(self, frame_vehicle_count):
        """Adjust signal time based on vehicle count & neighbor congestion"""
        # Check for any high congestion neighbors with incoming direction
        has_high_incoming_neighbor = any(
            data['congestion'] in ['high', 'medium'] and data['direction'] == 'incoming'
            for data in self.neighbor_signals.values()
            if time.time() - data['last_update'] <= 5  # Only consider recent updates
        )

        # If any incoming neighbor has high/medium congestion, enforce max 50 seconds
        if has_high_incoming_neighbor:
            max_allowed = 50
            # Print special message about the restriction
            high_neighbors = [nid for nid, data in self.neighbor_signals.items() 
                            if data['congestion'] in ['high', 'medium'] and 
                            data['direction'] == 'incoming' and 
                            time.time() - data['last_update'] <= 5]
            for neighbor_id in high_neighbors:
                print(f"Signal {self.signal_id} received from Signal {neighbor_id}: "
                      f"Congestion = {data['congestion']}, Direction = incoming, "
                      f"Message: \"Don't increase the Signal Time beyond 50 seconds\"")
        else:
            max_allowed = 70

        # Base calculation with the determined max allowed time
        if frame_vehicle_count <= 5:
            base_time = 10
        elif 6 <= frame_vehicle_count <= 25:
            base_time = 10 + (frame_vehicle_count - 5) * 2
        else:
            base_time = 50 + (frame_vehicle_count - 25) * 0.8

        # Apply the maximum time limit strictly
        final_time = min(base_time, max_allowed)
    
        # Ensure minimum time is respected
        final_time = max(self.min_time, final_time)

        return final_time

    def update_neighbor_status(self, neighbor_id, congestion, direction):
        """Update simulated neighbor's congestion status"""
        if neighbor_id in self.neighbor_signals:
            self.neighbor_signals[neighbor_id] = {
                'congestion': congestion,
                'direction': direction,
                'last_update': time.time()
            }
            # Print the received update
            print(f"Signal {self.signal_id} received update from Signal {neighbor_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")
            # Special message if congestion is high/medium and direction is incoming
            if congestion in ['high', 'medium'] and direction == 'incoming':
                print(f"Signal {self.signal_id} received from Signal {neighbor_id}: "
                      f"Congestion = {congestion}, Direction = incoming, "
                      f"Message: \"Don't increase the Signal Time beyond 50 seconds\"")

    def send_congestion_update(self, congestion, direction):
        """Simulate sending congestion status to neighbors"""
        self.congestion_level = congestion
        for neighbor_id in self.neighbor_signals:
            # Print the status being sent before updating
            print(f"Signal {self.signal_id} sending to Signal {neighbor_id}: "
                  f"Congestion = {congestion}, Direction = {direction}")
            # Simulate neighbor receiving the update
            self.update_neighbor_status(neighbor_id, congestion, direction)

class VideoProcessor:
    def __init__(self, signal_id, neighbors=None):
        self.cap = None
        self.processing = False
        self.signal_controller = TrafficSignalController(signal_id, neighbors)
        self.last_frame_details = {
            'frame_number': 0,
            'vehicle_count': 0,
            'signal_time': 10,
            'congestion_level': 'low'
        }
        self.congestion_thresholds = {'low': 10, 'medium': 25, 'high': 40}
        self.direction = 'outgoing'  # Default traffic direction

    def get_congestion_level(self, count):
        if count < self.congestion_thresholds['low']:
            return 'low'
        elif count < self.congestion_thresholds['medium']:
            return 'medium'
        else:
            return 'high'

    def process_video(self, video_source):
        self.cap = cv2.VideoCapture(video_source)
        if not self.cap.isOpened():
            messagebox.showerror("Error", "Could not open video source!")
            return

        self.processing = True
        frame_number = 0
        last_update_time = 0

        while self.processing:
            ret, frame = self.cap.read()
            if not ret:
                break

            frame_number += 1
            results = model(frame)
            result_frame = results[0].plot()

            # Count vehicles
            vehicle_count = sum(1 for result in results for box in result.boxes 
                              if model.names[int(box.cls)] in vehicle_classes)

            congestion = self.get_congestion_level(vehicle_count)
            signal_time = self.signal_controller.calculate_signal_time(vehicle_count)

            # Update last frame details
            self.last_frame_details = {
                'frame_number': frame_number,
                'vehicle_count': vehicle_count,
                'signal_time': signal_time,
                'congestion_level': congestion
            }

            # Send congestion updates (throttled to 1 per second)
            if time.time() - last_update_time > 1:
                print(f"\n--- Frame {frame_number} Update ---")
                print(f"Current congestion at Signal {self.signal_controller.signal_id}: {congestion}")
                self.signal_controller.send_congestion_update(congestion, self.direction)
                last_update_time = time.time()

            # Display info
            self.display_info(result_frame, frame_number, vehicle_count, signal_time, congestion)
            cv2.imshow(f"Signal {self.signal_controller.signal_id}", result_frame)
            
            # Print the final allocated time for this signal
            print(f"Final allocated time for Signal {self.signal_controller.signal_id}: {signal_time:.1f} seconds")

            if cv2.waitKey(1) & 0xFF == ord('q'):
                self.stop_processing()

        self.cleanup()

    def display_info(self, frame, frame_num, vehicles, signal_time, congestion):
        cv2.putText(frame, f"Signal {self.signal_controller.signal_id}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        cv2.putText(frame, f"Vehicles: {vehicles}", (10, 70), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(frame, f"Signal Time: {signal_time:.1f}s", (10, 110), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        color = (0, 0, 255) if congestion == 'high' else (0, 255, 0) if congestion == 'medium' else (255, 255, 0)
        cv2.putText(frame, f"Congestion: {congestion}", (10, 150), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        cv2.putText(frame, "Press 'q' to stop", (10, 190), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    def stop_processing(self):
        self.processing = False

    def cleanup(self):
        if self.cap:
            self.cap.release()
        cv2.destroyAllWindows()

def start_signal_system():
    root = tk.Tk()
    root.title("Traffic Signal Controller")
    root.geometry("400x250")

    def start_processing():
        try:
            signal_id = int(signal_id_entry.get())
            neighbors = neighbor_entry.get().strip()
            neighbor_ids = [int(n.strip()) for n in neighbors.split(',')] if neighbors else []
            
            # Ask for initial congestion status and direction of each neighbor
            neighbor_status = {}
            for neighbor_id in neighbor_ids:
                status = simpledialog.askstring("Neighbor Status", 
                                               f"Enter initial congestion status for Signal {neighbor_id} (low/medium/high):")
                direction = simpledialog.askstring("Traffic Direction", 
                                                 f"Enter traffic direction for Signal {neighbor_id} (incoming/outgoing):")
                neighbor_status[neighbor_id] = {
                    'congestion': status.lower() if status else 'low',
                    'direction': direction.lower() if direction else 'outgoing'
                }
            
            processor = VideoProcessor(signal_id, neighbor_status)
            
            # Set traffic direction for this signal
            direction = simpledialog.askstring("Traffic Direction", 
                                             f"Enter traffic direction for Signal {signal_id} (incoming/outgoing):")
            if direction:
                processor.direction = direction.lower()
            
            if webcam_var.get():
                processor.process_video(0)  # Webcam
            else:
                file_path = filedialog.askopenfilename(filetypes=[("Video Files", "*.mp4;*.avi")])
                if file_path:
                    processor.process_video(file_path)
        except ValueError:
            messagebox.showerror("Error", "Invalid signal/neighbor ID!")

    # GUI Layout
    tk.Label(root, text="Signal ID:").pack()
    signal_id_entry = tk.Entry(root)
    signal_id_entry.pack()

    tk.Label(root, text="Neighbor IDs (comma-separated):").pack()
    neighbor_entry = tk.Entry(root)
    neighbor_entry.pack()

    webcam_var = tk.BooleanVar(value=True)
    tk.Checkbutton(root, text="Use Webcam", variable=webcam_var).pack(pady=5)

    tk.Button(root, text="Start", command=start_processing).pack(pady=10)
    root.mainloop()

if __name__ == "__main__":
    start_signal_system()


0: 384x640 12 persons, 22 cars, 14 motorcycles, 2 trucks, 101.5ms
Speed: 6.3ms preprocess, 101.5ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

--- Frame 1 Update ---
Current congestion at Signal 1: high
Signal 1 sending to Signal 2: Congestion = high, Direction = outgoing
Signal 1 received update from Signal 2: Congestion = high, Direction = outgoing
Final allocated time for Signal 1: 60.4 seconds

0: 384x640 11 persons, 21 cars, 12 motorcycles, 2 trucks, 126.2ms
Speed: 4.6ms preprocess, 126.2ms inference, 2.5ms postprocess per image at shape (1, 3, 384, 640)
Final allocated time for Signal 1: 58.0 seconds

0: 384x640 16 persons, 20 cars, 14 motorcycles, 4 trucks, 97.1ms
Speed: 4.3ms preprocess, 97.1ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)
Final allocated time for Signal 1: 60.4 seconds

0: 384x640 10 persons, 20 cars, 14 motorcycles, 4 trucks, 66.3ms
Speed: 3.5ms preprocess, 66.3ms inference, 1.5ms postprocess per image at shape (1, 3,

Exception in Tkinter callback
Traceback (most recent call last):
  File "c:\Users\chand\AppData\Local\Programs\Python\Python312\Lib\tkinter\__init__.py", line 1968, in __call__
    return self.func(*args)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\chand\AppData\Local\Temp\ipykernel_9924\4254705668.py", line 224, in start_processing
    processor.process_video(file_path)
  File "C:\Users\chand\AppData\Local\Temp\ipykernel_9924\4254705668.py", line 138, in process_video
    signal_time = self.signal_controller.calculate_signal_time(vehicle_count)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\chand\AppData\Local\Temp\ipykernel_9924\4254705668.py", line 44, in calculate_signal_time
    f"Congestion = {data['congestion']}, Direction = incoming, "
                    ^^^^
NameError: name 'data' is not defined



0: 384x640 2 persons, 17 cars, 3 buss, 98.0ms
Speed: 3.4ms preprocess, 98.0ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

--- Frame 1 Update ---
Current congestion at Signal 1: medium
Signal 1 sending to Signal 2: Congestion = medium, Direction = incoming
Signal 1 received update from Signal 2: Congestion = medium, Direction = incoming
Signal 1 received from Signal 2: Congestion = medium, Direction = incoming, Message: "Don't increase the Signal Time beyond 50 seconds"
Final allocated time for Signal 1: 40.0 seconds

0: 384x640 4 persons, 17 cars, 4 buss, 89.4ms
Speed: 3.8ms preprocess, 89.4ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)


Exception in Tkinter callback
Traceback (most recent call last):
  File "c:\Users\chand\AppData\Local\Programs\Python\Python312\Lib\tkinter\__init__.py", line 1968, in __call__
    return self.func(*args)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\chand\AppData\Local\Temp\ipykernel_9924\4254705668.py", line 224, in start_processing
    processor.process_video(file_path)
  File "C:\Users\chand\AppData\Local\Temp\ipykernel_9924\4254705668.py", line 138, in process_video
    signal_time = self.signal_controller.calculate_signal_time(vehicle_count)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\chand\AppData\Local\Temp\ipykernel_9924\4254705668.py", line 44, in calculate_signal_time
    f"Congestion = {data['congestion']}, Direction = incoming, "
                    ^^^^
NameError: name 'data' is not defined
