In [7]:

from ultralytics import YOLO
import cv2
import torch
from pathlib import Path
import time
from collections import defaultdict
import pyttsx3  # Text-to-speech library
import threading  # For non-blocking speech

# Initialize text-to-speech engine
tts_engine = pyttsx3.init()
tts_engine.setProperty('rate', 150)  # Speed of speech (words per minute)
tts_engine.setProperty('volume', 1.0)  # Volume level (0.0 to 1.0)

# Global variables for tracking and triggers
object_tracker = defaultdict(lambda: {'count': 0, 'last_seen': 0})
trigger_cooldown = defaultdict(float)
last_detection_time = 0
description_cooldown = 5  # seconds between descriptions for the same object type
last_spoken = ""  # Track last spoken phrase to avoid repeats

# Object descriptions database
OBJECT_DESCRIPTIONS = {
    'backpack': "A backpack is a bag carried on one's back, typically made of cloth or leather with straps over the shoulders.",
    'bottle-a': "A bottle is a container with a neck that is narrower than the body, used for storing liquids.",
    'bottle-b': "A second type of bottle, possibly different in shape or size from bottle-a.",
    'bowl': "A bowl is a round, deep dish used for preparing or serving food.",
    'casserole': "A casserole is a large, deep dish used both in the oven and as a serving vessel.",
    'chair': "A chair is a piece of furniture designed for sitting, typically with four legs and a back.",
    'cup': "A cup is a small open container used for drinking, usually with a handle.",
    'fork': "A fork is a utensil with prongs used for eating or serving food.",
    'frigo': "A refrigerator is a cooling appliance used to preserve food at low temperatures.",
    'glass': "A glass is a hard, brittle substance made by fusing sand with soda and lime, used for drinking containers.",
    'handbag': "A handbag is a small bag used to carry personal items, typically carried by women.",
    'iphone': "An iPhone is a line of smartphones designed and marketed by Apple Inc.",
    'knife': "A knife is a tool with a cutting edge or blade, used for cutting or as a weapon.",
    'lamp': "A lamp is a device that produces light, typically using electricity or oil.",
    'laptop': "A laptop is a portable computer suitable for use while traveling.",
    'macbook': "A MacBook is a brand of Macintosh laptop computers by Apple Inc.",
    'micro-ondes': "A microwave is an electric oven that heats and cooks food by exposing it to electromagnetic radiation.",
    'oldphone': "An old phone refers to earlier models of telephones, possibly rotary or early mobile phones.",
    'paperbag': "A paper bag is a bag made of paper, usually used for carrying goods.",
    'plate': "A plate is a flat dish for holding food, typically circular and made of china or other materials.",
    'smartphone': "A smartphone is a mobile phone with advanced features beyond basic calling, typically with internet access.",
    'sofa': "A sofa is a long upholstered seat with a back and arms, for two or more people.",
    'spoon': "A spoon is a utensil with a shallow bowl on a handle, used for eating, stirring, and serving food.",
    'table': "A table is a piece of furniture with a flat top and one or more legs, used as a surface for working or eating.",
    'washmachine': "A washing machine is a machine for washing laundry, such as clothing and sheets."
}

def speak(text, priority=1):
    """Speak text using TTS (runs in a separate thread to avoid blocking)"""
    global last_spoken

    # Don't repeat the same phrase or interrupt higher priority messages
    if text == last_spoken or (tts_engine.isBusy() and priority < 2):
        return

    last_spoken = text
    threading.Thread(target=tts_engine.say, args=(text,)).start()
    try:
        # This will run in the thread
        tts_engine.runAndWait()
    except RuntimeError:  # Ignore errors from thread management
        pass

def get_object_description(class_name):
    """Returns a detailed description of the detected object"""
    return OBJECT_DESCRIPTIONS.get(class_name, f"A {class_name} was detected.")

