In [1]:
%pip install ultralytics opencv-python numpy


Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.




[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
import cv2
import threading
import numpy as np
from ultralytics import YOLO
import paho.mqtt.client as mqtt
import time
import smtplib
from email.mime.text import MIMEText
import random
import json

# MQTT Configuration
MQTT_BROKER = "192.168.1.8" 
MQTT_PORT = 1883
MQTT_TOPIC_GENERAL = "detection/general"
MQTT_TOPIC_ACCIDENT = "detection/accident"
MQTT_TOPIC_POTHOLE_SPEEDBUMP = "detection/pothole_speedbump"
MQTT_TOPIC_AMBULANCE = "emergency/ambulance"

# Email Configuration
SMTP_SERVER = "smtp.gmail.com"  
SMTP_PORT = 587
EMAIL_SENDER = "s.s.madhuresh1945@gmail.com" 
EMAIL_PASSWORD = "lpuo jllw uobw iwqn"  
EMAIL_RECIPIENT = "madh22084.cs@rmkec.ac.in"  

# Initialize MQTT Client
mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()

# Global variables to store detection results
results_general = None
results_accident = None
results_pothole_speedbump = None
frame_lock = threading.Lock()

# Ambulance-related global variables
ambulance_signal_received = False  # Flag to indicate if an ambulance signal is received

# Function to generate pseudo-random speeds and directions
def generate_vehicle_data():
    speeds = [random.randint(30, 80) for _ in range(5)]  # Speeds in km/h
    directions = ["North"]  # Cardinal directions
    return speeds, directions

# Function to generate a pseudo-random location
def generate_random_location():
    locations = [
        "Main Street and 1st Avenue",
        "Highway 401 near Exit 25",
        "Elm Street and Oak Road",
        "Downtown Boulevard and Central Lane",
        "Parkway Drive and Riverfront"
    ]
    return random.choice(locations)

# Function to send email notifications
def send_email(subject, body):
    try:
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = EMAIL_SENDER
        msg['To'] = EMAIL_RECIPIENT
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls() 
            server.login(EMAIL_SENDER, EMAIL_PASSWORD)  
            server.sendmail(EMAIL_SENDER, EMAIL_RECIPIENT, msg.as_string())  
        print("Email sent successfully!")
    except Exception as e:
        print(f"Failed to send email: {e}")

# MQTT Callback for Ambulance Signal
def on_message(client, userdata, message):
    global ambulance_signal_received
    if message.topic == MQTT_TOPIC_AMBULANCE:
        payload = message.payload.decode("utf-8")
        if payload == "Ambulance Arriving":
            print("Ambulance signal received! Pausing detection system...")
            ambulance_signal_received = True
        elif payload == "Ambulance Departed":
            print("Ambulance departed. Resuming detection system...")
            ambulance_signal_received = False

# Subscribe to the ambulance topic
mqtt_client.subscribe(MQTT_TOPIC_AMBULANCE)
mqtt_client.on_message = on_message

# Load the YOLOv8 models
model_general = YOLO(r"D:\uyir\objectDetection\New Models\yolov8s.pt")  
model_accident = YOLO(r"D:\uyir\objectDetection\New Models\i1-yolov8s.pt")  
model_pothole_speedbump = YOLO(r"D:\uyir\objectDetection\New Models\humps.pt")  

# Define class mappings for each model
CLASS_NAMES_GENERAL = {
    0: "Person",
    1: "Bicycle",
    2: "Car",
    3: "Bike",
    5: "Bus",
    7: "Truck",
    9: "Traffic Light",
    11: "Stop Sign",
    15: "Cat",
    16: "Dog",
    18: "Sheep",
    19: "Cow"
}
CLASS_NAMES_POTHOLE_SPEEDBUMP = {
    0: "Pothole",
    1: "Speed Breaker"
}
CLASS_NAMES_ACCIDENT = {
    0: "Accident"
}

# Function to run general object detection
def detect_general(frame):
    global results_general
    with frame_lock:
        results_general = model_general(frame)

# Function to run accident detection
def detect_accident(frame):
    global results_accident
    with frame_lock:
        results_accident = model_accident(frame)

# Function to run pothole/speed bump detection
def detect_pothole_speedbump(frame):
    global results_pothole_speedbump
    with frame_lock:
        results_pothole_speedbump = model_pothole_speedbump(frame)

# Function to draw bounding boxes on the frame and return filtered class names with confidence scores
def draw_boxes(frame, results, class_names, color=(0, 255, 0), label_prefix=""):
    if results is not None:
        detected_classes = []  # List to store detected class names and confidence scores
        for result in results:
            boxes = result.boxes.cpu().numpy()  # Get bounding boxes
            for box in boxes:
                cls_id = int(box.cls[0])  # Class ID
                if cls_id in class_names:  # Check if class ID is in the filtered list
                    x1, y1, x2, y2 = map(int, box.xyxy[0])  # Extract coordinates
                    conf = box.conf[0]  # Confidence score
                    class_name = f"{label_prefix}{class_names[cls_id]}"  # Map class ID to name
                    label = f"{class_name} ({conf:.2f})"
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                    cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
                    detected_classes.append((class_name, conf))  # Add class name and confidence to the list
        return detected_classes  # Return filtered class names with confidence scores
    return []

# Main function to capture webcam feed, run models in parallel, and publish MQTT messages
def main():
    cap = cv2.VideoCapture(0)  
    last_publish_time = time.time()  
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Failed to capture frame")
            break

        # Check if ambulance signal is active
        if ambulance_signal_received:
            print("Ambulance priority active. Skipping detections...")
            time.sleep(1)  # Sleep briefly to avoid high CPU usage
            continue

        # Create threads for each model
        thread_general = threading.Thread(target=detect_general, args=(frame,))
        thread_accident = threading.Thread(target=detect_accident, args=(frame,))
        thread_pothole_speedbump = threading.Thread(target=detect_pothole_speedbump, args=(frame,))

        # Start threads
        thread_general.start()
        thread_accident.start()
        thread_pothole_speedbump.start()

        # Wait for all threads to finish
        thread_general.join()
        thread_accident.join()
        thread_pothole_speedbump.join()

        # Draw bounding boxes and get filtered class names with confidence scores
        general_classes_with_conf = draw_boxes(frame, results_general, CLASS_NAMES_GENERAL, color=(0, 255, 0), label_prefix="")
        accident_classes = draw_boxes(frame, results_accident, CLASS_NAMES_ACCIDENT, color=(255, 0, 0), label_prefix="")
        pothole_speedbump_classes = draw_boxes(frame, results_pothole_speedbump, CLASS_NAMES_POTHOLE_SPEEDBUMP, color=(0, 0, 255), label_prefix="")

        # Generate pseudo speeds and directions
        speeds, directions = generate_vehicle_data()

        # Publish MQTT messages every 5 seconds
        current_time = time.time()
        if current_time - last_publish_time >= 5:
            # Handle general model detections
            if general_classes_with_conf:
                # Sort by confidence score in descending order
                general_classes_with_conf.sort(key=lambda x: x[1], reverse=True)
                highest_conf_class = general_classes_with_conf[0][0]  # Select the class with the highest confidence

                # Simulate speed and direction for the detected object
                speed = random.choice(speeds)
                direction = random.choice(directions)

                # Create a JSON payload
                payload = {
                    "class": highest_conf_class,
                    "speed": speed,
                    "direction": direction
                }

                # Publish the payload as a JSON string
                mqtt_client.publish(MQTT_TOPIC_GENERAL, json.dumps(payload))
                print(f"Published: {payload}")

            # Handle accident detections
            for accident_class_tuple in accident_classes:
                accident_class = accident_class_tuple[0]  # Extract the class name from the tuple
                mqtt_client.publish(MQTT_TOPIC_ACCIDENT, accident_class)
                if "Accident" in accident_class: 
                    random_location = generate_random_location() 
                    email_body = f"Accident detected at {random_location}. Please take necessary actions."
                    send_email("Accident Detected!", email_body)

            # Handle pothole/speed bump detections
            for pothole_speedbump_class_tuple in pothole_speedbump_classes:
                pothole_speedbump_class = pothole_speedbump_class_tuple[0]  # Extract the class name from the tuple
                mqtt_client.publish(MQTT_TOPIC_POTHOLE_SPEEDBUMP, pothole_speedbump_class)

            last_publish_time = current_time  

        # Display the frame with detections
        cv2.imshow("Multi-Model Detection", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release resources
    cap.release()
    cv2.destroyAllWindows()
    mqtt_client.loop_stop()
    mqtt_client.disconnect()

if __name__ == "__main__":
    main()

Failed to capture frame


In [3]:
from ultralytics import YOLO

# Path to your YOLOv8 model
MODEL_PATH = r"D:\uyir\objectDetection\New Models\yolov8s.pt"  # Replace with your model path

# Load the YOLOv8 model
model = YOLO(MODEL_PATH)

# Access and print the class names
class_names = model.names  # This is a dictionary mapping class IDs to class names
print("Class Names:")
for cls_id, cls_name in class_names.items():
    print(f"Class ID: {cls_id}, Class Name: {cls_name}")

Class Names:
Class ID: 0, Class Name: person
Class ID: 1, Class Name: bicycle
Class ID: 2, Class Name: car
Class ID: 3, Class Name: motorcycle
Class ID: 4, Class Name: airplane
Class ID: 5, Class Name: bus
Class ID: 6, Class Name: train
Class ID: 7, Class Name: truck
Class ID: 8, Class Name: boat
Class ID: 9, Class Name: traffic light
Class ID: 10, Class Name: fire hydrant
Class ID: 11, Class Name: stop sign
Class ID: 12, Class Name: parking meter
Class ID: 13, Class Name: bench
Class ID: 14, Class Name: bird
Class ID: 15, Class Name: cat
Class ID: 16, Class Name: dog
Class ID: 17, Class Name: horse
Class ID: 18, Class Name: sheep
Class ID: 19, Class Name: cow
Class ID: 20, Class Name: elephant
Class ID: 21, Class Name: bear
Class ID: 22, Class Name: zebra
Class ID: 23, Class Name: giraffe
Class ID: 24, Class Name: backpack
Class ID: 25, Class Name: umbrella
Class ID: 26, Class Name: handbag
Class ID: 27, Class Name: tie
Class ID: 28, Class Name: suitcase
Class ID: 29, Class Name: fris