In [8]:
# 회사 계정과 연결된 openai api를 사용하기 위한 코드
import openai, time, builtins, json
from box import Box
from datetime import datetime
from pathlib import Path
from pprint import pprint
from const.paths import *

# 객체의 타입을 명확히 알기 위한 import
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.beta.thread import Thread
from openai.types.beta.assistant import Assistant
from openai.types.beta.threads.run import Run
from openai.types.beta.threads.thread_message import ThreadMessage

DATETIME_FORMAT = "%Y%m%d-%H%M%S"
MESSAGE_SEPARATOR = "\n\n------------------------------------\n\n"

RESULT_DIR = RESULT_STORAGE

keys = Box(
    json.load(
        open("const/openai_keys.json", "r", encoding="utf-8")
        )
    )

# 코드 간결화를 위한 객체
class APIClient:

    def __init__(self, api_key):
        self.client = openai.OpenAI(api_key = api_key)
        self._assistant:Assistant = None
        self._thread:Thread = None

    def set_assistant(self, assistant_id):
        self._assistant = self.client.beta.assistants.retrieve(assistant_id)
    
    def set_thread(self, thread_id):
        self._thread = self.client.beta.threads.retrieve(thread_id)
    
    @property
    def assisstant(self):
        return self._assistant
    
    @property
    def thread(self):
        return self._thread

    @property
    def recent_message(self) -> str:
        """
        가장 최근 AI 생성 메세지를 반환\n
        단, 아직 AI가 메세지를 생성하지 않았다면 유저의 최근 메세지를 반환함
        """
        self.check_assistant_and_thread()
        messages:list[ThreadMessage] = list(self.client.beta.threads.messages.list(thread_id=self._thread.id))
        return messages[0].content[0].text.value

    def check_assistant_and_thread(self):
        if self._assistant is None: raise ValueError("Assistant is not set")
        if self._thread is None: raise ValueError("Thread is not set")

    def _wait_on_run(self, run:Run, thread:Thread):
        while run.status == "queued" or run.status == "in_progress":
            run = self.client.beta.threads.runs.retrieve(
                thread_id=thread.id,
                run_id=run.id,
            )
            time.sleep(0.5)
        return run

    def send_message(self, content:str, **kwargs):
        """
        메세지 전송 후 대답을 받기까지 기다리는 코드
        """
        self.check_assistant_and_thread()
        self.client.beta.threads.messages.create(
            thread_id=self._thread.id,
            role="user",
            content=content
        )
        run = self.client.beta.threads.runs.create(
            thread_id=self._thread.id,
            assistant_id=self._assistant.id
        )
        run = self._wait_on_run(run, self._thread)
        return run
    
    def mlist_to_txt(self, result_dir:str|Path):
        result_dir = Path(result_dir)
        txt_file = result_dir / self.make_file_name("txt")
        messages = self.get_mlist(result_dir)
        with builtins.open(txt_file, "w", encoding = "utf-8") as f:
            sender = ""
            for msg in messages:
                if msg.assistant_id: sender = f"{self._assistant.id}"
                else: sender = "user"
                f.write(sender)
                f.write("\n\n")
                f.write(MESSAGE_SEPARATOR.join([c.text.value for c in msg.content]))
                f.write(MESSAGE_SEPARATOR)
        # thread_id를 공유하는 다른 결과 파일 삭제
        for file in RESULT_DIR.glob(f"{self._thread.id}_*.txt"):
            if file != txt_file: file.unlink()
    
    def mlist_to_json(self, result_dir:str|Path):
        result_dir = Path(result_dir)
        json_file = result_dir / self.make_file_name("json")
        messages = self.get_mlist(result_dir)
        with builtins.open(json_file, "w", encoding = "utf-8") as f:
            data = []
            for msg in messages:
                data.append({
                    "sender": "assistant" if msg.assistant_id else "user",
                    "timestamp": datetime.utcfromtimestamp(msg.created_at).strftime(DATETIME_FORMAT),
                    "contents": [c.text.value for c in msg.content]
                })
            # data의 순서를 뒤집어서 저장
            data.reverse()
            json.dump(data, f, ensure_ascii=False, indent=2)
        # thread_id를 공유하는 다른 결과 파일 삭제
        for file in RESULT_DIR.glob(f"{self._thread.id}_*.json"):
            if file != json_file: file.unlink()

    def make_file_name(self, extension:str):
        return f"{self._thread.id}_{datetime.now().strftime(DATETIME_FORMAT)}.{extension}"
    
    # mlist_to_txt와 mlist_to_json 공통 논리를 함수화
    def get_mlist(self, result_dir:Path):
        self.check_assistant_and_thread()
        result_dir.mkdir(exist_ok=True)
        messages = self.client.beta.threads.messages.list(thread_id=self._thread.id)
        return messages

In [9]:
# 주요 변수 선언

api_client = APIClient(keys.client_key)

# 웹페이지에서 생성한 assistant와 thread id를 APIClient에 등록
api_client.set_assistant(keys.assistants.NER_Agent.id)
api_client.set_thread(keys.assistants.NER_Agent.threads[0])
# api_client.set_assistant(keys["assistants"]["QA_Agent"]["id"])
# api_client.set_thread("")

In [None]:
# 인도네시아어 질문 생성, 번역
run = api_client.send_message(
"""
"""
)
api_client.recent_message

In [None]:
# 인도네시아어 질문에 대한 인도네시아어 답변 생성 (번역할 필요 없음)
run = api_client.send_message(
"""
"""
)
api_client.recent_message

In [52]:
# 현재까지의 대화 내용을 출력하기
api_client.mlist_to_txt(RESULT_DIR)

In [10]:
api_client.mlist_to_json(RESULT_DIR)

