In [2]:
# Cell 1: Install dependencies and imports
# This cell handles package installation and imports all necessary libraries

# First uninstall any existing OpenCV installations
!pip uninstall opencv-python opencv-python-headless opencv-contrib-python -y

# Install the full version with GUI support
!pip install opencv-python

import cv2
import torch
import time
import threading
import numpy as np
from PIL import Image
from transformers import CLIPProcessor, CLIPModel, BlipProcessor, BlipForQuestionAnswering
import speech_recognition as sr
import pyttsx3
import queue
import easyocr
import os

# Force OpenCV to use a specific backend
os.environ["OPENCV_VIDEOIO_PRIORITY_MSMF"] = "0"  # For Windows

Found existing installation: opencv-python 4.11.0.86
Uninstalling opencv-python-4.11.0.86:
  Successfully uninstalled opencv-python-4.11.0.86




Collecting opencv-python
  Using cached opencv_python-4.11.0.86-cp37-abi3-win_amd64.whl.metadata (20 kB)
Using cached opencv_python-4.11.0.86-cp37-abi3-win_amd64.whl (39.5 MB)
Installing collected packages: opencv-python
Successfully installed opencv-python-4.11.0.86


In [3]:
# Cell 2: Class definition - Initialization and setup
# This contains the class definition with initialization code

class RealtimeVisionAssistant:
    def __init__(self):
        print("Initializing models. This may take a moment...")
        try:
            # Vision models
            self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14-336")
            self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14-336")
            
            # VQA model
            self.vqa_processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-capfilt-large")
            self.vqa_model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-capfilt-large")
            
            # OCR
            self.reader = easyocr.Reader(['en'])
            
            # Speech recognition and synthesis
            self.recognizer = sr.Recognizer()
            self.engine = None  # will init in thread
            
            # Common objects
            self.object_classes = [
                "person", "bicycle", "car", "motorcycle", "bus", "truck", "traffic light",
                "fire hydrant", "stop sign", "bench", "chair", "couch", "table", "door",
                "stairs", "elevator", "bottle", "cup", "fork", "knife", "spoon", "bowl",
                "laptop", "phone", "keyboard", "microwave", "oven", "sink", "refrigerator", "book"
            ]
            
            self.important_objects = ["person", "car", "bicycle", "dog", "stairs", "door", "obstacle"]
            
            print("Models loaded successfully")
        except Exception as e:
            print(f"Error initializing: {e}")
            raise

        # States
        self.current_frame = None
        self.last_processed_frame = None
        self.last_analysis_time = 0
        self.conversation_history = []
        self.running = False
        self.camera = None

        # Queues
        self.speech_queue = queue.Queue()
        self.command_queue = queue.Queue()
        
        # Settings
        self.analysis_interval = 3.0
        self.auto_describe = False
        self.last_description_time = 0
        self.voice_mode = "casual"

In [4]:
# Cell 3: Camera control methods
# This cell contains methods for starting/stopping the camera

def start_camera(self):
    try:
        self.camera = cv2.VideoCapture(0)
        time.sleep(2)
        if not self.camera.isOpened():
            self.speak("Camera failed to initialize. Please check connections.")
            return False
        self.running = True
        print("Camera started successfully")
        return True
    except Exception as e:
        print(f"Camera error: {e}")
        self.speak("Camera failed to start")
        return False

def stop_camera(self):
    self.running = False
    if self.camera:
        self.camera.release()
    try:
        cv2.destroyAllWindows()
    except:
        pass  # Ignore if windows can't be destroyed
    print("Camera stopped")

# Add these methods to the class
RealtimeVisionAssistant.start_camera = start_camera
RealtimeVisionAssistant.stop_camera = stop_camera

In [5]:
# Cell 4: Speech and voice methods
# This cell handles speech synthesis and voice recognition

def speak(self, text):
    print(f"Assistant: {text}")
    self.speech_queue.put(text)

def speech_worker(self):
    try:
        self.engine = pyttsx3.init()
        self.engine.setProperty('rate', 180)
        while self.running:
            try:
                if not self.speech_queue.empty():
                    text = self.speech_queue.get()
                    self.engine.say(text)
                    self.engine.runAndWait()
                else:
                    time.sleep(0.1)
            except Exception as e:
                print(f"Speech error: {e}")
                time.sleep(0.5)
    except Exception as e:
        print(f"Speech worker error: {e}")

