In [1]:
import os
from typing import Iterator
from agno.agent import Agent, RunResponse
from agno.models.google import Gemini
from kokoro import KPipeline
from IPython.display import display, Audio
import soundfile as sf
import os
import threading
import queue
import re
import numpy as np
import time

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
pipeline = KPipeline(lang_code='a')  # 'a' for American English
voice = 'af_heart'  # Choose your preferred voice



  WeightNorm.apply(module, name, dim)


In [3]:
agent = Agent(
        model=Gemini(id="gemini-2.0-flash", api_key=""),
        description="You are a voice-assistant model",
        instructions="include proper punctuations for good pronunciation and do not use any markdown or symbols")

In [4]:
os.makedirs("audio_chunks", exist_ok=True)

In [5]:

class RealTimeAudioStreamer:
    def __init__(self, tts_pipeline, voice="af_heart", speed=1.0):
        self.pipeline = tts_pipeline
        self.voice = voice
        self.speed = speed
        self.text_queue = queue.Queue()
        self.audio_queue = queue.Queue()
        self.text_processing_thread = None
        self.audio_playback_thread = None
        self.stop_event = threading.Event()
        self.full_text = ""
        self.chunk_index = 0
        self.sentence_buffer = ""
        
    def process_text_to_audio(self):
        """Thread function to convert text chunks to audio"""
        while not self.stop_event.is_set() or not self.text_queue.empty():
            try:
                # Get text from queue with 0.1s timeout
                text_chunk = self.text_queue.get(timeout=0.2)
                
                # Add to sentence buffer
                self.sentence_buffer += text_chunk
                
                # Check if we have complete sentences to process
                sentences = self._split_into_sentences(self.sentence_buffer)
                
                if sentences:
                    # Process complete sentences
                    keep_buffer = sentences.pop()  # Keep incomplete sentence in buffer
                    self.sentence_buffer = keep_buffer
                    
                    for sentence in sentences:
                        if sentence.strip():
                            # Generate audio for this sentence
                            for _, _, audio in self.pipeline(sentence, voice=self.voice, speed=self.speed):
                                # Put audio in queue for playback thread
                                self.audio_queue.put(audio)
                                
                                # Save the audio chunk
                                audio_path = f"audio_chunks/chunk_{self.chunk_index}.wav"
                                sf.write(audio_path, audio, 24000)
                                self.chunk_index += 1
                
                self.text_queue.task_done()
            except queue.Empty:
                pass
            except Exception as e:
                print(f"Error in audio processing: {e}")
    
    def play_audio(self, autoplay=True):
        """Thread function to play audio chunks as they become available"""
        while not self.stop_event.is_set() or not self.audio_queue.empty():
            try:
                # Get audio from queue with 0.1s timeout
                audio = self.audio_queue.get(timeout=0.1)
                
                # Play the audio
                display(Audio(data=audio, rate=24000, autoplay=autoplay))
                
                self.audio_queue.task_done()
                
                # Small delay to avoid overloading the display
                time.sleep(0.05)
            except queue.Empty:
                pass
            except Exception as e:
                print(f"Error in audio playback: {e}")
    
    def _split_into_sentences(self, text):
        """Split text into sentences, keeping the last incomplete sentence in buffer"""
        # Basic sentence splitting on punctuation
        sentence_endings = re.finditer(r'([.!?])\s+', text)
        
        # Find positions of all sentence endings
        end_positions = [match.end() for match in sentence_endings]
        
        if not end_positions:
            # No complete sentence found, return original text as buffer
            return [text]
        
        # Extract complete sentences
        sentences = []
        start_pos = 0
        
        for end_pos in end_positions:
            sentences.append(text[start_pos:end_pos])
            start_pos = end_pos
        
        # Add remaining text (incomplete sentence) as the last item
        sentences.append(text[start_pos:])
        
        return sentences
    
    def start(self):
        """Start the processing and playback threads"""
        self.stop_event.clear()
        
        # Start text processing thread
        self.text_processing_thread = threading.Thread(
            target=self.process_text_to_audio, 
            daemon=True
        )
        self.text_processing_thread.start()
        
        # Start audio playback thread
        self.audio_playback_thread = threading.Thread(
            target=self.play_audio, 
            daemon=True
        )
        self.audio_playback_thread.start()
    
    def stop(self):
        """Stop the processing and playback threads"""
        self.stop_event.set()
        
        # Process any remaining text in the buffer
        if self.sentence_buffer.strip():
            for _, _, audio in self.pipeline(self.sentence_buffer, voice=self.voice, speed=self.speed):
                self.audio_queue.put(audio)
                audio_path = f"audio_chunks/chunk_{self.chunk_index}.wav"
                sf.write(audio_path, audio, 24000)
                self.chunk_index += 1
        
        # Wait for threads to finish
        if self.text_processing_thread:
            self.text_processing_thread.join(timeout=5)
        if self.audio_playback_thread:
            self.audio_playback_thread.join(timeout=5)
        
        # Generate complete audio file
        self._save_full_audio()
    
    def add_text(self, text):
        """Add text to the processing queue"""
        self.text_queue.put(text)
        self.full_text += text
    
    def _save_full_audio(self):
        """Generate and save audio for the full text"""
        print("Generating full audio from complete response...")
        full_audio = []
        for _, _, audio in pipeline(self.full_text, voice=self.voice, speed=self.speed, split_pattern=r'\n+'):
            full_audio.append(audio)
        
        # Concatenate and save full audio
        if full_audio:
            full_audio_array = np.concatenate(full_audio)
            sf.write("audio_chunks/full_response.wav", full_audio_array, 24000)

In [6]:
def process_stream_to_speech_realtime(prompt: str, play_audio: bool = True):
    """Process streamed LLM response with real-time audio conversion and playback"""
    # Get streaming response
    response_stream: Iterator[RunResponse] = agent.run(prompt, stream=True)
    
    # Create and start audio streamer
    audio_streamer = RealTimeAudioStreamer(pipeline, voice=voice)
    audio_streamer.start()
    
    try:
        # Process each chunk as it arrives
        for chunk in response_stream:
            if chunk.content:
                # Add text to the processing queue
                audio_streamer.add_text(chunk.content)
        
        # Stop streamer when all text is processed
        audio_streamer.stop()
        
        return audio_streamer.full_text
    except Exception as e:
        audio_streamer.stop()
        print(f"Error during streaming: {e}")
        return audio_streamer.full_text


In [7]:
# Example usage
prompt = "who is david goggins"
full_response = process_stream_to_speech_realtime(prompt)
print("Complete response:", full_response) 

Generating full audio from complete response...
Complete response: David Goggins is an American ultramarathon runner, ultra-distance cyclist, triathlete, motivational speaker, and author. He is a retired United States Navy SEAL and former world record holder for the most pull-ups done in 24 hours.

