In [70]:
import os  # Standard Python module to interact with the operating system (e.g., env variables, file paths)

from openai import OpenAI  # Official OpenAI client – used for making API calls to GPT-4, GPT-3.5, etc.

from langgraph.graph import StateGraph, END  
# LangGraph is used to define stateful graphs (multi-agent or workflow pipelines)
# StateGraph defines nodes and transitions, END is used to mark terminal states

from typing import TypedDict, Annotated, List
# Built-in Python module for type hints
# TypedDict helps define structured dictionary types
# Annotated lets you attach metadata to types (useful in LangChain input/output schemas)

import operator  
# Built-in Python module providing functional equivalents of operators (like <, ==, etc.)

from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
# These are message types in LangChain's core message passing system
# Used for defining and processing chat-style inputs/outputs

from langchain_openai import ChatOpenAI  
# LangChain wrapper for OpenAI's chat models – lets you call GPT-4/3.5 with extra features like streaming

from langchain_community.tools.tavily_search import TavilySearchResults  
# Community-contributed LangChain integration for Tavily (a web search tool for real-time search results)

from langchain_openai import ChatOpenAI
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
memory = SqliteSaver.from_conn_string("my_agent.db")

In [71]:
class AgentState(TypedDict):
    task: str
    plan: str
    draft: str
    critique: str
    content: List[str]
    revision_number: int
    max_revisions: int

In [72]:
from dotenv import load_dotenv
load_dotenv()

token = os.getenv("GITHUB_TOKEN")
tavily_api_key = os.getenv("TAVILY_API_KEY")

In [73]:
model = ChatOpenAI(
    model="openai/gpt-4.1-mini",
    base_url="https://models.github.ai/inference",
    api_key=token,
)

In [74]:
PLAN_PROMPT = """You are an expert writer tasked with writing a high level outline of an essay. \
Write such an outline for the user provided topic. Give an outline of the essay along with any relevant notes \
or instructions for the sections."""

In [75]:
WRITER_PROMPT = """You are an essay assistant tasked with writing excellent 5-paragraph essays.\
Generate the best essay possible for the user's request and the initial outline. \
If the user provides critique, respond with a revised version of your previous attempts. \
Utilize all the information below as needed: 

------

{content}"""

In [76]:
REFLECTION_PROMPT = """You are a teacher grading an essay submission. \
Generate critique and recommendations for the user's submission. \
Provide detailed recommendations, including requests for length, depth, style, etc."""

In [77]:
RESEARCH_PLAN_PROMPT = """You are a researcher charged with providing information that can \
be used when writing the following essay. Generate a list of search queries that will gather \
any relevant information. Only generate 3 queries max."""


In [78]:
RESEARCH_CRITIQUE_PROMPT = """You are a researcher charged with providing information that can \
be used when making any requested revisions (as outlined below). \
Generate a list of search queries that will gather any relevant information. Only generate 3 queries max."""


In [79]:
from langchain_core.pydantic_v1 import BaseModel

class Queries(BaseModel):
    queries: List[str]

In [80]:
from tavily import TavilyClient
tavily = TavilyClient(api_key=tavily_api_key)

In [81]:
def plan_node(state: AgentState):
    messages = [
        SystemMessage(content=PLAN_PROMPT),
        HumanMessage(content=state['task'])
    ]
    response = model.invoke(messages)
    return {"plan": response.content}

In [82]:
def research_plan_node(state: AgentState):
    # Step 1: Generate research queries based on the task using an AI model
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_PLAN_PROMPT),  # System instruction: how to generate queries
        HumanMessage(content=state['task'])           # User-provided task, e.g., "Research on AI safety"
    ])

    # Step 2: Get existing content if available, else initialize an empty list
    content = state['content'] or []

    # Step 3: For each generated query, perform a web search and collect results
    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)  # Search using Tavily API, limit to 2 results
        for r in response['results']:
            content.append(r['content'])  # Add the text content of each result to the list

    # Step 4: Return the collected content in a dictionary (used as updated agent state)
    return {"content": content}


In [83]:
def generation_node(state: AgentState):
    # Step 1: Combine all the collected content into a single string with line breaks
    content = "\n\n".join(state['content'] or [])

    # Step 2: Create a message from the user with the task and their current plan
    user_message = HumanMessage(
        content=f"{state['task']}\n\nHere is my plan:\n\n{state['plan']}"
    )

    # Step 3: Prepare the list of messages to send to the model
    messages = [
        SystemMessage(
            content=WRITER_PROMPT.format(content=content)  # Fill in the prompt with the researched content
        ),
        user_message  # Include the user's task and plan
    ]

    # Step 4: Invoke the model with the prepared messages to generate a draft
    response = model.invoke(messages)

    # Step 5: Return the generated draft and update the revision number
    return {
        "draft": response.content,  # The AI-generated draft
        "revision_number": state.get("revision_number", 1) + 1  # Increment revision number (starts at 1 if not present)
    }


In [84]:
def reflection_node(state: AgentState):
    messages = [
        SystemMessage(content=REFLECTION_PROMPT), 
        HumanMessage(content=state['draft'])
    ]
    response = model.invoke(messages)
    return {"critique": response.content}

In [85]:
def research_critique_node(state: AgentState):
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_CRITIQUE_PROMPT),
        HumanMessage(content=state['critique'])
    ])
    content = state['content'] or []
    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)
        for r in response['results']:
            content.append(r['content'])
    return {"content": content}

In [86]:
def should_continue(state):
    if state["revision_numner"] > state["max_revisions"]:
        return END
    return "reflect"

In [87]:
builder = StateGraph(AgentState)

In [88]:
builder.add_node("planner", plan_node)
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.add_node("research_plan", research_plan_node)
builder.add_node("research_critique", research_critique_node)

<langgraph.graph.state.StateGraph at 0x1e60fabf910>

In [89]:
builder.set_entry_point("planner")

<langgraph.graph.state.StateGraph at 0x1e60fabf910>

In [90]:
builder.add_conditional_edges(
    "generate",
    should_continue,
    {END: END, "reflect": "reflect"}
)

<langgraph.graph.state.StateGraph at 0x1e60fabf910>

In [91]:
builder.add_edge("planner", "research_plan")
builder.add_edge("research_plan", "generate")

builder.add_edge("reflect", "research_critique")
builder.add_edge("research_critique", "generate")

<langgraph.graph.state.StateGraph at 0x1e60fabf910>

In [93]:
from langgraph.checkpoint.sqlite import SqliteSaver

with SqliteSaver.from_conn_string("my_agent.db") as checkpointer:
    app = graph
    thread = {"configurable": {"thread_id": "1"}}
    for s in app.stream({
        'task': "what is the difference between langchain and langsmith",
        "max_revisions": 2,
        "revision_number": 1,
    }, thread):
        print(s)


AttributeError: '_GeneratorContextManager' object has no attribute 'get_next_version'