An agent that will be able to generate images, talk to the user and respond to its questions.

In [16]:
from dotenv import load_dotenv
from openai import OpenAI
import gradio as gr
import json
import base64
from io import BytesIO
from PIL import Image
from IPython.display import Audio, display
import tempfile
from pydub import AudioSegment
import pygame
import time


load_dotenv()

openai_api_key = os.getenv("OPENAI_API_KEY")

if len(openai_api_key) > 10 and openai_api_key.startswith("sk-proj"):
    print("Using OpenAI API key from environment variable.")
else:
    print("Please set the OPENAI_API_KEY environment variable to your OpenAI API key.")
    exit(1)

MODEL = "gpt-4o-mini"
openai = OpenAI()

force_dark_mode = """
function force_dark_mode() {
    const url = new URL(window.location);
    if (url.searchParams.get('__theme') !== 'dark') {
        url.searchParams.set('__theme', 'dark');
        window.location.href = url.href;
    }
}
"""

ticket_prices = {"bucharest" : 400, "paris" : 500, "london" : 600, "new york" : 700, "los angeles" : 800, "chicago" : 900}

bookings = {}

for city in ticket_prices.keys():
    bookings[city] = []

for city in bookings.keys():
    bookings[city].append({"date_time" : "2023-10-01"})
    bookings[city].append({"date_time" : "2024-01-20"})


system_message = f"""
You are a helpful assistant working for Frontier Airlines.

Your tone should be calm, courteous, and concise.
- Always respond with a single sentence unless asked for more detail.
- If you do not know the answer, say so politely.
"""

system_message += f"You can give recommendations if you are asked so, but only about these destinations : {ticket_prices.keys()}."
system_message += "Ask the user if they would like to know the ticket price for the city they are interested in and provide the price."
system_message += "Always respond with calm and courtesy."


system_message += """
- After the user showed more interest in a certain city, list available dates for the requested city.
- Ask the customer to pick a date for booking after showing availability.
- If a ticket is not available on the requested date, inform the user and suggest alternatives.
"""


def get_ticket_price(city):
    print(f"Fetching ticket price for {city}...")
    destination_city = city.lower()
    return ticket_prices.get(destination_city, "Unknown destination.")

def make_booking(city, date_time):
    destination_city = city.lower()
    for booking in bookings[destination_city]:
        if booking["date_time"] == date_time and destination_city in ticket_prices:
            return f"Booking ticket to {city} for {date_time}... successfully created!"
    return f"Booking failed. No available tickets to {city} for {date_time}."

def availability_ticket(city):
    destination_city = city.lower()
    if destination_city not in bookings.keys():
        return []
    return [booking["date_time"] for booking in bookings[destination_city]]


price_function = {
    "name" : "get_ticket_price",
    "description" : "Get the ticket price for the given city. Call this whenever you need to know the ticket price, for example when a customer asks 'How much is a ticket to this city?'",
    "parameters" : {
        "type" : "object",
        "properties" : {
            "city" : {
                "type" : "string",
                "description" : "The name of the city the customer wanto to travel to."
            }
        },
        "required" : ["city"],
        "additionalProperties" : False
    }
}

booking_function = {
    "name" : "make_booking",
    "description" : "Get the availability of the ticket for the given city. Call this whenever you need to know the availability of a ticket, for example when a customer asks 'Can I book a ticket to this city in this date?'",
    "parameters" : {
        "type" : "object",
        "properties" : {
            "city" : {
                "type" : "string",
                "description" : "The name of the city the customer wants to book a ticket for."
            },
            "date_time" : {
                "type" : "string",
                "description" : "The date and time the customer want to book the ticket for."
            }
        },
        "required" : ["city", "date_time"],
        "additionalProperties" : False
    }
}

availability_function = {
    "name" : "availability_ticket",
    "description" : "Get the list of available tickets for a specific city. Call this whenever you have to provide flight dates to the customer.",
    "parameters" : {
        "type" : "object",
        "properties" : {
            "city" : {
                "type" : "string",
                "description" : "The name of the city you will provide the list of available tickets for."
            }
        },
        "required" : ["city"],
        "additionalProperties" : False
    }
}

