In [1]:
import os
from typing import TypedDict, Annotated, Any
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
from langgraph.types import Send
from pydantic import BaseModel, Field
import json

# Initialize LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
critic_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)  # Precise for evaluation

In [2]:
# Pydantic models for structured data
class PlanStep(BaseModel):
    """Individual step in the plan"""
    step_number: int
    description: str
    status: str = "pending"  # pending, in_progress, completed


class ExecutionResult(BaseModel):
    """Result from executing a step"""
    step_number: int
    description: str
    result: str


class Critique(BaseModel):
    """Critic's evaluation of output"""
    score: int = Field(ge=1, le=5, description="Overall quality score")
    strengths: list[str]
    weaknesses: list[str]
    specific_feedback: str
    
    @property
    def needs_refinement(self) -> bool:
        return self.score < 4


# Hybrid State combining Plan-Execute and Reflection
class HybridState(TypedDict):
    # Task input
    task: str
    
    # Plan-Execute phase
    plan: list[PlanStep]
    current_step_index: int
    execution_results: list[ExecutionResult]
    all_steps_completed: bool
    
    # Reflection phase
    draft: str  # Initial output from generator
    critique: Critique | None
    refinement_iterations: int
    refined_draft: str | None
    final_output: str

In [3]:
def planner_node(state: HybridState) -> HybridState:
    """Create detailed plan for the task"""
    print("\n PLANNER: Breaking down the task...")
    
    prompt = f"""You are a strategic planner. Break down this task into 3-4 clear, executable steps.

Task: {state['task']}

For each step, provide a concise description of what needs to be done.
Number the steps starting from 1.

Format your response as a JSON array with this structure:
[
  {{"step_number": 1, "description": "First step description"}},
  {{"step_number": 2, "description": "Second step description"}},
  ...
]"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    try:
        # Extract JSON from response
        json_str = response.content
        if "```json" in json_str:
            json_str = json_str.split("```json")[1].split("```")[0].strip()
        elif "```" in json_str:
            json_str = json_str.split("```")[1].split("```")[0].strip()
        
        plan_data = json.loads(json_str)
        plan = [PlanStep(**item) for item in plan_data]
    except:
        # Fallback plan
        plan = [
            PlanStep(step_number=1, description="Research the topic"),
            PlanStep(step_number=2, description="Gather key information"),
            PlanStep(step_number=3, description="Create initial summary"),
        ]
    
    print(f"✓ Plan created with {len(plan)} steps:")
    for step in plan:
        print(f"  Step {step.step_number}: {step.description}")
    
    return {
        **state,
        "plan": plan,
        "current_step_index": 0,
        "execution_results": [],
        "all_steps_completed": False,
    }


def executor_node(state: HybridState) -> HybridState:
    """Execute current plan step"""
    if state["current_step_index"] >= len(state["plan"]):
        print("\n✓ All steps completed!")
        return {
            **state,
            "all_steps_completed": True,
        }
    
    current_step = state["plan"][state["current_step_index"]]
    print(f"\n  EXECUTOR: Step {current_step.step_number}/{len(state['plan'])}")
    print(f"   Description: {current_step.description}")
    
    # Generate execution for this step
    prompt = f"""Complete this step of the task:

Original Task: {state['task']}

Current Step: {current_step.description}

Provide a detailed, concrete result for this step. Be specific and actionable."""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    result_text = response.content
    
    # Store result
    execution_result = ExecutionResult(
        step_number=current_step.step_number,
        description=current_step.description,
        result=result_text
    )
    
    new_results = state["execution_results"] + [execution_result]
    
    print(f"   ✓ Result: {result_text[:150]}...")
    
    return {
        **state,
        "execution_results": new_results,
        "current_step_index": state["current_step_index"] + 1,
    }


def should_continue_execution(state: HybridState) -> str:
    """Decide whether to continue executing or move to generation"""
    if state["current_step_index"] < len(state["plan"]):
        return "executor"  # Continue executing
    return "generator"  # All steps done, move to generation


def generator_node(state: HybridState) -> HybridState:
    """Generate initial draft from all execution results"""
    print("\n GENERATOR: Synthesizing execution results into draft...")
    
    # Combine all results for context
    results_text = "\n".join([
        f"Step {r.step_number}: {r.description}\nResult: {r.result}"
        for r in state["execution_results"]
    ])
    
    prompt = f"""You are a skilled content synthesizer. Using the results from all execution steps, create a cohesive, well-organized draft that addresses the original task.

