# Chapter 7: Advanced LangGraph Patterns 실습

이 노트북은 Reflection, Subgraphs, Supervisor 패턴 등 고급 기법을 실습합니다.

## 환경 설정

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

if not os.getenv("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = input("OpenAI API Key를 입력하세요: ")

## 1. Reflection Pattern (자기 성찰)

In [None]:
from typing import TypedDict, List, Literal
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, MessagesState
from pydantic import BaseModel, Field

# Reflection State
class ReflectionState(MessagesState):
    draft: str
    critique: str
    revision_count: int
    max_revisions: int
    quality_score: float

# 품질 평가 스키마
class QualityAssessment(BaseModel):
    score: float = Field(description="품질 점수 (0-10)")
    strengths: List[str] = Field(description="강점")
    weaknesses: List[str] = Field(description="약점")
    suggestions: List[str] = Field(description="개선 제안")

# 초안 생성 노드
def generate_draft(state: ReflectionState):
    """초안 생성"""
    messages = state["messages"]
    query = messages[-1].content if messages else ""
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
    
    draft_prompt = f"""
    다음 요청에 대한 응답을 작성하세요:
    
    {query}
    
    명확하고 구조화된 답변을 제공하세요.
    """
    
    response = llm.invoke(draft_prompt)
    state["draft"] = response.content
    state["revision_count"] = 0
    state["max_revisions"] = 3
    
    print(f"초안 생성 완료 (길이: {len(response.content)}자)")
    return state

# 비평 노드
def critique_draft(state: ReflectionState):
    """초안을 비평하고 개선점 제시"""
    draft = state["draft"]
    original_query = state["messages"][-1].content
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    structured_llm = llm.with_structured_output(QualityAssessment)
    
    critique_prompt = f"""
    원본 요청: {original_query}
    
    작성된 답변:
    {draft}
    
    위 답변을 다음 기준으로 평가하세요:
    1. 정확성: 정보가 정확한가?
    2. 완전성: 모든 요구사항을 다루었는가?
    3. 명확성: 이해하기 쉬운가?
    4. 구조: 논리적으로 구성되었는가?
    5. 유용성: 실제로 도움이 되는가?
    """
    
    assessment = structured_llm.invoke(critique_prompt)
    
    # 비평 텍스트 생성
    critique = f"""
    품질 점수: {assessment.score}/10
    
    강점:
    {chr(10).join(f'- {s}' for s in assessment.strengths)}
    
    약점:
    {chr(10).join(f'- {w}' for w in assessment.weaknesses)}
    
    개선 제안:
    {chr(10).join(f'- {s}' for s in assessment.suggestions)}
    """
    
    state["critique"] = critique
    state["quality_score"] = assessment.score
    
    print(f"비평 완료 - 점수: {assessment.score}/10")
    return state

# 수정 노드
def revise_draft(state: ReflectionState):
    """비평을 바탕으로 초안 수정"""
    draft = state["draft"]
    critique = state["critique"]
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.5)
    
    revision_prompt = f"""
    원본 답변:
    {draft}
    
    비평 및 제안:
    {critique}
    
    위 비평을 참고하여 답변을 개선하세요.
    약점을 보완하고 제안사항을 반영하세요.
    """
    
    revised = llm.invoke(revision_prompt)
    state["draft"] = revised.content
    state["revision_count"] += 1
    
    print(f"수정 {state['revision_count']}회 완료")
    return state

# 완료 여부 결정
def should_continue_revising(state: ReflectionState) -> Literal["critique", "end"]:
    """추가 수정이 필요한지 결정"""
    score = state.get("quality_score", 0)
    revision_count = state.get("revision_count", 0)
    max_revisions = state.get("max_revisions", 3)
    
    # 품질이 충분하거나 최대 수정 횟수 도달
    if score >= 8.0 or revision_count >= max_revisions:
        return "end"
    return "critique"

