In [22]:
import speech_recognition as sr
from rapidfuzz import process

# List of predefined commands
commands = [
    "start navigation",
    "stop navigation",
    "zoom in",
    "zoom out",
    "find shortest route",
    "reroute",
    "exit application"
]


def recognize_speech():
    """Capture speech input and convert it to text."""
    recognizer = sr.Recognizer()
    with sr.Microphone() as source:
        print("Listening for a command...")
        recognizer.adjust_for_ambient_noise(source)  # Helps with background noise
        try:
            audio = recognizer.listen(source, timeout=5)  # Listen with a timeout
            text = recognizer.recognize_google(audio)  # Convert speech to text
            return text.lower()
        except sr.UnknownValueError:
            print("Sorry, could not understand the audio.")
            return None
        except sr.RequestError:
            print("Could not request results from speech recognition service.")
            return None
        except sr.WaitTimeoutError:
            print("Listening timed out.")
            return None


def find_best_match(heard_command):
    """Find the best matching command from the list."""
    if not heard_command:
        return None, None, 0

    best_match, score, _ = process.extractOne(heard_command, commands)
    return heard_command, best_match, score


# Continuous listening loop
print("Voice command system activated. Say a command:")
while True:
    heard_command = recognize_speech()
    if heard_command:
        heard, matched, score = find_best_match(heard_command)
        print(f"Heard: {heard}")
        print(f"Matched Command: {matched}")
        print(f"Match Score: {score}")

        if matched == "exit application":
            print("Exiting program...")
            break


Voice command system activated. Say a command:


AttributeError: Could not find PyAudio; check installation

In [17]:

import whisper
import queue

# Load the Whisper model
model = whisper.load_model("small")  # Use "tiny", "base", "small", "medium", or "large"

# Parameters
sample_rate = 16000  # Whisper works best with 16kHz sample rate
block_size = 1024  # Number of frames per block
channels = 1  # Mono audio
duration = 5  # Maximum duration of each command in seconds

# Queue to hold audio data
audio_queue = queue.Queue()


def audio_callback(indata, frames, time, status):
    """This is called for each audio block from the microphone."""
    if status:
        print(status, file=sys.stderr)
    audio_queue.put(indata.copy())


def play_audio(audio_data):
    """Play back the recorded audio."""
    print("Playing back audio...")
    sd.play(audio_data, samplerate=sample_rate)
    sd.wait()  # Wait until the audio is finished playing
    print("Playback finished.")


def transcribe_audio(audio_data):
    """Transcribe audio data using Whisper."""
    audio_data = audio_data.flatten().astype(np.float32)  # Normalize to [-1, 1]
    result = model.transcribe(audio_data, fp16=False, language="en")  # Disable FP16 if not using a GPU
    return result["text"]


def record_and_transcribe():
    """Record audio, play it back, and transcribe it."""
    try:
        with sd.InputStream(samplerate=sample_rate, blocksize=block_size,
                            channels=channels, callback=audio_callback):
            while True:
                # Collect audio data for the specified duration
                audio_frames = []
                while not audio_queue.empty():
                    audio_queue.get()
                print("Listening... Speak now!")
                for _ in range(int(sample_rate / block_size * duration)):
                    audio_frames.append(audio_queue.get())

                # Combine audio frames into a single array
                audio_data = np.concatenate(audio_frames)

                # Play back the recorded audio
                # play_audio(audio_data)

                # Transcribe the audio
                transcription = transcribe_audio(audio_data)
                print(f"Transcription: {transcription}")

                # Exit if the user says "exit"
                if "exit" in transcription.lower():
                    print("Exiting...")
                    break

    except KeyboardInterrupt:
        print("\nRecording stopped.")


if __name__ == "__main__":
    record_and_transcribe()

Listening... Speak now!
Transcription:  you
Listening... Speak now!
Transcription: 
Listening... Speak now!
Transcription: 
Listening... Speak now!
Transcription: 
Listening... Speak now!
Transcription: 
Listening... Speak now!

