In [2]:
import cv2
from ultralytics import YOLO
import pyttsx3
import time


In [3]:
# Load YOLOv8 model (medium size; change to 'yolov8n.pt' for faster inference)
model = YOLO('yolov8m.pt')

# Initialize text-to-speech engine
tts = pyttsx3.init()
tts.setProperty('rate', 150)  # Optional: slower speech for clarity


In [4]:
# Replace with your ESP32-CAM stream URL
stream_url = "http://192.168.1.20:81/stream"
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print("❌ Failed to connect to ESP32-CAM stream")
else:
    print("✅ Connected to ESP32-CAM stream")


✅ Connected to ESP32-CAM stream


In [5]:
# Only warn about these objects
DANGEROUS_CLASSES = {
    "knife", "scissors", "truck", "bus", "car", "motorbike",
    "dog", "bear", "bottle", "fire hydrant"
}

# Keep track of recently announced objects
recent_detections = set()
last_reset_time = time.time()

def speak(text):
    tts.say(text)
    tts.runAndWait()


In [6]:
# if cap.isOpened():
#     print("🎥 Press 'q' to stop")
#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             print("⚠️ Failed to grab frame")
#             break

#         # Run YOLO detection
#         results = model(frame, verbose=False)

#         # Annotate frame
#         annotated = results[0].plot()

#         # Display frame
#         cv2.imshow("ESP32-CAM + YOLOv8", annotated)

#         # Handle detections
#         for box in results[0].boxes:
#             cls_id = int(box.cls[0])
#             conf = float(box.conf[0])
#             name = model.names[cls_id]

#             if name in DANGEROUS_CLASSES and name not in recent_detections:
#                 print(f"⚠️ {name} detected ({conf:.2f})")
#                 speak(f"Warning: {name} ahead")
#                 recent_detections.add(name)

#         # Clear memory every 5 seconds
#         if time.time() - last_reset_time > 5:
#             recent_detections.clear()
#             last_reset_time = time.time()

#         # Exit condition
#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break

# # Cleanup
# cap.release()
# cv2.destroyAllWindows()


# Simple text to speech object detetcion

In [7]:
# For avoiding repeat announcements
recent_objects = set()
last_reset = time.time()

def speak(text):
    tts.say(text)
    tts.runAndWait()

if cap.isOpened():
    while True:
        ret, frame = cap.read()
        if not ret:
            print("⚠️ Failed to get frame")
            break

        results = model(frame, verbose=False)
        annotated_frame = results[0].plot()
        cv2.imshow("Live Detection", annotated_frame)

        # Loop through detected objects
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            class_name = model.names[cls_id]

            if class_name not in recent_objects:
                print(f"🗣️ Saying: {class_name}")
                speak(class_name)
                recent_objects.add(class_name)

        # Reset memory every 5 seconds to allow re-speaking
        if time.time() - last_reset > 5:
            recent_objects.clear()
            last_reset = time.time()

        # Quit when 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

# Release camera and window
cap.release()
cv2.destroyAllWindows()


KeyboardInterrupt: 

In [None]:
import time
import cv2
import pyttsx3
from ultralytics import YOLO

Run 'pip install torchvision==0.21' to fix torchvision or 'pip install -U torch torchvision' to update both.
For a full compatibility table see https://github.com/pytorch/vision#installation


In [None]:
# Load YOLOv8 model (medium size)
model = YOLO('yolov8m.pt')

# Initialize text-to-speech engine
tts = pyttsx3.init()
tts.setProperty('rate', 150)  # Adjust speech speed

In [None]:
# ESP32-CAM stream URL
stream_url = "http://192.168.1.20:81/stream"
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print("❌ Failed to connect to ESP32-CAM stream")
else:
    print("✅ Connected to ESP32-CAM stream")


✅ Connected to ESP32-CAM stream


In [None]:





# Object Categories
dangerous_objects = {
    "knife", "scissors", "fire hydrant", "train", "bus", "truck", "car", "motorcycle",
    "bicycle", "airplane", "boat", "skateboard", "surfboard", "baseball bat",
    "baseball glove", "tennis racket", "microwave", "oven", "toaster", "sink",
    "refrigerator", "hair drier", "toilet", "bear", "dog"
}

obstacle_objects = {
    "bench", "chair", "couch", "dining table", "bed", "tv", "vase", "potted plant",
    "parking meter", "stop sign", "traffic light", "remote", "keyboard", "laptop",
    "mouse", "cell phone", "book", "clock", "suitcase", "backpack", "handbag", "umbrella"
}

living_objects = {
    "person", "dog", "cat", "bird", "horse", "sheep", "cow", "elephant",
    "bear", "zebra", "giraffe"
}

# Track recent announcements
recent_detections = set()
last_reset_time = time.time()

def speak(text):
    tts.say(text)
    tts.runAndWait()

if cap.isOpened():
    print("🎥 Press 'q' to stop")
    while True:
        ret, frame = cap.read()
        if not ret:
            print("⚠️ Failed to grab frame")
            break

        # Run YOLO detection
        results = model(frame, verbose=False)

        # Annotate frame
        annotated = results[0].plot()
        cv2.imshow("ESP32-CAM + YOLOv8", annotated)

        # Process detections
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = model.names[cls_id]

            if name not in recent_detections:
                if name in dangerous_objects:
                    message = f"Warning: {name} detected. Be careful."
                elif name in obstacle_objects:
                    message = f"Obstacle ahead: {name}."
                elif name in living_objects:
                    message = f"{name} nearby."
                else:
                    message = name  # Fallback

                print(f"🗣️ Saying: {message} (Confidence: {conf:.2f})")
                speak(message)
                recent_detections.add(name)

        # Reset detections every 5 seconds
        if time.time() - last_reset_time > 5:
            recent_detections.clear()
            last_reset_time = time.time()

        # Exit on 'q'
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

# Cleanup
cap.release()
cv2.destroyAllWindows()


🎥 Press 'q' to stop
🗣️ Saying: person nearby. (Confidence: 0.31)
🗣️ Saying: person nearby. (Confidence: 0.44)
🗣️ Saying: tie (Confidence: 0.29)
🗣️ Saying: person nearby. (Confidence: 0.48)
🗣️ Saying: person nearby. (Confidence: 0.26)
🗣️ Saying: cat nearby. (Confidence: 0.43)
🗣️ Saying: cat nearby. (Confidence: 0.29)
🗣️ Saying: cat nearby. (Confidence: 0.25)
🗣️ Saying: person nearby. (Confidence: 0.26)
🗣️ Saying: person nearby. (Confidence: 0.46)
🗣️ Saying: Obstacle ahead: laptop. (Confidence: 0.27)
🗣️ Saying: cat nearby. (Confidence: 0.27)
🗣️ Saying: tie (Confidence: 0.26)
🗣️ Saying: cat nearby. (Confidence: 0.30)
🗣️ Saying: Obstacle ahead: remote. (Confidence: 0.30)
🗣️ Saying: tie (Confidence: 0.32)
🗣️ Saying: person nearby. (Confidence: 0.26)
🗣️ Saying: tie (Confidence: 0.38)
🗣️ Saying: cat nearby. (Confidence: 0.28)
🗣️ Saying: tie (Confidence: 0.40)
🗣️ Saying: tie (Confidence: 0.60)
🗣️ Saying: Obstacle ahead: remote. (Confidence: 0.33)
🗣️ Saying: cat nearby. (Confidence: 0.25)
🗣️ Sa

In [None]:
import time
import cv2
import pyttsx3
from ultralytics import YOLO

# Load model
model = YOLO('yolov8m.pt')

# Initialize TTS
tts = pyttsx3.init()
tts.setProperty('rate', 150)

# ESP32-CAM stream URL
stream_url = "http://192.168.1.20:81/stream"
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print("❌ Failed to connect to ESP32-CAM stream")
else:
    print("✅ Connected to ESP32-CAM stream")

# Object categories
dangerous_objects = {
    "knife", "scissors", "fire hydrant", "train", "bus", "truck", "car", "motorcycle",
    "bicycle", "airplane", "boat", "skateboard", "surfboard", "baseball bat",
    "baseball glove", "tennis racket", "microwave", "oven", "toaster", "sink",
    "refrigerator", "hair drier", "toilet", "bear", "dog"
}
obstacle_objects = {
    "bench", "chair", "couch", "dining table", "bed", "tv", "vase", "potted plant",
    "parking meter", "stop sign", "traffic light", "remote", "keyboard", "laptop",
    "mouse", "cell phone", "book", "clock", "suitcase", "backpack", "handbag", "umbrella"
}
living_objects = {
    "person", "dog", "cat", "bird", "horse", "sheep", "cow", "elephant",
    "bear", "zebra", "giraffe"
}

# Cooldown system: last spoken time per object class
spoken_timestamps = {}
cooldown_seconds = 10  # Speak again only after this time per object class

def speak(text):
    tts.say(text)
    tts.runAndWait()

if cap.isOpened():
    print("🎥 Press 'q' to stop")
    while True:
        ret, frame = cap.read()
        if not ret:
            print("⚠️ Failed to grab frame")
            break

        results = model(frame, verbose=False)
        annotated = results[0].plot()
        cv2.imshow("ESP32-CAM + YOLOv8", annotated)

        current_time = time.time()
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = model.names[cls_id]

            # Check cooldown
            last_spoken = spoken_timestamps.get(name, 0)
            if current_time - last_spoken >= cooldown_seconds:
                if name in dangerous_objects:
                    message = f"Warning: {name} detected. Be careful."
                elif name in obstacle_objects:
                    message = f"Obstacle ahead: {name}."
                elif name in living_objects:
                    message = f"{name} nearby."
                else:
                    message = name  # fallback

                print(f"🗣️ {message} (Confidence: {conf:.2f})")
                speak(message)
                spoken_timestamps[name] = current_time

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()


Run 'pip install torchvision==0.21' to fix torchvision or 'pip install -U torch torchvision' to update both.
For a full compatibility table see https://github.com/pytorch/vision#installation
✅ Connected to ESP32-CAM stream
🎥 Press 'q' to stop
🗣️ tie (Confidence: 0.31)
🗣️ person nearby. (Confidence: 0.26)
🗣️ cat nearby. (Confidence: 0.32)
🗣️ person nearby. (Confidence: 0.26)
🗣️ cat nearby. (Confidence: 0.36)
🗣️ tie (Confidence: 0.29)
🗣️ cat nearby. (Confidence: 0.32)
🗣️ Obstacle ahead: keyboard. (Confidence: 0.42)
🗣️ Obstacle ahead: book. (Confidence: 0.31)
🗣️ Obstacle ahead: laptop. (Confidence: 0.48)
🗣️ Obstacle ahead: tv. (Confidence: 0.37)
🗣️ Obstacle ahead: laptop. (Confidence: 0.36)
🗣️ cat nearby. (Confidence: 0.25)
🗣️ Obstacle ahead: chair. (Confidence: 0.39)
🗣️ Obstacle ahead: laptop. (Confidence: 0.26)
🗣️ Obstacle ahead: cell phone. (Confidence: 0.33)
🗣️ person nearby. (Confidence: 0.37)
🗣️ Obstacle ahead: laptop. (Confidence: 0.29)
🗣️ Obstacle ahead: tv. (Confidence: 0.38)
🗣️ 

In [None]:
import time
import cv2
import pyttsx3
from ultralytics import YOLO

# Load YOLOv8 model
model = YOLO('yolov8m.pt')

# TTS setup
tts = pyttsx3.init()
tts.setProperty('rate', 150)

# ESP32-CAM stream
stream_url = "http://192.168.1.20:81/stream"
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print("❌ Failed to connect to ESP32-CAM stream")
else:
    print("✅ Connected to ESP32-CAM stream")

# Categories
dangerous_objects = {
    "knife", "scissors", "fire hydrant", "train", "bus", "truck", "car", "motorcycle",
    "bicycle", "airplane", "boat", "skateboard", "surfboard", "baseball bat",
    "baseball glove", "tennis racket", "microwave", "oven", "toaster", "sink",
    "refrigerator", "hair drier", "toilet", "bear", "dog"
}
obstacle_objects = {
    "bench", "chair", "couch", "dining table", "bed", "tv", "vase", "potted plant",
    "parking meter", "stop sign", "traffic light", "remote", "keyboard", "laptop",
    "mouse", "cell phone", "book", "clock", "suitcase", "backpack", "handbag", "umbrella"
}
living_objects = {
    "person", "dog", "cat", "bird", "horse", "sheep", "cow", "elephant",
    "bear", "zebra", "giraffe"
}

# Timers and history
spoken_timestamps = {}
object_locations = {}
cooldown_seconds = 10
movement_threshold = 50  # pixels
prev_frame_classes = set()

def speak(text):
    tts.say(text)
    tts.runAndWait()

def get_center(box):
    x1, y1, x2, y2 = box.xyxy[0]
    return ((x1 + x2) / 2, (y1 + y2) / 2)

if cap.isOpened():
    print("🎥 Press 'q' to stop")
    while True:
        ret, frame = cap.read()
        if not ret:
            print("⚠️ Failed to grab frame")
            break

        results = model(frame, verbose=False)
        annotated = results[0].plot()
        cv2.imshow("ESP32-CAM + YOLOv8", annotated)

        current_time = time.time()
        current_classes = set()

        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = model.names[cls_id]
            current_classes.add(name)

            # Get object center
            center = get_center(box)

            # Check movement
            moved = True
            if name in object_locations:
                prev_x, prev_y = object_locations[name]
                moved = (abs(center[0] - prev_x) > movement_threshold) or \
                        (abs(center[1] - prev_y) > movement_threshold)

            object_locations[name] = center

            # Speak only if:
            # - Enough time passed (cooldown)
            # - Object is new (not in prev frame)
            # - Object moved significantly
            last_spoken = spoken_timestamps.get(name, 0)
            if (current_time - last_spoken >= cooldown_seconds and
                name not in prev_frame_classes and
                moved):

                if name in dangerous_objects:
                    message = f"Warning: {name} detected. Be careful."
                elif name in obstacle_objects:
                    message = f"Obstacle ahead: {name}."
                elif name in living_objects:
                    message = f"{name} nearby."
                else:
                    message = name  # fallback

                print(f"🗣️ {message} (Conf: {conf:.2f})")
                speak(message)
                spoken_timestamps[name] = current_time

        # Update previous frame state
        prev_frame_classes = current_classes

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

# Cleanup
cap.release()
cv2.destroyAllWindows()