def listen_for_commands(self):
    print("Voice recognition active. Listening for commands...")
    with sr.Microphone() as source:
        self.recognizer.adjust_for_ambient_noise(source, duration=1)
        while self.running:
            try:
                print("Listening...")
                audio = self.recognizer.listen(source, timeout=5, phrase_time_limit=5)
                print("Processing speech...")
                try:
                    command = self.recognizer.recognize_google(audio).lower()
                    print(f"You said: {command}")
                    self.command_queue.put(command)
                except sr.UnknownValueError:
                    pass
                except sr.RequestError as e:
                    print(f"Speech recognition service error: {e}")
            except sr.WaitTimeoutError:
                pass
            except Exception as e:
                print(f"Listening error: {e}")
                time.sleep(0.5)

# Add these methods to the class
RealtimeVisionAssistant.speak = speak
RealtimeVisionAssistant.speech_worker = speech_worker
RealtimeVisionAssistant.listen_for_commands = listen_for_commands

In [6]:
# Cell 5: Vision processing methods
# This cell contains methods for object detection, text recognition, and analysis

def detect_text(self, image):
    try:
        if not isinstance(image, np.ndarray):
            image = np.array(image)
        results = self.reader.readtext(image)
        return [text for _, text, conf in results if conf > 0.3]
    except Exception as e:
        print(f"Text detection error: {e}")
        return []

def identify_objects(self, image):
    try:
        inputs = self.clip_processor(
            text=["a photo of a " + obj for obj in self.object_classes],
            images=image, 
            return_tensors="pt", 
            padding=True
        )
        with torch.no_grad():
            outputs = self.clip_model(**inputs)
            logits_per_image = outputs.logits_per_image
            probs = logits_per_image.softmax(dim=1)
        detected_objects = []
        for i, obj in enumerate(self.object_classes):
            confidence = probs[0][i].item()
            if confidence > 0.3:
                detected_objects.append((obj, confidence))
        detected_objects.sort(key=lambda x: x[1], reverse=True)
        return detected_objects
    except Exception as e:
        print(f"Object detection error: {e}")
        return []

def generate_description(self, image):
    if image is None:
        return "No image available to analyze"
    text_elements = self.detect_text(image)
    objects = self.identify_objects(image)
    priority_objects = [obj for obj, conf in objects if obj in self.important_objects]
    other_objects = [obj for obj, conf in objects if obj not in self.important_objects][:3]

    description_parts = []
    if any(obj == "person" for obj, _ in objects):
        num_people = sum(1 for obj, _ in objects if obj == "person")
        if num_people == 1:
            description_parts.append("There's a person in front of you")
        else:
            description_parts.append(f"There are about {num_people} people in front of you")
    important = [obj for obj in priority_objects if obj != "person"]
    if important:
        if len(important) == 1:
            description_parts.append(f"I can see a {important[0]}")
        else:
            obj_text = ", ".join(important[:-1]) + f" and a {important[-1]}"
            description_parts.append(f"I can see a {obj_text}")
    if self.voice_mode == "detailed" and other_objects:
        if len(other_objects) == 1:
            description_parts.append(f"There's also a {other_objects[0]}")
        else:
            obj_text = ", ".join(other_objects[:-1]) + f" and a {other_objects[-1]}"
            description_parts.append(f"There are also {obj_text}")
    if text_elements:
        if len(text_elements) == 1 and len(text_elements[0]) < 50:
            description_parts.append(f"I can read the text: '{text_elements[0]}'")
        elif len(text_elements) > 0:
            description_parts.append(f"I can see some text that includes '{text_elements[0]}'")

    if not description_parts:
        return "I can see an image, but I can't clearly identify specific objects or text in it."
    description = ". ".join(description_parts) + "."
    return description

# Add these methods to the class
RealtimeVisionAssistant.detect_text = detect_text
RealtimeVisionAssistant.identify_objects = identify_objects
RealtimeVisionAssistant.generate_description = generate_description

In [7]:
# Cell 6: Command processing and question answering
# This cell handles user commands and visual question answering

def process_command(self, command):
    if any(x in command for x in ["stop", "quit", "exit"]):
        self.speak("Shutting down assistant")
        self.running = False
        return
    
    if "describe" in command or "what do you see" in command:
        self.analyze_current_frame(force=True)
        return
    
    if "auto describe on" in command or "start describing" in command:
        self.auto_describe = True
        self.speak("Automatic descriptions turned on")
        return
    
    if "auto describe off" in command or "stop describing" in command:
        self.auto_describe = False
        self.speak("Automatic descriptions turned off")
        return
    
    if "casual mode" in command:
        self.voice_mode = "casual"
        self.speak("Switched to casual description mode")
        return
    
    if "detailed mode" in command:
        self.voice_mode = "detailed"
        self.speak("Switched to detailed description mode")
        return
    
    self.answer_question(command)