Recording stopped.


In [6]:

import whisper
import queue
from collections import deque

# Load the Whisper model
model = whisper.load_model("small")  # Use "tiny", "base", "small", "medium", or "large"

# Parameters
sample_rate = 16000  # Whisper works best with 16kHz sample rate
block_size = 1024  # Number of frames per block
channels = 1  # Mono audio
silence_threshold = 0.2  # Threshold for silence detection
min_silence_duration = 1.5  # Minimum silence duration to consider the user has stopped speaking
trigger_term = "hey computer"  # The trigger term to activate command listening
window_duration = 2.0  # Duration of the moving window in seconds
check_interval = 0.1  # Check the window every 100ms

# Queue to hold audio data
audio_queue = queue.Queue()


def audio_callback(indata, frames, time, status):
    """This is called for each audio block from the microphone."""
    if status:
        print(status, file=sys.stderr)
    audio_queue.put(indata.copy())


def is_silent(audio_data):
    """Check if the audio data is silent based on energy threshold."""
    np.abs(audio_data)
    return np.sqrt(np.mean(audio_data ** 2)) < silence_threshold


def transcribe_audio(audio_data):
    """Transcribe audio data using Whisper."""
    audio_data = audio_data.flatten().astype(np.float32)
    result = model.transcribe(audio_data, fp16=False, language='en')  # Disable FP16 if not using a GPU
    return result["text"].strip().lower()


def listen_for_trigger():
    """Listen for the trigger term using a moving window."""
    print("Listening for trigger term...")
    buffer = deque(maxlen=int(sample_rate * window_duration / block_size))  # Sliding buffer for 2 seconds of audio
    last_check_time = time.time()

    while True:
        # Collect audio data
        if not audio_queue.empty():
            buffer.append(audio_queue.get())

        # Check the window every 100ms
        if time.time() - last_check_time >= check_interval:
            if len(buffer) > 0:
                # Concatenate the audio data in the buffer
                audio_data = np.concatenate(buffer)
                transcription = transcribe_audio(audio_data)
                print(f"Heard: {transcription}")

                # Check if the trigger term is detected
                if trigger_term in transcription:
                    print(f"Trigger term '{trigger_term}' detected!")
                    return True

            last_check_time = time.time()


def listen_for_command():
    """Listen for a command after the trigger term is detected."""
    print("Listening for command...")
    audio_frames = []
    last_speech_time = time.time()

    while True:
        # Collect audio data
        if not audio_queue.empty():
            audio_data = audio_queue.get()
            audio_frames.append(audio_data)

            # Check if the audio is silent
            if is_silent(audio_data):
                if time.time() - last_speech_time > min_silence_duration:
                    # User has stopped speaking
                    break
            else:
                last_speech_time = time.time()

    # Transcribe the command
    audio_data = np.concatenate(audio_frames)
    command = transcribe_audio(audio_data)
    print(f"Command: {command}")
    return command


def process_command(command):
    """Process the transcribed command."""
    print(f"Processing command: {command}")
    # Add your command processing logic here
    if "exit" in command:
        print("Exiting...")
        sys.exit(0)


def main():
    """Main loop to listen for trigger terms and commands."""
    try:
        with sd.InputStream(samplerate=sample_rate, blocksize=block_size,
                            channels=channels, callback=audio_callback):
            while True:
                # Listen for the trigger term using a moving window
                if listen_for_trigger():
                    # Listen for a command
                    command = listen_for_command()
                    # Process the command
                    process_command(command)
    except KeyboardInterrupt:
        print("\nExiting...")


if __name__ == "__main__":
    main()

Listening for trigger term...
Heard: 
Heard: 
Heard: 
Heard: 
Heard: 
Heard: 
Heard: 
Heard: 
Heard: you
Heard: you
Heard: you
Heard: you
Heard: 
Heard: 
Heard: 
Heard: you
Heard: you
Heard: 
Heard: you
Heard: you
Heard: you
Heard: you
Heard: you
Heard: you
Heard: you
Heard: you
Heard: jesus, this is terence town.
Heard: what are the worlds most renowned men?
Heard: 
Heard: right now is an outcast.
Heard: just to show you guys what i mean.
Heard: 
Heard: 