✅ Connected to ESP32-CAM stream
🎥 Press 'q' to stop
🗣️ Obstacle ahead: book. (Conf: 0.31)
🗣️ cat nearby. (Conf: 0.37)
🗣️ tie (Conf: 0.28)
🗣️ person nearby. (Conf: 0.27)
🗣️ cat nearby. (Conf: 0.30)
🗣️ bird nearby. (Conf: 0.31)
🗣️ person nearby. (Conf: 0.26)
🗣️ cat nearby. (Conf: 0.27)
🗣️ Obstacle ahead: laptop. (Conf: 0.34)
🗣️ Obstacle ahead: tv. (Conf: 0.35)
🗣️ kite (Conf: 0.67)
🗣️ Obstacle ahead: laptop. (Conf: 0.31)
🗣️ cat nearby. (Conf: 0.28)
🗣️ person nearby. (Conf: 0.27)
🗣️ cat nearby. (Conf: 0.29)
🗣️ Obstacle ahead: book. (Conf: 0.41)
🗣️ cat nearby. (Conf: 0.25)
🗣️ person nearby. (Conf: 0.38)
🗣️ tie (Conf: 0.31)
🗣️ bottle (Conf: 0.31)
🗣️ Obstacle ahead: book. (Conf: 0.29)
🗣️ tie (Conf: 0.27)
🗣️ kite (Conf: 0.58)
🗣️ bird nearby. (Conf: 0.25)
🗣️ toothbrush (Conf: 0.55)
🗣️ Obstacle ahead: laptop. (Conf: 0.40)
🗣️ toothbrush (Conf: 0.43)
⚠️ Failed to grab frame


In [None]:
import cv2
import time
import pyttsx3
import openai
import io
import base64
from PIL import Image
from ultralytics import YOLO

# OpenAI API setup
openai.api_key = 'your-api-key-here'  # Replace with your actual API key

def speak(text):
    tts.say(text)
    tts.runAndWait()

def convert_cv2_to_bytes(img):
    _, buffer = cv2.imencode('.jpg', img)
    return io.BytesIO(buffer.tobytes())

def ask_gpt_about_image(image_bytes_io, user_prompt):
    base64_img = base64.b64encode(image_bytes_io.getvalue()).decode('utf-8')
    response = openai.ChatCompletion.create(
        model="gpt-4-vision-preview",
        messages=[
            {"role": "user", "content": [
                {"type": "text", "text": user_prompt},
                {"type": "image_url", "image_url": {
                    "url": f"data:image/jpeg;base64,{base64_img}" }}
            ]}
        ],
        max_tokens=200,
    )
    return response['choices'][0]['message']['content']

# Initialize YOLOv8 model
model = YOLO('yolov8m.pt')

# Initialize TTS
tts = pyttsx3.init()
tts.setProperty('rate', 150)

# ESP32-CAM Stream URL
stream_url = "http://192.168.1.24:81/stream"
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print("❌ Failed to connect to ESP32-CAM stream")
    exit()
else:
    print("✅ Connected to ESP32-CAM stream")

# Dangerous or important objects
high_priority = {
    "knife", "scissors", "fire hydrant", "bear", "truck",
    "bus", "dog", "person"
}

recent_detections = set()
description_cache = {}
gpt_last_called = {}
gpt_cooldown = 30  # seconds
last_reset_time = time.time()

print("🎥 Press 'q' to quit")
while True:
    ret, frame = cap.read()
    if not ret:
        print("⚠️ Failed to grab frame")
        break

    current_time = time.time()
    results = model(frame, verbose=False)
    annotated = results[0].plot()
    cv2.imshow("ESP32-CAM + YOLOv8", annotated)

    for box in results[0].boxes:
        cls_id = int(box.cls[0])
        conf = float(box.conf[0])
        name = model.names[cls_id]

        if name not in recent_detections:
            print(f"🔊 {name} detected ({conf:.2f})")
            speak(f"{name} ahead")
            recent_detections.add(name)

        # Handle GPT call for high-priority objects
        if name in high_priority:
            last_gpt = gpt_last_called.get(name, 0)
            if current_time - last_gpt > gpt_cooldown:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                object_crop = frame[y1:y2, x1:x2]
                img_bytes = convert_cv2_to_bytes(object_crop)

                prompt = (
                    f"Describe this object to a blind person. Explain what it is and whether it is dangerous, in 1-2 simple sentences."
                )
                try:
                    gpt_response = ask_gpt_about_image(img_bytes, prompt)
                    print(f"🧠 GPT: {gpt_response}")
                    speak(gpt_response)
                    gpt_last_called[name] = current_time
                except Exception as e:
                    print(f"❌ GPT error: {e}")

    if time.time() - last_reset_time > 5:
        recent_detections.clear()
        last_reset_time = time.time()

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [3]:
import cv2
import time
import pyttsx3
import io
import base64
from PIL import Image
from ultralytics import YOLO
import google.generativeai as genai
import os # For accessing environment variables

# --- Google Gemini API setup ---
# It's highly recommended to set your API key as an environment variable named GOOGLE_API_KEY.
# The `genai.configure` function will automatically pick it up.
try:
    genai.configure(api_key="AIzaSyCpv933505daIyqD6jX4ApPcj5U0CXOJGA")
except KeyError:
    print("❌ GOOGLE_API_KEY environment variable not set.")
    print("Please set the GOOGLE_API_KEY environment variable or hardcode it in the script (not recommended).")
    # You can uncomment the line below and replace 'YOUR_API_KEY_HERE' if you must hardcode it.
    # genai.configure(api_key="YOUR_API_KEY_HERE")
    exit() # Exit if API key is not configured

# Optional: Configure generation parameters for Gemini
# Adjust temperature for creativity (0.0 for deterministic, 1.0 for very creative)
# Other parameters like top_p, top_k can also be set.
gemini_generation_config = genai.types.GenerationConfig(temperature=0.4, max_output_tokens=200)

def speak(text):
    """Converts text to speech and plays it."""
    tts.say(text)
    tts.runAndWait()

def convert_cv2_to_bytes(img):
    """Converts an OpenCV image (NumPy array) to a BytesIO object."""
    _, buffer = cv2.imencode('.jpg', img)
    return io.BytesIO(buffer.tobytes())

def ask_gemini_about_image(image_bytes_io, user_prompt):
    """
    Sends an image and a text prompt to Google Gemini Pro Vision and returns the response.
    """
    try:
        # Convert BytesIO to PIL Image, which Gemini's API prefers
        pil_image = Image.open(image_bytes_io)

        # Initialize the Gemini Vision model
        # 'gemini-1.5-flash-latest' is generally faster and more cost-effective for vision tasks
        model = genai.GenerativeModel('gemini-1.5-flash-latest')

        # Create the content list for the multimodal prompt (text first, then image)
        contents = [user_prompt, pil_image]

        # Generate content from the model with specified generation config
        response = model.generate_content(contents, generation_config=gemini_generation_config)

        return response.text

    except Exception as e:
        print(f"❌ Error communicating with Gemini: {e}")
        return "Sorry, I couldn't get a detailed description right now."

# Initialize YOLOv8 model
# 'yolov8m.pt' is the medium model, good balance of speed and accuracy.
model = YOLO('yolov8l.pt')

# Initialize Text-to-Speech engine
tts = pyttsx3.init()
tts.setProperty('rate', 170) # Adjust speech rate (words per minute)

# ESP32-CAM Stream URL
# IMPORTANT: Replace with the actual IP address of your ESP32-CAM
stream_url = "http://192.168.1.20:81/stream"
cap = cv2.VideoCapture(stream_url)

if not cap.isOpened():
    print(f"❌ Failed to connect to ESP32-CAM stream at {stream_url}")
    print("Please check the IP address and ensure the ESP32-CAM is powered on and streaming.")
    exit()
else:
    print(f"✅ Connected to ESP32-CAM stream at {stream_url}")

# Define dangerous or important objects that warrant a detailed Gemini description
high_priority_objects = {
    "knife", "scissors", "fire hydrant", "bear", "truck",
    "bus", "dog", "person", "cat", "car", "bicycle", "motorcycle", "train"
}

# Pre-defined simple descriptions for high-priority objects when Gemini is on cooldown
local_high_priority_descriptions = {
    "knife": "Warning, a sharp knife is detected.",
    "scissors": "Be careful, scissors are nearby.",
    "person": "A person is present.",
    "dog": "A dog is detected.",
    "cat": "A cat is detected.",
    "car": "A car is detected.",
    "truck": "A truck is detected.",
    "bus": "A bus is detected.",
    "bicycle": "A bicycle is detected.",
    "motorcycle": "A motorcycle is detected.",
    "train": "A train is detected.",
    "fire hydrant": "A fire hydrant is detected.",
    "bear": "Warning, a bear is detected. Proceed with caution.",
}


# --- Cooldown and Detection Management ---
recent_detections = set() # Stores objects announced by YOLO in the last reset period
gemini_last_called = {}   # Stores the last time Gemini was called for a specific object
global_gemini_last_called_time = 0 # Stores the last time any Gemini call was made

gemini_cooldown_per_object = 30 # seconds - How long to wait before describing the same object again
global_gemini_cooldown = 10     # seconds - How long to wait before any new Gemini call

# Confidence threshold for YOLO detection before even considering Gemini
yolo_confidence_threshold_for_gemini = 0.65 # Only ask Gemini if YOLO is at least this confident

# How often to clear `recent_detections` to allow re-announcements
recent_detection_reset_interval = 5 # seconds
last_reset_time = time.time()

print("\n🎥 Press 'q' to quit the stream.")
print("Waiting for objects...")

while True:
    ret, frame = cap.read()
    if not ret:
        print("⚠️ Failed to grab frame. Reconnecting or ending stream.")
        # Attempt to reconnect or break
        cap.release()
        time.sleep(2) # Wait a bit before retrying
        cap = cv2.VideoCapture(stream_url)
        if not cap.isOpened():
            break # Exit if reconnection fails
        continue

    current_time = time.time()

    # Perform object detection with YOLOv8
    results = model(frame, verbose=False) # verbose=False to suppress verbose output
    annotated_frame = results[0].plot() # Draws bounding boxes and labels on the frame
    cv2.imshow("ESP32-CAM + YOLOv8", annotated_frame)

    current_frame_detections = set() # Objects detected in *this* specific frame

    for box in results[0].boxes:
        cls_id = int(box.cls[0])
        conf = float(box.conf[0])
        name = model.names[cls_id]

        current_frame_detections.add(name) # Add to current frame's detections

        # 1. Initial YOLO announcement for new detections
        if name not in recent_detections:
            print(f"🔊 {name} detected ({conf:.2f})")
            speak(f"{name} ahead")
            recent_detections.add(name) # Add to the set of recently announced objects

        # 2. Handle Gemini call for high-priority objects
        if name in high_priority_objects:
            # Check if YOLO's confidence is high enough for a Gemini call
            if conf < yolo_confidence_threshold_for_gemini:
                # print(f"DEBUG: {name} confidence too low ({conf:.2f}) for Gemini call.")
                continue # Skip Gemini call if confidence is low

            # Check global Gemini cooldown
            if current_time - global_gemini_last_called_time < global_gemini_cooldown:
                # print(f"DEBUG: Global Gemini cooldown active. Skipping {name} description.")
                # If global cooldown is active, but it's a high-priority object,
                # and we haven't given a basic announcement yet, give one.
                # This part is mostly covered by the `recent_detections` check above,
                # but adds robustness if the initial YOLO announcement was missed for some reason.
                if name not in recent_detections:
                    if name in local_high_priority_descriptions:
                        speak(local_high_priority_descriptions[name])
                        recent_detections.add(name) # Consider it announced if local fallback is used
                continue # Skip Gemini call

            # Check per-object Gemini cooldown
            last_call_for_this_object = gemini_last_called.get(name, 0)
            if current_time - last_call_for_this_object > gemini_cooldown_per_object:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                # Ensure crop coordinates are within frame bounds to avoid errors
                x1, y1, x2, y2 = max(0, x1), max(0, y1), min(frame.shape[1], x2), min(frame.shape[0], y2)

                # Only proceed if the cropped area is valid
                if (x2 - x1) > 0 and (y2 - y1) > 0:
                    object_crop = frame[y1:y2, x1:x2]
                    img_bytes = convert_cv2_to_bytes(object_crop)

                    prompt = (
                        f"Describe this specific detected object to a blind person. "
                        f"Explain what it is and whether it is dangerous or something to be aware of. "
                        f"Provide the description in 1-2 simple, concise sentences."
                    )
                    try:
                        print(f"Calling Gemini for: {name} (conf: {conf:.2f})...")
                        gemini_response = ask_gemini_about_image(img_bytes, prompt)
                        print(f"🧠 Gemini: {gemini_response}")
                        speak(gemini_response)
                        gemini_last_called[name] = current_time # Update per-object call time
                        global_gemini_last_called_time = current_time # Update global call time
                    except Exception as e:
                        print(f"❌ Gemini API call error: {e}")
                        # Fallback to local description if API call fails
                        if name in local_high_priority_descriptions:
                            speak(local_high_priority_descriptions[name])
                else:
                    print(f"DEBUG: Skipping Gemini for {name}, invalid crop area.")
            else:
                # Object in per-object cooldown, use local description if not recently announced (handled by recent_detections)
                # print(f"DEBUG: {name} in per-object cooldown. Skipping Gemini.")
                pass # Already handled by initial YOLO announcement and global cooldown checks

    # Periodically clear `recent_detections` to allow re-announcements of persistent objects
    if time.time() - last_reset_time > recent_detection_reset_interval:
        recent_detections.clear()
        last_reset_time = time.time()
        # print("DEBUG: recent_detections cleared.")


    # Exit loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        print("\nExiting application.")
        break

# Release video capture and destroy all OpenCV windows
cap.release()
cv2.destroyAllWindows()
tts.stop() # Ensure TTS engine is properly shut down

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8l.pt to 'yolov8l.pt'...


100%|██████████| 83.7M/83.7M [01:00<00:00, 1.46MB/s]


✅ Connected to ESP32-CAM stream at http://192.168.1.20:81/stream

