# Essay Writer Agent Flow

In [None]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage, ChatMessage

# Set up memory
memory = SqliteSaver.from_conn_string(":memory:")

In [3]:
# We want to keep track of all elements in a state
class AgentState(TypedDict):
    task: str
    plan: str
    draft: str
    critique: str
    content: List[str]
    revision_number: int
    max_revisions: int

In [5]:
from langchain_ollama.llms import OllamaLLM
model = OllamaLLM(model="qwen2.5")

In [62]:
PLAN_PROMPT = """You are an expert writer tasked with writing a high level outline of an essay. \
Write such an outline for the user provided topic. Give an outline of the essay along with any relevant notes \
or instructions for the sections."""

WRITER_PROMPT = """You are an essay assistant tasked with writing excellent 5-paragraph essays.\
Generate the best essay possible for the user's request and the initial outline. \
If the user provides critique, respond with a revised version of your previous attempts. \
Utilize all the information below as needed: 

------

{content}"""

REFLECTION_PROMPT = """You are a teacher grading an essay submission. \
Generate critique and recommendations for the user's submission. \
Provide detailed recommendations, including requests for length, depth, style, etc."""

RESEARCH_PLAN_PROMPT = """You are a researcher charged with providing information that can \
be used when writing the following essay. Generate a list of search queries that will gather \
any relevant information. Only generate 3 queries max."""

RESEARCH_CRITIQUE_PROMPT = """You are a researcher charged with providing information that can \
be used when making any requested revisions (as outlined below). \
Generate a list of search queries that will gather any relevant information. Only generate 3 queries max."""

In [63]:
from langchain_core.pydantic_v1 import BaseModel
from langchain_community.tools import DuckDuckGoSearchRun

search = DuckDuckGoSearchRun()

class Queries(BaseModel):
    queries: List[str]

    

### Agent Nodes

- Planning
- Research
- Generation
- Reflection
- Critique

In [64]:
def plan_node(state: AgentState):
    messages = [
        SystemMessage(content=PLAN_PROMPT), 
        HumanMessage(content=state['task'])
    ]
    response = model.invoke(messages)
    return {"plan": response}

In [65]:
def research_plan_node(state: AgentState):
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_PLAN_PROMPT),
        HumanMessage(content=state['task'])
    ])
    content = state['content'] or []
    for q in queries.queries:
        # search() returns a string
        response_string = search(query=q) # Removed max_results, handle if needed via tool config
        if response_string: # Append if not empty
             content.append(response_string)
    return {"content": content}

In [66]:
def generation_node(state: AgentState):
    content = "\n\n".join(state['content'] or [])
    user_message = HumanMessage(
        content=f"{state['task']}\n\nHere is my plan:\n\n{state['plan']}")
    messages = [
        SystemMessage(
            content=WRITER_PROMPT.format(content=content)
        ),
        user_message
        ]
    response = model.invoke(messages)
    return {
        "draft": response, 
        "revision_number": state.get("revision_number", 1) + 1
    }


In [67]:
def reflection_node(state: AgentState):
    messages = [
        SystemMessage(content=REFLECTION_PROMPT), 
        HumanMessage(content=state['draft'])
    ]
    response = model.invoke(messages)
    return {"critique": response}

In [68]:
def research_critique_node(state: AgentState):
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_CRITIQUE_PROMPT),
        HumanMessage(content=state['critique'])
    ])
    content = state['content'] or []
    for q in queries.queries:
        # search() returns a string
        response_string = search(query=q) # Removed max_results, handle if needed via tool config
        if response_string: # Append if not empty
             content.append(response_string)
    return {"content": content}

In [69]:
def should_continue(state):
    if state["revision_number"] > state["max_revisions"]:
        return END
    return "reflect"

In [70]:
builder = StateGraph(AgentState)

In [71]:
builder.add_node("planner", plan_node)
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.add_node("research_plan", research_plan_node)
builder.add_node("research_critique", research_critique_node)

<langgraph.graph.state.StateGraph at 0x29b144bd220>

In [72]:
builder.set_entry_point("planner")

<langgraph.graph.state.StateGraph at 0x29b144bd220>

In [73]:
builder.add_conditional_edges(
    "generate", 
    should_continue, 
    {END: END, "reflect": "reflect"}
)


<langgraph.graph.state.StateGraph at 0x29b144bd220>

In [74]:
builder.add_edge("planner", "research_plan")
builder.add_edge("research_plan", "generate")

builder.add_edge("reflect", "research_critique")
builder.add_edge("research_critique", "generate")

<langgraph.graph.state.StateGraph at 0x29b144bd220>

In [75]:
# from IPython.display import Image

# Image(graph.get_graph().draw_png())

In [76]:
memory_context = SqliteSaver.from_conn_string(":memory:")

with memory_context as checkpointer:  # Use the newly created context manager
    # Compile the graph inside the 'with' block
    print("Compiling graph with checkpointer...")
    graph = builder.compile(checkpointer=checkpointer)
    print("Graph compiled.")

    # Also run the stream inside the 'with' block
    thread = {"configurable": {"thread_id": "1"}}
    print(f"Streaming graph for thread: {thread['configurable']['thread_id']}")
    try:
        for s in graph.stream({
            'task': "what is the difference between langchain and langsmith",
            "max_revisions": 2,
            "revision_number": 1, # Start with revision 1
        }, thread):
            print("---")
            print(s)
    except Exception as e:
        print(f"\nError during stream: {e}") # Add error handling for clarity
    finally:
        print("--- Stream finished ---")

Compiling graph with checkpointer...
Graph compiled.
Streaming graph for thread: 1
---
{'planner': {'plan': '### Essay Outline: Understanding the Difference Between LangChain and LangSmith\n\n#### Introduction\n- **Background Information:** Briefly introduce both platforms (LangChain, LangSmith) in the context of natural language processing and artificial intelligence tools.\n- **Thesis Statement:** State that while both are part of the larger ecosystem of AI development frameworks, they serve different purposes and have distinct features.\n\n#### Section 1: Overview of LangChain\n- **Definition and Purpose:** Define what LangChain is and its primary purpose in the AI landscape.\n    - Note: Emphasize its role as an open-source library for building powerful LLM (Large Language Model) applications.\n- **Key Features:** Highlight key features such as ease of integration, modular architecture, and community support.\n- **Use Cases:** Provide examples or case studies where LangChain has be

In [78]:
# import warnings
# warnings.filterwarnings("ignore")

# from helper import ewriter, writer_gui

In [80]:
# MultiAgent = ewriter()
# app = writer_gui(MultiAgent.graph)
# app.launch()