Exiting...


In [None]:
from src.whisper_voice_command import VoiceCommandListener

if __name__ == "__main__":
    listener = VoiceCommandListener()

    # Set up triggers (terms to be detected, and whether they should be included or not)
    listener.set_triggers([("hey computer", True)])


    # Set a custom command listener
    def custom_command_listener(command):
        print(f"Custom command listener received command: {command}")
        if command == "exit":
            print("Custom exit action triggered.")
            listener.stop()


    listener.set_command_listener(custom_command_listener)

    # Start the voice command listener
    listener.start()

    # Wait for the thread to halt
    listener.wait()

    # Stop the listener after it halts (if necessary)
    listener.stop()

  from .autonotebook import tqdm as notebook_tqdm


Audio thread started.
Waiting for audio thread to halt...
Listening for trigger term...
Heard: you
Heard: which needs technology and so forth.
Heard: is the kepler broad.
Heard: is the radius of the earth.
Heard: you might wonder how the ancients first...
Heard: the earth you have to use a...
Heard: it could be a flat disc.
Heard: most of you find this pretty intuitive.
Heard: and he shared with me a nice proof of
Heard: in 3d there's enough...
Heard: because he knew about lunar eclipses.
Heard: there's a picture i can show you.
Heard: this is visible proof of the other's truth.
Heard: 
Heard: as well in this time period.
Heard: i'm gonna go and try to see if i can get a little bit of a better view of the view from the top. i'm gonna go and try to see if i can get a little bit of a better view of the view from the top. i'm gonna go and try to see if i can get a little bit of a better view from the top. i'm gonna go and try to see if i can get a little bit of a better view from the top.

In [7]:



class SpeechManager:
    def __init__(self):
        """
        Initialize the SpeechManager with a non-blocking TTS engine.
        """
        self.speech_queue = queue.Queue()
        self.is_running = True
        self.should_stop = False

        # Start a thread to process the speech queue
        self.speech_thread = threading.Thread(target=lambda: print("Speach tread"))
        self._process_queue()
        self.speech_thread.daemon = True
        self.speech_thread.start()

    def queue_speech(self, text, prepend=False, interrupt_current=False):
        """
        Queue text for speech synthesis.

        Args:
            text (str): The text to be spoken.
            prepend (bool): If True, add the text to the front of the queue.
            interrupt_current (bool): If True, interrupt the current speech and play this text immediately.
        """

        if interrupt_current:
            self.engine.stop()
            temp_queue = queue.Queue()
            temp_queue.put(text)
            self.speech_queue = temp_queue
        elif prepend:
            # Add the text to the front of the queue
            temp_queue = queue.Queue()
            temp_queue.put(text)
            while not self.speech_queue.empty():
                temp_queue.put(self.speech_queue.get())
            self.speech_queue = temp_queue
        else:
            self.speech_queue.put(text)

    def cancel_all(self):
        """
        Interrupt the current speech.
        """
        self.speech_queue = queue.Queue()
        self.engine.stop()  # Stop the current speech

    def _process_queue(self):
        """
        Process the speech queue in a separate thread.
        """
        try:
            self.engine = pyttsx3.init()
            print("Saying...")
            self.engine.say("How are u?")
            self.engine.runAndWait()
            return
            while not self.should_stop:
                if not self.speech_queue.empty():
                    text = self.speech_queue.get()
                    self._speak(text)
                else:
                    threading.Event().wait(0.1)  # Sleep briefly to avoid busy-waiting
        except Exception as e:
            print("Speach manager error:", e)
        finally:
            self.is_running = False

    def _speak(self, text):
        """
        Speak the given text using the TTS engine in a non-blocking way.
        """

        def _on_start(name):
            print(f"Started speaking: {text}")

        def _on_end(name, completed):
            print(f"Finished speaking: {text}")

        # Register event callbacks
        self.engine.connect('started-utterance', _on_start)
        self.engine.connect('finished-utterance', _on_end)

        # Speak the text
        self.engine.say(text)
        self.engine.iterate()  # Process the speech without blocking

    def stop(self):
        """
        Stop the speech manager and clean up resources.
        """
        self.should_stop = True
        self.engine.stop()

    # Additional methods for configuration
    def set_rate(self, rate):
        """
        Set the speech rate (words per minute).
        """
        self.engine.setProperty('rate', rate)

    def set_volume(self, volume):
        """
        Set the speech volume (0.0 to 1.0).
        """
        self.engine.setProperty('volume', volume)

    def set_voice(self, voice_id):
        """
        Set the voice by ID.
        """
        voices = self.engine.getProperty('voices')
        if 0 <= voice_id < len(voices):
            self.engine.setProperty('voice', voices[voice_id].id)

    def get_available_voices(self):
        """
        Get a list of available voices.
        """
        return self.engine.getProperty('voices')