def trigger_action(class_name, confidence, box_xyxy, frame_shape):
    """
    Enhanced function to trigger actions and provide descriptions when objects are detected.
    Now includes text-to-speech output.
    """
    global last_detection_time

    current_time = time.time()
    box_area = (box_xyxy[2] - box_xyxy[0]) * (box_xyxy[3] - box_xyxy[1])
    frame_area = frame_shape[0] * frame_shape[1]
    relative_size = box_area / frame_area

    # Update object tracking
    object_tracker[class_name]['count'] += 1
    object_tracker[class_name]['last_seen'] = current_time

    # Basic detection info (printed to console)
    detection_msg = f"{class_name} detected with {confidence:.0%} confidence"
    print(f"\n{detection_msg}")
    speak(detection_msg, priority=1)

    # Position analysis
    center_x = (box_xyxy[0] + box_xyxy[2]) // 2
    center_y = (box_xyxy[1] + box_xyxy[3]) // 2

    # Determine position in frame
    x_pos = "left" if center_x < frame_shape[1] // 3 else "right" if center_x > 2 * frame_shape[1] // 3 else "center"
    y_pos = "top" if center_y < frame_shape[0] // 3 else "bottom" if center_y > 2 * frame_shape[0] // 3 else "middle"
    position = f"{x_pos} {y_pos}"

    print(f"  Position: {position}, Size: {relative_size:.1%}")

    # Speak position if object is large enough
    if relative_size > 0.1:  # Only announce position for significant objects
        speak(f"located at {x_pos} {y_pos}", priority=1)

    # Display object description (with cooldown)
    if current_time - trigger_cooldown.get(class_name, 0) > description_cooldown:
        description = get_object_description(class_name)
        print(f"\nAI DESCRIPTION: {description}")
        speak(description, priority=2)
        trigger_cooldown[class_name] = current_time

    # Special triggers with TTS alerts
    trigger_cooldown_time = 10  # seconds between allowed special triggers

    # Kitchen safety triggers
    if class_name == 'knife' and confidence > 0.75:
        if current_time - trigger_cooldown.get('knife_safety', 0) > trigger_cooldown_time:
            alert = "Safety warning! Knife detected. Please handle with care."
            print(f"\nSAFETY NOTICE: {alert}")
            speak(alert, priority=3)  # Highest priority
            trigger_cooldown['knife_safety'] = current_time

    # Appliance triggers
    elif class_name == 'frigo' and confidence > 0.7:
        if current_time - trigger_cooldown.get('frigo_check', 0) > trigger_cooldown_time:
            alert = "Refrigerator detected. Optimal temperature is zero to four degrees Celsius."
            if relative_size > 0.3:
                alert += " Warning: The door appears to be open."
            print(f"\nAPPLIANCE NOTICE: {alert}")
            speak(alert, priority=3)
            trigger_cooldown['frigo_check'] = current_time

    # Electronic devices
    elif class_name in ['laptop', 'macbook'] and confidence > 0.8:
        if current_time - trigger_cooldown.get('laptop_notice', 0) > trigger_cooldown_time:
            alert = "Laptop detected. Remember to take regular breaks from screen time."
            print(f"\nDEVICE NOTICE: {alert}")
            speak(alert, priority=2)
            trigger_cooldown['laptop_notice'] = current_time

    # Phone detection
    elif class_name in ['iphone', 'smartphone'] and confidence > 0.85:
        if current_time - trigger_cooldown.get('phone_notice', 0) > trigger_cooldown_time:
            alert = "Smartphone detected. Consider putting it down if not needed."
            print(f"\nDEVICE NOTICE: {alert}")
            speak(alert, priority=2)
            trigger_cooldown['phone_notice'] = current_time

    # Update last detection time
    last_detection_time = current_time

def cleanup_old_objects():
    """Clean up objects not seen in the last 30 seconds from tracker"""
    current_time = time.time()
    to_delete = [name for name, data in object_tracker.items()
                 if current_time - data['last_seen'] > 30]
    for name in to_delete:
        del object_tracker[name]

def print_detection_summary():
    """Print summary of detected objects and speak if new items"""
    if object_tracker:
        summary = "\nDETECTION SUMMARY:"
        print(summary)

        # Build spoken summary for important items
        spoken_items = []
        for obj, data in object_tracker.items():
            line = f"  - {obj}: detected {data['count']} times"
            print(line)

            # Only speak items detected recently and more than once
            if time.time() - data['last_seen'] < 10 and data['count'] > 1:
                spoken_items.append(f"{data['count']} {obj}s")

        if spoken_items:
            speak("Current items: " + ", ".join(spoken_items), priority=2)
    else:
        print("\nNo objects detected yet.")

