In [None]:
import openai as client

# # 처음 assistant를 생성할때는 아래 코드를 이용하면됨
# assistant = client.beta.assistants.create(
#     name="Book Assistant",
#     instructions="You help users with their question on the files they upload.",
#     model="gpt-3.5-turbo-1106",
#     tools=[{"type":"retrieval"}]
# ) # 2024.03.07 현재 assistant가 beta임

# 한번 생성한 assistant를 재사용할때는 id를 이용해서 사용가능
assistant_id="asst_xxxx"

In [None]:
# thread를 생성 (thread는 메시지의 그룹임). 생성한 thread는 run을 해야 llm에서 처리됨
thread = client.beta.threads.create(
    messages=[
        {
            "role":"user",
            "content":"I want you to help me with this file"
        }
    ]
)
thread

In [None]:
file = client.files.create(
    file=client.file_from_path("../../files/chapter_one.txt"),
    purpose="assistants"
)
file

In [None]:
client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content="",
    file_ids=[file.id]
)

In [None]:
# thread를 실행하기 위해 run을 생성
run = client.beta.threads.runs.create(
    thread_id=thread.id,
    assistant_id=assistant_id
)
run

In [None]:
import json

# 현재(2024.3월) streaming이 지원되지 않으므로 run의 상태가 어떤지 알기 위해서 계속 물어봐야함
def get_run(run_id, thread_id):
    return client.beta.threads.runs.retrieve(
        run_id=run_id,
        thread_id=thread_id,
    )

def get_messages(thread_id):
    messages = client.beta.threads.messages.list(
        thread_id=thread_id
    )
    messages = list(messages)
    messages.reverse()
    for message in messages:
        if len(message.content) >= 1:
            print(f"{message.role}: {message.content[0].text.value}")
            for annotation in message.content[0].text.annotations:
                print(f"Source: {annotation.file_citation}")
        else:
            print(f"{message.role}: No content")

def send_message(thread_id, content):
    return client.beta.threads.messages.create(
        thread_id=thread_id,
        role="user",
        content=content
    )

def cancel_run(run_id, thread_id):
    return client.beta.threads.runs.cancel(
        thread_id=thread_id,
        run_id=run_id
    )

def delete_thread(thread_id):
    return client.beta.threads.delete(
        thread_id=thread_id
    )

def get_tool_outputs(run_id, thread_id):
    run = get_run(run_id, thread_id)
    outputs = []
    for action in run.required_action.submit_tool_outputs.tool_calls:
        action_id = action.id
        function = action.function
        print(f"Calling function: {function.name} with arg {function.arguments}")
        # ai가 실행하라고 한 함수를 실행. 함수는 미리 functions_map에 담아놨음
        outputs.append({
            "output": functions_map[function.name](json.loads(function.arguments)),
            "tool_call_id":action_id
        })

    return outputs

def submit_tool_outputs(run_id, thread_id):
    outputs = get_tool_outputs(run_id, thread_id)
    return client.beta.threads.runs.submit_tool_outputs(
        run_id = run_id,
        thread_id=thread_id,
        tool_outputs=outputs
    )

In [None]:
get_run(run.id, thread.id).status

In [None]:
get_messages(thread.id)

In [None]:
send_message(thread.id, "Where does he work?")