# Example usage
if __name__ == "__main__":
    manager = SpeechManager()

    # Queue some speech
    manager.queue_speech("Hello, how are you?")
    manager.queue_speech("This is a test.", prepend=True)
    manager.queue_speech("I will interrupt now.", interrupt_current=True)

    # Configure TTS
    # manager.set_rate(150)
    # manager.set_volume(0.8)
    # manager.set_voice(0)

    # Keep the main thread alive to allow the speech thread to run
    import time

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        manager.stop()
        print("Speech manager stopped.")

Saying...
Speach tread
Speech manager stopped.


In [3]:
import pyttsx3

engine = pyttsx3.init()  # object creation

""" RATE"""
rate = engine.getProperty('rate')  # getting details of current speaking rate
print(rate)  #printing current voice rate
engine.setProperty('rate', 125)  # setting up new voice rate

"""VOLUME"""
volume = engine.getProperty('volume')  #getting to know current volume level (min=0 and max=1)
print(volume)  #printing current volume level
engine.setProperty('volume', 1.0)  # setting up volume level  between 0 and 1

"""VOICE"""
voices = engine.getProperty('voices')  #getting details of current voice
#engine.setProperty('voice', voices[0].id)  #changing index, changes voices. o for male
engine.setProperty('voice', voices[100].id)  #changing index, changes voices. 1 for female

engine.say("Hello World!")
engine.say('My current speaking rate is ' + str(rate))
engine.runAndWait()
engine.stop()


125
1.0


In [31]:
from gtts import gTTS

tts = gTTS("Hello, this is a test, is not it?.")
tts.save("output.mp3")


gTTSError: Failed to connect. Probable cause: Unknown

In [None]:
from TTS.api import TTS
import threading


class OfflineTTS:
    def __init__(self, model_name="tts_models/en/ljspeech/tacotron2-DDC"):
        self.tts = TTS(model_name)
        self.speech_queue = queue.Queue()
        self.speaking_thread = threading.Thread(target=self._process_queue, daemon=True)
        self.speaking_thread.start()

    def _process_queue(self):
        while True:
            text = self.speech_queue.get()
            if text is None:
                break
            audio = self.tts.tts(text)
            sd.play(np.array(audio), samplerate=22050)
            sd.wait()
            self.speech_queue.task_done()

    def queue_speech(self, text):
        """Adds text to the speech queue."""
        self.speech_queue.put(text)

    def stop(self):
        """Stops the speech thread."""
        self.speech_queue.put(None)
        self.speaking_thread.join()


# Example Usage
tts = OfflineTTS()
tts.queue_speech("This is an advanced offline text to speech system.")


In [5]:
import whisper
import sounddevice as sd
import numpy as np
import time

# Load the Whisper model (choose 'tiny', 'base', 'small', 'medium', 'large')
model = whisper.load_model("small")

