In [2]:
import os
import json
from dotenv import load_dotenv
from openai import OpenAI
import gradio as gr
import base64
from io import BytesIO
from PIL import Image
import whisper
from IPython.display import Audio as IPythonAudio, display

# Initialization
load_dotenv(override=True)

openai_api_key = os.getenv('OPENAI_API_KEY')
if openai_api_key:
    print(f"OpenAI API Key exists and begins {openai_api_key[:8]}")
else:
    print("OpenAI API Key not set")
    
MODEL = "gpt-4o-mini"
openai = OpenAI()

# Load Whisper model for speech recognition
speech_model = whisper.load_model("tiny")  # You can use "tiny", "base", "small", "medium", or "large"

system_message = "You are a helpful assistant for an Airline called FlightAI. "
system_message += "Give short, courteous answers, no more than 1 sentence. "
system_message += "Always be accurate. If you don't know the answer, say so."
system_message += "When booking a flight always check if the desired time is really available."

ticket_prices = {"london": "$799", "paris": "$899", "tokyo": "$1400", "berlin": "$499"}

def get_ticket_price(destination_city):
    print(f"Tool get_ticket_price called for {destination_city}")
    city = destination_city.lower()
    return ticket_prices.get(city, "Unknown")

flight_times = {"london": ["08:00", "11:42", "13:30", "15:47"], "paris": ["07:30", "10:44", "17:30", "22:37"], "tokyo": ["08:54", "14:59"], "berlin": ["06:52", "09:43", "13:56"]}

def get_flight_times(destination_city):
    print(f"Tool get_flight_times called for {destination_city}")
    city = destination_city.lower()
    return flight_times.get(city, "Unknown")

def book_flight(destination_city, time):
    print(f"Booking flight to {destination_city} at {time}")
    return f"Flight to {destination_city.title()} booked for {time}. Confirmation sent!"

price_function = {
    "name": "get_ticket_price",
    "description": "Get the price of a return ticket to the destination city. Call this whenever you need to know the ticket price, for example when a customer asks 'How much is a ticket to this city'",
    "parameters": {
        "type": "object",
        "properties": {
            "destination_city": {
                "type": "string",
                "description": "The city that the customer wants to travel to",
            },
        },
        "required": ["destination_city"],
        "additionalProperties": False
    }
}

flight_times_function = {
    "name": "get_flight_times",
    "description": "Get the times of fights to location. Call this whenever you need to know the flight times, for example when a customer want to book a flight to a city, provide him with the time options.",
    "parameters": {
        "type": "object",
        "properties": {
            "destination_city": {
                "type": "string",
                "description": "The city that the customer wants to travel to",
            },
        },
        "required": ["destination_city"],
        "additionalProperties": False
    }
}

book_flight_function = {
    "name": "book_flight",
    "description": "Book a flight to a city at a specific time.",
    "parameters": {
        "type": "object",
        "properties": {
            "destination_city": {
                "type": "string",
                "description": "City to fly to"
            },
            "time": {
                "type": "string",
                "description": "Departure time in HH:MM"
            }
        },
        "required": ["destination_city", "time"]
    }
}

tools = [
    {"type": "function", "function": price_function},
    {"type": "function", "function": flight_times_function},
    {"type": "function", "function": book_flight_function}
]

def handle_tool_call(message):
    tool_responses = []
    for tool_call in message.tool_calls:
        function_name = tool_call.function.name
        arguments = json.loads(tool_call.function.arguments)
        print(f"Handling tool call for function: {function_name}, arguments: {arguments}")
        city = arguments.get("destination_city")

        if function_name == "get_ticket_price":
            result = get_ticket_price(city)
        elif function_name == "get_flight_times":
            result = get_flight_times(city)
        elif function_name == "book_flight":
            time = arguments.get("time")
            result = book_flight(city, time)
        else:
            result = "Unknown function"

        tool_responses.append({
            "role": "tool",
            "tool_call_id": tool_call.id,
            "name": function_name,
            "content": json.dumps(result)
        })
    return tool_responses, city

def talker(message):
    try:
        response = openai.audio.speech.create(
            model="tts-1",
            voice="onyx",
            input=message)

        audio_stream = BytesIO(response.content)
        output_filename = "output_audio.mp3"
        with open(output_filename, "wb") as f:
            f.write(audio_stream.read())

        # Play the generated audio if in an environment that supports IPython display
        try:
            display(IPythonAudio(output_filename, autoplay=True))
        except:
            print(f"Audio response saved to {output_filename}")
    except Exception as e:
        print(f"Error generating speech: {e}")