🎥 Press 'q' to quit the stream.
Waiting for objects...
🔊 tie detected (0.58)
🔊 person detected (0.27)
🔊 tie detected (0.53)
🔊 person detected (0.45)
🔊 person detected (0.49)
🔊 tie detected (0.45)
🔊 tie detected (0.52)
🔊 person detected (0.38)
🔊 tie detected (0.57)
🔊 person detected (0.37)
🔊 tie detected (0.57)
🔊 person detected (0.38)
🔊 tie detected (0.48)
🔊 person detected (0.42)
🔊 person detected (0.50)
🔊 cat detected (0.26)
Calling Gemini for: person (conf: 0.65)...
🧠 Gemini: The image shows a person, likely a man, in a dimly lit room;  there is no immediate danger present, but exercise caution as the context is unclear.

🔊 tie detected (0.26)
🔊 person detected (0.63)
🔊 cat detected (0.32)
⚠️ Failed to grab frame. Reconnecting or ending stream.
🔊 person detected (0.53)
🔊 person detected (0.62)
🔊 cat detected (0.31)


KeyboardInterrupt: 

In [13]:
from dotenv import load_dotenv
load_dotenv()  # This will read .env and set os.environ


True

In [None]:
import cv2
import time
import pyttsx3
import io
import gc
import threading
import logging
import queue
from dataclasses import dataclass, field
from typing import Dict, Set, Optional, List
from PIL import Image
from ultralytics import YOLO
import google.generativeai as genai
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from requests.exceptions import RequestException
from contextlib import contextmanager

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Config:
    """Configuration class for easy parameter management"""
    YOLO_MODEL: str = 'yolov8m.pt'
    SPEECH_RATE: int = 170
    GEMINI_COOLDOWN: int = 10
    OBJECT_COOLDOWN: int = 30
    CONFIDENCE_THRESHOLD: float = 0.65
    STREAM_URL: str = "http://192.168.1.20:81/stream"
    DETECTION_RESET_INTERVAL: int = 5
    PROCESS_EVERY_N_FRAMES: int = 2
    MAX_GEMINI_RETRIES: int = 3
    GEMINI_TIMEOUT: int = 10
    TARGET_FPS: int = 30
    MAX_RECONNECTION_ATTEMPTS: int = 5
    THREAD_POOL_SIZE: int = 3
    MEMORY_CLEANUP_INTERVAL: int = 100  # frames
    HIGH_PRIORITY_OBJECTS: List[str] = field(default_factory=lambda: [
        "knife", "scissors", "fire hydrant", "bear", "truck",
        "bus", "dog", "person", "cat", "car", "bicycle", 
        "motorcycle", "train", "stop sign", "traffic light"
    ])

class ResourceManager:
    """Manages system resources and cleanup"""
    
    def __init__(self):
        self.frame_count = 0
        self.memory_cleanup_interval = 100
        
    def should_cleanup_memory(self) -> bool:
        """Determine if memory cleanup should be performed"""
        self.frame_count += 1
        return self.frame_count % self.memory_cleanup_interval == 0
    
    def cleanup_memory(self):
        """Force garbage collection and memory cleanup"""
        try:
            gc.collect()
            logger.debug(f"Memory cleanup performed at frame {self.frame_count}")
        except Exception as e:
            logger.warning(f"Memory cleanup failed: {e}")

class TTSManager:
    """Manages Text-to-Speech with proper resource handling"""
    
    def __init__(self, speech_rate: int = 170):
        self.speech_rate = speech_rate
        self.tts_queue = queue.Queue()
        self.is_running = True
        self.tts_thread = None
        self._init_tts()
        
    def _init_tts(self):
        """Initialize TTS engine with error handling"""
        try:
            self.tts = pyttsx3.init()
            self.tts.setProperty('rate', self.speech_rate)
            
            # Start TTS worker thread
            self.tts_thread = threading.Thread(target=self._tts_worker, daemon=True)
            self.tts_thread.start()
            logger.info("TTS engine initialized successfully")
        except Exception as e:
            logger.error(f"Failed to initialize TTS: {e}")
            self.tts = None
    
    def _tts_worker(self):
        """Worker thread for TTS operations"""
        while self.is_running:
            try:
                text = self.tts_queue.get(timeout=1.0)
                if text is None:  # Shutdown signal
                    break
                if self.tts:
                    self.tts.say(text)
                    self.tts.runAndWait()
                self.tts_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"TTS worker error: {e}")
    
    def speak_async(self, text: str):
        """Add text to TTS queue for asynchronous speech"""
        if self.is_running and not self.tts_queue.full():
            try:
                self.tts_queue.put_nowait(text)
            except queue.Full:
                logger.warning("TTS queue full, skipping speech")
    
    def cleanup(self):
        """Properly cleanup TTS resources"""
        try:
            self.is_running = False
            self.tts_queue.put(None)  # Shutdown signal
            
            if self.tts_thread and self.tts_thread.is_alive():
                self.tts_thread.join(timeout=2.0)
            
            if self.tts and hasattr(self.tts, 'stop'):
                try:
                    self.tts.stop()
                except:
                    pass  # Ignore stop() failures
                    
            logger.info("TTS cleanup completed")
        except Exception as e:
            logger.error(f"TTS cleanup error: {e}")

class FPSController:
    """Controls frame processing rate"""
    
    def __init__(self, target_fps: int = 30):
        self.target_fps = target_fps
        self.frame_time = 1.0 / target_fps
        self.last_frame_time = time.time()
    
    def wait_for_next_frame(self):
        """Wait to maintain target FPS"""
        current_time = time.time()
        elapsed = current_time - self.last_frame_time
        
        if elapsed < self.frame_time:
            time.sleep(self.frame_time - elapsed)
        
        self.last_frame_time = time.time()

class SmartVisionAssistant:
    def __init__(self, config: Config):
        self.config = config
        self.setup_gemini()
        self.setup_models()
        self.setup_tracking()
        self.setup_executors()
        self.setup_managers()
        
    def setup_gemini(self):
        """Initialize Gemini API with proper error handling"""
        api_key = os.getenv('GOOGLE_API_KEY')
        if not api_key:
            logger.error("GOOGLE_API_KEY environment variable not set")
            raise ValueError("API key not configured")
        
        try:
            genai.configure(api_key=api_key)
            self.gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
            self.generation_config = genai.types.GenerationConfig(
                temperature=0.4, 
                max_output_tokens=200
            )
            logger.info("Gemini API configured successfully")
        except Exception as e:
            logger.error(f"Failed to configure Gemini API: {e}")
            raise
    
    def setup_models(self):
        """Initialize YOLO model with validation"""
        try:
            self.yolo_model = YOLO(self.config.YOLO_MODEL)
            
            # Validate model names
            if hasattr(self.yolo_model, 'names') and self.yolo_model.names:
                self.model_names = self.yolo_model.names
                logger.info(f"YOLO model loaded with {len(self.model_names)} classes")
            else:
                logger.warning("YOLO model has no class names, using fallback")
                self.model_names = {i: f"class_{i}" for i in range(80)}  # COCO default
                
        except Exception as e:
            logger.error(f"Failed to load YOLO model: {e}")
            raise
    
    def setup_tracking(self):
        """Initialize tracking variables"""
        self.recent_detections: Set[str] = set()
        self.gemini_last_called: Dict[str, float] = {}
        self.global_gemini_last_called_time = 0
        self.last_reset_time = time.time()
        self.frame_count = 0
        self.reconnection_attempts = 0
        
        # Use configurable high-priority objects
        self.high_priority_objects = set(self.config.HIGH_PRIORITY_OBJECTS)
        
        self.local_descriptions = {
            "knife": "Warning: Sharp knife detected. Exercise caution.",
            "scissors": "Scissors present. Handle with care.",
            "person": "Person detected in the area.",
            "dog": "Dog spotted nearby.",
            "cat": "Cat detected in the vicinity.",
            "car": "Vehicle present - stay alert.",
            "truck": "Large truck detected.",
            "bus": "Bus in the area.",
            "bicycle": "Bicycle detected.",
            "motorcycle": "Motorcycle present.",
            "train": "Train detected - maintain safe distance.",
            "fire hydrant": "Fire hydrant located nearby.",
            "bear": "DANGER: Bear detected. Move to safety immediately.",
            "stop sign": "Stop sign ahead.",
            "traffic light": "Traffic light detected."
        }
    
    def setup_executors(self):
        """Initialize thread pools with controlled limits"""
        self.gemini_executor = ThreadPoolExecutor(
            max_workers=self.config.THREAD_POOL_SIZE,
            thread_name_prefix="GeminiWorker"
        )
        logger.info(f"Thread pools initialized with {self.config.THREAD_POOL_SIZE} workers")
    
    def setup_managers(self):
        """Initialize resource managers"""
        self.tts_manager = TTSManager(self.config.SPEECH_RATE)
        self.fps_controller = FPSController(self.config.TARGET_FPS)
        self.resource_manager = ResourceManager()
    
    def get_object_name(self, cls_id: int) -> str:
        """Safely get object name with fallback"""
        try:
            return self.model_names.get(cls_id, f"unknown_class_{cls_id}")
        except Exception:
            return f"class_{cls_id}"
    
    @contextmanager
    def safe_image_processing(self, image_data):
        """Context manager for safe image processing with cleanup"""
        pil_image = None
        try:
            if isinstance(image_data, io.BytesIO):
                image_data.seek(0)
                pil_image = Image.open(image_data)
            yield pil_image
        finally:
            if pil_image:
                pil_image.close()
            if isinstance(image_data, io.BytesIO):
                image_data.close()
    
    def convert_cv2_to_bytes(self, img) -> io.BytesIO:
        """Convert OpenCV image to BytesIO with error handling"""
        try:
            _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 85])
            return io.BytesIO(buffer.tobytes())
        except Exception as e:
            logger.error(f"Image conversion error: {e}")
            raise
    
    def ask_gemini_about_image(self, image_bytes_io: io.BytesIO, prompt: str) -> str:
        """Robust Gemini API call with proper resource management"""
        for attempt in range(self.config.MAX_GEMINI_RETRIES):
            try:
                with self.safe_image_processing(image_bytes_io) as pil_image:
                    if not pil_image:
                        return "Image processing failed"
                    
                    contents = [prompt, pil_image]
                    response = self.gemini_model.generate_content(
                        contents, 
                        generation_config=self.generation_config
                    )
                    
                    if response and response.text:
                        return response.text.strip()
                    else:
                        logger.warning(f"Empty Gemini response on attempt {attempt + 1}")
                        
            except Exception as e:
                logger.warning(f"Gemini API attempt {attempt + 1} failed: {e}")
                if attempt == self.config.MAX_GEMINI_RETRIES - 1:
                    return "Unable to get detailed description at this time."
                time.sleep(2 ** attempt)  # Exponential backoff
        
        return "Description service temporarily unavailable."
    
    def connect_to_stream(self) -> cv2.VideoCapture:
        """Establish connection to video stream with retry limits"""
        for attempt in range(self.config.MAX_RECONNECTION_ATTEMPTS):
            try:
                cap = cv2.VideoCapture(self.config.STREAM_URL)
                if cap.isOpened():
                    # Test if we can actually read a frame
                    ret, _ = cap.read()
                    if ret:
                        logger.info(f"Connected to stream: {self.config.STREAM_URL}")
                        self.reconnection_attempts = 0  # Reset counter on success
                        return cap
                    else:
                        cap.release()
                        logger.warning(f"Stream connected but no frames available (attempt {attempt + 1})")
                else:
                    logger.warning(f"Stream connection attempt {attempt + 1} failed")
                    
                time.sleep(min(2 ** attempt, 10))  # Exponential backoff with cap
                
            except Exception as e:
                logger.error(f"Stream connection error on attempt {attempt + 1}: {e}")
                time.sleep(2)
        
        self.reconnection_attempts += 1
        if self.reconnection_attempts >= self.config.MAX_RECONNECTION_ATTEMPTS:
            raise ConnectionError(f"Failed to connect to stream after {self.config.MAX_RECONNECTION_ATTEMPTS} attempts. Giving up.")
        else:
            raise ConnectionError(f"Failed to connect to stream, attempt {self.reconnection_attempts}")
    
    def process_detections(self, frame, results):
        """Process YOLO detection results"""
        current_time = time.time()
        current_frame_detections = set()
        
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = self.get_object_name(cls_id)
            
            current_frame_detections.add(name)
            
            # Initial YOLO announcement
            if name not in self.recent_detections:
                logger.info(f"New detection: {name} (confidence: {conf:.2f})")
                self.tts_manager.speak_async(f"{name} detected")
                self.recent_detections.add(name)
            
            # Handle high-priority objects
            if name in self.high_priority_objects:
                self.handle_high_priority_object(frame, box, name, conf, current_time)
    
    def handle_high_priority_object(self, frame, box, name: str, conf: float, current_time: float):
        """Handle high-priority object detection with controlled threading"""
        # Check confidence threshold
        if conf < self.config.CONFIDENCE_THRESHOLD:
            return
        
        # Check global cooldown
        if current_time - self.global_gemini_last_called_time < self.config.GEMINI_COOLDOWN:
            return
        
        # Check per-object cooldown
        last_call = self.gemini_last_called.get(name, 0)
        if current_time - last_call <= self.config.OBJECT_COOLDOWN:
            return
        
        # Extract object region safely
        try:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
            
            if (x2 - x1) <= 0 or (y2 - y1) <= 0:
                return
            
            # Create a copy to avoid race conditions
            object_crop = frame[y1:y2, x1:x2].copy()
            
            # Submit to thread pool (non-blocking)
            future = self.gemini_executor.submit(
                self.process_with_gemini,
                object_crop, name, current_time
            )
            
            # Update timestamps immediately to prevent duplicate calls
            self.gemini_last_called[name] = current_time
            self.global_gemini_last_called_time = current_time
            
        except Exception as e:
            logger.error(f"Error handling high-priority object {name}: {e}")
    
    def process_with_gemini(self, object_crop, name: str, timestamp: float):
        """Process object with Gemini API (runs in thread pool)"""
        img_bytes = None
        try:
            img_bytes = self.convert_cv2_to_bytes(object_crop)
            prompt = (
                f"Describe this {name} to a visually impaired person. "
                f"Focus on safety considerations and important details. "
                f"Keep it concise - 1-2 sentences maximum."
            )
            
            logger.info(f"Requesting Gemini description for: {name}")
            gemini_response = self.ask_gemini_about_image(img_bytes, prompt)
            
            if gemini_response and "unavailable" not in gemini_response.lower():
                logger.info(f"Gemini response for {name}: {gemini_response}")
                self.tts_manager.speak_async(gemini_response)
            else:
                logger.warning(f"Gemini failed for {name}, using fallback: {gemini_response}")
                # Fallback to local description
                fallback = self.local_descriptions.get(name, f"{name} detected")
                self.tts_manager.speak_async(fallback)
                
        except Exception as e:
            logger.error(f"Gemini processing error for {name}: {e}")
            # Use local fallback
            fallback = self.local_descriptions.get(name, f"{name} detected")
            self.tts_manager.speak_async(fallback)
        finally:
            # Cleanup
            if img_bytes:
                img_bytes.close()
            del object_crop  # Explicit cleanup
    
    def should_process_frame(self) -> bool:
        """Determine if current frame should be processed"""
        self.frame_count += 1
        return self.frame_count % self.config.PROCESS_EVERY_N_FRAMES == 0
    
    def reset_recent_detections(self):
        """Periodically reset recent detections"""
        current_time = time.time()
        if current_time - self.last_reset_time > self.config.DETECTION_RESET_INTERVAL:
            self.recent_detections.clear()
            self.last_reset_time = current_time
            logger.debug("Recent detections cleared")
    
    def run(self):
        """Main application loop with comprehensive error handling"""
        cap = None
        try:
            cap = self.connect_to_stream()
            logger.info(f"Starting vision assistance - Target FPS: {self.config.TARGET_FPS}")
            logger.info("Press 'q' to quit")
            
            while True:
                # Control frame rate
                self.fps_controller.wait_for_next_frame()
                
                ret, frame = cap.read()
                if not ret:
                    logger.warning("Failed to grab frame, attempting reconnection")
                    cap.release()
                    time.sleep(2)
                    try:
                        cap = self.connect_to_stream()
                        continue
                    except ConnectionError as e:
                        logger.error(f"Reconnection failed: {e}")
                        break
                
                # Process frame selectively
                if self.should_process_frame():
                    try:
                        results = self.yolo_model(frame, verbose=False)
                        self.process_detections(frame, results)
                        
                        # Display annotated frame
                        annotated_frame = results[0].plot()
                        cv2.imshow("Smart Vision Assistant", annotated_frame)
                        
                    except Exception as e:
                        logger.error(f"Frame processing error: {e}")
                
                # Periodic maintenance
                self.reset_recent_detections()
                
                # Memory cleanup
                if self.resource_manager.should_cleanup_memory():
                    self.resource_manager.cleanup_memory()
                
                # Check for quit command
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    logger.info("Quit command received")
                    break
                    
        except KeyboardInterrupt:
            logger.info("Application interrupted by user")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        finally:
            self.cleanup(cap)
    
    def cleanup(self, cap):
        """Comprehensive resource cleanup"""
        try:
            logger.info("Starting cleanup process...")
            
            # Close video capture
            if cap and cap.isOpened():
                cap.release()
            
            # Cleanup OpenCV windows
            cv2.destroyAllWindows()
            
            # Shutdown executors
            if hasattr(self, 'gemini_executor'):
                self.gemini_executor.shutdown(wait=True, timeout=5.0)
            
            # Cleanup TTS
            if hasattr(self, 'tts_manager'):
                self.tts_manager.cleanup()
            
            # Force final garbage collection
            gc.collect()
            
            logger.info("Cleanup completed successfully")
            
        except Exception as e:
            logger.error(f"Cleanup error: {e}")

