# PRACTICE GUIDE: INTEGRATION OF SPEECH RECOGNITION AND UNDERSTANDING

## OVERVIEW

In this practice session, you will integrate the speech recognition and speech understanding components developed in previous modules to create a cohesive system. This integration will form the foundation for a complete voice assistant.

## OBJECTIVES

1. Design a modular architecture for the voice assistant
2. Implement threaded processing for concurrent operations
3. Create a pipeline for audio capture → speech recognition → intent understanding
4. Develop a responsive feedback system

## PREREQUISITES

- Completion of Modules 1-4
- Understanding of basic threading concepts in Python
- Familiarity with the queue module

## PRACTICE EXERCISES

### Exercise 1: Design the System Architecture

1. Draw a diagram (on paper or using a digital tool) that illustrates:
   - Components of the system (audio capture, recognition, understanding)
   - Data flow between components
   - Threading model
   - User feedback mechanisms
   
2. Document your architecture with comments explaining:
   - How components will communicate
   - How to manage processing load
   - Error handling approach

### Exercise 2: Implement a Threaded Audio Processor

1. Create a new Python file called `threaded_audio_processor.py`
2. Implement a class that:
   - Captures audio in a background thread
   - Processes audio frames as they become available
   - Uses a queue to pass audio data to the recognition component
   - Can be started and stopped cleanly

In [None]:
import threading
import queue
import pyaudio
import time

class ThreadedAudioProcessor:
    def __init__(self, chunk_size=1024, format=pyaudio.paInt16, 
                 channels=1, rate=16000, buffer_size=10):
        """Initialize the threaded audio processor.
        
        Args:
            chunk_size: Size of audio chunks to process
            format: PyAudio format (default: 16-bit PCM)
            channels: Number of audio channels (default: mono)
            rate: Sample rate in Hz
            buffer_size: Maximum number of frames in queue before blocking
        """
        # Create a queue for audio frames
        self.audio_queue = queue.Queue(maxsize=buffer_size)
        
        # Store audio parameters
        self.chunk_size = chunk_size
        self.format = format
        self.channels = channels
        self.rate = rate
        
        # Thread control flags
        self.is_running = False
        self.audio_thread = None
        
        # PyAudio objects
        self.p = None
        self.stream = None
        
    def start_processing(self):
        """Start the audio processing in a background thread."""
        if self.audio_thread is not None and self.audio_thread.is_alive():
            print("Audio processing already running!")
            return
            
        # Create and start the thread
        self.is_running = True
        self.audio_thread = threading.Thread(target=self.process_audio_thread)
        self.audio_thread.daemon = True
        self.audio_thread.start()
        print("Audio processing started.")
        
    def process_audio_thread(self):
        """Process audio in a background thread."""
        try:
            # Initialize PyAudio
            self.p = pyaudio.PyAudio()
            
            # Open audio stream
            self.stream = self.p.open(
                format=self.format,
                channels=self.channels,
                rate=self.rate,
                input=True,
                frames_per_buffer=self.chunk_size
            )
            
            print(f"Audio stream opened: {self.rate}Hz, {self.channels} channel(s)")
            
            # Read frames and add to queue
            while self.is_running:
                try:
                    # Read audio data
                    audio_data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    
                    # Add to queue, block if queue is full
                    self.audio_queue.put(audio_data, block=True, timeout=0.5)
                    
                except queue.Full:
                    print("Warning: Audio queue full, dropping frames")
                    
                except Exception as e:
                    print(f"Error capturing audio: {str(e)}")
                    # Short sleep to prevent tight loop if error persists
                    time.sleep(0.1)
            
            print("Audio capture thread stopping...")
            
        except Exception as e:
            print(f"Error in audio thread: {str(e)}")
            
        finally:
            # Clean up resources
            self.cleanup()
                
    def stop_processing(self):
        """Stop audio processing and clean up resources."""
        print("Stopping audio processing...")
        self.is_running = False
        
        # Wait for thread to finish
        if self.audio_thread and self.audio_thread.is_alive():
            self.audio_thread.join(timeout=2.0)
        
        self.cleanup()
        print("Audio processing stopped.")
        
    def cleanup(self):
        """Clean up audio resources."""
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
            self.stream = None
            
        if self.p is not None:
            self.p.terminate()
            self.p = None
            
    def get_audio_queue(self):
        """Get the queue that contains audio frames."""
        return self.audio_queue
        
    def get_sample_rate(self):
        """Get the audio sample rate."""
        return self.rate

