In [None]:
# @title ### 4. 싱글 패스 플랜 제너레이터 (Single-Path Plan Generator)

# @markdown 설정된 목표를 달성하기 위한 일련의 구체적인 단계를 생성하는 패턴입니다.

# @markdown LangGraph를 이용하여 목표 설정, 태스크 분해, 태스크 실행, 결과 정리의 워크플로우를 구현합니다.


# SinglePathPlanGenerationState 스테이트 모델 정의

class SinglePathPlanGenerationState(BaseModel):

    query: str = Field(..., description="사용자가 입력한 쿼리")

    optimized_goal: str = Field(default="", description="최적화된 목표")

    optimized_response: str = Field(default="", description="최적화된 응답 정의")

    tasks: List[str] = Field(default_factory=list, description="실행할 태스크 리스트")

    current_task_index: int = Field(default=0, description="현재 실행 중인 태스크 번호")

    results: Annotated[List[str], operator.add] = Field(

        default_factory=list, description="실행 완료된 태스크 결과 리스트"

    )

    final_output: str = Field(default="", description="최종 출력 결과")


# SinglePathPlanGenerationAgent (노드 함수 포함) 정의

class SinglePathPlanGenerationAgent:

    def __init__(self, llm: ChatOpenAI):

        self.llm = llm

        self.passive_goal_creator = PassiveGoalCreator(llm=llm)

        self.prompt_optimizer = PromptOptimizer(llm=llm)

        self.response_optimizer = ResponseOptimizer(llm=llm)


    def goal_setting(self, state: SinglePathPlanGenerationState) -> Dict[str, Any]:

        goal: Goal = self.passive_goal_creator.run(query=state.query)

        optimized_goal: OptimizedGoal = self.prompt_optimizer.run(query=goal.text)

        optimized_response: str = self.response_optimizer.run(query=optimized_goal.text)

        return {

            "optimized_goal": optimized_goal.text,

            "optimized_response": optimized_response,

        }


# DecomposedTasks 데이터 모델 정의 (태스크 분해 결과)

class DecomposedTasks(BaseModel):

    values: List[str] = Field(

        default_factory=list,

        min_items=3,

        max_items=5,

        description="3~5개로 분해된 태스크",

    )


# QueryDecomposer 클래스 정의

class QueryDecomposer:

    def __init__(self, llm: ChatOpenAI):

        self.llm = llm

        self.current_date = datetime.now().strftime("%Y-%m-%d")


    def run(self, query: str) -> DecomposedTasks:

        prompt = ChatPromptTemplate.from_template(

            f"""CURRENT DATE: {self.current_date}


태스크: 주어진 목표를 구체적이고 실행 가능한 태스크로 분해해 주세요.

요건: 

1. 다음 행동만으로 목표를 달성할 것 절대 지정된 이외의 행동을 취하지 말 것.

 - 인터넷을 이용하여 목표 달성을 위한 조사를 수행한다.

 - 사용자를 위한 보고서를 생성한다.

2. 각 태스크는 구체적이고 상세하게 기재하며, 단독으로 실행 및 검증 가능한 정보를 포함할 것. 추상적인 표현을 일절 포함하지 말 것.

3. 태스크는 실행 가능한 순서로 리스트화할 것.

4. 태스크는 한국어로 출력할 것.

목표: {query}"""

        )

        chain = prompt | self.llm.with_structured_output(DecomposedTasks)

        return chain.invoke({"query": query})


    def _decompose_query(self, state: SinglePathPlanGenerationState) -> Dict[str, Any]:

        decomposer = QueryDecomposer(llm=self.llm)

        decomposed_tasks: DecomposedTasks = decomposer.run(query=state.optimized_goal)

        return {"tasks": decomposed_tasks.values}


# TaskExecutor 클래스 정의