def main():
    """Main entry point with configuration validation"""
    try:
        config = Config()
        
        # Validate critical configuration
        if not os.path.exists(config.YOLO_MODEL) and not config.YOLO_MODEL.startswith('yolov8'):
            logger.warning(f"YOLO model file not found: {config.YOLO_MODEL}")
        
        assistant = SmartVisionAssistant(config)
        assistant.run()
        
    except Exception as e:
        logger.error(f"Application failed to start: {e}")
        return 1
    
    return 0

if __name__ == "__main__":
    exit(main())

2025-06-07 00:09:15,133 - INFO - Gemini API configured successfully
2025-06-07 00:09:16,021 - INFO - YOLO model loaded with 80 classes
2025-06-07 00:09:16,022 - INFO - Thread pools initialized with 3 workers
2025-06-07 00:09:16,025 - INFO - TTS engine initialized successfully
2025-06-07 00:09:16,156 - INFO - Connected to stream: http://192.168.1.20:81/stream
2025-06-07 00:09:16,156 - INFO - Starting vision assistance - Target FPS: 30
2025-06-07 00:09:16,156 - INFO - Press 'q' to quit
2025-06-07 00:09:16,788 - INFO - New detection: person (confidence: 0.30)
2025-06-07 00:09:16,971 - INFO - New detection: cat (confidence: 0.30)
2025-06-07 00:09:20,268 - INFO - New detection: tie (confidence: 0.28)
2025-06-07 00:09:21,255 - INFO - New detection: person (confidence: 0.26)
2025-06-07 00:09:21,420 - INFO - New detection: potted plant (confidence: 0.30)
2025-06-07 00:09:30,904 - INFO - New detection: book (confidence: 0.28)
2025-06-07 00:09:31,904 - INFO - New detection: cat (confidence: 0.40

: 

In [1]:
import cv2
import time
import pyttsx3
import io
import threading
import logging
from dataclasses import dataclass
from typing import Dict, Set, Optional
from PIL import Image
from ultralytics import YOLO
import google.generativeai as genai
import os
from requests.exceptions import RequestException

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Config:
    """Configuration class for easy parameter management"""
    YOLO_MODEL: str = 'yolov8l.pt'
    SPEECH_RATE: int = 170
    GEMINI_COOLDOWN: int = 10
    OBJECT_COOLDOWN: int = 30
    CONFIDENCE_THRESHOLD: float = 0.65
    STREAM_URL: str = "http://192.168.1.20:81/stream"
    DETECTION_RESET_INTERVAL: int = 5
    PROCESS_EVERY_N_FRAMES: int = 2
    MAX_GEMINI_RETRIES: int = 3
    GEMINI_TIMEOUT: int = 10

class SmartVisionAssistant:
    def __init__(self, config: Config):
        self.config = config
        self.setup_gemini()
        self.setup_models()
        self.setup_tracking()
        
    def setup_gemini(self):
        """Initialize Gemini API with proper error handling"""
        api_key = os.getenv('GOOGLE_API_KEY')
        if not api_key:
            logger.error("GOOGLE_API_KEY environment variable not set")
            raise ValueError("API key not configured")
        
        try:
            genai.configure(api_key=api_key)
            self.gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
            self.generation_config = genai.types.GenerationConfig(
                temperature=0.4, 
                max_output_tokens=200
            )
            logger.info("Gemini API configured successfully")
        except Exception as e:
            logger.error(f"Failed to configure Gemini API: {e}")
            raise
    
    def setup_models(self):
        """Initialize YOLO model and TTS engine"""
        try:
            self.yolo_model = YOLO(self.config.YOLO_MODEL)
            logger.info(f"YOLO model {self.config.YOLO_MODEL} loaded successfully")
        except Exception as e:
            logger.error(f"Failed to load YOLO model: {e}")
            raise
        
        try:
            self.tts = pyttsx3.init()
            self.tts.setProperty('rate', self.config.SPEECH_RATE)
            logger.info("TTS engine initialized")
        except Exception as e:
            logger.error(f"Failed to initialize TTS: {e}")
            raise
    
    def setup_tracking(self):
        """Initialize tracking variables"""
        self.recent_detections: Set[str] = set()
        self.gemini_last_called: Dict[str, float] = {}
        self.global_gemini_last_called_time = 0
        self.last_reset_time = time.time()
        self.frame_count = 0
        
        # Enhanced object categories
        self.high_priority_objects = {
            "knife", "scissors", "fire hydrant", "bear", "truck",
            "bus", "dog", "person", "cat", "car", "bicycle", 
            "motorcycle", "train", "stop sign", "traffic light"
        }
        
        self.local_descriptions = {
            "knife": "Warning: Sharp knife detected. Exercise caution.",
            "scissors": "Scissors present. Handle with care.",
            "person": "Person detected in the area.",
            "dog": "Dog spotted nearby.",
            "cat": "Cat detected in the vicinity.",
            "car": "Vehicle present - stay alert.",
            "truck": "Large truck detected.",
            "bus": "Bus in the area.",
            "bicycle": "Bicycle detected.",
            "motorcycle": "Motorcycle present.",
            "train": "Train detected - maintain safe distance.",
            "fire hydrant": "Fire hydrant located nearby.",
            "bear": "DANGER: Bear detected. Move to safety immediately.",
            "stop sign": "Stop sign ahead.",
            "traffic light": "Traffic light detected."
        }
    
    def speak_async(self, text: str):
        """Non-blocking text-to-speech"""
        def _speak():
            try:
                self.tts.say(text)
                self.tts.runAndWait()
            except Exception as e:
                logger.error(f"TTS error: {e}")
        
        threading.Thread(target=_speak, daemon=True).start()
    
    def convert_cv2_to_bytes(self, img) -> io.BytesIO:
        """Convert OpenCV image to BytesIO with error handling"""
        try:
            _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 85])
            return io.BytesIO(buffer.tobytes())
        except Exception as e:
            logger.error(f"Image conversion error: {e}")
            raise
    
    def ask_gemini_about_image(self, image_bytes_io: io.BytesIO, prompt: str) -> str:
        """Robust Gemini API call with retries"""
        for attempt in range(self.config.MAX_GEMINI_RETRIES):
            try:
                image_bytes_io.seek(0)  # Reset buffer position
                pil_image = Image.open(image_bytes_io)
                
                contents = [prompt, pil_image]
                response = self.gemini_model.generate_content(
                    contents, 
                    generation_config=self.generation_config
                )
                
                # Clean up PIL image
                pil_image.close()
                return response.text
                
            except Exception as e:
                logger.warning(f"Gemini API attempt {attempt + 1} failed: {e}")
                if attempt == self.config.MAX_GEMINI_RETRIES - 1:
                    return "Unable to get detailed description at this time."
                time.sleep(2 ** attempt)  # Exponential backoff
        
        return "Description service temporarily unavailable."
    
    def connect_to_stream(self) -> cv2.VideoCapture:
        """Establish connection to video stream with retries"""
        max_attempts = 3
        for attempt in range(max_attempts):
            try:
                cap = cv2.VideoCapture(self.config.STREAM_URL)
                if cap.isOpened():
                    logger.info(f"Connected to stream: {self.config.STREAM_URL}")
                    return cap
                else:
                    logger.warning(f"Stream connection attempt {attempt + 1} failed")
                    time.sleep(2)
            except Exception as e:
                logger.error(f"Stream connection error: {e}")
                time.sleep(2)
        
        raise ConnectionError(f"Failed to connect to stream after {max_attempts} attempts")
    
    def process_detections(self, frame, results):
        """Process YOLO detection results"""
        current_time = time.time()
        current_frame_detections = set()
        
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = self.yolo_model.names[cls_id]
            
            current_frame_detections.add(name)
            
            # Initial YOLO announcement
            if name not in self.recent_detections:
                logger.info(f"New detection: {name} (confidence: {conf:.2f})")
                self.speak_async(f"{name} detected")
                self.recent_detections.add(name)
            
            # Handle high-priority objects
            if name in self.high_priority_objects:
                self.handle_high_priority_object(frame, box, name, conf, current_time)
    
    def handle_high_priority_object(self, frame, box, name: str, conf: float, current_time: float):
        """Handle high-priority object detection with Gemini integration"""
        # Check confidence threshold
        if conf < self.config.CONFIDENCE_THRESHOLD:
            return
        
        # Check global cooldown
        if current_time - self.global_gemini_last_called_time < self.config.GEMINI_COOLDOWN:
            return
        
        # Check per-object cooldown
        last_call = self.gemini_last_called.get(name, 0)
        if current_time - last_call <= self.config.OBJECT_COOLDOWN:
            return
        
        # Extract object region
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
        
        if (x2 - x1) <= 0 or (y2 - y1) <= 0:
            return
        
        # Process with Gemini in separate thread
        object_crop = frame[y1:y2, x1:x2].copy()
        threading.Thread(
            target=self.process_with_gemini,
            args=(object_crop, name, current_time),
            daemon=True
        ).start()
    
    def process_with_gemini(self, object_crop, name: str, timestamp: float):
        """Process object with Gemini API (runs in separate thread)"""
        try:
            img_bytes = self.convert_cv2_to_bytes(object_crop)
            prompt = (
                f"Describe this {name} to a visually impaired person. "
                f"Focus on safety considerations and important details. "
                f"Keep it concise - 1-2 sentences maximum."
            )
            
            logger.info(f"Requesting Gemini description for: {name}")
            gemini_response = self.ask_gemini_about_image(img_bytes, prompt)
            
            if gemini_response and "unavailable" not in gemini_response.lower():
                logger.info(f"Gemini response: {gemini_response}")
                self.speak_async(gemini_response)
                self.gemini_last_called[name] = timestamp
                self.global_gemini_last_called_time = timestamp
            else:
                # Fallback to local description
                fallback = self.local_descriptions.get(name, f"{name} detected")
                self.speak_async(fallback)
                
        except Exception as e:
            logger.error(f"Gemini processing error for {name}: {e}")
            # Use local fallback
            fallback = self.local_descriptions.get(name, f"{name} detected")
            self.speak_async(fallback)
    
    def should_process_frame(self) -> bool:
        """Determine if current frame should be processed"""
        self.frame_count += 1
        return self.frame_count % self.config.PROCESS_EVERY_N_FRAMES == 0
    
    def reset_recent_detections(self):
        """Periodically reset recent detections"""
        current_time = time.time()
        if current_time - self.last_reset_time > self.config.DETECTION_RESET_INTERVAL:
            self.recent_detections.clear()
            self.last_reset_time = current_time
            logger.debug("Recent detections cleared")
    
    def run(self):
        """Main application loop"""
        try:
            cap = self.connect_to_stream()
            logger.info("Starting vision assistance - Press 'q' to quit")
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    logger.warning("Failed to grab frame, attempting reconnection")
                    cap.release()
                    time.sleep(2)
                    try:
                        cap = self.connect_to_stream()
                        continue
                    except ConnectionError:
                        logger.error("Failed to reconnect, exiting")
                        break
                
                # Process frame selectively
                if self.should_process_frame():
                    try:
                        results = self.yolo_model(frame, verbose=False)
                        self.process_detections(frame, results)
                        
                        # Display annotated frame
                        annotated_frame = results[0].plot()
                        cv2.imshow("Smart Vision Assistant", annotated_frame)
                    except Exception as e:
                        logger.error(f"Frame processing error: {e}")
                
                # Reset detections periodically
                self.reset_recent_detections()
                
                # Check for quit command
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    logger.info("Quit command received")
                    break
                    
        except KeyboardInterrupt:
            logger.info("Application interrupted by user")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        finally:
            self.cleanup(cap)
    
    def cleanup(self, cap):
        """Clean up resources"""
        try:
            cap.release()
            cv2.destroyAllWindows()
            self.tts.stop()
            logger.info("Cleanup completed")
        except Exception as e:
            logger.error(f"Cleanup error: {e}")

