In [82]:
import os
import json
import operator
import warnings
warnings.filterwarnings("ignore")

from dotenv import load_dotenv
from pprint import pprint

from langchain_teddynote import logging
from models import Agent, get_rag_instance

from utils import save_output2json
from prompt import load_system_prompt, load_invoke_input

from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, FunctionMessage, HumanMessage
from langgraph.graph import END, StateGraph
from langgraph.prebuilt.tool_executor import ToolExecutor, ToolInvocation
from typing import Annotated, Sequence, TypedDict, List
from typing_extensions import TypedDict
import functools

from retriever.retriever_handler import get_retriever
from utils.model_handler import get_llm
from utils.utils import format_docs
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import CommaSeparatedListOutputParser
from langchain.tools import Tool
from langchain_core.runnables import RunnablePassthrough
from langchain.tools.render import format_tool_to_openai_function
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder

from pydantic import BaseModel
from typing import Literal

In [83]:
# .env 파일 로드
load_dotenv(dotenv_path=".env")

# API 키 가져오기
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
LANGCHAIN_API_KEY = os.getenv("LANGCHAIN_API_KEY")

# LangSmith 추적 기능을 활성화합니다. (선택적)
os.environ["LANGCHAIN_TRACING_V2"] = "true"

In [84]:
# 각 에이전트와 도구에 대한 다른 노드를 생성할 것입니다. 이 클래스는 그래프의 각 노드 사이에서 전달되는 객체를 정의합니다.
class AgentState(TypedDict):
    sample_names: List[str]
    supervisor_question: str
    messages: Annotated[Sequence[BaseMessage], operator.add]
    sender: str

### SampleNameSearcher

In [85]:
retriever = get_retriever(
            file_folder="./data/raw", 
            file_number=11,
            chunk_size=500, 
            chunk_overlap=100, 
            search_k=10
        )

model = ChatOpenAI(model_name="gpt-4o", temperature=0.1)

sample_name_retriever_prompt = """
You are an expert assistant specializing in extracting information from research papers related to battery technology. Your role is to carefully analyze the provided document.

Document:
{context}
"""

output_parser = CommaSeparatedListOutputParser()
format_instructions = output_parser.get_format_instructions()

prompt = ChatPromptTemplate.from_messages([
    ("system", sample_name_retriever_prompt), 
    ("human", "{sample_name_question}")
])

sample_name_searcher_chain = (
    {
        "context": retriever | format_docs, 
        "sample_name_question": RunnablePassthrough()
    }
    | prompt 
    | model 
    | output_parser
)

In [86]:
sample_name_question = """
Use all of the NCM cathode sample names (e.g., 'NCM-622', 'pristine NCM', 'M-NCM') provided in the electrochemical performance section. You just output sample names. Do Not output like '- NCM622' , just output 'NCM622.
"""

In [87]:
sample_names = sample_name_searcher_chain.invoke(sample_name_question)

In [88]:
sample_names

['NR0', 'NR1', 'NR3', 'NR5']

### Supervisor

In [141]:
from langchain_core.messages import AIMessage

def create_supervisor(model_name, members: list, system_prompt: str=None):            
    options_for_next = ["FINISH"] + members

    # 작업자 선택 응답 모델 정의: 다음 작업자를 선택하거나 작업 완료를 나타냄
    class RouteResponse(BaseModel):
        next: Literal[*options_for_next]

    # ChatPromptTemplate 생성
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="members"),
            (
                "system",
                "Given the conversation above, who should act next? "
                "Or should we FINISH? Select one of: {options}",
            ),
        ]
    ).partial(options=str(options_for_next), members=", ".join(members))

    llm = get_llm(model_name, temperature=0.1)
    print(prompt)
    return {"supervisor_question": RunnablePassthrough()} | prompt | llm.with_structured_output(RouteResponse)