def answer_question(self, question):
    if self.current_frame is None:
        self.speak("I don't have an image to analyze at the moment.")
        return

    pil_image = Image.fromarray(cv2.cvtColor(self.current_frame, cv2.COLOR_BGR2RGB))

    if any(keyword in question.lower() for keyword in ["what do you see", "describe", "what's there"]):
        description = self.generate_description(pil_image)
        self.speak(description)
        return

    if any(keyword in question.lower() for keyword in ["text", "read", "say", "writing"]):
        text = self.detect_text(pil_image)
        if text:
            self.speak(f"I can read the following text: {', '.join(text[:3])}")
        else:
            self.speak("I don't see any readable text in view right now.")
        return

    try:
        inputs = self.vqa_processor(pil_image, question, return_tensors="pt")
        with torch.no_grad():
            output = self.vqa_model.generate(**inputs, max_length=50)
        answer = self.vqa_processor.decode(output[0], skip_special_tokens=True)

        if len(answer) < 5:
            self.speak(f"It looks like {answer}")
        else:
            self.speak(answer)
    except Exception as e:
        print(f"VQA error: {e}")
        self.speak("I'm having trouble answering that.")

def analyze_current_frame(self, force=False):
    current_time = time.time()
    if not force and (current_time - self.last_description_time < self.analysis_interval):
        return
    self.last_description_time = current_time
    if self.current_frame is not None:
        pil_image = Image.fromarray(cv2.cvtColor(self.current_frame, cv2.COLOR_BGR2RGB))
        description = self.generate_description(pil_image)
        self.speak(description)

# Add these methods to the class
RealtimeVisionAssistant.process_command = process_command
RealtimeVisionAssistant.answer_question = answer_question
RealtimeVisionAssistant.analyze_current_frame = analyze_current_frame

In [10]:
# Cell 7: Main execution loop
# This cell contains the main run method and execution code

def run(self):
    """Main method to start all components"""
    if not self.start_camera():
        return
        
    try:
        # Start speech and listening threads
        threading.Thread(target=self.speech_worker, daemon=True).start()
        threading.Thread(target=self.listen_for_commands, daemon=True).start()

        # Welcome message
        self.speak("Vision assistant is active and ready to help.")
        self.speak("Say 'describe' or 'what do you see' when you want me to describe the scene.")

        # Main processing loop
        while self.running:
            ret, frame = self.camera.read()
            if not ret:
                continue
                
            self.current_frame = frame
            
            try:
                # Display the frame
                cv2.imshow('Vision Assistant', frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to quit
                    self.running = False
                    break
            except:
                # If display fails, continue without it
                pass
            
            # Process any pending commands
            if not self.command_queue.empty():
                command = self.command_queue.get()
                self.process_command(command)
            
            # Auto-describe if enabled
            if self.auto_describe:
                self.analyze_current_frame()
                
            time.sleep(0.05)
            
    except KeyboardInterrupt:
        print("Interrupted by user")
    finally:
        self.stop_camera()

# Add the method to the class
RealtimeVisionAssistant.run = run

# Main execution
print("Starting Realtime Vision Assistant...")
assistant = RealtimeVisionAssistant()
assistant.run()

Starting Realtime Vision Assistant...
Initializing models. This may take a moment...


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


Models loaded successfully
Camera started successfully
Voice recognition active. Listening for commands...
Assistant: Vision assistant is active and ready to help.
Assistant: Say 'describe' or 'what do you see' when you want me to describe the scene.
Listening...
Processing speech...
Listening...
Processing speech...
Listening...
Processing speech...
You said: describe
Listening...
Listening...
Assistant: There's a person in front of you.
Processing speech...
Listening...
Processing speech...
You said: describe
Listening...
Listening...
Assistant: I can see some text that includes 'cello'.
Processing speech...
Listening...
Processing speech...
You said: detailed mode
Listening...
Assistant: Switched to detailed description mode
Processing speech...
Listening...
Listening...
Processing speech...
You said: describe
Listening...
Listening...
Assistant: There's also a phone. I can see some text that includes 'JUNE2025'.
Processing speech...
Listening...
Processing speech...
You said: where