In [1]:
import os
import json
from dotenv import load_dotenv
from IPython.display import Markdown, display, update_display
from openai import OpenAI
import gradio as gr
import google.generativeai
import anthropic

In [2]:
# constants

MODEL_GPT = 'gpt-4o-mini'
MODEL_CLAUDE = 'claude-3-5-sonnet-20240620'
MODEL_GEMINI = 'gemini-1.5-flash'

In [3]:
# set up environment

load_dotenv()
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')
os.environ['ANTHROPIC_API_KEY'] = os.getenv('ANTHROPIC_API_KEY', 'your-key-if-not-using-env')
os.environ['GOOGLE_API_KEY'] = os.getenv('GOOGLE_API_KEY', 'your-key-if-not-using-env')

In [4]:
openai = OpenAI()
claude = anthropic.Anthropic()
google.generativeai.configure()

In [5]:
import base64
from io import BytesIO
from PIL import Image
from IPython.display import Audio, display

In [6]:
def talker(message):
    response = openai.audio.speech.create(
        model='tts-1',
        voice='alloy',
        input=message
    )

    audio_stream = BytesIO(response.content)
    output_filename = 'output_audio.mp3'
    with open(output_filename, 'wb') as f:
        f.write(audio_stream.read())

    display(Audio(output_filename,autoplay=True))

# talker('테스트')

In [7]:
# prompts
general_prompt = "답변은 가능한 한 기술적으로 하세요.\
전문 지식이 있는 주제에 대한 질문에만 답변하세요.\
모르는 것이 있으면 말하세요. 한글로만 답변해줘."

additional_prompt_gpt = "사용자 질의를 분석하고 콘텐츠가 주로 \
거시경제 뉴스 분석이 있는지 확인합니다. \
그렇다면 직접 답변하세요. 그렇지 않으면 주로 \
자산 간 상관관계 분석 관련이 있는 경우 도구 ask_gemini에서 답변을 받거나 \
기술적 지표 분석과 관련된 주제에 속하는 경우 도구 ask_claude에서 답변을 받습니다."

system_prompt_gpt = "You are a helpful technical tutor who is an expert in \
거시경제 뉴스 분석."+ additional_prompt_gpt + general_prompt
system_prompt_gemini = "You are a helpful technical tutor who is an expert in 자산 간 상관관계 분석." + general_prompt
system_prompt_claude = "You are a helpful technical tutor who is an expert in 기술적 지표 분석." + general_prompt

def get_user_prompt(question):
    return "Please give a detailed explanation to the following question: " + question

In [8]:
def call_claude(question):
    result = claude.messages.create(
        model=MODEL_CLAUDE,
        max_tokens=200,
        temperature=0.3,
        system=system_prompt_claude,
        messages=[
            {"role": "user", "content": get_user_prompt(question)},
        ],
    )

    return result.content[0].text

In [28]:
def call_gemini(question):
    gemini = google.generativeai.GenerativeModel(
        model_name=MODEL_GEMINI,
        system_instruction=system_prompt_gemini
    )
    response = gemini.generate_content(get_user_prompt(question))
    response = response.text
    return response

In [29]:
def ask_claude(question):
    print(f"Tool ask_claude called for {question}")
    return call_claude(question)

def ask_gemini(question):
    print(f"Tool ask_gemini called for {question}")
    return call_gemini(question)

In [30]:
ask_claude_function = {
    "name": "ask_claude",
    "description": "Get the answer to the question related to a topic this agent is faimiliar with.",
    "parameters": {
        "type": "object",
        "properties": {
            "question_for_topic": {
                "type": "string",
                "description": "질문은 기술적 지표 분석과 관련이 있다."
            }
        },
        "required": ["question_for_topic"],
        "additionalProperties": False
    }
}

In [31]:
ask_gemini_function = {
    "name": "ask_gemini",
    "description": "Get the answer to the question related to a topic this agent is faimiliar with.",
    "parameters": {
        "type": "object",
        "properties": {
            "question_for_topic": {
                "type": "string",
                "description": "질문은 자산 간 상관관계 분석과 관련이 있다."
            }
        },
        "required": ["question_for_topic"],
        "additionalProperties": False
    }
}

In [32]:
tools = [{"type": "function", "function": ask_claude_function},
        {"type": "function", "function": ask_gemini_function}]
tools_function_map = {
    "ask_claude": ask_claude,
    "ask_gemini": ask_gemini
}