def main():
    """Main entry point"""
    try:
        config = Config()
        assistant = SmartVisionAssistant(config)
        assistant.run()
    except Exception as e:
        logger.error(f"Application failed to start: {e}")

if __name__ == "__main__":
    main()

Run 'pip install torchvision==0.21' to fix torchvision or 'pip install -U torch torchvision' to update both.
For a full compatibility table see https://github.com/pytorch/vision#installation


2025-06-07 00:20:56,356 - INFO - Gemini API configured successfully
2025-06-07 00:20:56,539 - INFO - YOLO model yolov8l.pt loaded successfully
2025-06-07 00:20:56,604 - INFO - Imported existing <module 'comtypes.gen' from 'c:\\Users\\asus\\Desktop\\Projects\\recom_env\\Lib\\site-packages\\comtypes\\gen\\__init__.py'>
2025-06-07 00:20:56,605 - INFO - Using writeable comtypes cache directory: 'c:\Users\asus\Desktop\Projects\recom_env\Lib\site-packages\comtypes\gen'
2025-06-07 00:20:56,709 - INFO - TTS engine initialized
2025-06-07 00:20:56,773 - INFO - Connected to stream: http://192.168.1.20:81/stream
2025-06-07 00:20:56,774 - INFO - Starting vision assistance - Press 'q' to quit
2025-06-07 00:21:00,253 - INFO - New detection: person (confidence: 0.45)
2025-06-07 00:21:00,269 - INFO - New detection: cat (confidence: 0.44)
2025-06-07 00:21:00,269 - ERROR - TTS error: run loop already started
2025-06-07 00:21:01,955 - INFO - New detection: person (confidence: 0.67)
2025-06-07 00:21:01,955

In [3]:
import cv2
import time
import pyttsx3
import io
import threading
import logging
from dataclasses import dataclass
from typing import Dict, Set, Optional, List
from collections import defaultdict, deque
from PIL import Image
from ultralytics import YOLO
import google.generativeai as genai
import os
from requests.exceptions import RequestException

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Config:
    """Configuration class for easy parameter management"""
    YOLO_MODEL: str = 'yolov8l.pt'
    SPEECH_RATE: int = 170
    GEMINI_COOLDOWN: int = 10
    OBJECT_COOLDOWN: int = 30
    STREAM_URL: str = "http://192.168.1.20:81/stream"
    DETECTION_RESET_INTERVAL: int = 20  # Increased from 5 to 20 seconds
    PROCESS_EVERY_N_FRAMES: int = 2
    MAX_GEMINI_RETRIES: int = 3
    GEMINI_TIMEOUT: int = 10
    
    # New parameters for enhanced detection
    FRAMES_FOR_CONFIRMATION: int = 2  # Object must appear in 3 consecutive frames
    DETECTION_HISTORY_SIZE: int = 10  # Keep last 10 frames of detections
    MIN_DETECTION_DURATION: float = 2.0  # Minimum duration in seconds
    FALSE_POSITIVE_COOLDOWN: int = 120  # 2 minutes cooldown for false positives

class DetectionTracker:
    """Enhanced detection tracking with temporal consistency"""
    def __init__(self, config: Config):
        self.config = config
        self.detection_history: deque = deque(maxlen=config.DETECTION_HISTORY_SIZE)
        self.confirmed_objects: Dict[str, float] = {}  # object -> first_detection_time
        self.announced_objects: Set[str] = set()
        self.false_positive_list: Set[str] = set()  # Objects marked as false positives
        self.false_positive_timestamps: Dict[str, float] = {}
        self.frame_detections: Dict[str, int] = defaultdict(int)  # Count consecutive frames
        
    def add_frame_detections(self, detections: Set[str], timestamp: float):
        """Add current frame detections to history"""
        self.detection_history.append((detections, timestamp))
        
        # Update consecutive frame counts
        for obj in detections:
            self.frame_detections[obj] += 1
        
        # Reset counts for objects not in current frame
        objects_to_remove = []
        for obj in self.frame_detections:
            if obj not in detections:
                objects_to_remove.append(obj)
        
        for obj in objects_to_remove:
            self.frame_detections[obj] = 0
    
    def is_detection_consistent(self, obj_name: str) -> bool:
        """Check if object has been detected consistently"""
        if obj_name in self.false_positive_list:
            # Check if cooldown period has passed
            if obj_name in self.false_positive_timestamps:
                if time.time() - self.false_positive_timestamps[obj_name] < self.config.FALSE_POSITIVE_COOLDOWN:
                    return False
                else:
                    # Remove from false positive list after cooldown
                    self.false_positive_list.discard(obj_name)
                    self.false_positive_timestamps.pop(obj_name, None)
        
        return self.frame_detections[obj_name] >= self.config.FRAMES_FOR_CONFIRMATION
    
    def should_announce(self, obj_name: str, timestamp: float) -> bool:
        """Determine if object should be announced"""
        # Don't announce if already announced recently
        if obj_name in self.announced_objects:
            return False
        
        # Check temporal consistency
        if not self.is_detection_consistent(obj_name):
            return False
        
        # Check minimum duration for confirmation
        if obj_name in self.confirmed_objects:
            duration = timestamp - self.confirmed_objects[obj_name]
            if duration >= self.config.MIN_DETECTION_DURATION:
                return True
        else:
            # First time seeing this object consistently
            self.confirmed_objects[obj_name] = timestamp
        
        return False
    
    def mark_as_announced(self, obj_name: str):
        """Mark object as announced"""
        self.announced_objects.add(obj_name)
    
    def mark_as_false_positive(self, obj_name: str):
        """Mark object as false positive (manual override)"""
        self.false_positive_list.add(obj_name)
        self.false_positive_timestamps[obj_name] = time.time()
        logger.info(f"Marked {obj_name} as false positive")
    
    def reset_announced(self):
        """Reset announced objects (called periodically)"""
        self.announced_objects.clear()
        # Also clean up old confirmed objects
        current_time = time.time()
        old_objects = [obj for obj, timestamp in self.confirmed_objects.items() 
                      if current_time - timestamp > self.config.DETECTION_RESET_INTERVAL]
        for obj in old_objects:
            self.confirmed_objects.pop(obj, None)

class SmartVisionAssistant:
    def __init__(self, config: Config):
        self.config = config
        self.setup_gemini()
        self.setup_models()
        self.setup_tracking()
        
    def setup_gemini(self):
        """Initialize Gemini API with proper error handling"""
        api_key = os.getenv('GOOGLE_API_KEY')
        if not api_key:
            logger.error("GOOGLE_API_KEY environment variable not set")
            raise ValueError("API key not configured")
        
        try:
            genai.configure(api_key=api_key)
            self.gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
            self.generation_config = genai.types.GenerationConfig(
                temperature=0.4, 
                max_output_tokens=200
            )
            logger.info("Gemini API configured successfully")
        except Exception as e:
            logger.error(f"Failed to configure Gemini API: {e}")
            raise
    
    def setup_models(self):
        """Initialize YOLO model and TTS engine"""
        try:
            self.yolo_model = YOLO(self.config.YOLO_MODEL)
            logger.info(f"YOLO model {self.config.YOLO_MODEL} loaded successfully")
        except Exception as e:
            logger.error(f"Failed to load YOLO model: {e}")
            raise
        
        try:
            self.tts = pyttsx3.init()
            self.tts.setProperty('rate', self.config.SPEECH_RATE)
            logger.info("TTS engine initialized")
        except Exception as e:
            logger.error(f"Failed to initialize TTS: {e}")
            raise
    
    def setup_tracking(self):
        """Initialize tracking variables"""
        self.detection_tracker = DetectionTracker(self.config)
        self.gemini_last_called: Dict[str, float] = {}
        self.global_gemini_last_called_time = 0
        self.last_reset_time = time.time()
        self.frame_count = 0
        
        # Enhanced object categories with confidence thresholds
        self.object_confidence_thresholds = {
            # High-priority safety objects - lower threshold
            "knife": 0.60,
            "scissors": 0.60,
            "bear": 0.55,
            "fire hydrant": 0.70,
            
            # Vehicles - medium threshold
            "truck": 0.70,
            "bus": 0.70,
            "car": 0.75,
            "bicycle": 0.70,
            "motorcycle": 0.70,
            "train": 0.65,
            
            # Animals - higher threshold (commonly misdetected)
            "dog": 0.80,
            "cat": 0.85,  # Highest threshold for cats
            "bird": 0.85,
            
            # People and common objects
            "person": 0.75,
            "stop sign": 0.80,
            "traffic light": 0.75,
        }
        
        self.high_priority_objects = {
            "knife", "scissors", "fire hydrant", "bear", "truck",
            "bus", "dog", "person", "cat", "car", "bicycle", 
            "motorcycle", "train", "stop sign", "traffic light"
        }
        
        self.local_descriptions = {
            "knife": "Warning: Sharp knife detected. Exercise caution.",
            "scissors": "Scissors present. Handle with care.",
            "person": "Person detected in the area.",
            "dog": "Dog spotted nearby.",
            "cat": "Cat detected in the vicinity.",
            "car": "Vehicle present - stay alert.",
            "truck": "Large truck detected.",
            "bus": "Bus in the area.",
            "bicycle": "Bicycle detected.",
            "motorcycle": "Motorcycle present.",
            "train": "Train detected - maintain safe distance.",
            "fire hydrant": "Fire hydrant located nearby.",
            "bear": "DANGER: Bear detected. Move to safety immediately.",
            "stop sign": "Stop sign ahead.",
            "traffic light": "Traffic light detected."
        }
    
    def get_confidence_threshold(self, obj_name: str) -> float:
        """Get confidence threshold for specific object"""
        return self.object_confidence_thresholds.get(obj_name, 0.70)  # Default threshold
    
    def speak_async(self, text: str):
        """Non-blocking text-to-speech"""
        def _speak():
            try:
                self.tts.say(text)
                self.tts.runAndWait()
            except Exception as e:
                logger.error(f"TTS error: {e}")
        
        threading.Thread(target=_speak, daemon=True).start()
    
    def convert_cv2_to_bytes(self, img) -> io.BytesIO:
        """Convert OpenCV image to BytesIO with error handling"""
        try:
            _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 85])
            return io.BytesIO(buffer.tobytes())
        except Exception as e:
            logger.error(f"Image conversion error: {e}")
            raise
    
    def ask_gemini_about_image(self, image_bytes_io: io.BytesIO, prompt: str) -> str:
        """Robust Gemini API call with retries"""
        for attempt in range(self.config.MAX_GEMINI_RETRIES):
            try:
                image_bytes_io.seek(0)  # Reset buffer position
                pil_image = Image.open(image_bytes_io)
                
                contents = [prompt, pil_image]
                response = self.gemini_model.generate_content(
                    contents, 
                    generation_config=self.generation_config
                )
                
                # Clean up PIL image
                pil_image.close()
                return response.text
                
            except Exception as e:
                logger.warning(f"Gemini API attempt {attempt + 1} failed: {e}")
                if attempt == self.config.MAX_GEMINI_RETRIES - 1:
                    return "Unable to get detailed description at this time."
                time.sleep(2 ** attempt)  # Exponential backoff
        
        return "Description service temporarily unavailable."
    
    def connect_to_stream(self) -> cv2.VideoCapture:
        """Establish connection to video stream with retries"""
        max_attempts = 3
        for attempt in range(max_attempts):
            try:
                cap = cv2.VideoCapture(self.config.STREAM_URL)
                if cap.isOpened():
                    logger.info(f"Connected to stream: {self.config.STREAM_URL}")
                    return cap
                else:
                    logger.warning(f"Stream connection attempt {attempt + 1} failed")
                    time.sleep(2)
            except Exception as e:
                logger.error(f"Stream connection error: {e}")
                time.sleep(2)
        
        raise ConnectionError(f"Failed to connect to stream after {max_attempts} attempts")
    
    def process_detections(self, frame, results):
        """Process YOLO detection results with enhanced filtering"""
        current_time = time.time()
        current_frame_detections = set()
        valid_detections = []
        
        # First pass: filter by confidence thresholds
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = self.yolo_model.names[cls_id]
            
            # Apply object-specific confidence threshold
            threshold = self.get_confidence_threshold(name)
            if conf >= threshold:
                current_frame_detections.add(name)
                valid_detections.append((box, name, conf))
                logger.debug(f"Valid detection: {name} (confidence: {conf:.2f}, threshold: {threshold:.2f})")
        
        # Update detection tracker
        self.detection_tracker.add_frame_detections(current_frame_detections, current_time)
        
        # Second pass: process confirmed detections
        for box, name, conf in valid_detections:
            # Check if object should be announced
            if self.detection_tracker.should_announce(name, current_time):
                logger.info(f"Confirmed detection: {name} (confidence: {conf:.2f})")
                self.speak_async(f"{name} detected")
                self.detection_tracker.mark_as_announced(name)
                
                # Handle high-priority objects with Gemini
                if name in self.high_priority_objects:
                    self.handle_high_priority_object(frame, box, name, conf, current_time)
    
    def handle_high_priority_object(self, frame, box, name: str, conf: float, current_time: float):
        """Handle high-priority object detection with Gemini integration"""
        # Check global cooldown
        if current_time - self.global_gemini_last_called_time < self.config.GEMINI_COOLDOWN:
            return
        
        # Check per-object cooldown
        last_call = self.gemini_last_called.get(name, 0)
        if current_time - last_call <= self.config.OBJECT_COOLDOWN:
            return
        
        # Extract object region
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
        
        if (x2 - x1) <= 0 or (y2 - y1) <= 0:
            return
        
        # Process with Gemini in separate thread
        object_crop = frame[y1:y2, x1:x2].copy()
        threading.Thread(
            target=self.process_with_gemini,
            args=(object_crop, name, current_time),
            daemon=True
        ).start()
    
    def process_with_gemini(self, object_crop, name: str, timestamp: float):
        """Process object with Gemini API (runs in separate thread)"""
        try:
            img_bytes = self.convert_cv2_to_bytes(object_crop)
            prompt = (
                f"Describe this {name} to a visually impaired person. "
                f"Focus on safety considerations and important details. "
                f"Keep it concise - 1-2 sentences maximum."
            )
            
            logger.info(f"Requesting Gemini description for: {name}")
            gemini_response = self.ask_gemini_about_image(img_bytes, prompt)
            
            if gemini_response and "unavailable" not in gemini_response.lower():
                logger.info(f"Gemini response: {gemini_response}")
                self.speak_async(gemini_response)
                self.gemini_last_called[name] = timestamp
                self.global_gemini_last_called_time = timestamp
            else:
                # Fallback to local description
                fallback = self.local_descriptions.get(name, f"{name} detected")
                self.speak_async(fallback)
                
        except Exception as e:
            logger.error(f"Gemini processing error for {name}: {e}")
            # Use local fallback
            fallback = self.local_descriptions.get(name, f"{name} detected")
            self.speak_async(fallback)
    
    def should_process_frame(self) -> bool:
        """Determine if current frame should be processed"""
        self.frame_count += 1
        return self.frame_count % self.config.PROCESS_EVERY_N_FRAMES == 0
    
    def reset_recent_detections(self):
        """Periodically reset recent detections"""
        current_time = time.time()
        if current_time - self.last_reset_time > self.config.DETECTION_RESET_INTERVAL:
            self.detection_tracker.reset_announced()
            self.last_reset_time = current_time
            logger.debug("Recent detections cleared")
    
    def handle_keyboard_input(self, key):
        """Handle keyboard commands"""
        if key == ord('f'):  # 'f' for false positive
            # Mark the most recently detected object as false positive
            # This is a simple implementation - you could enhance it to show a menu
            print("\nMark object as false positive? Enter object name (or 'cancel'):")
            # Note: In a real implementation, you'd want to handle this more elegantly
            # This is just a demonstration of the concept
            pass
    
    def run(self):
        """Main application loop"""
        try:
            cap = self.connect_to_stream()
            logger.info("Starting enhanced vision assistance")
            logger.info("Controls: 'q' to quit, 'f' to mark false positive")
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    logger.warning("Failed to grab frame, attempting reconnection")
                    cap.release()
                    time.sleep(2)
                    try:
                        cap = self.connect_to_stream()
                        continue
                    except ConnectionError:
                        logger.error("Failed to reconnect, exiting")
                        break
                
                # Process frame selectively
                if self.should_process_frame():
                    try:
                        results = self.yolo_model(frame, verbose=False)
                        self.process_detections(frame, results)
                        
                        # Display annotated frame
                        annotated_frame = results[0].plot()
                        cv2.imshow("Enhanced Smart Vision Assistant", annotated_frame)
                    except Exception as e:
                        logger.error(f"Frame processing error: {e}")
                
                # Reset detections periodically
                self.reset_recent_detections()
                
                # Check for quit command
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    logger.info("Quit command received")
                    break
                elif key == ord('f'):
                    self.handle_keyboard_input(key)
                    
        except KeyboardInterrupt:
            logger.info("Application interrupted by user")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        finally:
            self.cleanup(cap)
    
    def cleanup(self, cap):
        """Clean up resources"""
        try:
            cap.release()
            cv2.destroyAllWindows()
            self.tts.stop()
            logger.info("Cleanup completed")
        except Exception as e:
            logger.error(f"Cleanup error: {e}")

