- 테스트

In [None]:
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from typing import TypedDict, List, Annotated
from langchain_core.prompts import ChatPromptTemplate
import operator
from dotenv import load_dotenv
import os

# 환경 변수 로드
load_dotenv()
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("OPENAI_API_KEY가 .env 파일에 설정되지 않았습니다.")

# 상태 정의
class AgentState(TypedDict):
    messages: Annotated[List, operator.add]  # 메시지 누적
    phase: str  # 현재 단계: "thinking", "finalizing"
    iteration: int  # 루프 횟수 추적

# 도구 정의
@tool
def get_weather(city: str) -> str:
    """Get the given city`s weather"""
    return f"{city}의 날씨는 맑아요!"

# LLM 설정 (스트리밍 활성화)
llm = ChatOpenAI(model="gpt-5-nano", streaming=True)
tools = [get_weather]

# ReAct 프롬프트: 도구 호출과 추론 루프 지원
react_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful assistant. Respond in ReAct format:
1. Always provide: Thought: [Your step-by-step reasoning]
2. If a tool is needed, use the tool-calling mechanism to invoke it
3. If no tool is needed but reasoning is incomplete, output: Continue: [next reasoning step or sub-question]
4. If reasoning is complete and no tool is needed, output: Final Answer: [final response]
Every response MUST include a Thought in the content field, even when using tool-calling.
Do NOT use tools unless the query explicitly requires external data.
For reasoning tasks, use Continue for intermediate steps."""),
    ("placeholder", "{messages}"),
])

# LLM 바인드 (도구 바인딩)
agent_runnable = react_prompt | llm.bind_tools(tools)

# 에이전트 노드
def agent_node(state: AgentState) -> AgentState:
    messages = state["messages"]
    phase = state.get("phase", "thinking")
    iteration = state.get("iteration", 0) + 1
    
    # 무한 루프 방지
    if iteration > 10:
        return {
            "messages": messages + [AIMessage(content="Thought: Maximum iteration limit reached.\nFinal Answer: Unable to complete reasoning.")],
            "phase": "finalizing",
            "iteration": iteration
        }
    
    result = agent_runnable.invoke({"messages": messages})
    # 부적절한 tool_calls 필터링 (문맥과 무관한 경우)
    # if result.tool_calls:
    #     query = messages[0].content.lower()
    #     valid_tool = any(tool["name"] == "get_weather" and "날씨" in query for tool in result.tool_calls)
    #     if not valid_tool:
    #         result.tool_calls = []  # 부적절한 도구 호출 제거
    #         result.content = f"{result.content}\nContinue: Re-evaluate the query without tools."
    
    new_phase = "finalizing" if result.tool_calls or "Final Answer" in result.content else "thinking"
    return {"messages": [result], "phase": new_phase, "iteration": iteration}

# 도구 노드
tool_node = ToolNode(tools)

# 그래프 정의
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")

# 조건부 엣지
def route_agent(state: AgentState):
    last_message = state["messages"][-1]
    content = last_message.content
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    if "Final Answer" in content:
        return END
    return "agent"  # Continue 또는 Thought만 있으면 루프백

workflow.add_conditional_edges(
    "agent",
    route_agent,
    {"agent": "agent", "tools": "tools", END: END}
)
workflow.add_edge("tools", "agent")

# 그래프 컴파일
graph = workflow.compile()

# 비동기 스트리밍 함수
async def stream_react(user_input: str):
    try:
        events = graph.astream_events(
            {"messages": [HumanMessage(content=user_input)], "phase": "thinking", "iteration": 0},
            version="v1"
        )
        async for event in events:
            kind = event["event"]
            if kind == "on_llm_stream":
                chunk = event["data"]["chunk"]
                if chunk.content:
                    yield f"🤔 Thought (streaming): {chunk.content}"
            elif kind == "on_chain_stream":
                if "agent" in event["tags"]:
                    if "messages" in event["data"]["chunk"]:
                        msg = event["data"]["chunk"]["messages"][0]
                        content = msg.content
                        if content:
                            yield f"🤔 Full Thought: {content}"
                        if msg.tool_calls:
                            for call in msg.tool_calls:
                                yield f"🔧 Action: {call['name']}({call['args']})"
                elif "tools" in event["tags"]:
                    if "messages" in event["data"]["chunk"]:
                        for msg in event["data"]["chunk"]["messages"]:
                            yield f"📊 Observation: {msg.content}"
            elif kind == "on_chain_end" and "agent" in event["tags"]:
                final_msg = event["data"]["output"]["messages"][-1]
                content = final_msg.content
                if content and "Final Answer" in content:
                    yield f"✅ Final Answer: {content.split('Final Answer:')[-1].strip()}"
    except Exception as e:
        yield f"❌ Error: {str(e)}"

# 테스트용 main
async def main():
    # user_query = "한 달간의 유럽 배낭여행 계획을 짜줘. 주요 도시는 파리, 로마, 베를린이고, 예산은 500만 원이야."
    user_query = "대구 날씨 알려줘."
    async for step in stream_react(user_query):
        print(step)

await main()