# Reflection 그래프 생성
def create_reflection_graph():
    workflow = StateGraph(ReflectionState)
    
    # 노드 추가
    workflow.add_node("generate", generate_draft)
    workflow.add_node("critique", critique_draft)
    workflow.add_node("revise", revise_draft)
    
    # 플로우 정의
    workflow.set_entry_point("generate")
    workflow.add_edge("generate", "critique")
    workflow.add_edge("revise", "critique")
    
    # 조건부 엣지
    workflow.add_conditional_edges(
        "critique",
        should_continue_revising,
        {
            "critique": "revise",
            "end": END
        }
    )
    
    return workflow.compile()

# 실행
reflection_graph = create_reflection_graph()

# 테스트
queries = [
    "머신러닝과 딥러닝의 차이점을 설명해주세요.",
    "효과적인 프로젝트 관리 방법론에 대해 설명해주세요."
]

for query in queries:
    print(f"\n{'='*80}")
    print(f"질문: {query}\n")
    
    result = reflection_graph.invoke({
        "messages": [HumanMessage(content=query)]
    })
    
    print(f"\n최종 답변 (점수: {result['quality_score']}/10):")
    print(result["draft"])
    print(f"\n총 수정 횟수: {result['revision_count']}")

## 2. Direct Subgraph Pattern

In [None]:
from langgraph.graph import StateGraph

# 서브그래프용 State
class SubTaskState(TypedDict):
    task: str
    result: str
    status: str

# 메인 그래프 State
class MainTaskState(TypedDict):
    main_task: str
    subtasks: List[str]
    subtask_results: dict
    final_result: str

# 서브그래프 생성 함수
def create_research_subgraph():
    """연구 작업을 수행하는 서브그래프"""
    
    def research(state: SubTaskState):
        task = state["task"]
        print(f"  [서브그래프] 연구 중: {task}")
        
        llm = ChatOpenAI(model="gpt-4o-mini")
        response = llm.invoke(f"다음 주제에 대해 간단히 연구하세요: {task}")
        
        state["result"] = response.content
        state["status"] = "researched"
        return state
    
    def validate(state: SubTaskState):
        print(f"  [서브그래프] 검증 중...")
        # 간단한 검증 로직
        if len(state["result"]) > 50:
            state["status"] = "validated"
        else:
            state["status"] = "needs_more_research"
        return state
    
    workflow = StateGraph(SubTaskState)
    workflow.add_node("research", research)
    workflow.add_node("validate", validate)
    
    workflow.set_entry_point("research")
    workflow.add_edge("research", "validate")
    workflow.add_edge("validate", END)
    
    return workflow.compile()

# 메인 그래프 노드들
def decompose_task(state: MainTaskState):
    """작업을 서브태스크로 분해"""
    main_task = state["main_task"]
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    
    decompose_prompt = f"""
    다음 작업을 3개의 하위 작업으로 분해하세요:
    {main_task}
    
    각 하위 작업을 한 줄로 작성하세요.
    """
    
    response = llm.invoke(decompose_prompt)
    subtasks = [line.strip() for line in response.content.split('\n') if line.strip()][:3]
    
    state["subtasks"] = subtasks
    state["subtask_results"] = {}
    
    print(f"작업 분해 완료:")
    for i, task in enumerate(subtasks, 1):
        print(f"  {i}. {task}")
    
    return state

def execute_subtasks(state: MainTaskState):
    """서브그래프를 사용하여 각 서브태스크 실행"""
    research_graph = create_research_subgraph()
    
    print("\n서브태스크 실행:")
    for subtask in state["subtasks"]:
        print(f"\n처리 중: {subtask}")
        
        # 서브그래프 실행
        subgraph_result = research_graph.invoke({
            "task": subtask,
            "result": "",
            "status": "pending"
        })
        
        state["subtask_results"][subtask] = subgraph_result["result"]
    
    return state

def synthesize_results(state: MainTaskState):
    """서브태스크 결과들을 종합"""
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
    
    results_text = "\n\n".join([
        f"{task}:\n{result}"
        for task, result in state["subtask_results"].items()
    ])
    
    synthesis_prompt = f"""
    원본 작업: {state['main_task']}
    
    서브태스크 결과들:
    {results_text}
    
    위 결과들을 종합하여 원본 작업에 대한 통합된 답변을 작성하세요.
    """
    
    response = llm.invoke(synthesis_prompt)
    state["final_result"] = response.content
    
    return state