def main():
    """Main entry point"""
    try:
        config = Config()
        assistant = SmartVisionAssistant(config)
        assistant.run()
    except Exception as e:
        logger.error(f"Application failed to start: {e}")

if __name__ == "__main__":
    main()

2025-06-07 00:28:20,691 - INFO - Gemini API configured successfully
2025-06-07 00:28:20,942 - INFO - YOLO model yolov8l.pt loaded successfully
2025-06-07 00:28:20,976 - INFO - TTS engine initialized
2025-06-07 00:28:21,011 - INFO - Connected to stream: http://192.168.1.20:81/stream
2025-06-07 00:28:21,012 - INFO - Starting enhanced vision assistance
2025-06-07 00:28:21,012 - INFO - Controls: 'q' to quit, 'f' to mark false positive
2025-06-07 00:28:26,455 - INFO - Confirmed detection: person (confidence: 0.92)
2025-06-07 00:28:26,455 - INFO - Requesting Gemini description for: person
2025-06-07 00:28:29,838 - INFO - Gemini response: He appears to be a man with dark, somewhat unkempt hair and a beard, wearing a gray tank top; the image quality is poor, making it difficult to assess any immediate safety concerns.

2025-06-07 00:28:44,321 - INFO - Confirmed detection: person (confidence: 0.75)
2025-06-07 00:29:09,321 - INFO - Confirmed detection: person (confidence: 0.85)
2025-06-07 00:29:

In [4]:
import cv2
import time
import pyttsx3
import io
import threading
import logging
from dataclasses import dataclass
from typing import Dict, Set, Optional, List
from collections import defaultdict, deque
from PIL import Image
from ultralytics import YOLO
import google.generativeai as genai
import os
from requests.exceptions import RequestException

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Config:
    """Configuration class for easy parameter management"""
    YOLO_MODEL: str = 'yolov8l.pt'
    SPEECH_RATE: int = 170
    GEMINI_COOLDOWN: int = 10
    OBJECT_COOLDOWN: int = 30
    STREAM_URL: str = "http://192.168.1.20:81/stream"
    DETECTION_RESET_INTERVAL: int = 20  # Increased from 5 to 20 seconds
    PROCESS_EVERY_N_FRAMES: int = 2
    MAX_GEMINI_RETRIES: int = 3
    GEMINI_TIMEOUT: int = 10
    
    # New parameters for enhanced detection
    FRAMES_FOR_CONFIRMATION: int = 2  # Changed from 3 to 2 as per user request
    DETECTION_HISTORY_SIZE: int = 10  # Keep last 10 frames of detections
    MIN_DETECTION_DURATION: float = 2.0  # Minimum duration in seconds
    FALSE_POSITIVE_COOLDOWN: int = 120  # 2 minutes cooldown for false positives

class DetectionTracker:
    """Enhanced detection tracking with temporal consistency"""
    def __init__(self, config: Config):
        self.config = config
        self.detection_history: deque = deque(maxlen=config.DETECTION_HISTORY_SIZE)
        self.confirmed_objects: Dict[str, float] = {}  # object -> first_detection_time
        self.announced_objects: Set[str] = set()
        self.gemini_announced_objects: Set[str] = set()  # Separate tracking for Gemini descriptions
        self.false_positive_list: Set[str] = set()  # Objects marked as false positives
        self.false_positive_timestamps: Dict[str, float] = {}
        self.frame_detections: Dict[str, int] = defaultdict(int)  # Count consecutive frames
        
    def add_frame_detections(self, detections: Set[str], timestamp: float):
        """Add current frame detections to history"""
        self.detection_history.append((detections, timestamp))
        
        # Update consecutive frame counts
        for obj in detections:
            self.frame_detections[obj] += 1
        
        # Reset counts for objects not in current frame
        objects_to_remove = []
        for obj in self.frame_detections:
            if obj not in detections:
                objects_to_remove.append(obj)
        
        for obj in objects_to_remove:
            self.frame_detections[obj] = 0
    
    def is_detection_consistent(self, obj_name: str) -> bool:
        """Check if object has been detected consistently"""
        if obj_name in self.false_positive_list:
            # Check if cooldown period has passed
            if obj_name in self.false_positive_timestamps:
                if time.time() - self.false_positive_timestamps[obj_name] < self.config.FALSE_POSITIVE_COOLDOWN:
                    return False
                else:
                    # Remove from false positive list after cooldown
                    self.false_positive_list.discard(obj_name)
                    self.false_positive_timestamps.pop(obj_name, None)
        
        return self.frame_detections[obj_name] >= self.config.FRAMES_FOR_CONFIRMATION
    
    def should_announce_gemini(self, obj_name: str) -> bool:
        """Check if Gemini description should be announced (separate from initial detection)"""
        return obj_name not in self.gemini_announced_objects
    
    def mark_gemini_as_announced(self, obj_name: str):
        """Mark Gemini description as announced"""
        self.gemini_announced_objects.add(obj_name)
    
    def should_announce(self, obj_name: str, timestamp: float) -> bool:
        """Determine if object should be announced"""
        # Don't announce if already announced recently
        if obj_name in self.announced_objects:
            return False
        
        # Check temporal consistency
        if not self.is_detection_consistent(obj_name):
            return False
        
        # Check minimum duration for confirmation
        if obj_name in self.confirmed_objects:
            duration = timestamp - self.confirmed_objects[obj_name]
            if duration >= self.config.MIN_DETECTION_DURATION:
                return True
        else:
            # First time seeing this object consistently
            self.confirmed_objects[obj_name] = timestamp
        
        return False
    
    def mark_as_announced(self, obj_name: str):
        """Mark object as announced"""
        self.announced_objects.add(obj_name)
    
    def mark_as_false_positive(self, obj_name: str):
        """Mark object as false positive (manual override)"""
        self.false_positive_list.add(obj_name)
        self.false_positive_timestamps[obj_name] = time.time()
        logger.info(f"Marked {obj_name} as false positive")
    
    def reset_announced(self):
        """Reset announced objects (called periodically)"""
        self.announced_objects.clear()
        self.gemini_announced_objects.clear()  # Reset Gemini announcements too
        # Also clean up old confirmed objects
        current_time = time.time()
        old_objects = [obj for obj, timestamp in self.confirmed_objects.items() 
                      if current_time - timestamp > self.config.DETECTION_RESET_INTERVAL]
        for obj in old_objects:
            self.confirmed_objects.pop(obj, None)

