In [14]:
import os
from typing import Annotated, TypedDict
from dotenv import load_dotenv

# LangChain components
from langchain_community.chat_models import ChatOllama
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import BaseMessage
from pydantic import BaseModel, Field

# For pretty printing
from rich.console import Console
from rich.markdown import Markdown
from langchain_tavily import TavilySearch

# LangGraph components
from langgraph.graph import StateGraph
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.prebuilt import ToolNode


# --- API Key and Tracing Setup ---
load_dotenv()
print("Environment variables loaded and tracing is set up.")

Environment variables loaded and tracing is set up.


In [28]:
import json
from typing import TypedDict, Optional, List
from pydantic import BaseModel, Field
from rich.console import Console
import re

# -------------------------------------------------------------------
# Setup
# -------------------------------------------------------------------
console = Console()
from langchain_community.chat_models import ChatOllama
llm = ChatOllama(model="llama3:latest", temperature=0)
 
# -------------------------------------------------------------------
# Schemas
# -------------------------------------------------------------------
class VerificationResult(BaseModel):
    """Schema for the Verifier output."""
    is_successful: bool = Field(description="True if the tool execution was successful and the data is valid.")
    reasoning: str = Field(description="Reasoning for the verification decision.")

def web_search(query: str) -> str:
    """Performs a web search, but is designed to fail for a specific type of  query. You can establish guardrails similar to this for any use cases."""
    console.print(f"--- TOOL: Searching for '{query}'... ---")
    if "employee count" in query.lower():
        console.print("--- TOOL: [bold red] Simulated API failure![/bold red] ---")
        return "Error: Could not retrieve internal data."
    else:
        result = TavilySearch(max_results=2).invoke(query)
        if isinstance(result, (dict, list)):
            return json.dumps(result, indent=2)
        return str(result)


class Plan(BaseModel):
    """Plan schema for the planner."""
    steps: List[str] = Field(
        description="List of queries (max 5).",
        max_items=5
    )

# Define the state for the basic P-E agent
class BasicPEState(TypedDict):
    user_request: str
    plan: Optional[List[str]]
    intermediate_steps: List[str]
    final_answer: Optional[str]

def basic_synthesizer_node(state: BasicPEState):
    console.print("--- (Basic) SYNTHESIZER: Generating final answer... ---")
    context = "\n".join(state["intermediate_steps"])
    prompt = f"Synthesize an answer for '{state['user_request']}' using this data:\n{context}"
    answer = llm.invoke(prompt).content
    return {"final_answer": answer}

class PEVState(TypedDict):
    user_request: str
    plan: Optional[List[str]]
    last_tool_result: Optional[str]
    intermediate_steps: List[str]
    final_answer: Optional[str]
    retries: int

# -------------------------------------------------------------------
# Planner node (manual JSON parsing)
# -------------------------------------------------------------------
def pev_planner_node(state: PEVState):
    retries = state.get("retries", 0)
    # Setting max retries on how many times planner can retry before it fail completely.
    if retries > 4:
        console.print("--- (PEV) PLANNER: Retry limit reached. Stopping. ---")
        return {
            "plan": [],
            "final_answer": "Error: Unable to complete task after" + str({retries}) + "retries."
        }

    console.print(f"--- (PEV) PLANNER: Creating/revising plan (retry {retries})... ---")
    past_context = "\n".join(state["intermediate_steps"])

    base_prompt = f"""
    You are a planning agent. 
    Create a plan to answer: '{state['user_request']}'. 
    Use the 'web_search' tool.

    Rules:
    - Return ONLY valid JSON exactly like: {{ "steps": ["query1", "query2"] }}
    - Maximum 5 steps.
    - Do NOT repeat failed queries or endless variations.
    - Do NOT output explanations, only JSON.

    Previous attempts and results:
    {past_context}
    """

    for attempt in range(2):
        try:
            response = llm.invoke(base_prompt)
            text = response.content if hasattr(response, "content") else str(response)
            clean = text.strip()
            # remove any code fences
            if clean.startswith("```"):
                clean = re.sub(r"^```[a-zA-Z]*\n", "", clean)
                clean = re.sub(r"\n```$", "", clean)
            match = re.search(r"\{.*\}", clean, re.DOTALL)
            if match:
                clean = match.group()
            data = json.loads(clean)
            plan = Plan(**data)
            return {"plan": plan.steps, "retries": retries + 1}
        except Exception as e:
            console.print(f"[red]Planner JSON parsing failed (attempt {attempt+1}): {e}[/red]")
            base_prompt = f"Return ONLY valid JSON with {{'steps': ['...']}}. {base_prompt}"

    return {"plan": [state['user_request']], "retries": retries + 1}