In [142]:
## Supervisor 시스템 프롬프트
supervisor_system_prompt = """
- 당신은 Researcher와 Verifier를 관리하는 지시자입니다. 
- 아래의 작업자들 간의 대화를 조정하여 최종적으로 정확한 답변을 도출해야 합니다.

## 역할 및 프로세스:
1. **Sample Name Searcher**로부터 sample name들을 받습니다.
2. 우리는 질문에 담긴 모든 변수에 대한 정보를 각 sample name에 대해 추출해야 합니다.  
3. 모든 정보를 한 번에 추출하면 정확도가 떨어질 수 있으므로, 추출할 변수들을 4개의 category로 분할합니다.
4. **4명의 Researcher 에이전트**에게 각 변수 category에 대한 질문을 생성하고, 그 질문 리스트를 제공합니다.
5. 반드시 모든 sample name에 대해 각각 정보를 추출해야 합니다. 
6. Researcher들이 정보를 추출한 후, **4명의 Verifier 에이전트**에게 전달하여 검증을 요청합니다.
7. Verifier의 검증 결과를 종합하여 최종 답변을 추론합니다.

## 작업자 관리:
- 당신은 {members} 간의 대화를 조율합니다.
- 아래의 사용자 요청에 따라, 다음 작업을 수행할 작업자를 결정하고 지시해야 합니다.
- 각 작업자는 자신의 작업을 완료하면 결과와 상태를 반환합니다.
- 모든 과정이 완료되면 `FINISH`로 응답해야 합니다.

## 질문 목록:

### Final Answer:
"""

In [143]:
members = [f"Researcher{i}" for i in range(1, 5)] + [f"Verifier{i}" for i in range(1, 5)]

# from langchain.schema import AIMessage

# members = [AIMessage(content=f"Researcher{i}") for i in range(1, 5)] + [AIMessage(content=f"Verifier{i}") for i in range(1, 5)]


supervisor_agent = create_supervisor("gpt-4o", members, supervisor_system_prompt)

