In [1]:
import os

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_core.messages import SystemMessage, HumanMessage
from pydantic import BaseModel
from tavily import TavilyClient

In [3]:
from langchain_ollama import ChatOllama

model = ChatOllama(
    model="llama3",
    temperature=0
)

In [4]:
from langgraph.checkpoint.memory import MemorySaver

In [6]:
memory = MemorySaver()
tavily = TavilyClient(
    api_key="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"
)

In [7]:
class AgentState(TypedDict) : 
    task: str
    plan: str
    draft: str
    critique: str
    content: str
    content_number: int
    revision_no: int
    max_revision: int

In [8]:
PLAN_PROMPT = """You are an expert writer tasked with writing a high level outline of an essay. \
Write such an outline for the user provided topic. Give an outline of the essay along with any relevant notes \
or instructions for the sections."""

In [9]:
WRITER_PROMPT = """You are an essay assistant tasked with writing excellent 5-paragraph essays.\
Generate the best essay possible for the user's request and the initial outline. \
If the user provides critique, respond with a revised version of your previous attempts. \
Utilize all the information below as needed: 

------

{content}"""

In [10]:
REFLECTION_PROMPT = """You are a teacher grading an essay submission. \
Generate critique and recommendations for the user's submission. \
Provide detailed recommendations, including requests for length, depth, style, etc."""

In [11]:
RESEARCH_PLAN_PROMPT = """You are a researcher charged with providing information that can \
be used when writing the following essay. Generate a list of search queries that will gather \
any relevant information. Only generate 3 queries max."""

In [12]:
RESEARCH_CRITIQUE_PROMPT = """You are a researcher charged with providing information that can \
be used when making any requested revisions (as outlined below). \
Generate a list of search queries that will gather any relevant information. Only generate 3 queries max."""

In [13]:
class Queries(BaseModel):
    queries: List[str]

In [14]:
def plan_node(state: AgentState):
    """Creates a high-level outline for the essay"""
    messages = [
        SystemMessage(content=PLAN_PROMPT), 
        HumanMessage(content=state['task'])
    ]
    response = model.invoke(messages)
    return {"plan": response.content}

In [15]:
def research_plan_node(state: AgentState):
    """Searches the web for information based on the initial plan"""
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_PLAN_PROMPT),
        HumanMessage(content=state['task'])
    ])

In [18]:
def search_node(state, queries):
    content = state.get("content", [])

    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)
        for r in response.get("results", []):
            content.append(r["content"])

    return {"content": content}


In [19]:
def generation_node(state: AgentState):
    """Generates or revises the essay draft"""
    content = "\n\n".join(state['content'] or [])
    user_message = HumanMessage(
        content=f"{state['task']}\n\nHere is my plan:\n\n{state['plan']}"
    )
    messages = [
        SystemMessage(content=WRITER_PROMPT.format(content=content)),
        user_message
    ]
    response = model.invoke(messages)
    return {
        "draft": response.content, 
        "revision_number": state.get("revision_number", 1) + 1
    }

In [20]:
def reflection_node(state: AgentState):
    """Critiques the current draft and provides feedback"""
    messages = [
        SystemMessage(content=REFLECTION_PROMPT), 
        HumanMessage(content=state['draft'])
    ]
    response = model.invoke(messages)
    return {"critique": response.content}

In [24]:
def research_critique_node(state: AgentState):
    """Searches for additional information based on critique"""
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_CRITIQUE_PROMPT),
        HumanMessage(content=state['critique'])
    ])
    content = state['content'] or []
    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)
        for r in response['results']:
            content.append(r['content'])
    return {"content": content}

In [25]:
 def should_continue(state):
    """Decides whether to continue revising or finish"""
    if state["revision_number"] > state["max_revisions"]:
        return END
    return "reflect"


In [26]:
builder = StateGraph(AgentState)

# Add all nodes to the graph
builder.add_node("planner", plan_node)
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.add_node("research_plan", research_plan_node)
builder.add_node("research_critique", research_critique_node)

<langgraph.graph.state.StateGraph at 0x17091733f90>

In [27]:
builder.set_entry_point("planner")

# Define the workflow edges
builder.add_edge("planner", "research_plan")
builder.add_edge("research_plan", "generate")
builder.add_edge("reflect", "research_critique")
builder.add_edge("research_critique", "generate")

<langgraph.graph.state.StateGraph at 0x17091733f90>

In [28]:
builder.add_conditional_edges(
    "generate", 
    should_continue, 
    {END: END, "reflect": "reflect"}
)


<langgraph.graph.state.StateGraph at 0x17091733f90>

In [29]:
graph = builder.compile(checkpointer=memory)

In [30]:
try:
    from IPython.display import Image, display
    display(Image(graph.get_graph().draw_png()))
    print("‚úì Graph visualization displayed")
except Exception as e:
    print(f"‚ö† Could not display graph: {e}")
    print("  Install graphviz if you want to see the visualization")

‚ö† Could not display graph: Install pygraphviz to draw graphs: `pip install pygraphviz`.
  Install graphviz if you want to see the visualization


In [31]:
thread = {"configurable": {"thread_id": "1"}}

print("\n" + "="*60)
print("STARTING ESSAY GENERATION WORKFLOW")
print("="*60 + "\n")

for s in graph.stream({
    'task': "what is the difference between langchain and langsmith",
    "max_revisions": 2,
    "revision_number": 1,
}, thread):
    print(f"\n{'='*60}")
    print(f"Step: {list(s.keys())[0]}")
    print(f"{'='*60}")
    print(s)

print("\n" + "="*60)
print("WORKFLOW COMPLETED")
print("="*60)


STARTING ESSAY GENERATION WORKFLOW


Step: planner
{'planner': {'plan': "Here's a high-level outline for an essay on the differences between LangChain and LangSmith:\n\nI. Introduction\n\n* Briefly introduce the topic of AI-powered language models, specifically LangChain and LangSmith\n* Provide context: both are popular tools used in natural language processing (NLP) tasks, but they have distinct features and applications\n* Thesis statement: While LangChain and LangSmith share some similarities, their differences lie in their architecture, capabilities, and use cases.\n\nII. Architecture and Capabilities\n\n* Describe the architecture of each model:\n\t+ LangChain: a transformer-based language model that uses a combination of self-attention mechanisms and convolutional neural networks (CNNs) to process input text\n\t+ LangSmith: a BERT-based language model that leverages pre-trained weights from the original BERT model and fine-tunes them for specific tasks\n* Highlight the unique f

KeyError: 'content'

In [None]:
final_state = graph.get_state(thread)
print("\nüìù FINAL ESSAY:\n")
print(final_state.values.get('draft', 'No draft available'))