# Test the audio processor
if __name__ == "__main__":
    processor = ThreadedAudioProcessor()
    processor.start_processing()
    
    try:
        # Run for 5 seconds as a test
        print("Recording for 5 seconds...")
        for i in range(5):
            time.sleep(1)
            print(f"Queue size: {processor.get_audio_queue().qsize()}")
            
    finally:
        processor.stop_processing()

### Exercise 3: Create a Threaded Recognition System

1. Create a Python file called `threaded_recognition.py`
2. Implement a class that:
   - Takes audio frames from a queue
   - Processes them with Vosk in a separate thread
   - Outputs recognized text to another queue

In [None]:
from vosk import Model, KaldiRecognizer
import queue
import json
import threading
import os
import time

class ThreadedRecognition:
    def __init__(self, audio_queue, model_path, sample_rate=16000):
        """Initialize the threaded recognition system.
        
        Args:
            audio_queue: Queue containing audio frames to process
            model_path: Path to Vosk model
            sample_rate: Sample rate of the audio
        """
        # Store the audio queue
        self.audio_queue = audio_queue
        
        # Create a text output queue
        self.text_queue = queue.Queue()
        
        # Store model path and sample rate
        self.model_path = model_path
        self.sample_rate = sample_rate
        
        # Thread control flags
        self.is_running = False
        self.recognition_thread = None
        
        # Vosk objects
        self.model = None
        self.recognizer = None
        
        # Recognition state
        self.last_partial = ""
        
    def initialize_model(self):
        """Initialize the Vosk model and recognizer."""
        if not os.path.exists(self.model_path):
            raise ValueError(f"Model path '{self.model_path}' does not exist!")
            
        print(f"Loading Vosk model from {self.model_path}...")
        self.model = Model(self.model_path)
        self.recognizer = KaldiRecognizer(self.model, self.sample_rate)
        print("Vosk model loaded successfully.")
        
    def start_recognition(self):
        """Start the recognition process in a background thread."""
        if self.recognition_thread is not None and self.recognition_thread.is_alive():
            print("Recognition already running!")
            return
            
        # Initialize the model if needed
        if self.model is None or self.recognizer is None:
            self.initialize_model()
            
        # Create and start the thread
        self.is_running = True
        self.recognition_thread = threading.Thread(target=self.recognition_thread_func)
        self.recognition_thread.daemon = True
        self.recognition_thread.start()
        print("Speech recognition started.")
        
    def recognition_thread_func(self):
        """The main recognition thread function."""
        try:
            while self.is_running:
                try:
                    # Get audio data with timeout
                    audio_data = self.audio_queue.get(timeout=0.5)
                    
                    # Process with Vosk
                    if self.recognizer.AcceptWaveform(audio_data):
                        # We have a final result
                        result = json.loads(self.recognizer.Result())
                        text = result.get("text", "").strip()
                        
                        if text:  # Only if we actually got some text
                            print(f"Recognized: {text}")
                            # Put the result in the output queue
                            self.text_queue.put({"type": "final", "text": text})
                            
                            # Reset the partial text
                            self.last_partial = ""
                    else:
                        # We have a partial result
                        result = json.loads(self.recognizer.PartialResult())
                        partial = result.get("partial", "").strip()
                        
                        if partial and partial != self.last_partial:
                            print(f"Partial: {partial}", end="\r")
                            # Update the partial text
                            self.last_partial = partial
                            # Put the partial result in the queue
                            self.text_queue.put({"type": "partial", "text": partial})
                    
                    # Mark task as done
                    self.audio_queue.task_done()
                    
                except queue.Empty:
                    # No audio data available, just continue
                    pass
                    
                except Exception as e:
                    print(f"Error processing audio: {str(e)}")
                    time.sleep(0.1)  # Prevent tight loop if error persists
                    
            print("Recognition thread stopping...")
            
        except Exception as e:
            print(f"Error in recognition thread: {str(e)}")
            
    def stop_recognition(self):
        """Stop the recognition process."""
        print("Stopping speech recognition...")
        self.is_running = False
        
        # Wait for thread to finish
        if self.recognition_thread and self.recognition_thread.is_alive():
            self.recognition_thread.join(timeout=2.0)
            
        print("Speech recognition stopped.")
        
    def get_text_queue(self):
        """Get the queue that contains recognized text."""
        return self.text_queue

