In [1]:
import os
from dotenv import load_dotenv
load_dotenv()
os.environ['GOOGLE_API_KEY']=os.getenv('GOOGLE_API_KEY')

In [3]:
from google import genai

client = genai.Client(http_options= {
      'api_version': 'v1alpha'
})

In [4]:
model_name = "gemini-2.0-flash-exp"

In [5]:
import asyncio
import contextlib
import json
import wave

from IPython import display

from google import genai
from google.genai import types

In [6]:
@contextlib.contextmanager
def wave_file(filename, channels=1, rate=24000, sample_width=2):
    with wave.open(filename, "wb") as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(sample_width)
        wf.setframerate(rate)
        yield wf

In [7]:

import logging
logger = logging.getLogger('Live')
#logger.setLevel('DEBUG')  # Switch between "INFO" and "DEBUG" to toggle debug messages.
logger.setLevel('INFO')

In [8]:
n = 0
async def run(prompt, modality="TEXT", tools=None):
  global n
  if tools is None:
    tools=[]

  config = {
          "tools": tools,
          "response_modalities": [modality]}

  async with client.aio.live.connect(model=model_name, config=config) as session:
    display.display(display.Markdown(prompt))
    display.display(display.Markdown('-------------------------------'))
    await session.send(input=prompt, end_of_turn=True)

    audio = False
    filename = f'audio_{n}.wav'
    with wave_file(filename) as wf:
      async for response in session.receive():
        logger.debug(str(response))
        if text:=response.text:
          display.display(display.Markdown(text))
          continue

        if data:=response.data:
          print('.', end='')
          wf.writeframes(data)
          audio = True
          continue

        server_content = response.server_content
        if server_content is not None:
          handle_server_content(wf, server_content)
          continue

        tool_call = response.tool_call
        if tool_call is not None:
          await handle_tool_call(session, tool_call)


  if audio:
    display.display(display.Audio(filename, autoplay=True))
    n = n+1

In [9]:
def handle_server_content(wf, server_content):
  model_turn = server_content.model_turn
  if model_turn:
    for part in model_turn.parts:
      executable_code = part.executable_code
      if executable_code is not None:
        display.display(display.Markdown('-------------------------------'))
        display.display(display.Markdown(f'``` python\n{executable_code.code}\n```'))
        display.display(display.Markdown('-------------------------------'))

      code_execution_result = part.code_execution_result
      if code_execution_result is not None:
        display.display(display.Markdown('-------------------------------'))
        display.display(display.Markdown(f'```\n{code_execution_result.output}\n```'))
        display.display(display.Markdown('-------------------------------'))

  grounding_metadata = getattr(server_content, 'grounding_metadata', None)
  if grounding_metadata is not None:
    display.display(
        display.HTML(grounding_metadata.search_entry_point.rendered_content))

  return

In [10]:
async def handle_tool_call(session, tool_call):
  for fc in tool_call.function_calls:
    tool_response = types.LiveClientToolResponse(
        function_responses=[types.FunctionResponse(
            name=fc.name,
            id=fc.id,
            response={'result':'ok'},
        )]
    )

    print('\n>>> ', tool_response)
    await session.send(input=tool_response)

In [12]:
await run(prompt="capital of paris?", tools=None, modality = "TEXT")

capital of paris?

-------------------------------

The

 capital of France is **Paris**.