# 메인 그래프 생성
def create_main_with_subgraph():
    workflow = StateGraph(MainTaskState)
    
    workflow.add_node("decompose", decompose_task)
    workflow.add_node("execute", execute_subtasks)
    workflow.add_node("synthesize", synthesize_results)
    
    workflow.set_entry_point("decompose")
    workflow.add_edge("decompose", "execute")
    workflow.add_edge("execute", "synthesize")
    workflow.add_edge("synthesize", END)
    
    return workflow.compile()

# 실행
main_graph = create_main_with_subgraph()

# 테스트
complex_tasks = [
    "스타트업을 성공적으로 운영하는 방법",
    "기후 변화에 대응하는 개인의 역할"
]

for task in complex_tasks:
    print(f"\n{'='*80}")
    print(f"메인 작업: {task}\n")
    
    result = main_graph.invoke({
        "main_task": task
    })
    
    print(f"\n최종 종합 결과:")
    print(result["final_result"])

## 3. Function Subgraph Pattern

In [None]:
from typing import Callable

# 재사용 가능한 서브그래프 팩토리
class SubgraphFactory:
    @staticmethod
    def create_analysis_subgraph(analysis_type: str):
        """분석 유형에 따른 서브그래프 생성"""
        
        class AnalysisState(TypedDict):
            data: str
            analysis_type: str
            analysis_result: dict
            confidence: float
        
        def analyze(state: AnalysisState):
            llm = ChatOpenAI(model="gpt-4o-mini")
            
            prompts = {
                "sentiment": "감정 분석을 수행하세요 (긍정/부정/중립):",
                "summary": "핵심 내용을 3줄로 요약하세요:",
                "entities": "주요 엔티티(인물, 장소, 조직)를 추출하세요:",
                "topics": "주요 주제를 식별하세요:"
            }
            
            prompt = prompts.get(analysis_type, "분석을 수행하세요:")
            response = llm.invoke(f"{prompt}\n\n{state['data']}")
            
            state["analysis_result"] = {
                "type": analysis_type,
                "result": response.content
            }
            return state
        
        def assess_confidence(state: AnalysisState):
            # 간단한 신뢰도 평가
            result_length = len(str(state["analysis_result"].get("result", "")))
            state["confidence"] = min(result_length / 100, 1.0)
            return state
        
        workflow = StateGraph(AnalysisState)
        workflow.add_node("analyze", analyze)
        workflow.add_node("assess", assess_confidence)
        
        workflow.set_entry_point("analyze")
        workflow.add_edge("analyze", "assess")
        workflow.add_edge("assess", END)
        
        return workflow.compile()
    
    @staticmethod
    def create_processing_subgraph(process_type: str):
        """데이터 처리 서브그래프 생성"""
        
        class ProcessState(TypedDict):
            input_data: str
            processed_data: str
            metadata: dict
        
        def preprocess(state: ProcessState):
            # 전처리
            data = state["input_data"]
            if process_type == "clean":
                data = data.strip().lower()
            elif process_type == "format":
                data = f"Formatted: {data}"
            state["processed_data"] = data
            return state
        
        def add_metadata(state: ProcessState):
            state["metadata"] = {
                "process_type": process_type,
                "timestamp": "2024-01-01",
                "length": len(state["processed_data"])
            }
            return state
        
        workflow = StateGraph(ProcessState)
        workflow.add_node("preprocess", preprocess)
        workflow.add_node("metadata", add_metadata)
        
        workflow.set_entry_point("preprocess")
        workflow.add_edge("preprocess", "metadata")
        workflow.add_edge("metadata", END)
        
        return workflow.compile()

# 메인 파이프라인 State
class PipelineState(TypedDict):
    raw_text: str
    analyses: List[dict]
    processed_versions: List[dict]
    final_report: str

# 동적 서브그래프 실행 노드
def run_multiple_analyses(state: PipelineState):
    """여러 분석 서브그래프 실행"""
    text = state["raw_text"]
    analysis_types = ["sentiment", "summary", "entities", "topics"]
    
    state["analyses"] = []
    
    print("분석 실행:")
    for analysis_type in analysis_types:
        print(f"  - {analysis_type} 분석 중...")
        
        # 동적으로 서브그래프 생성 및 실행
        subgraph = SubgraphFactory.create_analysis_subgraph(analysis_type)
        result = subgraph.invoke({
            "data": text,
            "analysis_type": analysis_type
        })
        
        state["analyses"].append(result["analysis_result"])
    
    return state