# Test the recognition system
if __name__ == "__main__":
    from threaded_audio_processor import ThreadedAudioProcessor
    
    # Path to your Vosk model
    model_path = "/home/luar/AI/voice_assistant/vosk-model-small-en-us-0.15"
    
    # Create audio processor
    audio_processor = ThreadedAudioProcessor()
    audio_queue = audio_processor.get_audio_queue()
    
    # Create recognition system
    recognizer = ThreadedRecognition(audio_queue, model_path)
    
    # Start both systems
    audio_processor.start_processing()
    recognizer.start_recognition()
    
    try:
        print("Say something (running for 15 seconds)...")
        # Run for a while to test
        time.sleep(15)
    finally:
        # Stop everything
        recognizer.stop_recognition()
        audio_processor.stop_processing()

### Exercise 4: Implement a Threaded Understanding Manager

1. Create a Python file called `threaded_understanding.py`
2. Implement a class that:
   - Takes recognized text from a queue
   - Processes it to extract intents and entities
   - Maintains conversation context across turns

In [None]:
import queue
import threading
import re
import time

class ThreadedUnderstanding:
    def __init__(self, text_queue):
        """Initialize the threaded understanding manager.
        
        Args:
            text_queue: Queue containing recognized text to process
        """
        # Store the text queue
        self.text_queue = text_queue
        
        # Create an output queue for intents and entities
        self.intent_queue = queue.Queue()
        
        # Thread control flags
        self.is_running = False
        self.understanding_thread = None
        
        # Initialize intent patterns
        self.intent_patterns = {
            "greeting": [
                r"(hello|hi|hey|greetings)( there| assistant| voice assistant)?",
                r"good (morning|afternoon|evening)"
            ],
            "farewell": [
                r"(goodbye|bye|see you( later)?)",
                r"(exit|quit|stop)( assistant| program)?"
            ],
            "weather_inquiry": [
                r"(what|how)('s| is) (the )?weather( like)?( in (?P<location>\w+))?",
                r"(weather|forecast)( in| for) (?P<location>[\w\s]+)"
            ],
            "time_inquiry": [
                r"what('s| is) (the )?time( now)?",
                r"(tell|give) me the (current |)time"
            ],
            "device_control": [
                r"(turn|switch) (?P<action>on|off) (the )?(?P<device>[\w\s]+)( please)?"
            ]
        }
        
        # Initialize context
        self.context = {
            "last_intent": None,
            "entities": {},
            "conversation_history": []
        }
        
    def start_understanding(self):
        """Start the understanding process in a background thread."""
        if self.understanding_thread is not None and self.understanding_thread.is_alive():
            print("Understanding process already running!")
            return
            
        # Create and start the thread
        self.is_running = True
        self.understanding_thread = threading.Thread(target=self.understanding_thread_func)
        self.understanding_thread.daemon = True
        self.understanding_thread.start()
        print("Speech understanding started.")
        
    def understanding_thread_func(self):
        """The main understanding thread function."""
        try:
            while self.is_running:
                try:
                    # Get text data with timeout
                    text_data = self.text_queue.get(timeout=0.5)
                    
                    # Only process final results
                    if text_data["type"] == "final":
                        text = text_data["text"]
                        
                        # Process the text
                        intent, entities = self.detect_intent(text)
                        
                        # Handle context
                        intent, entities = self.apply_context(intent, entities, text)
                        
                        # Update context
                        self.update_context(intent, entities, text)
                        
                        # Add to output queue
                        self.intent_queue.put({
                            "intent": intent,
                            "entities": entities,
                            "text": text
                        })
                        
                        print(f"Intent: {intent}, Entities: {entities}")
                    
                    # Mark task as done
                    self.text_queue.task_done()
                    
                except queue.Empty:
                    # No text data available, just continue
                    pass
                    
                except Exception as e:
                    print(f"Error processing text: {str(e)}")
                    time.sleep(0.1)
                    
            print("Understanding thread stopping...")
            
        except Exception as e:
            print(f"Error in understanding thread: {str(e)}")
            
    def detect_intent(self, text):
        """
        Detect the intent from the given text.
        
        Args:
            text: The text to analyze
            
        Returns:
            A tuple of (intent, entities) where entities is a dictionary
        """
        # Convert text to lowercase for case-insensitive matching
        text = text.lower()
        
        # Check each intent and its patterns
        for intent, patterns in self.intent_patterns.items():
            for pattern in patterns:
                match = re.search(pattern, text)
                if match:
                    # Extract entities from the match
                    entities = self._extract_entities(intent, match, text)
                    return intent, entities
        
        # No match found, return unknown intent
        return "unknown", {}
        
    def _extract_entities(self, intent, match, text):
        """Extract entities from the regex match."""
        entities = {}
        
        # Extract named groups from the regex match
        for group_name in match.groupdict():
            entities[group_name] = match.group(group_name)
            
        # Special handling for certain intents
        if intent == "weather_inquiry" and "location" not in entities:
            # Try to extract location after "in"
            location_match = re.search(r"in (?P<location>[\w\s]+)$", text.lower())
            if location_match:
                entities["location"] = location_match.group("location").strip()
                
        return entities
        
    def apply_context(self, intent, entities, text):
        """Apply context to handle ambiguous queries."""
        # If this is an unknown intent, see if we can use context
        if intent == "unknown":
            last_intent = self.context.get("last_intent")
            
            # Handle follow-up queries about weather
            if last_intent == "weather_inquiry":
                # Look for time references
                if any(word in text.lower() for word in ["tomorrow", "later", "weekend"]):
                    # This is likely a follow-up about weather
                    intent = "weather_inquiry"
                    entities["location"] = self.context["entities"].get("location", "current location")
                    
                    # Extract time reference
                    if "tomorrow" in text.lower():
                        entities["time"] = "tomorrow"
                    elif "weekend" in text.lower():
                        entities["time"] = "this weekend"
                    else:
                        entities["time"] = "later"
        
        return intent, entities
        
    def update_context(self, intent, entities, text):
        """Update the conversation context."""
        # Update last intent
        self.context["last_intent"] = intent
        
        # Update entities (only add new ones, don't remove existing)
        for key, value in entities.items():
            self.context["entities"][key] = value
            
        # Add to conversation history
        self.context["conversation_history"].append({
            "text": text,
            "intent": intent,
            "entities": entities.copy(),
            "timestamp": time.time()
        })
        
        # Trim history if too long
        if len(self.context["conversation_history"]) > 10:
            self.context["conversation_history"] = self.context["conversation_history"][-10:]
            
    def stop_understanding(self):
        """Stop the understanding process."""
        print("Stopping speech understanding...")
        self.is_running = False
        
        # Wait for thread to finish
        if self.understanding_thread and self.understanding_thread.is_alive():
            self.understanding_thread.join(timeout=2.0)
            
        print("Speech understanding stopped.")
        
    def get_intent_queue(self):
        """Get the queue that contains detected intents."""
        return self.intent_queue

