# Lesson 6: Essay Writer

In [1]:
import re

PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

SVC_ACC = !(gcloud config get-value core/account)
SVC_ACC = SVC_ACC[0]

PROJECT_NUMBER=str(re.search(r'\d+', SVC_ACC).group())

LOCATION="us-central1"

FOLDER_NAME="."

In [2]:
from dotenv import load_dotenv

_ = load_dotenv()

from langchain_google_vertexai import ChatVertexAI
llm = ChatVertexAI(model_name= 'gemini-pro')

In [None]:
# %pip install langgraph
# %pip install langgraph-checkpoint-sqlite
%pip install tavily-python
%pip install pygraphviz

Note: you may need to restart the kernel to use updated packages.


In [None]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage, ChatMessage

memory = SqliteSaver.from_conn_string(":memory:")

In [None]:
class AgentState(TypedDict):
    task: str
    plan: str
    draft: str
    critique: str
    content: List[str]
    revision_number: int
    max_revisions: int

In [None]:
# from langchain_openai import ChatOpenAI
# model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

In [None]:
PLAN_PROMPT = """You are an expert writer tasked with writing a high level outline of an essay. \
Write such an outline for the user provided topic. Give an outline of the essay along with any relevant notes \
or instructions for the sections."""

In [None]:
WRITER_PROMPT = """You are an essay assistant tasked with writing excellent 5-paragraph essays.\
Generate the best essay possible for the user's request and the initial outline. \
If the user provides critique, respond with a revised version of your previous attempts. \
Utilize all the information below as needed: 

------

{content}"""

In [None]:
REFLECTION_PROMPT = """You are a teacher grading an essay submission. \
Generate critique and recommendations for the user's submission. \
Provide detailed recommendations, including requests for length, depth, style, etc."""

In [None]:
RESEARCH_PLAN_PROMPT = """You are a researcher charged with providing information that can \
be used when writing the following essay. Generate a list of search queries that will gather \
any relevant information. Only generate 3 queries max."""


In [None]:
RESEARCH_CRITIQUE_PROMPT = """You are a researcher charged with providing information that can \
be used when making any requested revisions (as outlined below). \
Generate a list of search queries that will gather any relevant information. Only generate 3 queries max."""


In [None]:
from langchain_core.pydantic_v1 import BaseModel

class Queries(BaseModel):
    queries: List[str]

In [None]:
from tavily import TavilyClient
import os

with open('env2.txt', 'r') as file:
    serper_api_key = file.read().strip()

os.environ["SERPER_API_KEY"] = serper_api_key

tavily = TavilyClient(api_key=serper_api_key)


In [None]:
def plan_node(state: AgentState):
    messages = [
        SystemMessage(content=PLAN_PROMPT), 
        HumanMessage(content=state['task'])
    ]
    response = llm.invoke(messages)
    return {"plan": response.content}

In [None]:
def research_plan_node(state: AgentState):
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_PLAN_PROMPT),
        HumanMessage(content=state['task'])
    ])
    content = state['content'] or []
    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)
        for r in response['results']:
            content.append(r['content'])
    return {"content": content}

In [None]:
def generation_node(state: AgentState):
    content = "\n\n".join(state['content'] or [])
    user_message = HumanMessage(
        content=f"{state['task']}\n\nHere is my plan:\n\n{state['plan']}")
    messages = [
        SystemMessage(
            content=WRITER_PROMPT.format(content=content)
        ),
        user_message
        ]
    response = model.invoke(messages)
    return {
        "draft": response.content, 
        "revision_number": state.get("revision_number", 1) + 1
    }


In [None]:
def reflection_node(state: AgentState):
    messages = [
        SystemMessage(content=REFLECTION_PROMPT), 
        HumanMessage(content=state['draft'])
    ]
    response = model.invoke(messages)
    return {"critique": response.content}

In [None]:
def research_critique_node(state: AgentState):
    queries = model.with_structured_output(Queries).invoke([
        SystemMessage(content=RESEARCH_CRITIQUE_PROMPT),
        HumanMessage(content=state['critique'])
    ])
    content = state['content'] or []
    for q in queries.queries:
        response = tavily.search(query=q, max_results=2)
        for r in response['results']:
            content.append(r['content'])
    return {"content": content}

In [None]:
def should_continue(state):
    if state["revision_number"] > state["max_revisions"]:
        return END
    return "reflect"

In [None]:
builder = StateGraph(AgentState)

In [None]:
builder.add_node("planner", plan_node)
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.add_node("research_plan", research_plan_node)
builder.add_node("research_critique", research_critique_node)

In [None]:
builder.set_entry_point("planner")

In [None]:
builder.add_conditional_edges(
    "generate", 
    should_continue, 
    {END: END, "reflect": "reflect"}
)


In [None]:
builder.add_edge("planner", "research_plan")
builder.add_edge("research_plan", "generate")

builder.add_edge("reflect", "research_critique")
builder.add_edge("research_critique", "generate")

In [None]:
graph = builder.compile(checkpointer=memory)

In [None]:
from IPython.display import Image

Image(graph.get_graph().draw_png())

In [None]:
# thread = {"configurable": {"thread_id": "1"}}
# for s in graph.stream({
#     'task': "what is the difference between langchain and langsmith",
#     "max_revisions": 2,
#     "revision_number": 1,
# }, thread, ):
#     print(s)

