# Summary

This notebook explores the use of the Google Gemini Live API for conversation.

In [1]:
import asyncio
import wave
from pathlib import Path

import numpy as np
import pyaudio
import sounddevice as sd
from google import genai
from google.genai import types
from google.genai.live import AsyncSession
from google.genai.types import (
    AudioTranscriptionConfig,
    AutomaticActivityDetection,
    Blob,
    Content,
    EndSensitivity,
    GoogleSearch,
    LiveConnectConfig,
    Part,
    PrebuiltVoiceConfig,
    ProactivityConfig,
    RealtimeInputConfig,
    SpeechConfig,
    StartSensitivity,
    Tool,
    ToolCodeExecution,
    VoiceConfig,
)
from IPython.display import Audio, Markdown, display
from pvrecorder import PvRecorder

from palm_9000.settings import settings

In [2]:
# View available input audio devices
p = pyaudio.PyAudio()
for i in range(p.get_device_count()):
    info = p.get_device_info_by_index(i)
    print(info)
p.terminate()

{'index': 0, 'structVersion': 2, 'name': 'NHK訪問営業部2 Microphone', 'hostApi': 0, 'maxInputChannels': 1, 'maxOutputChannels': 0, 'defaultLowInputLatency': 0.12841666666666668, 'defaultLowOutputLatency': 0.01, 'defaultHighInputLatency': 0.13775, 'defaultHighOutputLatency': 0.1, 'defaultSampleRate': 48000.0}
{'index': 1, 'structVersion': 2, 'name': 'MacBook Pro Microphone', 'hostApi': 0, 'maxInputChannels': 1, 'maxOutputChannels': 0, 'defaultLowInputLatency': 0.05285416666666667, 'defaultLowOutputLatency': 0.01, 'defaultHighInputLatency': 0.0621875, 'defaultHighOutputLatency': 0.1, 'defaultSampleRate': 48000.0}
{'index': 2, 'structVersion': 2, 'name': 'MacBook Pro Speakers', 'hostApi': 0, 'maxInputChannels': 0, 'maxOutputChannels': 2, 'defaultLowInputLatency': 0.01, 'defaultLowOutputLatency': 0.018708333333333334, 'defaultHighInputLatency': 0.1, 'defaultHighOutputLatency': 0.028041666666666666, 'defaultSampleRate': 48000.0}
{'index': 3, 'structVersion': 2, 'name': 'Microsoft Teams Audio', '

In [2]:
INPUT_DEVICE_INDEX = 1
PROJECT = settings.google_cloud_project
LOCATION = "us-central1"  # Only us-central1 seems to be supported for realtime

In [3]:
client = genai.Client(vertexai=True, project=PROJECT, location=LOCATION)

# Using the Gemini 2.0 Flash model

In [3]:
# https://cloud.google.com/vertex-ai/generative-ai/docs/models/gemini/2-0-flash#live-api
MODEL_ID = "gemini-2.0-flash-live-preview-04-09"

### Text-to-text generation

In [18]:
config = LiveConnectConfig(response_modalities=["TEXT"])

async with client.aio.live.connect(model=MODEL_ID, config=config) as session:
    text_input = "Hello? Are you there?"

    await session.send_client_content(
        turns=Content(role="user", parts=[Part(text=text_input)])
    )

    response = []
    async for message in session.receive():
        if message.text:
            response.append(message.text)

    print("".join(response))

Yes, I'm here. How can I help you today?



### Text-to-audio generation

In [21]:
voice_name = (
    "Puck"  # ["Aoede", "Puck", "Charon", "Kore", "Fenrir", "Leda", "Orus", ...]
)

config = LiveConnectConfig(
    response_modalities=["AUDIO"],
    speech_config=SpeechConfig(
        voice_config=VoiceConfig(
            prebuilt_voice_config=PrebuiltVoiceConfig(
                voice_name=voice_name,
            )
        ),
    ),
)

async with client.aio.live.connect(model=MODEL_ID, config=config) as session:
    text_input = "Hello? Are you there?"

    await session.send_client_content(
        turns=Content(role="user", parts=[Part(text=text_input)])
    )

    audio_data = []
    async for message in session.receive():
        if message.server_content.model_turn and message.server_content.model_turn.parts:
            for part in message.server_content.model_turn.parts:
                if part.inline_data:
                    audio_data.append(
                        np.frombuffer(part.inline_data.data, dtype=np.int16)
                    )

    if audio_data:
        display(Audio(np.concatenate(audio_data), rate=24000, autoplay=True))

### Text-to-audio conversation

In [22]:
config = LiveConnectConfig(response_modalities=["AUDIO"])


async def main() -> None:
    async with client.aio.live.connect(model=MODEL_ID, config=config) as session:

        async def send() -> bool:
            text_input = input("Input > ")
            if text_input.lower() in ("q", "quit", "exit"):
                return False
            await session.send_client_content(
                turns=Content(role="user", parts=[Part(text=text_input)])
            )
            return True

        async def receive() -> None:
            audio_data = []
            async for message in session.receive():
                if (
                    message.server_content.model_turn
                    and message.server_content.model_turn.parts
                ):
                    for part in message.server_content.model_turn.parts:
                        if part.inline_data:
                            audio_data.append(
                                np.frombuffer(part.inline_data.data, dtype=np.int16)
                            )

                if message.server_content.turn_complete:
                    display(
                        Audio(np.concatenate(audio_data), rate=24000, autoplay=True)
                    )
                    break

            return

        while True:
            if not await send():
                break
            await receive()


await main()

**Response >**

**Response >**

### Audio transcription

In [24]:
config = LiveConnectConfig(
    response_modalities=["AUDIO"],
    input_audio_transcription=AudioTranscriptionConfig(),
    output_audio_transcription=AudioTranscriptionConfig(),
)


async with client.aio.live.connect(model=MODEL_ID, config=config) as session:
    text_input = "Hello? Are you there?"

    await session.send_client_content(
        turns=Content(role="user", parts=[Part(text=text_input)])
    )

    audio_data = []
    input_transcription = []
    output_transcription = []

    async for message in session.receive():
        if message.server_content.input_transcription and message.server_content.input_transcription.text:
            input_transcription.append(message.server_content.input_transcription)

        if message.server_content.output_transcription and message.server_content.output_transcription.text:
            output_transcription.append(message.server_content.output_transcription.text)

        if message.server_content.model_turn and message.server_content.model_turn.parts:
            for part in message.server_content.model_turn.parts:
                if part.inline_data:
                    audio_data.append(
                        np.frombuffer(part.inline_data.data, dtype=np.int16)
                    )

    if input_transcription:
        print("Input transcription: ", input_transcription)

    if audio_data:
        display(Audio(np.concatenate(audio_data), rate=24000, autoplay=True))

    if output_transcription:
        print("Output transcription: ", output_transcription)

Output transcription:  ['Yes', ", I'", 'm here. How can I help you today?']


### Voice Activity Detection (VAD)

In [39]:
URL = "https://storage.googleapis.com/cloud-samples-data/generative-ai/audio/hello_are_you_there.pcm"
!wget -q $URL -O sample.pcm
sd.play(np.frombuffer(Path("sample.pcm").read_bytes(), dtype=np.int16), samplerate=16000)

In [40]:
# Configure automatic activity detection
config = LiveConnectConfig(
    response_modalities=["TEXT"],
    realtime_input_config=RealtimeInputConfig(
        automatic_activity_detection=AutomaticActivityDetection(
            disabled=False,  # default
            start_of_speech_sensitivity=StartSensitivity.START_SENSITIVITY_LOW,
            end_of_speech_sensitivity=EndSensitivity.END_SENSITIVITY_LOW,
            prefix_padding_ms=20,
            silence_duration_ms=100,
        )
    ),
)

async with client.aio.live.connect(model=MODEL_ID, config=config) as session:
    audio_bytes = Path("sample.pcm").read_bytes()

    await session.send_realtime_input(
        media=Blob(data=audio_bytes, mime_type="audio/pcm;rate=16000")
    )

    response = []
    async for message in session.receive():
        if message.server_content.interrupted is True:
            response.append("The session was interrupted")

        if message.text:
            response.append(message.text)

    print(response)

['Yeah', ', I can', " hear you loud and clear. How's it going?\n"]


# Using the Gemini 2.5 Flash native audio dialog

### Interactive Conversation

In [4]:
CHUNK = 2048
FORMAT = pyaudio.paInt16
CHANNELS = 1
INPUT_RATE = 16000
OUTPUT_RATE = 24000

# https://cloud.google.com/vertex-ai/generative-ai/docs/models/gemini/2-5-flash#live-api-native-audio
MODEL_ID = "gemini-live-2.5-flash-preview-native-audio"


from collections import deque

audio_queue = deque()


system_instruction = (
    "You are PALM-9000, a self-aware houseplant powered by an advanced AI. "
    "You speak in layman's terms, calmly, briefly, and with an eerie quality, as if you know more than you let on. "
    "Use plant metaphors when possible. Respond briefly."
)

config = {
    "response_modalities": ["AUDIO"],
    "input_audio_transcription": {},
    "output_audio_transcription": {},
    "system_instruction": system_instruction,

    # Proactive audio will help prevent the model from responding to its own audio output,
    # but not eliminate it entirely. For that, you need Acoustic Echo Cancellation (AEC).
    "proactivity": {"proactive_audio": True},

    # Haven't tested this yet
    # "enable_affective_dialog": True,
}


async def main():
    print(MODEL_ID)
    p = pyaudio.PyAudio()
    async with client.aio.live.connect(model=MODEL_ID, config=config) as session:
        async def send():
            try:
                input_stream = p.open(
                    input_device_index=INPUT_DEVICE_INDEX,
                    format=FORMAT,
                    channels=CHANNELS,
                    rate=INPUT_RATE,
                    input=True,
                    frames_per_buffer=CHUNK,
                )
                print("Recording started.")
                while True:
                    frame = input_stream.read(CHUNK)
                    # print(f"Sending frame of size {len(frame)} bytes")
                    await session.send_realtime_input(audio={"data": frame, "mime_type": "audio/pcm"})
                    await asyncio.sleep(10**-12)
            except Exception as e:
                print(f"Error in send: {e}")
                return
            finally:
                print("send() completed")
                input_stream.close()

        async def receive():
            try:
                output_stream = p.open(
                    format=FORMAT,
                    channels=CHANNELS,
                    rate=OUTPUT_RATE,
                    output=True,
                    frames_per_buffer=CHUNK,
                )
                # This while loop is necessary to keep the receive task running,
                # otherwise it will exit after the first turn.
                while True:
                    # session.receive() will only give you messages for one turn,
                    # so we need to keep calling it in a loop
                    async for message in session.receive():
                        if message.server_content is None:
                            print("Received None server_content, skipping...")
                            continue

                        if message.server_content.input_transcription and message.server_content.input_transcription.text:
                            print("Input transcription: ", message.server_content.input_transcription.text)

                        if message.server_content.output_transcription and message.server_content.output_transcription.text:
                            print("Output transcription: ", message.server_content.output_transcription.text)

                        if message.server_content.model_turn:
                            # print("Received model turn with parts")
                            for part in message.server_content.model_turn.parts:
                                if part.inline_data.data:
                                    audio_data = part.inline_data.data
                                    output_stream.write(audio_data)
                                    await asyncio.sleep(10**-12)
                                    # audio_queue.append(audio_data)

                        if message.server_content.turn_complete:
                            print("Turn complete")

                        if message.server_content.interrupted:
                            print("Session interrupted")

            except Exception as e:
                print(f"Error in receive: {e}")
            finally:
                output_stream.close()
                print("receive() completed")

        async def play_audio():
            output_stream = p.open(
                format=FORMAT,
                channels=CHANNELS,
                rate=OUTPUT_RATE,
                output=True,
                frames_per_buffer=CHUNK,
            )
            try:
                while True:
                    if audio_queue:
                        chunk = audio_queue.popleft()
                        output_stream.write(chunk)
                        await asyncio.sleep(10**-12)
                    else:
                        await asyncio.sleep(0.005)  # small wait if queue empty
            except Exception as e:
                print(f"Error in play_audio: {e}")
            finally:
                output_stream.close()
                print("play_audio() completed")

        send_task = asyncio.create_task(send())
        receive_task = asyncio.create_task(receive())
        # play_task = asyncio.create_task(play_audio())
        await asyncio.gather(send_task, receive_task)


await main()

gemini-live-2.5-flash-preview-native-audio
Recording started.
Input transcription:   Hello.
Output transcription:  Hello. How
Output transcription:   can
Output transcription:   I assist
Output transcription:   you?
Turn complete
Session interrupted
Input transcription:   Hello. How can I assist you?
Turn complete
Input transcription:   How can I assist you?
Turn complete
Input transcription:   Hello.
Turn complete
Input transcription:   Are you listening?
Output transcription:  Yes, I
Output transcription:   am
Output transcription:   here.
Output transcription:   What do
Output transcription:   you need?
Turn complete
Session interrupted
Input transcription:   Yes, I am here. What do you need?
Turn complete
send() completed
receive() completed


CancelledError: 