# Function to transcribe speech to text
def transcribe_audio(audio_path):
    try:
        # Transcribe audio using Whisper
        result = speech_model.transcribe(audio_path)
        print(f"Transcribed text: {result['text']}")
        return result["text"].strip()
    except Exception as e:
        print(f"Error transcribing audio: {e}")
        return "Sorry, I couldn't understand the audio."

# Combined function to handle voice input and processing
def process_voice(audio, history):
    if audio is not None:
        # Process the audio file path
        text = transcribe_audio(audio)
        print(f"Processing voice input: {text}")
        history += [{"role":"user", "content":text}]
        
        # Process the message through chat
        return chat(text, history)
    return history

def chat(message, history):
    # If there's no message and no history, return the empty history
    if not message and not history:
        return history
    
    # If there's no explicit message but there is history, 
    # we're probably coming from a voice input that's already in history
    if not message and history:
        # Get the last user message from history
        user_messages = [msg for msg in history if msg["role"] == "user"]
        if user_messages:
            message = user_messages[-1]["content"]
        else:
            return history
    
    messages = [{"role": "system", "content": system_message}] + history
    
    # If the last message is already the user's, don't add it again
    last_message = history[-1] if history else None
    if not last_message or last_message["role"] != "user" or last_message["content"] != message:
        messages.append({"role": "user", "content": message})
    
    response = openai.chat.completions.create(model=MODEL, messages=messages, tools=tools)
    
    if response.choices[0].finish_reason == "tool_calls":
        assistant_message = response.choices[0].message
        
        tool_messages, city = handle_tool_call(assistant_message)

        # Append assistant tool call message and tool responses
        messages.append(assistant_message)
        messages.extend(tool_messages)
        
        # Send updated messages back to OpenAI
        response = openai.chat.completions.create(model=MODEL, messages=messages)

    reply = response.choices[0].message.content
    
    # Only append the assistant's reply if it's not already the last message
    if not history or history[-1]["role"] != "assistant" or history[-1]["content"] != reply:
        history.append({"role": "assistant", "content": reply})

    # Generate speech from text
    talker(reply)
    
    return history

with gr.Blocks() as ui:
    with gr.Row():
        chatbot = gr.Chatbot(height=500, type="messages")
    with gr.Row():
        entry = gr.Textbox(label="Type your message:", placeholder="Ask me about flights...")
    with gr.Row():
        with gr.Column(scale=3):
            # Updated for Gradio 5.23.3
            audio_input = gr.Audio(sources=["microphone"], type="filepath", label="Or speak your message")
        with gr.Column(scale=1):
            audio_submit = gr.Button("Submit Voice")
    with gr.Row():
        clear = gr.Button("Clear Chat")

    def do_entry(message, history):
        if not message:
            return "", history
        history += [{"role":"user", "content":message}]
        return "", history
        
    entry.submit(do_entry, inputs=[entry, chatbot], outputs=[entry, chatbot]).then(
       chat, inputs=[entry, chatbot], outputs=[chatbot]
    )
    
    # Process voice input directly without using .then()
    audio_submit.click(process_voice, inputs=[audio_input, chatbot], outputs=[chatbot])

    clear.click(lambda: None, inputs=None, outputs=chatbot, queue=False)

ui.launch(inbrowser=True)

OpenAI API Key exists and begins sk-proj-


100%|█████████████████████████████████████| 72.1M/72.1M [00:17<00:00, 4.31MiB/s]


* Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.






Transcribed text:  Hello?
Processing voice input: Hello?


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\uvicorn\protocols\http\httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\uvicorn\middleware\proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\fastapi\applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\starlette\applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\starlette\middleware\errors.py", line 187, in __call__
   

Transcribed text:  I'd like to talk a flight to Paris.
Processing voice input: I'd like to talk a flight to Paris.


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\uvicorn\protocols\http\httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\uvicorn\middleware\proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\fastapi\applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\starlette\applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\starlette\middleware\errors.py", line 187, in __call__
   

Transcribed text:  What are my options?
Processing voice input: What are my options?
Handling tool call for function: get_flight_times, arguments: {'destination_city': 'Paris'}
Tool get_flight_times called for Paris


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\uvicorn\protocols\http\httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\uvicorn\middleware\proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\fastapi\applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\starlette\applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "C:\Users\Startklar\anaconda3\envs\llms\Lib\site-packages\starlette\middleware\errors.py", line 187, in __call__
   

Transcribed text:  Then I'd like to book the flight in the night, 2237.
Processing voice input: Then I'd like to book the flight in the night, 2237.
Handling tool call for function: book_flight, arguments: {'destination_city': 'Paris', 'time': '22:37'}
Booking flight to Paris at 22:37