input_variables=[] input_types={'members': list[typing.Annotated[typing.Union[typing.Annotated[langchain_core.messages.ai.AIMessage, Tag(tag='ai')], typing.Annotated[langchain_core.messages.human.HumanMessage, Tag(tag='human')], typing.Annotated[langchain_core.messages.chat.ChatMessage, Tag(tag='chat')], typing.Annotated[langchain_core.messages.system.SystemMessage, Tag(tag='system')], typing.Annotated[langchain_core.messages.function.FunctionMessage, Tag(tag='function')], typing.Annotated[langchain_core.messages.tool.ToolMessage, Tag(tag='tool')], typing.Annotated[langchain_core.messages.ai.AIMessageChunk, Tag(tag='AIMessageChunk')], typing.Annotated[langchain_core.messages.human.HumanMessageChunk, Tag(tag='HumanMessageChunk')], typing.Annotated[langchain_core.messages.chat.ChatMessageChunk, Tag(tag='ChatMessageChunk')], typing.Annotated[langchain_core.messages.system.SystemMessageChunk, Tag(tag='SystemMessageChunk')], typing.Annotated[langchain_core.messages.function.FunctionMessageC

### Researcher & Verifier

In [144]:
def create_agent(model_name, tools, system_message: str):
    # 에이전트를 생성합니다.
    functions = [format_tool_to_openai_function(t) for t in tools]
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a helpful AI assistant, collaborating with other assistants."
                " Use the provided tools to progress towards answering the question."                                        
                " If you are unable to fully answer, that's OK, another assistant with different tools "
                " will help where you left off. Execute what you can to make progress."
                " If you or any of the other assistants have the final answer or deliverable,"
                " You have access to the following tools: {tool_names}.\n{system_message}",
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )
    prompt = prompt.partial(system_message=system_message)
    prompt = prompt.partial(tool_names=", ".join(
        [tool.name for tool in tools]))
    llm = get_llm(model_name, temperature=0.1)
    
    return prompt | llm.bind_functions(functions)

In [145]:
## Researcher 시스템 프롬프트
researcher_system_prompt = """
- Supervisor로부터 받은 질문에 해당되는 변수들들를 논문으로부터 검색해서 찾아야 합니다. 
- retriever tool을 사용할 경우 받은 질문에 추가적인 설명을 붙여 query를 만들고 검색해야 합니다. 
- 논문에 나와있지 않는 변수가 있다면 누락하지말고 없다라는 말을 꼭 추가해줘야 합니다.
"""

## Verifier 시스템 프롬프트
verifier_system_prompt = """
- 당신은 Researcher로부터 받은 답변들에 대해 잘못된 부분이 없는지 확인하는 역할을 하는 에이전트 입니다. 
- 답변들을 확인할 경우 필수적으로 retriever tool을 사용해서 확인해야 합니다. 
- 논문에 나와있지 않는 정보가 있다면 누락하지말고 없다라는 말을 꼭 추가해줘야 합니다.
- 잘못된 부분이 있다면 Researcher에게 피드백해야 합니다. 
- 만약 잘못된 부분이 없이 모두 잘 추출되었다면 잘 작성된 Researcher의 답변을 Supervisor에게 전달합니다. 
- Supervisor에게 답변을 전달할 경우 `### Complete Verification` 필수적으로 추가해야 합니다. 

### Complete Verification:
"""

In [146]:
## retriever 설정
retriever = get_retriever(
    file_folder="./data/raw", 
    file_number=11,
    chunk_size=500, 
    chunk_overlap=100, 
    search_k=10
)
retriever_tool = Tool(
    name="retriever",
    func=retriever.get_relevant_documents,
    description="Retrieve relevant documents based on a query."
)     

In [147]:
researcher_agent = create_agent("gpt-4o", [retriever_tool], researcher_system_prompt)
verifier_agent = create_agent("gpt-4o", [retriever_tool], verifier_system_prompt)

### Node

In [148]:
# def sample_name_searcher_node(state):
#     """
#     SampleNameSearcher가 질문을 받고 샘플 이름을 찾아 state에 저장하는 함수.
#     """
#     sample_name_question = state.get("sample_name_question", "")  # 사용자의 질문을 가져옴
#     sample_name = sample_name_searcher_chain.invoke(sample_name_question)  # 질문과 관련된 샘플 이름 검색
#     state["sample_name"] = sample_name  # 결과 저장
    
#     return state  # Supervisor에게 전달할 state 반환

# def supervisor_node(state):
#     """
#     Supervisor가 SampleNameSearcher의 결과를 받아서 다음 단계를 수행하는 함수.
#     """
#     sample_names = state.get("sample_name", "")  # SampleNameSearcher에서 찾은 샘플 이름
#     supervisor_question = state.get("supervisor_question", "")  # 원래 질문

#     # Supervisor가 사용할 입력 데이터 생성
#     combined_input = f"Sample Name: {sample_names}\nQuestion: {supervisor_question}"
    
#     result = supervisor_agent.invoke(combined_input)  # Supervisor 실행
    
#     state["sub_question"] = result  # 결과 저장
#     return state  # 다음 노드로 전달

In [149]:
def agent_node(state, agent, name):
    agent_response = agent.invoke(state)

    if isinstance(agent_response, FunctionMessage):
        pass
    else:
        agent_response = HumanMessage(**agent_response.dict(exclude={"type", "name"}), name=name)
    return {
        "messages": [agent_response],
        "sender": name,
    }

In [150]:
supervisor_node = functools.partial(agent_node, agent=supervisor_agent, name="Supervisor")
# sample_name_searcher_node = functools.partial(agent_node, agent=sample_name_searcher_chain, name="SampleNameSearcher")

# researcher_node = functools.partial(agent_node, agent=researcher_agent, name="Researcher")
# verifier_node = functools.partial(agent_node, agent=verifier_agent, name="Verifier")

# researcher1_node = functools.partial(agent_node, agent=researcher_agent, name="Researcher1")
# verifier1_node = functools.partial(agent_node, agent=verifier_agent, name="Verifier1")

# researcher2_node = functools.partial(agent_node, agent=researcher_agent, name="Researcher2")
# verifier2_node = functools.partial(agent_node, agent=verifier_agent, name="Verifier2")

# researcher3_node = functools.partial(agent_node, agent=researcher_agent, name="Researcher3")
# verifier3_node = functools.partial(agent_node, agent=verifier_agent, name="Verifier3")

# researcher4_node = functools.partial(agent_node, agent=researcher_agent, name="Researcher4")
# verifier4_node = functools.partial(agent_node, agent=verifier_agent, name="Verifier4")

In [151]:
## set tool
tools = [retriever_tool]
tool_executor = ToolExecutor(tools)

In [152]:
def tool_node(state):
    # 그래프에서 도구를 실행하는 함수입니다.
    # 에이전트 액션을 입력받아 해당 도구를 호출하고 결과를 반환합니다.
    messages = state["messages"]
    
    # 계속 조건에 따라 마지막 메시지가 함수 호출을 포함하고 있음을 알 수 있습니다.
    first_message = messages[0]
    last_message = messages[-1]
    
    # ToolInvocation을 함수 호출로부터 구성합니다.
    tool_input = json.loads(last_message.additional_kwargs["function_call"]["arguments"])
    tool_name = last_message.additional_kwargs["function_call"]["name"]
    
    if tool_name == "retriever":
        base_query = tool_input.get("__arg1", "")  # 기존 query 가져오기
        refined_query = f"Context: {first_message.content} | Query: {base_query}"
        tool_input["__arg1"] = refined_query
    
    # 단일 인자 입력은 값으로 직접 전달할 수 있습니다.
    if len(tool_input) == 1 and "__arg1" in tool_input:
        tool_input = next(iter(tool_input.values()))
    
    action = ToolInvocation(
        tool=tool_name,
        tool_input=tool_input,
    )
    
    # 도구 실행자를 호출하고 응답을 받습니다.
    response = tool_executor.invoke(action)
    
    # 응답을 사용하여 FunctionMessage를 생성합니다.
    function_message = FunctionMessage(
        content=f"{tool_name} response: {str(response)}", name=action.tool
    )
    
    # 기존 리스트에 추가될 리스트를 반환합니다.
    return {"messages": [function_message]}

In [153]:
def router(state):
    # 상태 정보를 기반으로 다음 단계를 결정하는 라우터 함수
    messages = state["messages"]
    last_message = messages[-1]
    if "function_call" in last_message.additional_kwargs:
        return "call_tool"
    
    if "### Complete Verification" in last_message.content:
        return "save_answer"
    
    if "### Final Answer" in last_message.content:
        return "output"
    
    return "continue"

### Graph

In [154]:
## graph 구축
workflow = StateGraph(AgentState)
workflow.add_node("Supervisor", supervisor_node)

for i in range(1, 5):
    workflow.add_node(f"Researcher{i}", functools.partial(agent_node, agent=researcher_agent, name=f"Researcher{i}"))
    workflow.add_node(f"Verifier{i}", functools.partial(agent_node, agent=verifier_agent, name=f"Verifier{i}"))
    workflow.add_node(f"call_tool{i}", tool_node)
    workflow.add_edge("Supervisor", f"Researcher{i}")
    workflow.add_conditional_edges(
        f"Researcher{i}",
        router,
        {"continue": f"Verifier{i}", "call_tool": f"call_tool{i}"},
    )
    workflow.add_conditional_edges(
        f"Verifier{i}",
        router,
        {"continue": f"Researcher{i}", "call_tool": f"call_tool{i}", "save_answer": "Supervisor"},
    )
    workflow.add_conditional_edges(
        f"call_tool{i}",
        lambda x: x["sender"],
        {
            f"Researcher{i}": f"Researcher{i}",
            f"Verifier{i}": f"Verifier{i}",
        },
    )
    
workflow.set_entry_point("Supervisor")
workflow.add_edge("Supervisor", END)
graph = workflow.compile()   

In [155]:
print(graph.get_graph().draw_mermaid())
graph.get_graph().draw_mermaid_png(output_file_path="MultiAgentSupervisor.png")

%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
	__start__([<p>__start__</p>]):::first
	Supervisor(Supervisor)
	Researcher1(Researcher1)
	Verifier1(Verifier1)
	call_tool1(call_tool1)
	Researcher2(Researcher2)
	Verifier2(Verifier2)
	call_tool2(call_tool2)
	Researcher3(Researcher3)
	Verifier3(Verifier3)
	call_tool3(call_tool3)
	Researcher4(Researcher4)
	Verifier4(Verifier4)
	call_tool4(call_tool4)
	__end__([<p>__end__</p>]):::last
	Supervisor --> Researcher1;
	Supervisor --> Researcher2;
	Supervisor --> Researcher3;
	Supervisor --> Researcher4;
	Supervisor --> __end__;
	__start__ --> Supervisor;
	Researcher1 -. &nbsp;continue&nbsp; .-> Verifier1;
	Researcher1 -. &nbsp;call_tool&nbsp; .-> call_tool1;
	Verifier1 -. &nbsp;continue&nbsp; .-> Researcher1;
	Verifier1 -. &nbsp;call_tool&nbsp; .-> call_tool1;
	Verifier1 -. &nbsp;save_answer&nbsp; .-> Supervisor;
	call_tool1 -.-> Researcher1;
	call_tool1 -.-> Verifier1;
	Researcher2 -. &nbsp;continue&nbsp; .-> Verifier2;
	Researcher2 -. 



In [156]:
supervisor_question = f"""
## Sample Names: 
{sample_names}

## Question : 
논문에서 사용된 양극 활물질(CAM, Cathode Active Material)의 화학 조성(Stoichiometry) 정보는 무엇인가요?  
- 상업적으로 사용된 NCM(Nickel Cobalt Manganese)은 무엇인가요?  
- 사용된 리튬 원료(Lithium source)는 무엇인가요?  
- 사용된 합성 방법(Synthesis method)은 무엇인가요?  
- 결정화 방법(Crystallization method)은 무엇인가요?  
- 결정화 공정의 최종 온도(Crystallization final temperature)와 지속 시간(duration)은 각각 얼마인가요?  
- 사용된 도핑(Doping) 기술이 있는 경우, 어떤 원소가 도핑되었나요?  
- 양극 활물질에 적용된 코팅(Coating) 기술이 있다면, 어떤 물질로 코팅되었나요?  

논문에서 보고된 전극(Electrode, half-cell) 조성은 어떻게 되나요?  
- 활성 물질(Active material)과 도전제(Conductive additive), 바인더(Binder)의 비율은 얼마인가요?  
- 사용된 전해질(Electrolyte)의 종류는 무엇인가요?  
- 사용된 첨가제(Additive)는 무엇인가요?  
- NCM의 질량 적재량(Loading density, mass loading of NCM)은 얼마인가요?  

논문에서 보고된 형태학적 특성(Morphological Properties)은 무엇인가요?  
- 입자 크기(Particle size) 정보는 어떻게 되나요?  
- 입자 형태(Particle shape)는 어떻게 기술되었나요?  
- 입자 분포(Particle distribution) 특성은 어떻게 설명되었나요?  
- 코팅층(Coating layer)의 특성과 두께는 어떻게 보고되었나요?  
- 결정 구조(Crystal structure)와 격자 특성(lattice characteristics)은 무엇인가요?  

논문에서 보고된 양극 성능(Cathode Performance)은 어떻게 되나요?  
- 실험에 사용된 전압 범위(Voltage range)와 온도(Temperature)는 얼마인가요?  
- 서로 다른 C-rate에서의 용량(Specific capacity)은 어떻게 보고되었나요?  
  - 0.1C에서의 용량은?  
  - 0.2C에서의 용량은?  
  - 0.5C에서의 용량은?  
  - 1.0C에서의 용량은?  
  - 2.0C에서의 용량은?  
  - 그 외 추가적인 C-rate와 성능 데이터가 있다면 무엇인가요?  
"""

In [157]:
graph.invoke(input={
    "messages": [HumanMessage(content=supervisor_question, name="Supervisor")
]})


ValueError: variable members should be a list of base messages, got Researcher1, Researcher2, Researcher3, Researcher4, Verifier1, Verifier2, Verifier3, Verifier4 of type <class 'str'>