# Persistence and Streaming

In [18]:
from dotenv import load_dotenv

_ = load_dotenv()

In [19]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [20]:
tool = TavilySearchResults(max_results=2)

In [21]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [22]:
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")

In [23]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        
        # THIS CHECKPOINTER SAVES CONVERSATIONS IN A SQLLITE DATABASE. WE CAN USE AN EXTERNAL DATABASE AS WELL
        self.graph = graph.compile(checkpointer=checkpointer) 
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [24]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [25]:
messages = [HumanMessage(content="What is the weather in Rio de Janeiro, Brazil?")]

In [26]:
thread = {"configurable": {"thread_id": "1"}} # ALLOWS US TO HAVE MULTIPLE CONVERSATIONS AT THE SAME TIME

In [27]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_v2yzUjkZgGuO6MpGf61CpUOB', 'function': {'arguments': '{"query":"current weather in Rio de Janeiro, Brazil"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 25, 'prompt_tokens': 155, 'total_tokens': 180}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-66e653ba-eb59-49b4-b462-530d4ae2c827-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Rio de Janeiro, Brazil'}, 'id': 'call_v2yzUjkZgGuO6MpGf61CpUOB'}], usage_metadata={'input_tokens': 155, 'output_tokens': 25, 'total_tokens': 180})]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Rio de Janeiro, Brazil'}, 'id': 'call_v2yzUjkZgGuO6MpGf61CpUOB'}
Back to the model!
[ToolMessage(content='[{\'url\': \'https://www.wunderground.com/weather/br

In [28]:
messages = [HumanMessage(content="What about in La Plata, Argentina?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_1ikzSo6QDTfCOWne8w4nogkc', 'function': {'arguments': '{"query":"current weather in La Plata, Argentina"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 834, 'total_tokens': 858}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-2e488d77-71ed-4cdb-99b4-0306e9bc512a-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in La Plata, Argentina'}, 'id': 'call_1ikzSo6QDTfCOWne8w4nogkc'}], usage_metadata={'input_tokens': 834, 'output_tokens': 24, 'total_tokens': 858})]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in La Plata, Argentina'}, 'id': 'call_1ikzSo6QDTfCOWne8w4nogkc'}
Back to the model!
{'messages': [ToolMessage(content="[{'url': 'https://www.ventusky.com

In [29]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Currently, Rio de Janeiro, Brazil is warmer than La Plata, Argentina.\n\n- **Rio de Janeiro**: 21.1°C (70.0°F)\n- **La Plata**: 1.3°C (34.3°F)', response_metadata={'token_usage': {'completion_tokens': 51, 'prompt_tokens': 2303, 'total_tokens': 2354}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'stop', 'logprobs': None}, id='run-b1e5cf97-7730-4edb-833d-8c2c2a3f78bf-0', usage_metadata={'input_tokens': 2303, 'output_tokens': 51, 'total_tokens': 2354})]}


In [30]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please clarify what you would like to compare in terms of warmth? Are you asking about:\n\n1. The temperature difference between two locations?\n2. The warmth of different types of clothing or materials?\n3. The warmth of different seasons or times of the year?\n\nProviding more context will help me give you a precise answer.', response_metadata={'token_usage': {'completion_tokens': 67, 'prompt_tokens': 149, 'total_tokens': 216}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'stop', 'logprobs': None}, id='run-c6c64f4b-d764-4aac-8d1d-ceaf1293685a-0', usage_metadata={'input_tokens': 149, 'output_tokens': 67, 'total_tokens': 216})]}


## Streaming tokens

In [None]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [None]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")