In [None]:
import os
from dotenv import load_dotenv

load_dotenv("../.env")

api_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_version = os.getenv("OPENAI_API_VERSION")
api_deployment_name = os.getenv("OPENAI_GPT_DEPLOYMENT")

In [None]:
import time
from pathlib import Path
from typing import Iterable
import json

import requests
from openai import AzureOpenAI
from openai.types import FileObject
from openai.types.beta import Thread
from openai.types.beta.threads import Run
from openai.types.beta.threads.messages import MessageFile

In [None]:
client = AzureOpenAI(api_key=api_key, api_version=api_version, azure_endpoint=api_endpoint)

In [None]:
def get_customer_info(id: str) -> str:
    get_customer_api = os.getenv("CUSTOMER_INFO_API")

    # ?id=1&partitionKeyValue=1
    response = requests.get(f"{get_customer_api}?id={id}&partitionKeyValue={id}")
    return json.dumps(response.json())

In [None]:
assistant_id = os.getenv("OPENAI_ASSISTANT_ID")
assistant = client.beta.assistants.retrieve(assistant_id)

In [None]:
thread = client.beta.threads.create()

In [None]:
def call_functions(client: AzureOpenAI, thread: Thread, run: Run) -> None:
    print("Function Calling")
    required_actions = run.required_action.submit_tool_outputs.model_dump()
    print(required_actions)
    tool_outputs = []
    import json

    for action in required_actions["tool_calls"]:
        func_name = action["function"]["name"]
        arguments = json.loads(action["function"]["arguments"])

        if func_name == "get_customer_info":
            output = get_customer_info(id=arguments["id"])
            tool_outputs.append({"tool_call_id": action["id"], "output": output})
        else:
            raise ValueError(f"Unknown function: {func_name}")

    print("Submitting outputs back to the Assistant...")
    client.beta.threads.runs.submit_tool_outputs(thread_id=thread.id, run_id=run.id, tool_outputs=tool_outputs)

In [None]:
def format_messages(messages: Iterable[MessageFile]) -> None:
    message_list = []

    # Get all the messages till the last user message
    for message in messages:
        message_list.append(message)
        if message.role == "user":
            break

    # Reverse the messages to show the last user message first
    message_list.reverse()

    # Print the user or Assistant messages or images
    for message in message_list:
        for item in message.content:
            print(f"{message.role}:\n{item.text.value}\n")
            

In [None]:
def process_message(content: str) -> None:
    client.beta.threads.messages.create(thread_id=thread.id, role="user", content=content)

    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant.id
    )

    print("processing...")
    while True:
        run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
        if run.status == "completed":
            messages = client.beta.threads.messages.list(thread_id=thread.id)
            format_messages(messages)
            break
        if run.status == "failed":
            messages = client.beta.threads.messages.list(thread_id=thread.id)
            format_messages(messages)
            # Handle failed
            break
        if run.status == "expired":
            # Handle expired
            break
        if run.status == "cancelled":
            # Handle cancelled
            break
        if run.status == "requires_action":
            call_functions(client, thread, run)
        else:
            time.sleep(5)

In [None]:
process_message("What can you tell me about your jackets?")

In [None]:
process_message("What can you tell me about your jackets? Find products documentation in file products.csv. Use the columns 'name' and 'description' to find information about the product requested")

In [None]:
process_message("Can you remember me what products I have ordered before? My user id is 2")