In [44]:
# 쓸모없어짐
# 답변을 json 형태로 출력
"""
결과물 json 파일 형식
{
    "Doc_ID": "20230808_newsdata_Korea_007413",
    "Filename": "",
    "Title": "Kang Daniel Dipilih Sebagai Wajah Baru untuk Merek Kecantikan Global Mernel", # 제목
    "Text": "Kang Daniel Dipilih Sebagai Wajah Baru untuk Merek Kecantikan Global Mernel", # 본문
    "Pub_Type": "Newspaper",
    "Pub_Subj": "Korea",
    "Pub_date": "2021-01-01",
    "Coll_date": "2023-08-08",
    "data": [
        ...
    ]
}

data에 들어갈 dict 형식
{
    "SEN_ID": "20230808_newsdata_Korea_007413_sen000001",
    "Word_Count": 10,
    "NER_Count": 1,
    "ANNO_ID": "IN_001",
    "Raw_data": "Kang Daniel telah menjadi wajah baru dari merek kecantikan .",
    "Entities_list": [
        "PS-Name-B",
        "PS-Name-I",
        "O",
        "O",
        "O",
        "O",
        "O",
        "O",
        "O",
        "O"
    ],
    "Entities": [
        {
            "entity": "Kang Daniel",
            "entityClass": "PS-Name",
            "entityStart": 0,
            "entityEnd": 1
        }
    ]
}

답변의 토큰 리스트와 태그 리스트를 받고
data에 들어갈 dict 형식 중 Word_Count, NER_Count, Raw_data, Entities_list, Entities 만들기

토큰 리스트 / 태그 리스트 파싱 방식:
1. 답변을 개행 단위로 나누기
2. "토큰"이 포함된 행 다음에 "```"이 포함된 행이 나오면 그 행은 건너뛰고, 다시 "```"가 포함된 행이 나올 때까지의 각 행을 join으로 합쳐서 eval()을 이용해 별도의 변수에 저장
3. 2의 논리를 "태그" 이하에도 적용
"""

def make_entity_data(raw_data:str, entities_list:list) -> list:
    entity_data = []
    current_entity = None
    
    for i, (token, entity_tag) in enumerate(zip(raw_data.split(), entities_list)):
        if entity_tag != 'O':
            # 개체명 클래스와 타입 (B/I)을 분리합니다.
            ent_class, ent_type = entity_tag.rsplit('-', 1)
            
            if ent_type == 'B':
                # 새로운 개체명이 시작되었습니다.
                if current_entity:
                    # 이전 개체명 정보를 저장합니다.
                    entity_data.append(current_entity)
                    
                # 새 개체명 정보를 초기화합니다.
                current_entity = {
                    "entity": token,
                    "entityClass": ent_class,
                    "entityStart": i,
                    "entityEnd": i
                }
                
            elif ent_type == 'I' and current_entity and ent_class == current_entity['entityClass']:
                # 이전 개체명이 계속되고 있습니다.
                current_entity['entity'] += " " + token
                current_entity['entityEnd'] = i
                
        else:
            # 개체명이 끝났습니다.
            if current_entity:
                entity_data.append(current_entity)
                current_entity = None
    
    # 마지막 개체명이 있다면 추가합니다.
    if current_entity:
        entity_data.append(current_entity)
    
    return entity_data

def get_list_from_msg(msg:str) -> list[list[str]]:
    """
    msg: 답변의 텍스트
    """
    msg_list:list[str] = msg.split("\n")
    tokens_list = []
    tags_list = []
    for i in range(len(msg_list)):
        if "```" in msg_list[i]:
            raw_list = list(msg_list[i+1:msg_list.index("```", i+1)])
            raw_list = [raw.strip() for raw in raw_list]
            for row in range(0, len(raw_list), 2):
                tokens_list.append(eval(raw_list[row]))
                tags_list.append(eval(raw_list[row+1]))
            break
    return tokens_list, tags_list

def make_dict(tokens_list:list, tags_list:list) -> list[dict] | list[int]:
    """
    목표 json 형식의 Word_Count, NER_Count, Raw_data, Entities_list, Entities 만들기
    ---
    data: 토큰 리스트 혹은 태그 리스트
    """
    if len(tokens_list) != len(tags_list): return [len(tokens_list), len(tags_list)] # raise ValueError("Token list and Tag list must have same length")
    return_list = []
    for tokens, tags in zip(tokens_list, tags_list):
        comp_dict = {}
        if len(tokens) != len(tags): 
            comp_dict["error"] = f"tk : {len(tokens)}, tg : {len(tags)}"
            # return_list.append(comp_dict)
            # continue
        comp_dict["Word_Count"] = len(tokens)
        comp_dict["NER_Count"] = 0
        comp_dict["Raw_data"] = " ".join(tokens)
        comp_dict["Entities_list"] = tags
        comp_dict["Entities"] = make_entity_data(comp_dict["Raw_data"], tags)

        comp_dict["NER_Count"] = len(comp_dict["Entities"])
        
        return_list.append(comp_dict)
    return return_list

json.dump(
    make_dict(
        *get_list_from_msg(api_client.recent_message)
    ),
    open(
        Path(f"result_{datetime.now().strftime(DATETIME_FORMAT)}.json"), 
        "w", encoding="utf-8"),
    indent=4,
    ensure_ascii=False
)


In [88]:
# # assistant api를 활용하기 위한 문서 불러오기
# NER_instruction_filename = "indonesian_NER_instruction.md"
# NER_instruction = client.files.create(
#     file=open("indonesian_NER_instruction.md", "rb"),
#     purpose="assistants"
# )

# tfs_filename = "tagged_files_statistic.json"
# tfs = client.files.create(
#     file=open("tagged_files_statistic.json", "rb"),
#     purpose="assistants"
# )