# -------------------------------------------------------------------
# Executor node
# -------------------------------------------------------------------
def pev_executor_node(state: PEVState):
    if not state.get("plan"):
        console.print("--- (PEV) EXECUTOR: No steps left, skipping execution. ---")
        return {}

    console.print("--- (PEV) EXECUTOR: Running next step... ---")
    next_step = state["plan"][0]
    result = web_search(next_step)
    return {"plan": state["plan"][1:], "last_tool_result": result}

# -------------------------------------------------------------------
# Verifier node (manual JSON parsing)
# -------------------------------------------------------------------
def verifier_node(state: PEVState):
    console.print("--- VERIFIER: Checking last tool result... ---")
    prompt = f"""
    Verify if the following tool output is a successful result or an error message.
    Task: '{state['user_request']}'
    Tool Output: '{state['last_tool_result']}'

    Return ONLY valid JSON in this exact format:
    {{
        "is_successful": true or false,
        "reasoning": "short explanation"
    }}
    """
    response = llm.invoke(prompt)
    text = response.content if hasattr(response, "content") else str(response)
    clean = text.strip()
    if clean.startswith("```"):
        clean = re.sub(r"^```[a-zA-Z]*\n", "", clean)
        clean = re.sub(r"\n```$", "", clean)
    match = re.search(r"\{.*\}", clean, re.DOTALL)
    if match:
        clean = match.group()
    try:
        data = json.loads(clean)
        verification = VerificationResult(**data)
    except Exception as e:
        console.print(f"[red]Verifier JSON parsing failed: {e}[/red]")
        verification = VerificationResult(is_successful=False, reasoning="Could not parse verifier output.")

    console.print(f"--- VERIFIER: Judgment is '{'Pass' if verification.is_successful else 'Fail'}' ---")
    if verification.is_successful:
        return {"intermediate_steps": state["intermediate_steps"] + [state['last_tool_result']]}
    else:
        return {"plan": [], "intermediate_steps": state["intermediate_steps"] + [f"Verification Failed: {state['last_tool_result']}"]}

# -------------------------------------------------------------------
# Synthesizer node (reuse your basic_synthesizer_node)
# -------------------------------------------------------------------
pev_synthesizer_node = basic_synthesizer_node
# -------------------------------------------------------------------
# Router
# -------------------------------------------------------------------
def pev_router(state: PEVState):
    if state.get("final_answer"):
        console.print("--- ROUTER: Final answer available. Moving to synthesizer. ---")
        return "synthesize"

    if not state["plan"]:
        if state["intermediate_steps"] and "Verification Failed" in state["intermediate_steps"][-1]:
            console.print("--- ROUTER: Verification failed. Re-planning... ---")
            return "plan"
        else:
            console.print("--- ROUTER: Plan complete. Moving to synthesizer. ---")
            return "synthesize"
    else:
        console.print("--- ROUTER: Plan has more steps. Continuing execution. ---")
        return "execute"

# -------------------------------------------------------------------
# Build PEV graph
# -------------------------------------------------------------------
from langgraph.graph import StateGraph, END

pev_graph_builder = StateGraph(PEVState)
pev_graph_builder.add_node("plan", pev_planner_node)
pev_graph_builder.add_node("execute", pev_executor_node)
pev_graph_builder.add_node("verify", verifier_node)
pev_graph_builder.add_node("synthesize", pev_synthesizer_node)

pev_graph_builder.set_entry_point("plan")
pev_graph_builder.add_edge("plan", "execute")
pev_graph_builder.add_edge("execute", "verify")
pev_graph_builder.add_conditional_edges("verify", pev_router)
pev_graph_builder.add_edge("synthesize", END)

pev_agent_app = pev_graph_builder.compile()
console.print("[bold green]Planner-Executor-Verifier (PEV) agent compiled successfully.[/bold green]")


/var/folders/fk/np76hvhj1yj81k65qbkq0wv40000gn/T/ipykernel_12642/2158613693.py:37: PydanticDeprecatedSince20: `max_items` is deprecated and will be removed, use `max_length` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.12/migration/
  steps: List[str] = Field(


In [None]:
query = "What is the latest research on AI, and who are the top 5 leaders in the space?"
console.print(f"[bold green]Testing PEV agent on the same query:[/bold green]\n'{query}'\n")
initial_pev_input = {"user_request": query, "intermediate_steps": [], "retries": 0}
final_pev_output = pev_agent_app.invoke(initial_pev_input)
console.print("\n--- [bold green]Final Outpt from PEV Agent[/bold green] ---")
console.print(Markdown(final_pev_output['final_answer']))