# Audio recording settings
SAMPLE_RATE = 16000  # Whisper expects 16kHz audio
DURATION = 5  # Recording duration in seconds per chunk
my_data = None


def callback(indata, frames, t, status):
    """Callback function to process live audio"""
    if status:
        print(f"Error: {status}", flush=True)

    print(indata.mean())
    # print({'frames': frames, 't': t.inputBufferAdcTime, 'status': status}, t.outputBufferDacTime, t.currentTime)
    # global my_data
    # my_data = {'indata': indata.copy(), 'frames': frames, 't': t.inputBufferAdcTime, 'status': status}


# Start recording stream
with sd.InputStream(callback=callback, samplerate=SAMPLE_RATE, channels=1, dtype="float32", blocksize= 1024) as s:
    print("Listening...")
    while my_data is None:
        pass  # Keep the program running
    s.close()
    print(my_data)


Listening...
3.7608468e-05
-2.3893084e-05
0.00015052049
0.001430799
-0.0018630526
0.0017473453
-0.00022819405
-0.0007436129
-0.002691816
0.0021951543
0.0012475958
0.0014084931
-0.0024839835
-0.002391302
-0.00017613638
0.0020508554
0.0014517119
0.0006688951
-0.0031393073
-0.00027830782
0.00068402616
0.0017732056
0.00060093135
-0.00402719
0.0038303076
-0.0021085497
-0.0026303623
0.0037994944
-0.0008425764
0.0007975693
-0.0017228504
0.0027028946
-0.0001131919
-0.0030194048
0.0012038369
0.001891919
0.00029250327
-0.0015731137
-0.0016451626
0.0049984637
-0.0019552168
-0.0037578563
0.001023693
0.0035909545
-0.001520654
-0.0008967833
-0.0004839748
0.00048493827
0.00017809274
0.001355524
-0.0004700967
3.6451733e-05
0.00013708766
-0.0014031488
0.0012123832
0.00045090192
0.001947539
-0.0012444486
-0.0035801772
0.0018975968
0.000333959
0.00024909782
-0.0003949767
-0.0029249429
0.0035456454
0.00019048993
-0.0026862193
0.0017250844
0.0017865426
-0.0013352802
-0.0025233817
0.0023488617
0.0019271557


KeyboardInterrupt: 

In [1]:
import json
import queue
import sounddevice as sd
import vosk

# Load Vosk model (change path if needed)
MODEL_PATH = "../vosk-model"
model = vosk.Model(MODEL_PATH)

# Create a queue to hold recorded audio
q = queue.Queue()

# Audio recording settings
SAMPLE_RATE = 16000  # Vosk expects 16kHz audio
CHANNELS = 1

# Speech recognizer
recognizer = vosk.KaldiRecognizer(model, SAMPLE_RATE)


def callback(indata, frames, time, status):
    """Callback function to process live audio"""
    if status:
        print(f"Error: {status}", flush=True)
    q.put(bytes(indata))  # Put audio data into the queue


# Start recording stream
with sd.InputStream(callback=callback, samplerate=SAMPLE_RATE, channels=CHANNELS, dtype="int16"):
    print("Listening...")
    while True:
        data = q.get()
        if recognizer.AcceptWaveform(data):
            result = json.loads(recognizer.Result())
            print(f"Transcribed: {result['text']}")


LOG (VoskAPI:ReadDataFiles():model.cc:213) Decoding params beam=10 max-active=3000 lattice-beam=2
LOG (VoskAPI:ReadDataFiles():model.cc:216) Silence phones 1:2:3:4:5:6:7:8:9:10
LOG (VoskAPI:RemoveOrphanNodes():nnet-nnet.cc:948) Removed 0 orphan nodes.
LOG (VoskAPI:RemoveOrphanComponents():nnet-nnet.cc:847) Removing 0 orphan components.
LOG (VoskAPI:ReadDataFiles():model.cc:248) Loading i-vector extractor from ../vosk-model/ivector/final.ie
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:183) Computing derived variables for iVector extractor
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:204) Done.
LOG (VoskAPI:ReadDataFiles():model.cc:282) Loading HCL and G from ../vosk-model/graph/HCLr.fst ../vosk-model/graph/Gr.fst
LOG (VoskAPI:ReadDataFiles():model.cc:308) Loading winfo ../vosk-model/graph/phones/word_boundary.int