class SmartVisionAssistant:
    def __init__(self, config: Config):
        self.config = config
        self.setup_gemini()
        self.setup_models()
        self.setup_tracking()
        
    def setup_gemini(self):
        """Initialize Gemini API with proper error handling"""
        api_key = os.getenv('GOOGLE_API_KEY')
        if not api_key:
            logger.error("GOOGLE_API_KEY environment variable not set")
            raise ValueError("API key not configured")
        
        try:
            genai.configure(api_key=api_key)
            self.gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
            self.generation_config = genai.types.GenerationConfig(
                temperature=0.4, 
                max_output_tokens=200
            )
            logger.info("Gemini API configured successfully")
        except Exception as e:
            logger.error(f"Failed to configure Gemini API: {e}")
            raise
    
    def setup_models(self):
        """Initialize YOLO model and TTS engine"""
        try:
            self.yolo_model = YOLO(self.config.YOLO_MODEL)
            logger.info(f"YOLO model {self.config.YOLO_MODEL} loaded successfully")
        except Exception as e:
            logger.error(f"Failed to load YOLO model: {e}")
            raise
        
        try:
            self.tts = pyttsx3.init()
            self.tts.setProperty('rate', self.config.SPEECH_RATE)
            logger.info("TTS engine initialized")
        except Exception as e:
            logger.error(f"Failed to initialize TTS: {e}")
            raise
    
    def setup_tracking(self):
        """Initialize tracking variables"""
        self.detection_tracker = DetectionTracker(self.config)
        self.gemini_last_called: Dict[str, float] = {}
        self.global_gemini_last_called_time = 0
        self.last_reset_time = time.time()
        self.frame_count = 0
        
        # Enhanced object categories with confidence thresholds
        self.object_confidence_thresholds = {
            # High-priority safety objects - lower threshold
            "knife": 0.60,
            "scissors": 0.60,
            "bear": 0.55,
            "fire hydrant": 0.70,
            
            # Vehicles - medium threshold
            "truck": 0.70,
            "bus": 0.70,
            "car": 0.75,
            "bicycle": 0.70,
            "motorcycle": 0.70,
            "train": 0.65,
            
            # Animals - higher threshold (commonly misdetected)
            "dog": 0.80,
            "cat": 0.85,  # Highest threshold for cats
            "bird": 0.85,
            
            # People and common objects
            "person": 0.75,
            "stop sign": 0.80,
            "traffic light": 0.75,
        }
        
        self.high_priority_objects = {
            "knife", "scissors", "fire hydrant", "bear", "truck",
            "bus", "dog", "person", "cat", "car", "bicycle", 
            "motorcycle", "train", "stop sign", "traffic light"
        }
        
        self.local_descriptions = {
            "knife": "Warning: Sharp knife detected. Exercise caution.",
            "scissors": "Scissors present. Handle with care.",
            "person": "Person detected in the area.",
            "dog": "Dog spotted nearby.",
            "cat": "Cat detected in the vicinity.",
            "car": "Vehicle present - stay alert.",
            "truck": "Large truck detected.",
            "bus": "Bus in the area.",
            "bicycle": "Bicycle detected.",
            "motorcycle": "Motorcycle present.",
            "train": "Train detected - maintain safe distance.",
            "fire hydrant": "Fire hydrant located nearby.",
            "bear": "DANGER: Bear detected. Move to safety immediately.",
            "stop sign": "Stop sign ahead.",
            "traffic light": "Traffic light detected."
        }
    
    def get_confidence_threshold(self, obj_name: str) -> float:
        """Get confidence threshold for specific object"""
        return self.object_confidence_thresholds.get(obj_name, 0.70)  # Default threshold
    
    def speak_async(self, text: str):
        """Non-blocking text-to-speech"""
        def _speak():
            try:
                self.tts.say(text)
                self.tts.runAndWait()
            except Exception as e:
                logger.error(f"TTS error: {e}")
        
        threading.Thread(target=_speak, daemon=True).start()
    
    def convert_cv2_to_bytes(self, img) -> io.BytesIO:
        """Convert OpenCV image to BytesIO with error handling"""
        try:
            _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 85])
            return io.BytesIO(buffer.tobytes())
        except Exception as e:
            logger.error(f"Image conversion error: {e}")
            raise
    
    def ask_gemini_about_image(self, image_bytes_io: io.BytesIO, prompt: str) -> str:
        """Robust Gemini API call with retries"""
        for attempt in range(self.config.MAX_GEMINI_RETRIES):
            try:
                image_bytes_io.seek(0)  # Reset buffer position
                pil_image = Image.open(image_bytes_io)
                
                contents = [prompt, pil_image]
                response = self.gemini_model.generate_content(
                    contents, 
                    generation_config=self.generation_config
                )
                
                # Clean up PIL image
                pil_image.close()
                return response.text
                
            except Exception as e:
                logger.warning(f"Gemini API attempt {attempt + 1} failed: {e}")
                if attempt == self.config.MAX_GEMINI_RETRIES - 1:
                    return "Unable to get detailed description at this time."
                time.sleep(2 ** attempt)  # Exponential backoff
        
        return "Description service temporarily unavailable."
    
    def connect_to_stream(self) -> cv2.VideoCapture:
        """Establish connection to video stream with retries"""
        max_attempts = 3
        for attempt in range(max_attempts):
            try:
                cap = cv2.VideoCapture(self.config.STREAM_URL)
                if cap.isOpened():
                    logger.info(f"Connected to stream: {self.config.STREAM_URL}")
                    return cap
                else:
                    logger.warning(f"Stream connection attempt {attempt + 1} failed")
                    time.sleep(2)
            except Exception as e:
                logger.error(f"Stream connection error: {e}")
                time.sleep(2)
        
        raise ConnectionError(f"Failed to connect to stream after {max_attempts} attempts")
    
    def process_detections(self, frame, results):
        """Process YOLO detection results with enhanced filtering"""
        current_time = time.time()
        current_frame_detections = set()
        valid_detections = []
        
        # First pass: filter by confidence thresholds
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = self.yolo_model.names[cls_id]
            
            # Apply object-specific confidence threshold
            threshold = self.get_confidence_threshold(name)
            if conf >= threshold:
                current_frame_detections.add(name)
                valid_detections.append((box, name, conf))
                logger.debug(f"Valid detection: {name} (confidence: {conf:.2f}, threshold: {threshold:.2f})")
        
        # Update detection tracker
        self.detection_tracker.add_frame_detections(current_frame_detections, current_time)
        
        # Second pass: process confirmed detections
        for box, name, conf in valid_detections:
            # Check if object should be announced
            if self.detection_tracker.should_announce(name, current_time):
                logger.info(f"Confirmed detection: {name} (confidence: {conf:.2f})")
                self.speak_async(f"{name} detected")
                self.detection_tracker.mark_as_announced(name)
                
                # Handle high-priority objects with Gemini
                if name in self.high_priority_objects:
                    self.handle_high_priority_object(frame, box, name, conf, current_time)
    
    def handle_high_priority_object(self, frame, box, name: str, conf: float, current_time: float):
        """Handle high-priority object detection with Gemini integration"""
        # Check if we should get Gemini description (separate from basic detection announcement)
        if not self.detection_tracker.should_announce_gemini(name):
            return
            
        # Check global cooldown
        if current_time - self.global_gemini_last_called_time < self.config.GEMINI_COOLDOWN:
            return
        
        # Check per-object cooldown
        last_call = self.gemini_last_called.get(name, 0)
        if current_time - last_call <= self.config.OBJECT_COOLDOWN:
            return
        
        # Extract object region
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
        
        if (x2 - x1) <= 0 or (y2 - y1) <= 0:
            return
        
        # Mark that we're processing this object with Gemini
        self.detection_tracker.mark_gemini_as_announced(name)
        
        # Process with Gemini in separate thread
        object_crop = frame[y1:y2, x1:x2].copy()
        threading.Thread(
            target=self.process_with_gemini,
            args=(object_crop, name, current_time),
            daemon=True
        ).start()
    
    def process_with_gemini(self, object_crop, name: str, timestamp: float):
        """Process object with Gemini API (runs in separate thread)"""
        try:
            img_bytes = self.convert_cv2_to_bytes(object_crop)
            prompt = (
                f"Describe this {name} to a visually impaired person. "
                f"Focus on safety considerations and important details. "
                f"Keep it concise - 1-2 sentences maximum."
            )
            
            logger.info(f"Requesting Gemini description for: {name}")
            gemini_response = self.ask_gemini_about_image(img_bytes, prompt)
            
            if gemini_response and "unavailable" not in gemini_response.lower():
                logger.info(f"Gemini response for {name}: {gemini_response}")
                # Always speak the Gemini response - this is separate from the initial detection
                self.speak_async(gemini_response)
                self.gemini_last_called[name] = timestamp
                self.global_gemini_last_called_time = timestamp
            else:
                # Fallback to local description
                fallback = self.local_descriptions.get(name, f"Detailed description of {name} unavailable")
                logger.info(f"Using fallback description for {name}: {fallback}")
                self.speak_async(fallback)
                
        except Exception as e:
            logger.error(f"Gemini processing error for {name}: {e}")
            # Use local fallback
            fallback = self.local_descriptions.get(name, f"Unable to describe {name} in detail")
            self.speak_async(fallback)
    
    def should_process_frame(self) -> bool:
        """Determine if current frame should be processed"""
        self.frame_count += 1
        return self.frame_count % self.config.PROCESS_EVERY_N_FRAMES == 0
    
    def reset_recent_detections(self):
        """Periodically reset recent detections"""
        current_time = time.time()
        if current_time - self.last_reset_time > self.config.DETECTION_RESET_INTERVAL:
            self.detection_tracker.reset_announced()
            self.last_reset_time = current_time
            logger.debug("Recent detections cleared")
    
    def handle_keyboard_input(self, key):
        """Handle keyboard commands"""
        if key == ord('f'):  # 'f' for false positive
            # Mark the most recently detected object as false positive
            # This is a simple implementation - you could enhance it to show a menu
            print("\nMark object as false positive? Enter object name (or 'cancel'):")
            # Note: In a real implementation, you'd want to handle this more elegantly
            # This is just a demonstration of the concept
            pass
    
    def run(self):
        """Main application loop"""
        try:
            cap = self.connect_to_stream()
            logger.info("Starting enhanced vision assistance")
            logger.info("Controls: 'q' to quit, 'f' to mark false positive")
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    logger.warning("Failed to grab frame, attempting reconnection")
                    cap.release()
                    time.sleep(2)
                    try:
                        cap = self.connect_to_stream()
                        continue
                    except ConnectionError:
                        logger.error("Failed to reconnect, exiting")
                        break
                
                # Process frame selectively
                if self.should_process_frame():
                    try:
                        results = self.yolo_model(frame, verbose=False)
                        self.process_detections(frame, results)
                        
                        # Display annotated frame
                        annotated_frame = results[0].plot()
                        cv2.imshow("Enhanced Smart Vision Assistant", annotated_frame)
                    except Exception as e:
                        logger.error(f"Frame processing error: {e}")
                
                # Reset detections periodically
                self.reset_recent_detections()
                
                # Check for quit command
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    logger.info("Quit command received")
                    break
                elif key == ord('f'):
                    self.handle_keyboard_input(key)
                    
        except KeyboardInterrupt:
            logger.info("Application interrupted by user")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        finally:
            self.cleanup(cap)
    
    def cleanup(self, cap):
        """Clean up resources"""
        try:
            cap.release()
            cv2.destroyAllWindows()
            self.tts.stop()
            logger.info("Cleanup completed")
        except Exception as e:
            logger.error(f"Cleanup error: {e}")

def main():
    """Main entry point"""
    try:
        config = Config()
        assistant = SmartVisionAssistant(config)
        assistant.run()
    except Exception as e:
        logger.error(f"Application failed to start: {e}")

if __name__ == "__main__":
    main()

2025-06-07 00:33:34,613 - INFO - Gemini API configured successfully
2025-06-07 00:33:34,756 - INFO - YOLO model yolov8l.pt loaded successfully
2025-06-07 00:33:34,794 - INFO - TTS engine initialized
2025-06-07 00:33:34,856 - INFO - Connected to stream: http://192.168.1.20:81/stream
2025-06-07 00:33:34,856 - INFO - Starting enhanced vision assistance
2025-06-07 00:33:34,857 - INFO - Controls: 'q' to quit, 'f' to mark false positive
2025-06-07 00:34:20,411 - INFO - Confirmed detection: person (confidence: 0.75)
2025-06-07 00:34:20,414 - INFO - Requesting Gemini description for: person
2025-06-07 00:34:23,656 - INFO - Gemini response for person: He appears to be a man with dark, shoulder-length hair, wearing glasses and a gray tank top;  the image quality is poor, making it difficult to assess any immediate safety concerns.

2025-06-07 00:34:42,355 - INFO - Confirmed detection: person (confidence: 0.81)
2025-06-07 00:36:07,996 - INFO - Confirmed detection: laptop (confidence: 0.91)
2025-0

: 

2025-06-07 00:41:56,898 - INFO - Gemini API configured successfully


2025-06-07 00:41:57,038 - INFO - YOLO model yolov8l.pt loaded successfully
2025-06-07 00:41:57,038 - ERROR - Failed to initialize TTS: 'int' object has no attribute 'DETECTION_HISTORY_SIZE'
2025-06-07 00:41:57,038 - ERROR - Application failed to start: 'int' object has no attribute 'DETECTION_HISTORY_SIZE'


In [None]:
# ...existing code...

class ThreadSafeTTS:
    """Thread-safe TTS wrapper to prevent run loop conflicts"""
    def __init__(self, speech_rate: int = 170):
        self.speech_queue = queue.Queue()
        self.speech_rate = speech_rate
        self.tts_thread = None
        self.running = True
        self.start_tts_worker()
    
    def start_tts_worker(self):
        # ...existing code...
        pass
    
    def speak(self, text: str):
        # ...existing code...
        pass
    
    def stop(self):
        # ...existing code...
        pass

class DetectionTracker:
    """Enhanced detection tracking with temporal consistency"""
    def __init__(self, config: Config):
        self.config = config
        self.detection_history: deque = deque(maxlen=config.DETECTION_HISTORY_SIZE)
        # ...rest of DetectionTracker code...
# ...rest of your code...

In [6]:
import cv2
import time
import pyttsx3
import io
import threading
import logging
import queue
from dataclasses import dataclass
from typing import Dict, Set, Optional, List
from collections import defaultdict, deque
from PIL import Image
from ultralytics import YOLO
import google.generativeai as genai
import os
from requests.exceptions import RequestException

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Config:
    """Configuration class for easy parameter management"""
    YOLO_MODEL: str = 'yolov8l.pt'
    SPEECH_RATE: int = 170
    GEMINI_COOLDOWN: int = 10
    OBJECT_COOLDOWN: int = 30
    STREAM_URL: str = "http://192.168.1.20:81/stream"
    DETECTION_RESET_INTERVAL: int = 20  # Increased from 5 to 20 seconds
    PROCESS_EVERY_N_FRAMES: int = 2
    MAX_GEMINI_RETRIES: int = 3
    GEMINI_TIMEOUT: int = 10
    
    # New parameters for enhanced detection
    FRAMES_FOR_CONFIRMATION: int = 2  # Changed from 3 to 2 as per user request
    DETECTION_HISTORY_SIZE: int = 10  # Keep last 10 frames of detections
    MIN_DETECTION_DURATION: float = 2.0  # Minimum duration in seconds
    FALSE_POSITIVE_COOLDOWN: int = 120  # 2 minutes cooldown for false positives

class ThreadSafeTTS:
    """Thread-safe TTS wrapper to prevent run loop conflicts"""
    def __init__(self, speech_rate: int = 170):
        self.speech_queue = queue.Queue()
        self.speech_rate = speech_rate
        self.tts_thread = None
        self.running = True
        self.start_tts_worker()
    
    def start_tts_worker(self):
        """Start the TTS worker thread"""
        def tts_worker():
            try:
                tts_engine = pyttsx3.init()
                tts_engine.setProperty('rate', self.speech_rate)
                logger.info("TTS worker thread started")
                
                while self.running:
                    try:
                        text = self.speech_queue.get(timeout=1)
                        if text is None:  # Shutdown signal
                            break
                        logger.info(f"Speaking: {text}")
                        tts_engine.say(text)
                        tts_engine.runAndWait()
                        self.speech_queue.task_done()
                    except queue.Empty:
                        continue
                    except Exception as e:
                        logger.error(f"TTS worker error: {e}")
                        self.speech_queue.task_done()
                        
            except Exception as e:
                logger.error(f"Failed to initialize TTS worker: {e}")
        
        self.tts_thread = threading.Thread(target=tts_worker, daemon=True)
        self.tts_thread.start()
    
    def speak(self, text: str):
        """Add text to speech queue"""
        if self.running:
            self.speech_queue.put(text)
    
    def stop(self):
        """Stop the TTS worker"""
        self.running = False
        self.speech_queue.put(None)  # Shutdown signal
        if self.tts_thread and self.tts_thread.is_alive():
            self.tts_thread.join(timeout=2)
class DetectionTracker:            
    """Enhanced detection tracking with temporal consistency"""
    def __init__(self, config: Config):
        self.config = config
        self.detection_history: deque = deque(maxlen=config.DETECTION_HISTORY_SIZE)
        self.confirmed_objects: Dict[str, float] = {}  # object -> first_detection_time
        self.announced_objects: Set[str] = set()
        self.gemini_announced_objects: Set[str] = set()  # Separate tracking for Gemini descriptions
        self.false_positive_list: Set[str] = set()  # Objects marked as false positives
        self.false_positive_timestamps: Dict[str, float] = {}
        self.frame_detections: Dict[str, int] = defaultdict(int)  # Count consecutive frames
        
    def add_frame_detections(self, detections: Set[str], timestamp: float):
        """Add current frame detections to history"""
        self.detection_history.append((detections, timestamp))
        
        # Update consecutive frame counts
        for obj in detections:
            self.frame_detections[obj] += 1
        
        # Reset counts for objects not in current frame
        objects_to_remove = []
        for obj in self.frame_detections:
            if obj not in detections:
                objects_to_remove.append(obj)
        
        for obj in objects_to_remove:
            self.frame_detections[obj] = 0
    
    def is_detection_consistent(self, obj_name: str) -> bool:
        """Check if object has been detected consistently"""
        if obj_name in self.false_positive_list:
            # Check if cooldown period has passed
            if obj_name in self.false_positive_timestamps:
                if time.time() - self.false_positive_timestamps[obj_name] < self.config.FALSE_POSITIVE_COOLDOWN:
                    return False
                else:
                    # Remove from false positive list after cooldown
                    self.false_positive_list.discard(obj_name)
                    self.false_positive_timestamps.pop(obj_name, None)
        
        return self.frame_detections[obj_name] >= self.config.FRAMES_FOR_CONFIRMATION
    
    def should_announce_gemini(self, obj_name: str) -> bool:
        """Check if Gemini description should be announced (separate from initial detection)"""
        return obj_name not in self.gemini_announced_objects
    
    def mark_gemini_as_announced(self, obj_name: str):
        """Mark Gemini description as announced"""
        self.gemini_announced_objects.add(obj_name)
    
    def should_announce(self, obj_name: str, timestamp: float) -> bool:
        """Determine if object should be announced"""
        # Don't announce if already announced recently
        if obj_name in self.announced_objects:
            return False
        
        # Check temporal consistency
        if not self.is_detection_consistent(obj_name):
            return False
        
        # Check minimum duration for confirmation
        if obj_name in self.confirmed_objects:
            duration = timestamp - self.confirmed_objects[obj_name]
            if duration >= self.config.MIN_DETECTION_DURATION:
                return True
        else:
            # First time seeing this object consistently
            self.confirmed_objects[obj_name] = timestamp
        
        return False
    
    def mark_as_announced(self, obj_name: str):
        """Mark object as announced"""
        self.announced_objects.add(obj_name)
    
    def mark_as_false_positive(self, obj_name: str):
        """Mark object as false positive (manual override)"""
        self.false_positive_list.add(obj_name)
        self.false_positive_timestamps[obj_name] = time.time()
        logger.info(f"Marked {obj_name} as false positive")
    
    def reset_announced(self):
        """Reset announced objects (called periodically)"""
        self.announced_objects.clear()
        self.gemini_announced_objects.clear()  # Reset Gemini announcements too
        # Also clean up old confirmed objects
        current_time = time.time()
        old_objects = [obj for obj, timestamp in self.confirmed_objects.items() 
                      if current_time - timestamp > self.config.DETECTION_RESET_INTERVAL]
        for obj in old_objects:
            self.confirmed_objects.pop(obj, None)