def run_custom_inference():
    """
    Runs inference using a custom-trained YOLOv8 model with:
    - Text-to-speech descriptions
    - AI triggers with vocal alerts
    - Real-time object tracking
    """
    # Configuration
    model_path_str = 'yolo_room_detection_result/cpu_run12/weights/best.pt'
    source_input = '0'  # Default webcam

    confidence_threshold = 0.5
    iou_threshold = 0.45

    # Load Model
    model_path = Path(model_path_str).resolve()
    if not model_path.exists():
        error_msg = f"Error: Model weights file not found at {model_path}"
        print(error_msg)
        speak(error_msg, priority=3)
        return

    try:
        model = YOLO(model_path)
        load_msg = f"Loaded custom YOLO model with {len(model.names)} classes"
        print(load_msg)
        speak(load_msg, priority=1)
    except Exception as e:
        error_msg = f"Error loading YOLO model: {e}"
        print(error_msg)
        speak(error_msg, priority=3)
        return

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Using device: {device}")

    # Source Handling
    is_webcam = False
    is_image = False
    is_video_or_stream = False

    if isinstance(source_input, str) and source_input.isdigit():
        processed_source = int(source_input)
        is_webcam = True
    elif isinstance(source_input, int):
        processed_source = source_input
        is_webcam = True
    elif isinstance(source_input, str):
        source_path = Path(source_input)
        if source_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp', '.tif', '.tiff', '.webp']:
            if not source_path.exists():
                error_msg = f"Image file not found at {source_path}"
                print(error_msg)
                speak(error_msg, priority=3)
                return
            processed_source = str(source_path.resolve())
            is_image = True
        elif source_path.suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv'] or \
             source_input.lower().startswith(("rtsp://", "http://", "https://")):
            if not (source_input.lower().startswith(("rtsp://", "http://", "https://")) or source_path.exists()):
                error_msg = f"Video file not found at {source_path}"
                print(error_msg)
                speak(error_msg, priority=3)
                return
            processed_source = source_input
            is_video_or_stream = True
        else:
            error_msg = f"Unsupported file type: {source_input}"
            print(error_msg)
            speak(error_msg, priority=3)
            return
    else:
        error_msg = f"Invalid source type: {source_input}"
        print(error_msg)
        speak(error_msg, priority=3)
        return

    # Processing
    if is_image:
        print(f"Processing image: {processed_source}")
        try:
            results = model(processed_source, conf=confidence_threshold, iou=iou_threshold, device=device)

            if results and results[0]:
                annotated_frame = results[0].plot()
                img_shape = results[0].orig_shape

                for box in results[0].boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    conf = float(box.conf[0])
                    cls_id = int(box.cls[0])
                    class_name = model.names[cls_id] if model.names and cls_id < len(model.names) else f"ClassID:{cls_id}"
                    trigger_action(class_name, conf, [x1, y1, x2, y2], img_shape)

                print_detection_summary()
                cv2.imshow("YOLOv8 Detection - Image", annotated_frame)
                cv2.waitKey(0)
            else:
                msg = "No objects detected in the image"
                print(msg)
                speak(msg, priority=1)

        except Exception as e:
            error_msg = f"Image processing error: {e}"
            print(error_msg)
            speak(error_msg, priority=3)
        finally:
            cv2.destroyAllWindows()

    elif is_video_or_stream or is_webcam:
        cap = None
        try:
            cap = cv2.VideoCapture(processed_source)
            if not cap.isOpened():
                error_msg = f"Could not open video source: {processed_source}"
                print(error_msg)
                speak(error_msg, priority=3)
                return

            source_name = "Webcam" if is_webcam else "Video"
            start_msg = f"Starting {source_name} detection. Press Q to quit or S for summary."
            print(start_msg)
            speak(start_msg, priority=1)

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    end_msg = f"{source_name} stream ended."
                    print(end_msg)
                    speak(end_msg, priority=1)
                    if not is_webcam:
                        break
                    continue

                # Clean up old objects periodically
                if time.time() % 10 < 0.1:
                    cleanup_old_objects()

                # Perform detection
                results = model(frame, conf=confidence_threshold, iou=iou_threshold, device=device, verbose=False)

                if results and results[0]:
                    annotated_frame = results[0].plot()
                    frame_shape = results[0].orig_shape

                    for box in results[0].boxes:
                        x1, y1, x2, y2 = map(int, box.xyxy[0])
                        conf = float(box.conf[0])
                        cls_id = int(box.cls[0])
                        class_name = model.names[cls_id] if model.names and cls_id < len(model.names) else f"ClassID:{cls_id}"
                        trigger_action(class_name, conf, [x1, y1, x2, y2], frame_shape)

                    cv2.imshow(f"YOLOv8 Detection - {source_name}", annotated_frame)
                else:
                    cv2.imshow(f"YOLOv8 Detection - {source_name}", frame)

                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    print_detection_summary()
                    break_msg = "Stopping object detection."
                    print(break_msg)
                    speak(break_msg, priority=1)
                    break
                elif key == ord('s'):
                    print_detection_summary()

        except Exception as e:
            error_msg = f"Video processing error: {e}"
            print(error_msg)
            speak(error_msg, priority=3)
        finally:
            if cap:
                cap.release()
            cv2.destroyAllWindows()
            print("Detection stopped.")