Listening...
Transcribed: the
Transcribed: he declined here is not pointed towards the sun it's it's it's i'm angel with all of those raise he had ah what could have a norman which is kind of like an ancient protracted was under the portals under
Transcribed: key measure that the sun was occupied seven degrees of of it
Transcribed: critically because he could also noted at this exact moment down and so many many many miles away the sun was directly overhead he deduced that this means the arc length along earth between alexandria insane is about seven degrees this in turn means the ratio between seven degrees and the for three hundred sixty degrees of a circle
Transcribed: must be the same as the ratio of the distance between those two towns and the full circumference of the earth
Transcribed: no keep in mind in the records that we have it's not like he's reporting this distance in miles or kilometers the units they were using back then word stevia were a single unit is something like t

KeyboardInterrupt: 

In [3]:
import os
import sys
import queue
import sounddevice as sd
import numpy  # Make sure NumPy is loaded before it is used in the callback
assert numpy  # avoid "imported but unused" message (W0611)
from vosk import Model, KaldiRecognizer

# Set up the model and recognizer
# model_path = "path_to_your_vosk_model"  # Replace with the path to your Vosk model
# if not os.path.exists(model_path):
#     print(f"Please download a model from https://alphacephei.com/vosk/models and unpack as {model_path}")
#     sys.exit(1)
# 
# model = Model(model_path)
model = Model(lang='en-us')
recognizer = KaldiRecognizer(model, 16000)

# Audio settings
samplerate = 16000
blocksize = 8000
device = None  # Use the default input device

q = queue.Queue()

def callback(indata, frames, time, status):
    """This is called (from a separate thread) for each audio block."""
    if status:
        print(status, file=sys.stderr)
    q.put(bytes(indata))

try:
    with sd.RawInputStream(samplerate=samplerate, blocksize=blocksize, device=device,
                           dtype='int16', channels=1, callback=callback):
        print('#' * 80)
        print('Press Ctrl+C to stop the recording')
        print('#' * 80)

        while True:
            data = q.get()
            if recognizer.AcceptWaveform(data):
                result = recognizer.Result()
                print(result)
            else:
                partial_result = recognizer.PartialResult()
                print(partial_result)

except KeyboardInterrupt:
    print('\nDone')
except Exception as e:
    print(type(e).__name__ + ': ' + str(e))

LOG (VoskAPI:ReadDataFiles():model.cc:213) Decoding params beam=10 max-active=3000 lattice-beam=2
LOG (VoskAPI:ReadDataFiles():model.cc:216) Silence phones 1:2:3:4:5:6:7:8:9:10
LOG (VoskAPI:RemoveOrphanNodes():nnet-nnet.cc:948) Removed 0 orphan nodes.
LOG (VoskAPI:RemoveOrphanComponents():nnet-nnet.cc:847) Removing 0 orphan components.
LOG (VoskAPI:ReadDataFiles():model.cc:248) Loading i-vector extractor from /home/yoni_ash/.cache/vosk/vosk-model-small-en-us-0.15/ivector/final.ie
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:183) Computing derived variables for iVector extractor
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:204) Done.
LOG (VoskAPI:ReadDataFiles():model.cc:282) Loading HCL and G from /home/yoni_ash/.cache/vosk/vosk-model-small-en-us-0.15/graph/HCLr.fst /home/yoni_ash/.cache/vosk/vosk-model-small-en-us-0.15/graph/Gr.fst
LOG (VoskAPI:ReadDataFiles():model.cc:308) Loading winfo /home/yoni_ash/.cache/vosk/vosk-model-small-en-us-0.15/graph/phones/word_bound