# Test the understanding system
if __name__ == "__main__":
    # Create a test queue
    text_queue = queue.Queue()
    
    # Create the understanding system
    understanding = ThreadedUnderstanding(text_queue)
    
    # Start the system
    understanding.start_understanding()
    
    try:
        # Put some test phrases in the queue
        test_phrases = [
            "hello there",
            "what's the weather like today",
            "what's the weather like in New York",
            "what time is it",
            "turn on the living room lights",
            "how about tomorrow"
        ]
        
        for phrase in test_phrases:
            print(f"\nTesting phrase: '{phrase}'")
            text_queue.put({"type": "final", "text": phrase})
            time.sleep(1)  # Give time for processing
            
    finally:
        understanding.stop_understanding()

### Exercise 5: Create the Integrated System

Now, create a main file called `voice_assistant.py` that integrates all the components:

1. The ThreadedAudioProcessor for audio capture
2. The ThreadedRecognition for speech-to-text
3. The ThreadedUnderstanding for intent detection
4. A response generator and action executor

This will create a complete, integrated voice assistant.

In [None]:
import threading
import queue
import time
import random
import os

# Import our components
# Note: In a real project, these would be imported from their respective files
from threaded_audio_processor import ThreadedAudioProcessor
from threaded_recognition import ThreadedRecognition
from threaded_understanding import ThreadedUnderstanding