In [37]:
def chat(history):
    messages = [{"role": "system", "content": system_prompt_gpt}] + history
    stream = openai.chat.completions.create(model=MODEL_GPT, messages=messages, tools=tools, stream=True)

    full_response = ""
    history += [{"role": "assistant", "content": full_response}]

    tool_call_accumulator = ""
    tool_call_id = None
    tool_call_function_name = None
    tool_calls = []

    for chunk in stream:
        if chunk.choices[0].delta.content:
            full_response += chunk.choices[0].delta.content or ""
            history[-1]['content'] = full_response
            yield history

        if chunk.choices[0].delta.tool_calls:
            message = chunk.choices[0].delta
            print(message)
            for tc in chunk.choices[0].delta.tool_calls:
                print(tc)
                if tc.id:
                    tool_call_id = tc.id
                    if tool_call_function_name is None:
                        tool_call_function_name = tc.function.name

                tool_call_accumulator += tc.function.arguments if tc.function.arguments else ""

                try:
                    func_args = json.loads(tool_call_accumulator)
                    tool_response, tool_call = handle_tool_call(tool_call_function_name, func_args, tool_call_id)
                    tool_calls.append(tool_call)

                    messages.append({
                        "role": "assistant",
                        "tool_calls": tool_calls
                    })
                    messages.append(tool_response)

                    response = openai.chat.completions.create(
                        model=MODEL_GPT,
                        messages=messages,
                        stream=True
                    )
                    full_response = ""
                    for chunk in response:
                        if chunk.choices[0].delta.content:
                            full_response += chunk.choices[0].delta.content or ""
                            history[-1]['content'] = full_response
                            yield history

                    tool_call_accumulator = ''
                    tool_call_id = None
                    tool_call_function_name = None
                    tool_calls = []

                except json.JSONDecodeError:
                    pass
    talker(full_response)
                    

In [38]:
def handle_tool_call(function_name, arguments, tool_call_id):
    question = arguments.get('question_for_topic')

    tool_call = {
        "id": tool_call_id,
        "type": "function",
        "function": {
            "name": function_name,
            "arguments": json.dumps(arguments)
        }
    }

    if function_name in tools_function_map:
        answer = tools_function_map[function_name](question)
        response = {
            "role": "tool",
            "content": json.dumps({"question": question, "answer": answer}),
            "tool_call_id": tool_call_id
        }
        return response, tool_call

In [39]:
def transcribe_audio(audio_file_path):
    try:
        audio_file = open(audio_file_path, 'rb')
        response = openai.audio.transcriptions.create(model="whisper-1", file=audio_file)
        return response.text
    except Exception as e:
        return f"an error occurred: {e}"

In [40]:
with gr.Blocks() as ui:
    with gr.Row():
        chatbot = gr.Chatbot(height=500, type="messages", label="Multimodal Technical Expert Chatbot")
    with gr.Row():
        entry = gr.Textbox(label="Ask our technical expert anything:")
        audio_input = gr.Audio(
            sources="microphone",
            type="filepath",
            label="Record audio",
            editable=False,
            waveform_options=gr.WaveformOptions(
                show_recording_waveform=False,
            )
        )
        audio_input.stop_recording(
            fn=transcribe_audio,
            inputs=audio_input,
            outputs=entry
        )

        with gr.Row():
            clear = gr.Button("Clear")

        def do_entry(message, history):
            history += [{"role": "user", "content": message}]
            yield "", history

        entry.submit(do_entry, inputs=[entry, chatbot], outputs=[entry, chatbot]).then(
            chat, inputs=chatbot, outputs=chatbot)

        clear.click(lambda: None, inputs=None, outputs=chatbot, queue=False)

ui.launch()

* Running on local URL:  http://127.0.0.1:7865

To create a public link, set `share=True` in `launch()`.




ChoiceDelta(content=None, function_call=None, refusal=None, role='assistant', tool_calls=[ChoiceDeltaToolCall(index=0, id='call_aI6G7CLne8V8sT9feWJkTOcA', function=ChoiceDeltaToolCallFunction(arguments='', name='ask_gemini'), type='function')])
ChoiceDeltaToolCall(index=0, id='call_aI6G7CLne8V8sT9feWJkTOcA', function=ChoiceDeltaToolCallFunction(arguments='', name='ask_gemini'), type='function')
ChoiceDelta(content=None, function_call=None, refusal=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='{"', name=None), type=None)])
ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='{"', name=None), type=None)
ChoiceDelta(content=None, function_call=None, refusal=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='question', name=None), type=None)])
ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments

ChoiceDelta(content=None, function_call=None, refusal=None, role='assistant', tool_calls=[ChoiceDeltaToolCall(index=0, id='call_rBf8iT51bY4WGQPFJkkPxlOP', function=ChoiceDeltaToolCallFunction(arguments='', name='ask_gemini'), type='function')])
ChoiceDeltaToolCall(index=0, id='call_rBf8iT51bY4WGQPFJkkPxlOP', function=ChoiceDeltaToolCallFunction(arguments='', name='ask_gemini'), type='function')
ChoiceDelta(content=None, function_call=None, refusal=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='{"', name=None), type=None)])
ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='{"', name=None), type=None)
ChoiceDelta(content=None, function_call=None, refusal=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='question', name=None), type=None)])
ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments

ChoiceDelta(content=None, function_call=None, refusal=None, role='assistant', tool_calls=[ChoiceDeltaToolCall(index=0, id='call_zdp6GKeQKoq3O9oOQdNe4pTJ', function=ChoiceDeltaToolCallFunction(arguments='', name='ask_claude'), type='function')])
ChoiceDeltaToolCall(index=0, id='call_zdp6GKeQKoq3O9oOQdNe4pTJ', function=ChoiceDeltaToolCallFunction(arguments='', name='ask_claude'), type='function')
ChoiceDelta(content=None, function_call=None, refusal=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='{"', name=None), type=None)])
ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='{"', name=None), type=None)
ChoiceDelta(content=None, function_call=None, refusal=None, role=None, tool_calls=[ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments='question', name=None), type=None)])
ChoiceDeltaToolCall(index=0, id=None, function=ChoiceDeltaToolCallFunction(arguments