In [None]:
inputs

## Essay Writer Interface

In [None]:
import warnings
warnings.filterwarnings("ignore")

from helper import ewriter, writer_gui

In [None]:
MultiAgent = ewriter()
app = writer_gui(MultiAgent.graph)
app.launch()

# Main new test

In [None]:
from langchain.tools import tool

# setup the simple tools using LangChain tool decorator
@tool
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b


@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b


@tool
def square(a: int) -> int:
    """Calculates the square of a number."""
    a = int(a)
    return a * a

# setup the toolkit
toolkit = [add, multiply, square]

In [None]:
from langchain.agents import create_tool_calling_agent

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

#define system prompt for tool calling agent
system_prompt = """ You are a mathematical assistant.
        Use your tools to answer questions. If you do not have a tool to
        answer the question, say so. """

tool_calling_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder("chat_history", optional=True),
        ("human", "{input}"),
        MessagesPlaceholder("agent_scratchpad"),
    ]
)

tool_runnable = create_tool_calling_agent(llm, toolkit, prompt  = tool_calling_prompt)
tool_actions = tool_runnable.invoke({"input": "hi! what's 1+1 and  2 times 2", 'intermediate_steps': []})


In [None]:
def run_tool_agent(state):
    agent_outcome = tool_runnable.invoke(state)

    #this agent will overwrite the agent outcome state variable
    return {"agent_outcome": agent_outcome}

In [None]:
from langgraph.prebuilt.tool_executor import ToolExecutor

# tool executor invokes the tool action specified from the agent runnable
# they will become the nodes that will be called when the agent decides on a tool action.

tool_executor = ToolExecutor(toolkit)

# Define the function to execute tools
# This node will run a different tool as specified in the state variable agent_outcome
def execute_tools(state):
    # Get the most recent agent_outcome - this is the key added in the `agent` above
    agent_action = state['agent_outcome']
    if type(agent_action) is not list:
        agent_action = [agent_action]
    steps = []
    #sca only returns an action while tool calling returns a list
    # convert single actions to a list
    
    for action in agent_action:
    # Execute the tool
        output = tool_executor.invoke(action)
        print(f"The agent action is {action}")
        print(f"The tool result is: {output}")
        steps.append((action, str(output)))
    # Return the output
    return {"intermediate_steps": steps}

In [None]:
from typing import TypedDict, Annotated,Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain.agents.output_parsers.tools import ToolAgentAction
from langchain_core.messages import BaseMessage
import operator


class AgentState(TypedDict):
   # The input string from human
   input: str
   # The list of previous messages in the conversation
   chat_history: list[BaseMessage]
   # The outcome of a given call to the agent
   # Needs 'list' as a valid type as the tool agent returns a list.
   # Needs `None` as a valid type, since this is what this will start as
   # this state will be overwritten with the latest everytime the agent is run
   agent_outcome: Union[AgentAction, list, ToolAgentAction, AgentFinish, None]

   # List of actions and corresponding observations
   # These actions should be added onto the existing so we use `operator.add` 
   # to append to the list of past intermediate steps
   intermediate_steps: Annotated[list[Union[tuple[AgentAction, str], tuple[ToolAgentAction, str]]], operator.add]

In [None]:
def should_continue(data):
    # If the agent outcome is an AgentFinish, then we return `exit` string
    # This will be used when setting up the graph to define the flow
    if isinstance(data['agent_outcome'], AgentFinish):
        return "END"
    # Otherwise, an AgentAction is returned
    # Here we return `continue` string
    # This will be used when setting up the graph to define the flow
    else:
        return "CONTINUE"

In [None]:
from langgraph.graph import END, StateGraph

# Define a new graph
workflow = StateGraph(AgentState)

# When nodes are called, the functions for to the tools will be called. 
workflow.add_node("agent", run_tool_agent)


# Add tool invocation node to the graph
workflow.add_node("action", execute_tools)

# Define which node the graph will invoke at start.
workflow.set_entry_point("agent")

# Add flow logic with static edge.
# Each time a tool is invoked and completed we want to 
# return the result to the agent to assess if task is complete or to take further actions 

#each action invocation has an edge leading to the agent node.
workflow.add_edge('action', 'agent')


# Add flow logic with conditional edge.
workflow.add_conditional_edges(
    # first parameter is the starting node for the edge
    "agent",
    # the second parameter specifies the logic function to be run
    # to determine which node the edge will point to given the state.
    should_continue,

    #third parameter defines the mapping between the logic function 
    #output and the nodes on the graph
    # For each possible output of the logic function there must be a valid node.
    {
        # If 'continue' we proceed to the action node.
        "CONTINUE": "action",
        # Otherwise we end invocations with the END node.
        "END": END
    }
)

In [None]:
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

# Finally, compile the graph! 
# This compiles it into a LangChain Runnable,
app = workflow.compile(checkpointer = memory)

In [None]:
inputs = {"input": "give me 1+1 and then 2 times 2", "chat_history": []} 
config = {"configurable": {"thread_id": "1"}}

for s in app.stream(inputs, config = config):
    print(list(s.values())[0])
    print("----")