class ResponseGenerator:
    """Generates responses based on intents and entities."""
    
    def __init__(self):
        """Initialize response templates."""
        self.response_templates = {
            "greeting": [
                "Hello! How can I help you today?",
                "Hi there! What can I do for you?",
                "Greetings! How may I assist you?"
            ],
            "farewell": [
                "Goodbye! Have a great day!",
                "See you later!",
                "Bye for now!"
            ],
            "weather_inquiry": [
                "The weather in {location} is {condition} with a temperature of {temp}°F.",
                "In {location}, it's {condition} and {temp}°F.",
                "The forecast for {location} shows {condition} conditions and {temp}°F."
            ],
            "time_inquiry": [
                "The current time is {time}.",
                "It's {time} right now.",
                "The time is {time}."
            ],
            "device_control": [
                "I've turned {action} the {device}.",
                "The {device} is now {action}.",
                "{device} turned {action}."
            ],
            "unknown": [
                "I'm not sure I understand. Can you rephrase that?",
                "I don't know how to help with that yet.",
                "I didn't quite catch that. What would you like me to do?"
            ]
        }
        
    def generate_response(self, intent, entities):
        """Generate a response based on intent and entities."""
        # If we don't have templates for this intent, use unknown
        if intent not in self.response_templates:
            intent = "unknown"
            
        # Get a random template for this intent
        templates = self.response_templates[intent]
        template = random.choice(templates)
        
        # For specific intents, add mock data
        if intent == "weather_inquiry":
            # Mock weather data
            entities["location"] = entities.get("location", "current location")
            entities["condition"] = random.choice(["sunny", "cloudy", "rainy", "clear"])
            entities["temp"] = random.randint(65, 85)
            
        elif intent == "time_inquiry":
            # Real time
            entities["time"] = time.strftime("%I:%M %p")
            
        # Format the template with entities
        try:
            return template.format(**entities)
        except KeyError:
            # If we're missing required entities, return a fallback
            return "I need more information to help with that."


class ActionExecutor:
    """Executes actions based on intents and entities."""
    
    def execute_action(self, intent, entities):
        """Execute an action based on intent and entities."""
        if intent == "device_control":
            device = entities.get("device", "unknown device")
            action = entities.get("action", "unknown action")
            print(f"[Action] Turning {action} {device}")
            return True
            
        elif intent == "weather_inquiry":
            location = entities.get("location", "current location")
            print(f"[Action] Looking up weather for {location}")
            return True
            
        # No action for other intents
        return False