Original Task: {state['task']}

Execution Results:
{results_text}

Create a draft that:
1. Integrates all results naturally
2. Flows logically
3. Is comprehensive but concise

Draft:"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    draft = response.content.strip()
    
    print(f"✓ Draft generated ({len(draft)} characters)")
    
    return {
        **state,
        "draft": draft,
        "refined_draft": None,
        "refinement_iterations": 0,
    }


def critic_node(state: HybridState) -> HybridState:
    """Evaluate the current draft (initial or refined)"""
    content_to_critique = state["refined_draft"] if state["refined_draft"] else state["draft"]
    
    if state["refinement_iterations"] == 0:
        print("\n CRITIC: Evaluating initial draft...")
    else:
        print(f"\n CRITIC: Evaluating refined draft (iteration {state['refinement_iterations']})...")
    
    prompt = f"""You are a critical evaluator. Evaluate this content for quality.

Original Task: {state['task']}

Content to Evaluate:
{content_to_critique}

Provide evaluation in JSON format:
{{
  "score": <1-5, where 5 is excellent>,
  "strengths": [<list of what works well>],
  "weaknesses": [<list of areas to improve>],
  "specific_feedback": "<detailed feedback for improvement>"
}}"""
    
    response = critic_llm.invoke([HumanMessage(content=prompt)])
    
    try:
        json_str = response.content
        if "```json" in json_str:
            json_str = json_str.split("```json")[1].split("```")[0].strip()
        elif "```" in json_str:
            json_str = json_str.split("```")[1].split("```")[0].strip()
        
        critique_data = json.loads(json_str)
        critique = Critique(**critique_data)
    except:
        critique = Critique(
            score=3,
            strengths=["Has relevant content"],
            weaknesses=["Could be improved"],
            specific_feedback="Needs refinement"
        )
    
    print(f"✓ Score: {critique.score}/5")
    print(f"  Strengths: {', '.join(critique.strengths[:2])}")
    print(f"  Weaknesses: {', '.join(critique.weaknesses[:2])}")
    
    return {
        **state,
        "critique": critique,
    }


def should_refine(state: HybridState) -> str:
    """Decide whether to refine or finalize"""
    if state["critique"] and state["critique"].needs_refinement and state["refinement_iterations"] < 2:
        return "refiner"  # Refine
    return "finalizer"  # Done refining