################################################################################
Press Ctrl+C to stop the recording
################################################################################
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : "did"
}
{
  "partial" : "did you have is that"
}
{
  "partial" : "did you have is that the"
}
{
  "text" : "did you have is that visiting"
}
{
  "partial" : "this"
}
{
  "partial" : "this edition"
}
{
  "text" : "this edition"
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : "not for this"
}
{
  "partial" : "not for this"
}
{
  "text" : "not for this"
}
{
  "partial" : "leaving"
}
{
  "partial" : "living in"
}
{
  "partial" : "living in"
}
{
  "text" : "living and"
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "partial" : ""
}
{
  "parti

In [2]:
from gtts import gTTS
import os
import tempfile
import pygame
import threading
import queue
import time

class LiveTextToSpeech:
    def __init__(self):
        """Initialize the LiveTextToSpeech class."""
        self.speech_queue = queue.Queue()
        self.current_speech = None
        self.running = False
        self.lock = threading.Lock()
        self.thread = None

    def queue_speech(self, text, prepend=False, interrupt=False):
        """
        Add speech to the queue.
        :param text: The text to convert to speech.
        :param prepend: If True, add to the front of the queue.
        :param interrupt: If True, cancel the currently playing speech and prepend this one.
        """
        with self.lock:
            if interrupt and self.current_speech:
                self._stop_current_speech()
                self.speech_queue.queue.clear()  # Clear the queue if interrupting
                self.speech_queue.put(text)  # Prepend the new speech
            elif prepend:
                # Move all items to a temporary list, add the new item, then re-add the rest
                temp_list = []
                while not self.speech_queue.empty():
                    temp_list.append(self.speech_queue.get())
                self.speech_queue.put(text)
                for item in temp_list:
                    self.speech_queue.put(item)
            else:
                self.speech_queue.put(text)

            if not self.running:
                self._start_processing()

    def cancel_all(self):
        """Cancel all queued speech."""
        with self.lock:
            self.speech_queue.queue.clear()
            if self.current_speech:
                self._stop_current_speech()

    def _start_processing(self):
        """Start processing the speech queue in a separate thread."""
        self.running = True
        self.thread = threading.Thread(target=self._process_queue, daemon=True)
        self.thread.start()

    def _process_queue(self):
        """Process the speech queue."""
        while self.running:
            try:
                text = self.speech_queue.get(timeout=1)
                self._play_speech(text)
                self.speech_queue.task_done()
            except queue.Empty:
                if self.speech_queue.empty():
                    self.running = False
                    break

    def _play_speech(self, text):
        """Convert text to speech and play it."""
        with self.lock:
            self.current_speech = text
            try:
                # Create a temporary file for the speech
                with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
                    tts = gTTS(text=text, lang="en")
                    tts.save(temp_file.name)
                    temp_file_path = temp_file.name

                # Play the speech using pygame
                pygame.mixer.init()
                pygame.mixer.music.load(temp_file_path)
                pygame.mixer.music.play()

                # Wait for the speech to finish
                while pygame.mixer.music.get_busy():
                    time.sleep(0.1)

                # Clean up
                pygame.mixer.quit()
                os.remove(temp_file_path)
            except Exception as e:
                print(f"Error playing speech: {e}")
            finally:
                self.current_speech = None

    def _stop_current_speech(self):
        """Stop the currently playing speech."""
        if pygame.mixer.get_init():
            pygame.mixer.music.stop()
            pygame.mixer.quit()
        self.current_speech = None

    def __del__(self):
        """Clean up resources when the object is deleted."""
        self.cancel_all()
        if self.thread:
            self.thread.join()
            
def main():
    tts = LiveTextToSpeech()

    # Queue some speech
    tts.queue_speech("Hello, how are you?")
    tts.queue_speech("This is a test.")
    tts.queue_speech("Goodbye!")

    # Wait for a few seconds
    time.sleep(5)

    # Interrupt with a new speech
    tts.queue_speech("Wait, I have something important to say!", interrupt=True)

    # Wait for the speech to finish
    time.sleep(10)

    # Cancel all speech
    tts.cancel_all()

if __name__ == "__main__":
    main()

pygame 2.6.1 (SDL 2.28.4, Python 3.12.8)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [1]:

text = """In 1869, the stock ticker was invented. It was an electro-mechanical machine consisting of a typewriter, a long pair of wires and a ticker tape printer, and its purpose was to distribute stock prices over long distances in realtime. This concept gradually evolved into the faster, ASCII-based teletype. Teletypes were once connected across the world in a large network, called Telex, which was used for transferring commercial telegrams, but the teletypes weren't connected to any computers yet.

Meanwhile, however, the computers — still quite large and primitive, but able to multitask — were becoming powerful enough to be able to interact with users in realtime. When the command line eventually replaced the old batch processing model, teletypes were used as input and output devices, because they were readily available on the market.

There was a plethora of teletype models around, all slightly different, so some kind of software compatibility layer was called for. In the UNIX world, the approach was to let the operating system kernel handle all the low-level details, such as word length, baud rate, flow control, parity, control codes for rudimentary line editing and so on. Fancy cursor movements, colour output and other advanced features made possible in the late 1970s by solid state video terminals such as the VT-100, were left to the applications."""

import io
import numpy as np
import sounddevice as sd
from gtts import gTTS
from pydub import AudioSegment
import re
import threading
import time

# Function to split text into smaller chunks (sentences/phrases)
def split_text(text):
    return re.split(r'([,.!?;])', text)

# Function to process and play text in chunks
def speak_chunk(sentence):
    """Function to process a single sentence and play it"""
    sentence = sentence.strip()  # Clean leading/trailing spaces

    if sentence:  # Ensure the sentence is not empty
        # Convert text to speech
        tts = gTTS(sentence)
        audio_buffer = io.BytesIO()
        tts.write_to_fp(audio_buffer)
        audio_buffer.seek(0)

        # Load and convert audio
        audio = AudioSegment.from_file(audio_buffer, format="mp3")
        samples = np.array(audio.get_array_of_samples(), dtype=np.float32) / 32768.0

        # Handle stereo
        if audio.channels == 2:
            samples = samples.reshape((-1, 2))

        # Play chunk
        sd.play(samples, samplerate=audio.frame_rate)
        sd.wait()

# Function to process and play text in chunks
def speak_text(text):
    chunks = split_text(text)
    sentence = ""

    for chunk in chunks:
        sentence += chunk
        if chunk.strip() in {".", ",", "!", "?", ";"} or len(sentence) > 20:
            # Buffer next chunk while playing the current one
            threading.Thread(target=speak_chunk, args=(sentence,)).start()
            sentence = ""  # Reset for next chunk

            time.sleep(0.1)  # Small delay to buffer in parallel (can be adjusted)

# Example usage
# text = "Hello! This is a test. Breaking text improves real-time playback."
speak_text(text)



  m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token)
  m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token)
  elif re.match('(flt)p?( \(default\))?$', token):
  elif re.match('(dbl)p?( \(default\))?$', token):
Exception in thread Thread-7 (speak_chunk):
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/home/yoni_ash/.cache/pypoetry/virtualenvs/smart-cane-UziNl_FU-py3.12/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_129000/4252342460.py", line 29, in speak_chunk
  File "/home/yoni_ash/.cache/pypoetry/virtualenvs/smart-cane-UziNl_FU-py3.12/lib/python3.12/site-packages/gtts/tts.py", line 316, in write_to_fp
    for idx, decoded in enumerate(self.stream()):
                        ^^^^^^^^^^^^^^

In [None]:
from RealtimeTTS import TextToAudioStream, SystemEngine, AzureEngine, ElevenlabsEngine

engine = SystemEngine() # replace with your TTS engine
stream = TextToAudioStream(engine)
stream.feed("Hello world! How are you today?")
stream.play_async()