# Lesson 4: Persistence and Streaming

Persistence and streaming

Persistence lets you keep around the state
of an agent at a particular point in time.
This can let you go back to that state and resume in that state
in future interactions.
This is really important for long running applications.

Likewise, with streaming,
you can emit a list of signals of what's going on at that exact moment.
So for long running applications, you know exactly what the agent is doing.

we've added the concept of a check pointer into LangGraph.
A check pointer basically checkpoints
the state after
and between every node.
To add in persistence for this agent,
what we'll do is we'll use a SqliteSaver.

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In order to deal with persistence,
we've added the concept of a check pointer into LangGraph.
A check pointer basically checkpoints
the state after
and between every node.
To add in persistence for this agent,
what we'll do is we'll use a SqliteSaver.
So this is a really simple check pointer that we've added that uses
Sqlite, a built in database under the hood.
And we'll just use the in-memory database.
So if we refresh this notebook it'll disappear.
But you can easily connect this to an external database
or we also have other check pointers that use Redis
and Postgres and other more persistent databases like that.

In [6]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

**Streaming**

First, we might care about streaming
the individual messages.
So this would be the Al message that determines what action to take.
And then the observation message that represents the result
of taking that action.
The second thing we might care about streaming is tokens.
So for each token of the LLM call we might want to stream the output.

We're now going to add this concept of a thread config.
So this will be used to keep track of different threads
inside the persistent checkpointer.
This will allow us to have multiple conversations going on at the same time.

This is really needed for production applications
where you generally have many users.
This thread config is simply a dictionary with a configurable key.
And as part of that, we have a thread id and we can set that equal to any string.
Here we're going to set that equal to one.

We're now going to call the graph
not with invoke, but with stream.
We're going to pass in the same messages dictionary.
And we're also going to pass in this thread config as a second parameter there.
We're then going to get back a stream of events.
These events represent updates to that state over time
because we know our state only has one key,
the messages key

Each thread id represents one session or similar to one chat_id, langgraph will store all pvs messages w.r.t that thread_id 

In [8]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [9]:
thread = {"configurable": {"thread_id": "1"}}

In [10]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_mTnplhSZin0eHzj4Jk4nTUpW', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173, 'completion_tokens_details': {'reasoning_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_25624ae3a5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-2fbd26d8-24d4-43ba-af70-6782c6ecc00d-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_mTnplhSZin0eHzj4Jk4nTUpW'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_mTnplhSZin0eHzj4Jk4nTUpW'}
Back to the model!
[ToolMessage(content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'San Francisco\', \'region\': \

In [11]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_bybXck117IDFCptjw9YDQ5Wn', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 745, 'total_tokens': 767, 'completion_tokens_details': {'reasoning_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_25624ae3a5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-c61ea791-c9dd-4a86-8c25-805b9bbedb5a-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_bybXck117IDFCptjw9YDQ5Wn'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_bybXck117IDFCptjw9YDQ5Wn'}
Back to the model!
{'messages': [ToolMessage(content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'Los Angel

Using persistence and streaming concept , it will store all the messages of that thread_id in checkpointer memory

In [12]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="Los Angeles is currently warmer with a temperature of 21.7°C (71.1°F) compared to San Francisco's 18.9°C (66.0°F).", response_metadata={'token_usage': {'completion_tokens': 37, 'prompt_tokens': 1407, 'total_tokens': 1444, 'completion_tokens_details': {'reasoning_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_25624ae3a5', 'finish_reason': 'stop', 'logprobs': None}, id='run-d2166795-4015-46f5-9abf-899ab09e319b-0')]}


In [13]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please clarify what you are comparing? Are you referring to two specific locations, times, or objects?', response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 149, 'total_tokens': 172, 'completion_tokens_details': {'reasoning_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_25624ae3a5', 'finish_reason': 'stop', 'logprobs': None}, id='run-18d7f494-fb14-45e6-bebe-4a49688dde4d-0')]}


## Streaming tokens

But what about streaming tokens themselves.
For that, we're going to want to use the A-stream
events method that comes on all LangChain and LangGraph objects.
A-stream event is an asynchronous method,
which means that we're going to need to use an async checkpointer.

In [14]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [15]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_Ln5Sfh6c9Br3iLLnu3ghaAEE'}
Back to the model!
The| current| weather| in| San| Francisco| is| partly| cloudy| with| a| temperature| of| |18|.|9|°C| (|66|.|0|°F|).| The| wind| is| blowing| from| the| west|-s|outh|west| at| |8|.|9| mph| (|14|.|4| k|ph|),| and| the| humidity| level| is| at| |78|%.| The| atmospheric| pressure| is| |100|9|.|0| mb|.| Visibility| is| good| at| |16| km| (|9| miles|).|