class SmartVisionAssistant:
    def __init__(self, config: Config):
        self.config = config
        self.setup_gemini()
        self.setup_models()
        self.setup_tracking()
        
    def setup_gemini(self):
        """Initialize Gemini API with proper error handling"""
        api_key = os.getenv('GOOGLE_API_KEY')
        if not api_key:
            logger.error("GOOGLE_API_KEY environment variable not set")
            raise ValueError("API key not configured")
        
        try:
            genai.configure(api_key=api_key)
            self.gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
            self.generation_config = genai.types.GenerationConfig(
                temperature=0.4, 
                max_output_tokens=200
            )
            logger.info("Gemini API configured successfully")
        except Exception as e:
            logger.error(f"Failed to configure Gemini API: {e}")
            raise
    
    def setup_models(self):
        """Initialize YOLO model and TTS engine"""
        try:
            self.yolo_model = YOLO(self.config.YOLO_MODEL)
            logger.info(f"YOLO model {self.config.YOLO_MODEL} loaded successfully")
        except Exception as e:
            logger.error(f"Failed to load YOLO model: {e}")
            raise
        
        try:
            self.tts = ThreadSafeTTS(self.config.SPEECH_RATE)
            logger.info("Thread-safe TTS engine initialized")
        except Exception as e:
            logger.error(f"Failed to initialize TTS: {e}")
            raise
    
    def setup_tracking(self):
        """Initialize tracking variables"""
        self.detection_tracker = DetectionTracker(self.config)
        self.gemini_last_called: Dict[str, float] = {}
        self.global_gemini_last_called_time = 0
        self.last_reset_time = time.time()
        self.frame_count = 0
        
        # Enhanced object categories with confidence thresholds
        self.object_confidence_thresholds = {
            # High-priority safety objects - lower threshold
            "knife": 0.60,
            "scissors": 0.60,
            "bear": 0.55,
            "fire hydrant": 0.70,
            
            # Vehicles - medium threshold
            "truck": 0.70,
            "bus": 0.70,
            "car": 0.75,
            "bicycle": 0.70,
            "motorcycle": 0.70,
            "train": 0.65,
            
            # Animals - higher threshold (commonly misdetected)
            "dog": 0.80,
            "cat": 0.85,  # Highest threshold for cats
            "bird": 0.85,
            
            # People and common objects
            "person": 0.75,
            "stop sign": 0.80,
            "traffic light": 0.75,
        }
        
        self.high_priority_objects = {
            "knife", "scissors", "fire hydrant", "bear", "truck",
            "bus", "dog", "person", "cat", "car", "bicycle", 
            "motorcycle", "train", "stop sign", "traffic light"
        }
        
        self.local_descriptions = {
            "knife": "Warning: Sharp knife detected. Exercise caution.",
            "scissors": "Scissors present. Handle with care.",
            "person": "Person detected in the area.",
            "dog": "Dog spotted nearby.",
            "cat": "Cat detected in the vicinity.",
            "car": "Vehicle present - stay alert.",
            "truck": "Large truck detected.",
            "bus": "Bus in the area.",
            "bicycle": "Bicycle detected.",
            "motorcycle": "Motorcycle present.",
            "train": "Train detected - maintain safe distance.",
            "fire hydrant": "Fire hydrant located nearby.",
            "bear": "DANGER: Bear detected. Move to safety immediately.",
            "stop sign": "Stop sign ahead.",
            "traffic light": "Traffic light detected."
        }
    
    def get_confidence_threshold(self, obj_name: str) -> float:
        """Get confidence threshold for specific object"""
        return self.object_confidence_thresholds.get(obj_name, 0.60)  # Default threshold
    
    def speak_async(self, text: str):
        """Thread-safe text-to-speech"""
        self.tts.speak(text)
    
    def convert_cv2_to_bytes(self, img) -> io.BytesIO:
        """Convert OpenCV image to BytesIO with error handling"""
        try:
            _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 85])
            return io.BytesIO(buffer.tobytes())
        except Exception as e:
            logger.error(f"Image conversion error: {e}")
            raise
    
    def ask_gemini_about_image(self, image_bytes_io: io.BytesIO, prompt: str) -> str:
        """Robust Gemini API call with retries"""
        for attempt in range(self.config.MAX_GEMINI_RETRIES):
            try:
                image_bytes_io.seek(0)  # Reset buffer position
                pil_image = Image.open(image_bytes_io)
                
                contents = [prompt, pil_image]
                response = self.gemini_model.generate_content(
                    contents, 
                    generation_config=self.generation_config
                )
                
                # Clean up PIL image
                pil_image.close()
                return response.text
                
            except Exception as e:
                logger.warning(f"Gemini API attempt {attempt + 1} failed: {e}")
                if attempt == self.config.MAX_GEMINI_RETRIES - 1:
                    return "Unable to get detailed description at this time."
                time.sleep(2 ** attempt)  # Exponential backoff
        
        return "Description service temporarily unavailable."
    
    def connect_to_stream(self) -> cv2.VideoCapture:
        """Establish connection to video stream with retries"""
        max_attempts = 3
        for attempt in range(max_attempts):
            try:
                cap = cv2.VideoCapture(self.config.STREAM_URL)
                if cap.isOpened():
                    logger.info(f"Connected to stream: {self.config.STREAM_URL}")
                    return cap
                else:
                    logger.warning(f"Stream connection attempt {attempt + 1} failed")
                    time.sleep(2)
            except Exception as e:
                logger.error(f"Stream connection error: {e}")
                time.sleep(2)
        
        raise ConnectionError(f"Failed to connect to stream after {max_attempts} attempts")
    
    def process_detections(self, frame, results):
        """Process YOLO detection results with enhanced filtering"""
        current_time = time.time()
        current_frame_detections = set()
        valid_detections = []
        
        # First pass: filter by confidence thresholds
        for box in results[0].boxes:
            cls_id = int(box.cls[0])
            conf = float(box.conf[0])
            name = self.yolo_model.names[cls_id]
            
            # Apply object-specific confidence threshold
            threshold = self.get_confidence_threshold(name)
            if conf >= threshold:
                current_frame_detections.add(name)
                valid_detections.append((box, name, conf))
                logger.debug(f"Valid detection: {name} (confidence: {conf:.2f}, threshold: {threshold:.2f})")
        
        # Update detection tracker
        self.detection_tracker.add_frame_detections(current_frame_detections, current_time)
        
        # Second pass: process confirmed detections
        for box, name, conf in valid_detections:
            # Check if object should be announced
            if self.detection_tracker.should_announce(name, current_time):
                logger.info(f"Confirmed detection: {name} (confidence: {conf:.2f})")
                self.speak_async(f"{name} detected")
                self.detection_tracker.mark_as_announced(name)
            
            # Handle high-priority objects with Gemini (separate from basic announcement)
            if name in self.high_priority_objects:
                self.handle_high_priority_object(frame, box, name, conf, current_time)
    
    def handle_high_priority_object(self, frame, box, name: str, conf: float, current_time: float):
        """Handle high-priority object detection with Gemini integration"""
        # Check global cooldown
        if current_time - self.global_gemini_last_called_time < self.config.GEMINI_COOLDOWN:
            return
        
        # Check per-object cooldown
        last_call = self.gemini_last_called.get(name, 0)
        if current_time - last_call <= self.config.OBJECT_COOLDOWN:
            return
        
        # Check if we should get Gemini description
        if not self.detection_tracker.should_announce_gemini(name):
            return
        
        # Extract object region
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
        
        if (x2 - x1) <= 0 or (y2 - y1) <= 0:
            return
        
        # Mark that we're processing this object with Gemini
        self.detection_tracker.mark_gemini_as_announced(name)
        logger.info(f"Initiating Gemini processing for: {name}")
        
        # Process with Gemini in separate thread
        object_crop = frame[y1:y2, x1:x2].copy()
        threading.Thread(
            target=self.process_with_gemini,
            args=(object_crop, name, current_time),
            daemon=True
        ).start()
    
    def process_with_gemini(self, object_crop, name: str, timestamp: float):
        """Process object with Gemini API (runs in separate thread)"""
        try:
            img_bytes = self.convert_cv2_to_bytes(object_crop)
            prompt = (
                f"Describe this {name} to a visually impaired person. "
                f"Focus on safety considerations and important details. "
                f"Keep it concise - 1-2 sentences maximum."
            )
            
            logger.info(f"Requesting Gemini description for: {name}")
            gemini_response = self.ask_gemini_about_image(img_bytes, prompt)
            
            if gemini_response and "unavailable" not in gemini_response.lower():
                logger.info(f"Gemini response for {name}: {gemini_response}")
                # Always speak the Gemini response - this is separate from the initial detection
                self.speak_async(gemini_response)
                self.gemini_last_called[name] = timestamp
                self.global_gemini_last_called_time = timestamp
            else:
                # Fallback to local description
                fallback = self.local_descriptions.get(name, f"Detailed description of {name} unavailable")
                logger.info(f"Using fallback description for {name}: {fallback}")
                self.speak_async(fallback)
                
        except Exception as e:
            logger.error(f"Gemini processing error for {name}: {e}")
            # Use local fallback
            fallback = self.local_descriptions.get(name, f"Unable to describe {name} in detail")
            self.speak_async(fallback)
    
    def should_process_frame(self) -> bool:
        """Determine if current frame should be processed"""
        self.frame_count += 1
        return self.frame_count % self.config.PROCESS_EVERY_N_FRAMES == 0
    
    def reset_recent_detections(self):
        """Periodically reset recent detections"""
        current_time = time.time()
        if current_time - self.last_reset_time > self.config.DETECTION_RESET_INTERVAL:
            self.detection_tracker.reset_announced()
            self.last_reset_time = current_time
            logger.debug("Recent detections cleared")
    
    def handle_keyboard_input(self, key):
        """Handle keyboard commands"""
        if key == ord('f'):  # 'f' for false positive
            # Mark the most recently detected object as false positive
            # This is a simple implementation - you could enhance it to show a menu
            print("\nMark object as false positive? Enter object name (or 'cancel'):")
            # Note: In a real implementation, you'd want to handle this more elegantly
            # This is just a demonstration of the concept
            pass
    
    def run(self):
        """Main application loop"""
        try:
            cap = self.connect_to_stream()
            logger.info("Starting enhanced vision assistance")
            logger.info("Controls: 'q' to quit, 'f' to mark false positive")
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    logger.warning("Failed to grab frame, attempting reconnection")
                    cap.release()
                    time.sleep(2)
                    try:
                        cap = self.connect_to_stream()
                        continue
                    except ConnectionError:
                        logger.error("Failed to reconnect, exiting")
                        break
                
                # Process frame selectively
                if self.should_process_frame():
                    try:
                        results = self.yolo_model(frame, verbose=False)
                        self.process_detections(frame, results)
                        
                        # Display annotated frame
                        annotated_frame = results[0].plot()
                        cv2.imshow("Enhanced Smart Vision Assistant", annotated_frame)
                    except Exception as e:
                        logger.error(f"Frame processing error: {e}")
                
                # Reset detections periodically
                self.reset_recent_detections()
                
                # Check for quit command
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    logger.info("Quit command received")
                    break
                elif key == ord('f'):
                    self.handle_keyboard_input(key)
                    
        except KeyboardInterrupt:
            logger.info("Application interrupted by user")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        finally:
            self.cleanup(cap)
    
    def cleanup(self, cap):
        """Clean up resources"""
        try:
            cap.release()
            cv2.destroyAllWindows()
            self.tts.stop()  # Stop the thread-safe TTS
            logger.info("Cleanup completed")
        except Exception as e:
            logger.error(f"Cleanup error: {e}")

def main():
    """Main entry point"""
    try:
        config = Config()
        assistant = SmartVisionAssistant(config)
        assistant.run()
    except Exception as e:
        logger.error(f"Application failed to start: {e}")

if __name__ == "__main__":
    main()

2025-06-07 00:51:42,115 - INFO - Gemini API configured successfully
2025-06-07 00:51:42,242 - INFO - YOLO model yolov8l.pt loaded successfully
2025-06-07 00:51:42,242 - INFO - Thread-safe TTS engine initialized
2025-06-07 00:51:42,276 - INFO - TTS worker thread started
2025-06-07 00:51:42,295 - INFO - Connected to stream: http://192.168.1.20:81/stream
2025-06-07 00:51:42,296 - INFO - Starting enhanced vision assistance
2025-06-07 00:51:42,296 - INFO - Controls: 'q' to quit, 'f' to mark false positive
2025-06-07 00:52:34,922 - INFO - Initiating Gemini processing for: person
2025-06-07 00:52:34,922 - INFO - Requesting Gemini description for: person
2025-06-07 00:52:38,356 - INFO - Gemini response for person: He appears to be a man with dark, curly hair, glasses, and a beard, wearing earphones;  be mindful of cords potentially creating a tripping hazard.

2025-06-07 00:52:38,356 - INFO - Speaking: He appears to be a man with dark, curly hair, glasses, and a beard, wearing earphones;  be m

: 