def run_processing_pipeline(state: PipelineState):
    """처리 파이프라인 실행"""
    text = state["raw_text"]
    process_types = ["clean", "format"]
    
    state["processed_versions"] = []
    
    print("\n처리 파이프라인:")
    for process_type in process_types:
        print(f"  - {process_type} 처리 중...")
        
        subgraph = SubgraphFactory.create_processing_subgraph(process_type)
        result = subgraph.invoke({
            "input_data": text
        })
        
        state["processed_versions"].append({
            "type": process_type,
            "data": result["processed_data"],
            "metadata": result["metadata"]
        })
    
    return state

def generate_report(state: PipelineState):
    """최종 보고서 생성"""
    llm = ChatOpenAI(model="gpt-4o-mini")
    
    analyses_text = "\n".join([
        f"{a['type']}: {a['result']}"
        for a in state["analyses"]
    ])
    
    report_prompt = f"""
    원본 텍스트에 대한 종합 분석 보고서를 작성하세요.
    
    분석 결과:
    {analyses_text}
    
    간결하고 구조화된 보고서를 작성하세요.
    """
    
    response = llm.invoke(report_prompt)
    state["final_report"] = response.content
    
    return state

# 메인 파이프라인 생성
def create_dynamic_pipeline():
    workflow = StateGraph(PipelineState)
    
    workflow.add_node("analyze", run_multiple_analyses)
    workflow.add_node("process", run_processing_pipeline)
    workflow.add_node("report", generate_report)
    
    workflow.set_entry_point("analyze")
    workflow.add_edge("analyze", "process")
    workflow.add_edge("process", "report")
    workflow.add_edge("report", END)
    
    return workflow.compile()

# 실행
pipeline = create_dynamic_pipeline()

# 테스트
test_texts = [
    """인공지능은 현대 사회에 혁명적인 변화를 가져오고 있습니다. 
    의료, 교육, 금융 등 다양한 분야에서 AI의 활용이 증가하고 있으며,
    이는 효율성 향상과 새로운 기회 창출로 이어지고 있습니다.
    그러나 동시에 일자리 감소와 프라이버시 문제 등의 우려도 제기되고 있습니다."""
]

for text in test_texts:
    print(f"\n{'='*80}")
    print(f"원본 텍스트:\n{text[:100]}...\n")
    
    result = pipeline.invoke({"raw_text": text})
    
    print(f"\n최종 보고서:")
    print(result["final_report"])

## 4. Supervisor Pattern

In [None]:
from typing import Annotated
import operator

# Supervisor State
class SupervisorState(TypedDict):
    task: str
    workers: List[str]
    assignments: dict
    worker_results: Annotated[List[dict], operator.add]
    supervisor_summary: str
    status: str

# Worker 에이전트 생성
def create_worker_agent(name: str, specialty: str):
    """특정 전문 분야를 가진 워커 에이전트 생성"""
    
    def worker_function(task: str) -> dict:
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.5)
        
        worker_prompt = f"""
        당신은 {specialty} 전문가인 {name}입니다.
        
        다음 작업을 수행하세요:
        {task}
        
        당신의 전문 분야 관점에서 답변하세요.
        """
        
        response = llm.invoke(worker_prompt)
        
        return {
            "worker": name,
            "specialty": specialty,
            "task": task,
            "result": response.content
        }
    
    return worker_function