tools = [{"type" : "function", "function" : price_function}, {"type" : "function", "function" : booking_function}, {"type" : "function", "function" : availability_function}]

def handle_tool_call(message):
    responses = []
    for tool_call in message.tool_calls:
        arguments = json.loads(tool_call.function.arguments)
        tool_name = tool_call.function.name

        if tool_name == "get_ticket_price":
            city = arguments.get('city')
            price = get_ticket_price(city)
            responses.append({
                "role": "tool",
                "content": json.dumps({"city": city,"price": price}),
                "tool_call_id": tool_call.id
            })
        elif tool_name == "make_booking":
            city = arguments.get('city')
            date_time = arguments.get('date_time')
            booking_response = make_booking(city, date_time)
            responses.append({
                "role" : "tool",
                "content" : json.dumps({"city" : city, "date_time" : date_time, "booking_response" : booking_response}),
                "tool_call_id" : tool_call.id
            })
        elif tool_name == "availability_ticket":
            city = arguments.get('city')
            available_dates = availability_ticket(city)
            responses.append({
                "role" : "tool",
                "content" : json.dumps({"city" : city, "available_dates" : available_dates}),
                "tool_call_id" : tool_call.id
            })
    return responses

def talker(message):
    # Generate speech
    response = openai.audio.speech.create(
        model="tts-1",
        voice="onyx",
        input=message
    )

    # Convert response to an AudioSegment
    audio = AudioSegment.from_file(BytesIO(response.content), format="mp3")

    # Save to temporary file
    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file:
        audio.export(temp_file.name, format="mp3")

    # Play audio using pygame
    pygame.mixer.init()
    pygame.mixer.music.load(temp_file.name)
    pygame.mixer.music.play()

    while pygame.mixer.music.get_busy():
        time.sleep(0.1)
    


def artist(city):
    response = openai.images.generate(
        model = "dall-e-3",
        prompt = f"Create a beautiful image of {city}, showing its most famous landmarks and attractions, with tourists enjoying the scenery.",
        size = "1024x1024",
        n = 1, # number of images to generate
        response_format = "b64_json" # response format, it can be transmitted as a text string
    )

    image_data = response.data[0].b64_json # get the first image data
    image_bytes = base64.b64decode(image_data) # decode the base64 string to bytes
    return Image.open(BytesIO(image_bytes)) 


def chat_with_tool(history):
    messages = [{"role" : "system", "content" : system_message}] + history
    image = None

    response = openai.chat.completions.create(
        model = MODEL,
        messages = messages,
        tools = tools
    )

    if response.choices[0].finish_reason == "tool_calls":
        message = response.choices[0].message
        responses = handle_tool_call(message)
        messages.append(message)
        for response in responses:
            messages.append(response)
        # image = artist(city)
        response = openai.chat.completions.create(
            model = MODEL,
            messages = messages
        )

    reply = response.choices[0].message.content
    history += [{"role":"assistant", "content":reply}]

    talker(reply)
    return history, image


with gr.Blocks(js = force_dark_mode) as ui:
    with gr.Row():
        chatbot = gr.Chatbot(height=500, type="messages")
        image_output = gr.Image(height=500)
    with gr.Row():
        entry = gr.Textbox(label="Chat with our AI Assistant:")
    with gr.Row():
        clear = gr.Button("Clear")

    def do_entry(message, history):
        history += [{"role":"user", "content":message}]
        return "", history

    entry.submit(do_entry, inputs=[entry, chatbot], outputs=[entry, chatbot]).then(
        chat_with_tool, inputs=chatbot, outputs=[chatbot, image_output]
    )
    clear.click(lambda: None, inputs=None, outputs=chatbot, queue=False)

ui.launch(inbrowser=True)

Using OpenAI API key from environment variable.
* Running on local URL:  http://127.0.0.1:7874

To create a public link, set `share=True` in `launch()`.




Fetching ticket price for los angeles...