class TaskExecutor:

    def __init__(self, llm: ChatOpenAI):

        self.llm = llm

        self.tools = [TavilySearchResults(max_results=3)]


    def run(self, task: str) -> str:

        agent = create_react_agent(self.llm, self.tools)

        result = agent.invoke(

            {

                "messages": [

                    (

                        "human",

                        f"""다음 태스크를 실행하고 상세한 답변을 제공해 주세요.


태스크: {task}


요건:

1. 필요에 따라 제공된 도구를 사용하세요.

2. 실행은 철저하고 포괄적으로 수행하세요. 

3. 가능한 한 구체적인 사실이나 데이터를 제공하세요.

4. 발견한 내용을 명확하게 요약하세요."""

                    )

                ]

            }

        )

        return result["messages"][-1].content


    def _execute_task(self, state: SinglePathPlanGenerationState) -> Dict[str, Any]:

        self.task_executor = TaskExecutor(llm=self.llm)

        current_task = state.tasks[state.current_task_index]

        result = self.task_executor.run(task=current_task)

        return {

            "results": [result],

            "current_task_index": state.current_task_index + 1,

        }


# ResultAggregator 클래스 정의

class ResultAggregator:

    def __init__(self, llm: ChatOpenAI):

        self.llm = llm


    def run(self, query: str, response_definition: str, results: List[str]) -> str:

        prompt = ChatPromptTemplate.from_template(

            """주어진 목표:

{query}


조사 결과: 

{results}


주어진 목표에 대해 조사 결과를 활용하여 다음 지시에 기반한 응답을 생성해 주세요.

{response_definition}"""

        )

        results_str = "\n\n".join(

            f"Info {i+1}:\n{result}" for i, result in enumerate(results)

        )

        chain = prompt | self.llm | StrOutputParser()

        return chain.invoke(

            {

                "query": query,

                "results": results_str,

                "response_definition": response_definition,

            }

        )


    def _aggregate_results(self, state: SinglePathPlanGenerationState) -> Dict[str, Any]:

        self.result_aggregator = ResultAggregator(llm=self.llm)

        final_output = self.result_aggregator.run(

            query=state.optimized_goal,

            response_definition=state.optimized_response,

            results=state.results,

        )

        return {"final_output": final_output}


# LangGraph 워크플로우 정의

class SinglePathPlanGeneratorGraph:

    def __init__(self, llm: ChatOpenAI):

        self.llm = llm

        self.agent_nodes = SinglePathPlanGenerationAgent(llm)

        self.query_decomposer_instance = QueryDecomposer(llm)

        self.task_executor_instance = TaskExecutor(llm)

        self.result_aggregator_instance = ResultAggregator(llm)


    def build_graph(self):

        workflow = StateGraph(SinglePathPlanGenerationState)


        workflow.add_node("goal_setting", self.agent_nodes.goal_setting)

        workflow.add_node("decompose_query", self.query_decomposer_instance._decompose_query)

        workflow.add_node("execute_task", self.task_executor_instance._execute_task)

        workflow.add_node("aggregate_results", self.result_aggregator_instance._aggregate_results)


        workflow.set_entry_point("goal_setting")


        workflow.add_edge("goal_setting", "decompose_query")

        workflow.add_edge("decompose_query", "execute_task")


        workflow.add_conditional_edges(

            "execute_task",

            lambda state: "continue" if state.current_task_index < len(state.tasks) else "finish",

            {

                "continue": "execute_task",

                "finish": "aggregate_results",

            },

        )

        workflow.add_edge("aggregate_results", END)


        return workflow.compile()


# 싱글 패스 플랜 제너레이터 실행

single_path_llm = ChatOpenAI(model=settings.openai_smart_model, temperature=settings.temperature)

single_path_graph_builder = SinglePathPlanGeneratorGraph(llm=single_path_llm)

single_path_app = single_path_graph_builder.build_graph()


inputs = {"query": "카레라이스 만드는 법"}

print("\n--- 싱글 패스 플랜 제너레이터 실행 결과 ---")

for s in single_path_app.stream(inputs):

    print(s)


final_state = next(iter(single_path_app.stream(inputs, stream_mode="values")))

print(f"\n최종 보고서:\n{final_state.get('final_output', '최종 출력이 없습니다.')}")

```python