class VoiceAssistant:
    """Main voice assistant class that integrates all components."""
    
    def __init__(self, model_path):
        """Initialize the voice assistant."""
        self.model_path = model_path
        
        # Create component instances
        self.audio_processor = ThreadedAudioProcessor()
        self.audio_queue = self.audio_processor.get_audio_queue()
        
        self.recognizer = ThreadedRecognition(self.audio_queue, self.model_path)
        self.text_queue = self.recognizer.get_text_queue()
        
        self.understanding = ThreadedUnderstanding(self.text_queue)
        self.intent_queue = self.understanding.get_intent_queue()
        
        self.response_generator = ResponseGenerator()
        self.action_executor = ActionExecutor()
        
        # Thread control
        self.is_running = False
        self.response_thread = None
        
    def start(self):
        """Start the voice assistant."""
        if self.is_running:
            print("Voice assistant is already running!")
            return
            
        print("Starting voice assistant...")
        
        # Start all components
        self.audio_processor.start_processing()
        self.recognizer.start_recognition()
        self.understanding.start_understanding()
        
        # Start response thread
        self.is_running = True
        self.response_thread = threading.Thread(target=self.response_thread_func)
        self.response_thread.daemon = True
        self.response_thread.start()
        
        print("Voice assistant is now listening. Speak clearly!")
        
    def response_thread_func(self):
        """Thread that generates responses and executes actions."""
        try:
            while self.is_running:
                try:
                    # Get intent data with timeout
                    intent_data = self.intent_queue.get(timeout=0.5)
                    
                    intent = intent_data["intent"]
                    entities = intent_data["entities"]
                    
                    # Generate response
                    response = self.response_generator.generate_response(intent, entities)
                    
                    # Execute action
                    self.action_executor.execute_action(intent, entities)
                    
                    # Print response
                    print(f"\nAssistant: {response}")
                    
                    # Mark task as done
                    self.intent_queue.task_done()
                    
                except queue.Empty:
                    # No intent data available, just continue
                    pass
                
        except Exception as e:
            print(f"Error in response thread: {str(e)}")
            
    def stop(self):
        """Stop the voice assistant."""
        if not self.is_running:
            return
            
        print("\nStopping voice assistant...")
        self.is_running = False
        
        # Stop all components
        self.understanding.stop_understanding()
        self.recognizer.stop_recognition()
        self.audio_processor.stop_processing()
        
        # Wait for response thread
        if self.response_thread and self.response_thread.is_alive():
            self.response_thread.join(timeout=2.0)
            
        print("Voice assistant stopped.")


def main():
    """Main function to run the voice assistant."""
    # Path to your Vosk model
    model_path = "/home/luar/AI/voice_assistant/vosk-model-small-en-us-0.15"
    
    # Check if model exists
    if not os.path.exists(model_path):
        print(f"Error: Model not found at {model_path}")
        print("Please download a model from https://alphacephei.com/vosk/models")
        return
    
    # Create and start the voice assistant
    assistant = VoiceAssistant(model_path)
    
    try:
        # Start the assistant
        assistant.start()
        
        # Keep running until interrupted
        print("Press Ctrl+C to exit")
        while True:
            time.sleep(0.1)
            
    except KeyboardInterrupt:
        print("\nInterrupted by user")
        
    finally:
        # Stop the assistant
        assistant.stop()


if __name__ == "__main__":
    main()

## CHALLENGES AND EXTENSIONS

1. **Add Wake Word Detection**: Modify the system to only process speech when triggered by a wake word like "Hey Assistant".

2. **Text-to-Speech Response**: Add a module that converts the assistant's text responses to speech using a TTS library.

3. **Error Recovery**: Improve error handling to make the system more robust against errors in any component.

4. **Performance Optimization**: Profile the system to identify bottlenecks and optimize resource usage.

5. **Voice Activity Detection**: Add VAD to process audio only when speech is detected.

6. **Multiple Intent Support**: Enhance the understanding system to handle multiple intents in a single utterance.

## CONCLUSION

You've now built an integrated voice assistant system that combines real-time speech recognition with natural language understanding. This foundation can be extended with additional capabilities like wake word detection, text-to-speech, and more sophisticated understanding.

In the next module, we'll build on this integration to create a complete voice assistant project that can perform useful tasks.