def refiner_node(state: HybridState) -> HybridState:
    """Refine the draft based on critique"""
    content_to_refine = state["refined_draft"] if state["refined_draft"] else state["draft"]
    
    print(f"\n REFINER: Refining based on feedback (iteration {state['refinement_iterations'] + 1})...")
    
    prompt = f"""You are an editor refining content based on critical feedback.

Original Task: {state['task']}

Current Draft:
{content_to_refine}

Critic's Feedback:
- Score: {state['critique'].score}/5
- Weaknesses: {', '.join(state['critique'].weaknesses)}
- Specific Feedback: {state['critique'].specific_feedback}

Improve the draft addressing ALL weaknesses mentioned. Make it better without losing the core content.

Refined Draft:"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    refined = response.content.strip()
    
    print(f"✓ Draft refined ({len(refined)} characters)")
    
    return {
        **state,
        "refined_draft": refined,
        "refinement_iterations": state["refinement_iterations"] + 1,
        "critique": None,  # Reset critique for next evaluation
    }


def finalizer_node(state: HybridState) -> HybridState:
    """Create final output"""
    print("\n FINALIZER: Creating final output...")
    
    final = state["refined_draft"] if state["refined_draft"] else state["draft"]
    
    return {
        **state,
        "final_output": final,
    }

In [4]:
# Build the graph
graph = StateGraph(HybridState)

# Add nodes
graph.add_node("planner", planner_node)
graph.add_node("executor", executor_node)
graph.add_node("generator", generator_node)
graph.add_node("critic", critic_node)
graph.add_node("refiner", refiner_node)
graph.add_node("finalizer", finalizer_node)

# Add edges
graph.set_entry_point("planner")
graph.add_edge("planner", "executor")

# Execution loop with conditional
graph.add_conditional_edges("executor", should_continue_execution)

# To generation after execution complete
graph.add_edge("generator", "critic")

# Reflection loop with conditional
graph.add_conditional_edges("critic", should_refine)

# Back to critic after refinement (for re-evaluation)
graph.add_edge("refiner", "critic")

# Finalize edges
graph.add_edge("finalizer", END)

# Compile
app = graph.compile()

print("✓ Graph compiled successfully!")
print("Flow: Planner → Executor (loop) → Generator → Critic → [Refiner (loop) or Finalizer] → END")

✓ Graph compiled successfully!
Flow: Planner → Executor (loop) → Generator → Critic → [Refiner (loop) or Finalizer] → END


In [6]:
def run_agent(task: str):
    """Run the complete agent pipeline"""
    print("\n" + "="*70)
    print(f" TASK: {task}")
    print("="*70)
    
    # Initial state
    initial_state: HybridState = {
        "task": task,
        "plan": [],
        "current_step_index": 0,
        "execution_results": [],
        "all_steps_completed": False,
        "draft": "",
        "critique": None,
        "refinement_iterations": 0,
        "refined_draft": None,
        "final_output": "",
    }
    
    # Run the graph
    final_state = app.invoke(initial_state)
    
    # Print comprehensive results
    print("\n" + "="*70)
    print(" COMPLETE RESULTS")
    print("="*70)
    
    print("\n PLAN:")
    for step in final_state["plan"]:
        print(f"  {step.step_number}. {step.description}")
    
    print("\n EXECUTION RESULTS:")
    for i, result in enumerate(final_state["execution_results"], 1):
        print(f"  Step {result.step_number}:")
        print(f"    {result.result[:200]}..." if len(result.result) > 200 else f"    {result.result}")
    
    print("\n INITIAL DRAFT:")
    print(f"  {final_state['draft'][:300]}...")
    
    print("\n INITIAL CRITIQUE:")
    if final_state["critique"]:
        print(f"  Score: {final_state['critique'].score}/5")
        print(f"  Strengths: {', '.join(final_state['critique'].strengths)}")
        print(f"  Weaknesses: {', '.join(final_state['critique'].weaknesses)}")
    
    if final_state["refined_draft"]:
        print(f"\n✨ REFINED VERSION (Iteration {final_state['refinement_iterations']}):")
        print(f"  {final_state['refined_draft'][:300]}...")
    
    print("\n FINAL OUTPUT:")
    print(final_state["final_output"])
    
    print("\n" + "="*70)
    print("✓ Process Complete!")
    print("="*70)
    
    return final_state


# Run test scenario
task = "Research the benefits of Python programming, create a summary, and make it beginner-friendly"
result = run_agent(task)


 TASK: Research the benefits of Python programming, create a summary, and make it beginner-friendly

 PLANNER: Breaking down the task...
✓ Plan created with 4 steps:
  Step 1: Conduct comprehensive research on the benefits of Python programming by reviewing online articles, tutorials, and educational resources to gather a variety of perspectives.
  Step 2: Organize the collected information into key themes such as ease of learning, versatility, community support, and career opportunities.
  Step 3: Write a summary of the benefits using beginner-friendly language, avoiding technical jargon and providing relatable examples to illustrate each point.
  Step 4: Review and edit the summary for clarity and simplicity, ensuring that it is engaging and accessible for beginners, and then prepare it for distribution.

  EXECUTOR: Step 1/4
   Description: Conduct comprehensive research on the benefits of Python programming by reviewing online articles, tutorials, and educational resources to gath