if __name__ == '__main__':
    intro = """
    YOLOv8 Object Detection with Voice Assistant
    Features:
    - Real-time object detection
    - Spoken descriptions of detected objects
    - Safety alerts for dangerous items
    - Device usage reminders
    Press Q to quit, S for summary during detection.
    """
    print(intro)
    speak("Starting YOLO object detection with voice feedback.", priority=1)
    run_custom_inference()


    YOLOv8 Object Detection with Voice Assistant
    Features:
    - Real-time object detection
    - Spoken descriptions of detected objects
    - Safety alerts for dangerous items
    - Device usage reminders
    Press Q to quit, S for summary during detection.
    
Loaded custom YOLO model with 25 classes
Using device: cpu
Starting Webcam detection. Press Q to quit or S for summary.

macbook detected with 66% confidence
  Position: center middle, Size: 99.7%

AI DESCRIPTION: A MacBook is a brand of Macintosh laptop computers by Apple Inc.

iphone detected with 72% confidence
  Position: left middle, Size: 56.1%

AI DESCRIPTION: An iPhone is a line of smartphones designed and marketed by Apple Inc.

laptop detected with 57% confidence
  Position: center middle, Size: 99.8%

AI DESCRIPTION: A laptop is a portable computer suitable for use while traveling.

chair detected with 51% confidence
  Position: center middle, Size: 90.0%

AI DESCRIPTION: A chair is a piece of furniture design

KeyboardInterrupt: 

In [12]:
!pip install pyttsx3

Collecting pyttsx3
  Downloading pyttsx3-2.98-py3-none-any.whl.metadata (3.8 kB)
Collecting comtypes (from pyttsx3)
  Downloading comtypes-1.4.11-py3-none-any.whl.metadata (7.2 kB)
Collecting pypiwin32 (from pyttsx3)
  Downloading pypiwin32-223-py3-none-any.whl.metadata (236 bytes)
Downloading pyttsx3-2.98-py3-none-any.whl (34 kB)
Downloading comtypes-1.4.11-py3-none-any.whl (246 kB)
Using cached pypiwin32-223-py3-none-any.whl (1.7 kB)
Installing collected packages: pypiwin32, comtypes, pyttsx3
Successfully installed comtypes-1.4.11 pypiwin32-223 pyttsx3-2.98



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
# detect_objects_cpu.py
# YOLOv8 Custom Object Detection Script (CPU-compatible version)
# Uses a fine-tuned YOLOv8 model for object detection on images, videos, or webcam

from ultralytics import YOLO
import cv2
import torch
from pathlib import Path

def trigger_action(class_name, confidence, box_xyxy, frame_shape):
    """
    Placeholder function to trigger actions based on detected objects.

    Args:
        class_name (str): Detected class name
        confidence (float): Confidence score (0-1)
        box_xyxy (list): Bounding box coordinates [x1, y1, x2, y2]
        frame_shape (tuple): Frame dimensions (height, width)
    """
    print(f"Detected: '{class_name}' (Confidence: {confidence:.2f}) at Box: {box_xyxy}")
    # Example action logic can be added here