# Supervisor 노드
def supervisor_assign_tasks(state: SupervisorState):
    """Supervisor가 작업을 워커들에게 할당"""
    main_task = state["task"]
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    
    # 워커 정의
    workers = [
        {"name": "Researcher", "specialty": "정보 수집 및 연구"},
        {"name": "Analyst", "specialty": "데이터 분석 및 인사이트 도출"},
        {"name": "Writer", "specialty": "문서 작성 및 편집"}
    ]
    
    state["workers"] = [w["name"] for w in workers]
    
    # 각 워커에게 작업 할당
    assign_prompt = f"""
    메인 작업: {main_task}
    
    이 작업을 다음 3명의 워커에게 할당하세요:
    1. Researcher (정보 수집 전문)
    2. Analyst (분석 전문)
    3. Writer (작성 전문)
    
    각 워커에게 구체적인 하위 작업을 할당하세요.
    JSON 형식으로 응답하세요: {{"워커이름": "할당된 작업"}}
    """
    
    response = llm.invoke(assign_prompt)
    
    # 간단한 파싱 (실제로는 구조화된 출력 사용)
    assignments = {
        "Researcher": f"{main_task}에 대한 정보 수집",
        "Analyst": f"{main_task}에 대한 분석",
        "Writer": f"{main_task}에 대한 문서 작성"
    }
    
    state["assignments"] = assignments
    state["worker_results"] = []
    state["status"] = "assigned"
    
    print("Supervisor: 작업 할당 완료")
    for worker, task in assignments.items():
        print(f"  - {worker}: {task}")
    
    return state

def execute_worker_tasks(state: SupervisorState):
    """워커들이 할당된 작업 실행"""
    assignments = state["assignments"]
    
    # 워커 에이전트 생성
    workers = {
        "Researcher": create_worker_agent("Researcher", "정보 수집 및 연구"),
        "Analyst": create_worker_agent("Analyst", "데이터 분석 및 인사이트 도출"),
        "Writer": create_worker_agent("Writer", "문서 작성 및 편집")
    }
    
    print("\n워커들이 작업 수행 중:")
    results = []
    for worker_name, task in assignments.items():
        print(f"  - {worker_name} 작업 중...")
        worker = workers[worker_name]
        result = worker(task)
        results.append(result)
    
    state["worker_results"] = results
    state["status"] = "completed"
    
    return state

def supervisor_review_and_summarize(state: SupervisorState):
    """Supervisor가 결과 검토 및 최종 요약"""
    worker_results = state["worker_results"]
    main_task = state["task"]
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
    
    # 워커 결과 정리
    results_text = "\n\n".join([
        f"{r['worker']} ({r['specialty']}):\n{r['result']}"
        for r in worker_results
    ])
    
    review_prompt = f"""
    당신은 프로젝트 Supervisor입니다.
    
    메인 작업: {main_task}
    
    워커들의 작업 결과:
    {results_text}
    
    위 결과들을 검토하고 통합하여 최종 보고서를 작성하세요.
    각 워커의 기여를 인정하면서 일관된 결론을 도출하세요.
    """
    
    response = llm.invoke(review_prompt)
    state["supervisor_summary"] = response.content
    state["status"] = "reviewed"
    
    print("\nSupervisor: 검토 및 요약 완료")
    
    return state

# Supervisor 패턴 그래프 생성
def create_supervisor_graph():
    workflow = StateGraph(SupervisorState)
    
    # 노드 추가
    workflow.add_node("assign", supervisor_assign_tasks)
    workflow.add_node("execute", execute_worker_tasks)
    workflow.add_node("review", supervisor_review_and_summarize)
    
    # 플로우 정의
    workflow.set_entry_point("assign")
    workflow.add_edge("assign", "execute")
    workflow.add_edge("execute", "review")
    workflow.add_edge("review", END)
    
    return workflow.compile()

# 실행
supervisor_graph = create_supervisor_graph()

# 테스트
tasks = [
    "신제품 출시 전략 수립",
    "고객 만족도 향상 방안",
    "디지털 전환 로드맵 작성"
]

for task in tasks:
    print(f"\n{'='*80}")
    print(f"메인 작업: {task}\n")
    
    result = supervisor_graph.invoke({"task": task})
    
    print(f"\n최종 Supervisor 보고서:")
    print(result["supervisor_summary"])
    
    print(f"\n상태: {result['status']}")

## 실습 과제

1. 다단계 Reflection이 있는 창의적 글쓰기 시스템
2. 계층적 서브그래프를 활용한 복잡한 프로젝트 관리 시스템
3. 동적 워커 할당이 가능한 적응형 Supervisor 시스템

In [None]:
# 여기에 실습 코드를 작성하세요
