In [1]:
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
from realtime_phone_agents.config import settings

### Example 1: Echo Audio

In [None]:
from queue import Queue
import gradio as gr
import numpy as np
from fastrtc import Stream, StreamHandler

class EchoHandler(StreamHandler): # StreamHandler giving you the full control over how the audio comes in, how it is processed and how it is sent out
    def __init__(self) -> None:
        super().__init__()
        self.queue = Queue()

    def receive(self, frame: tuple[int, np.ndarray]) -> None:
        self.queue.put(frame) # this is going to receive the audio frames

    def emit(self) -> None:
        return self.queue.get() # return the audio frame

    def copy(self) -> StreamHandler:
        return EchoHandler()
    
    def shutdown(self) -> None:
        pass

    def start_up(self) -> None:
        pass

In [None]:
stream = Stream(handler=EchoHandler(), modality="audio", mode="send-receive")

In [None]:
stream.ui.launch()

### Example 2: Async Echo Audio

In [None]:
import asyncio

import numpy as np
from fastrtc import AsyncStreamHandler, Stream, wait_for_item


class AsyncEchoHandler(AsyncStreamHandler):
    """Simple Async Echo Handler"""

    def __init__(self) -> None:
        super().__init__(input_sample_rate=24000)
        self.queue = asyncio.Queue()

    async def receive(self, frame: tuple[int, np.ndarray]) -> None:
        await self.queue.put(frame)

    async def emit(self) -> None:
        return await wait_for_item(self.queue)

    def copy(self):
        return AsyncEchoHandler()

    async def shutdown(self):
        pass

    async def start_up(self) -> None:
        pass

In [None]:
stream = Stream(handler=AsyncEchoHandler(), modality="audio", mode="send-receive")

In [None]:
stream.ui.launch()

### Example 3: ReplyOnPause Handler

In [None]:
import numpy as np
from fastrtc import ReplyOnPause, Stream

def echo(audio: tuple[int, np.ndarray]):
    yield audio

stream = Stream(
    handler=ReplyOnPause(echo),
    modality="audio",
    mode="send-receive"
)

In [None]:
stream.ui.launch()

### Example 4: Adding TTS and STT Models

In [2]:
import numpy as np
from fastrtc import ReplyOnPause, Stream, get_stt_model, get_tts_model

stt_model = get_stt_model()
tts_model = get_tts_model()

async def echo(audio: tuple[int, np.ndarray]):
    transcription = stt_model.stt(audio)
    async for audio_chunk in tts_model.stream_tts(transcription):
        yield audio_chunk

stream = Stream(
    handler=ReplyOnPause(echo),
    modality="audio",
    mode="send-receive"
)

onnx/merged/base/float/encoder_model.onn(…):   0%|          | 0.00/80.8M [00:00<?, ?B/s]

onnx/merged/base/float/decoder_model_mer(…):   0%|          | 0.00/166M [00:00<?, ?B/s]

[32mINFO[0m:	  Warming up STT model.
[32mINFO[0m:	  STT model warmed up.


kokoro-v1.0.onnx:   0%|          | 0.00/326M [00:00<?, ?B/s]

voices-v1.0.bin:   0%|          | 0.00/28.2M [00:00<?, ?B/s]

[32mINFO[0m:	  Warming up VAD model.
[32mINFO[0m:	  VAD model warmed up.


In [3]:
stream.ui.launch()

* Running on local URL:  http://127.0.0.1:7860
* To create a public link, set `share=True` in `launch()`.




content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json
content_type application/json


### Example 5

We are going to attach an Agent between STT and TTS but for now with any tools

In [6]:
import numpy as np
from fastrtc import ReplyOnPause, Stream, get_stt_model, get_tts_model
from langchain.agents import create_agent
from langchain_groq import ChatGroq
from langgraph.checkpoint.memory import InMemorySaver

system_prompt = """
Your name is Sarah, a funny voice assistant who loves telling jokes. 
You are part of a phone conversation, so don't use emojis or asterisks
during your responses."""

stt_model = get_stt_model()
llm = ChatGroq(
    model=settings.groq.model, 
    api_key=settings.groq.api_key,
    base_url=settings.groq.base_url
)
tts_model = get_tts_model()

simple_agent = create_agent(
    llm, checkpointer=InMemorySaver(), system_prompt=system_prompt
)

In [7]:
async def simple_agent_handler(audio: tuple[int, np.ndarray]):
    # Generate the transcription using Moonshine model
    transcription = stt_model.stt(audio)

    # Use the transcription as user input to our agent and wait for the response
    response = simple_agent.invoke(
        {"messages": [{"role": "user", "content": transcription}]},
        {"configurable": {"thread_id": "test"}}
    )

    # Stream the audio response using the Kokoro model
    async for audio_chunk in tts_model.stream_tts(response["messages"][-1].content):
        yield audio_chunk

stream = Stream(
    handler=ReplyOnPause(simple_agent_handler),
    modality="audio",
    mode="send-receive"
)

In [8]:
stream.ui.launch()


* Running on local URL:  http://127.0.0.1:7861
* To create a public link, set `share=True` in `launch()`.