def run_detection():
    """
    Runs object detection using a custom-trained YOLOv8 model.
    Supports images, videos, and webcam feeds with CPU fallback.
    """
    # --- Configuration ---
    model_path = 'yolo_room_detection_result/cpu_run12/weights/best.pt'  # Updated to match training output
    source_input = '0'  # '0' for webcam, or path to image/video
    confidence_threshold = 0.5  # Minimum detection confidence
    iou_threshold = 0.45       # NMS IoU threshold

    # --- Validate Paths ---
    model_path = Path(model_path).resolve()
    if not model_path.exists():
        print(f"Error: Model not found at '{model_path}'")
        print(f"Current directory: {Path.cwd()}")
        return

    # --- Load Model ---
    try:
        model = YOLO(model_path)
        print(f"\nModel loaded: {model_path}")
        print(f"Classes: {model.names}")
    except Exception as e:
        print(f"Error loading model: {e}")
        return

    # --- Hardware Setup ---
    device = 'cpu'  # Force CPU for consistency with training
    print("\n=== Hardware Setup ===")
    print(f"Using device: {device}")
    if torch.cuda.is_available():
        print("(GPU available but not being used)")

    # --- Process Input Source ---
    print("\n=== Processing Setup ===")
    if isinstance(source_input, str) and source_input.isdigit():
        source = int(source_input)
        source_type = "webcam"
        print(f"Webcam source: {source}")
    elif isinstance(source_input, str) and Path(source_input).exists():
        source = str(Path(source_input).resolve())
        if Path(source_input).suffix.lower() in ['.jpg', '.png', '.jpeg']:
            source_type = "image"
            print(f"Image source: {source}")
        else:
            source_type = "video"
            print(f"Video source: {source}")
    else:
        print(f"Invalid source: {source_input}")
        return

    # --- Run Detection ---
    print("\n=== Starting Detection ===")
    print("Press 'q' to quit during video/webcam processing")

    if source_type == "image":
        try:
            results = model(source, conf=confidence_threshold, iou=iou_threshold, device=device)

            if results and results[0]:
                annotated_frame = results[0].plot()
                img_shape = results[0].orig_shape

                for box in results[0].boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    conf = float(box.conf[0])
                    cls_id = int(box.cls[0])
                    class_name = model.names.get(cls_id, f"ClassID:{cls_id}")
                    trigger_action(class_name, conf, [x1, y1, x2, y2], img_shape)

                cv2.imshow("YOLOv8 Detection", annotated_frame)
                cv2.waitKey(0)
            else:
                print("No detections found")

        except Exception as e:
            print(f"Image processing error: {e}")
        finally:
            cv2.destroyAllWindows()

    else:  # Video or webcam
        cap = cv2.VideoCapture(source)
        if not cap.isOpened():
            print(f"Failed to open {source_type} source")
            return

        try:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                results = model(frame, conf=confidence_threshold, iou=iou_threshold, device=device, verbose=False)

                if results and results[0]:
                    annotated_frame = results[0].plot()
                    frame_shape = results[0].orig_shape

                    for box in results[0].boxes:
                        x1, y1, x2, y2 = map(int, box.xyxy[0])
                        conf = float(box.conf[0
                                     ])
                        cls_id = int(box.cls[0])
                        class_name = model.names.get(cls_id, f"ClassID:{cls_id}")
                        trigger_action(class_name, conf, [x1, y1, x2, y2], frame_shape)

                    cv2.imshow("YOLOv8 Detection", annotated_frame)
                else:
                    cv2.imshow("YOLOv8 Detection", frame)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        except Exception as e:
            print(f"{source_type} processing error: {e}")
        finally:
            cap.release()
            cv2.destroyAllWindows()

    print("\n=== Detection Complete ===")

if __name__ == '__main__':
    print("=== YOLOv8 CPU Detection ===")
    print("Note: This script uses CPU by default")
    print("Configure model_path and source_input as needed")
    print("----------------------------")
    run_detection()

=== YOLOv8 CPU Detection ===
Note: This script uses CPU by default
Configure model_path and source_input as needed
----------------------------

Model loaded: C:\Users\adity\Downloads\yolo_room_detection\yolo_room_detection_result\cpu_run12\weights\best.pt
Classes: {0: 'backpack', 1: 'bottle-a', 2: 'bottle-b', 3: 'bowl', 4: 'casserole', 5: 'chair', 6: 'cup', 7: 'fork', 8: 'frigo', 9: 'glass', 10: 'handbag', 11: 'iphone', 12: 'knife', 13: 'lamp', 14: 'laptop', 15: 'macbook', 16: 'micro-ondes', 17: 'oldphone', 18: 'paperbag', 19: 'plate', 20: 'smartphone', 21: 'sofa', 22: 'spoon', 23: 'table', 24: 'washmachine'}

=== Hardware Setup ===
Using device: cpu

=== Processing Setup ===
Webcam source: 0

=== Starting Detection ===
Press 'q' to quit during video/webcam processing
Detected: 'iphone' (Confidence: 0.72) at Box: [322, 179, 639, 479]
Detected: 'iphone' (Confidence: 0.75) at Box: [334, 185, 639, 480]
Detected: 'macbook' (Confidence: 0.50) at Box: [0, 0, 637, 480]
Detected: 'macbook' (C

KeyboardInterrupt: 