# Lesson 4: Persistence and Streaming

1. Persistence lets you keep around the state of an agent at a particular point in time. This can let you go back to that state and resume in that state in future interactions.This is really important for long running applications.
2. Likewise, with streaming, you can emit a list of signals of what's going on at that exact moment. So for long running applications, you know exactly what the agent is

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [6]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [8]:
messages = [HumanMessage(content="What is the weather in sf?")]

We'll add a thread configuration to track different threads in the persistent checkpointer, enabling multiple simultaneous conversations for production applications. This thread config is a dictionary with a configurable key, including a thread ID that can be set to any string.

In [9]:
thread = {"configurable": {"thread_id": "1"}}

In [10]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_lh8ykkDx0R8hGKcI0EzKgSgJ', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_dd932ca5d1', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-d72f44be-b54a-4220-9609-a3865aff541c-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_lh8ykkDx0R8hGKcI0EzKgSgJ'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_lh8ykkDx0R8hGKcI0EzKgSgJ'}
Back to the model!
[ToolMessage(content='[{\'url\': \'https://www.wunderground.com/hourly/us/ca/san-francisco/94129/date/2024-07-11\', \'content\': \'San Francisco Weather Forecasts. Weather Underground provides

In [11]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_FMjQIiZwFUZ5pFIt9ATAZBcR', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 749, 'total_tokens': 771}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_dd932ca5d1', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-671892bc-b913-4d23-928d-916e98c2cc43-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_FMjQIiZwFUZ5pFIt9ATAZBcR'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_FMjQIiZwFUZ5pFIt9ATAZBcR'}
Back to the model!
{'messages': [ToolMessage(content='[{\'url\': \'https://www.wunderground.com/hourly/us/ca/los-Ángeles/90022/date/2024-7-11\', \'content\': \'Los Angeles Weather Forecasts. Weather Unde

In [12]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Los Angeles is currently warmer than San Francisco. The temperature in Los Angeles is approximately 21.7°C (71.1°F), while in San Francisco it is around 20.1°C (68.2°F).', response_metadata={'token_usage': {'completion_tokens': 46, 'prompt_tokens': 1347, 'total_tokens': 1393}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_dd932ca5d1', 'finish_reason': 'stop', 'logprobs': None}, id='run-654682a3-8e4d-437c-8087-e307bfb3319d-0')]}


In [13]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="Could you please clarify what you're asking about? Are you comparing the temperatures of two specific places, objects, or times? Providing more context will help me give you a precise answer.", response_metadata={'token_usage': {'completion_tokens': 37, 'prompt_tokens': 149, 'total_tokens': 186}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_d33f7b429e', 'finish_reason': 'stop', 'logprobs': None}, id='run-e45574ec-721f-4e47-9aa5-608aca12fe3c-0')]}


## Streaming tokens

In [14]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [15]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_c8x1SAl1a3MSNrwqydie5iXW'}
Back to the model!
To| get| the| most| accurate| and| up|-to|-date| weather| information| for| San| Francisco|,| you| can| check| the| following| resources|:

|1|.| [|Weather| Underground|](|https|://|www|.w|und|erground|.com|/hour|ly|/us|/|ca|/s|an|-fr|anc|isco|/|941|17|/date|/|202|4|-|07|-|11|)| -| Provides| local| and| long|-range| weather| forecasts|,| weather| reports|,| maps|,| and| tropical| weather| conditions| for| the| San| Francisco| area|.

|2|.| [|National| Weather| Service|](|https|://|forecast|.weather|.gov|/|zip|city|.php|?|input|string|=|San|+|Franc|isco|,|CA|)| -| Offers| hourly| weather| forecast|,| radar| and| satellite| images|,| and| additional| resources| like| severe| weather| outlook| maps| and| precipitation| maps|.|