When building Agents, we’re often working on longer running tasks. This requires understanding of two important concepts: Persistence and Streaming. Persistence lets you record the state of the Agent at a particular point in time. This allows you to go back and resume in that state for future interactions. This is important for long running applications.

Likewise with streaming, you can emit a list of signals of what’s going on that moment. So, for long running applications, you know exactly what the Agent is doing at that instant. Let us outline the steps followed in the Colab notebook attached with this article:

1)	To get started, we create the Agent as before.

2)	We load in the environment variables.

3)	We make the necessary imports

4)	We create the Tivily search tool again

5)	We create the Agent state

6)	We create the Agent

7)	We next add in persistence

8)	In order to deal with persistence, we need to understand the concept of check pointer in LangGraph

9)	A check pointer “check points” the state between every node

10)	To add in persistence, we use a sqlite server.

11)	This is a very simple check pointer that uses sqlite under the hood.

12)	We just use in memory database

13)	If we refresh the notebook, it will disappear

14)	For a live use case, you will have to connect it to an external database

15)	We also have check pointers that use Redis and Postgres and other persistence databases.

16)	Here we initialise the check pointer, we pass it to the graph compile

17)	This will be another parameter that we add it to the Agent

18)	We initialize the Agent and pass in the check pointer

19)	When we use the Agent, we also add in concept of streaming

20)	There are two things to care about streaming:

a)	First, we take the case of streaming individual messages
b)	Next is the streaming of observation messages that determine the result of taking the action

21)	We also consider the case if streaming the tokens. For each token of the LLM call, we might want to stream the output.

22)	In the notebook, we first add streaming to the AI message.

23)	We create a Human message

24)	We add the thread config which allows to keep track of different threads and persistence check point

25)	This allows us to have multiple conversations going on at the same time

26)	The thread config is mainly a dictionary with config key and a thread id which is a string. Here we set it equal to “1”.

27)	We call the graph not with the invoke but with stream

28)	We pass in the messages dictionary as well as the thread config

29)	We ask a question – What is the weather like in San Francisco?

30)	We get back a stream of events a) AI Message b) Tool Message c) Thin AI message

31)	We ask it another question: What about LA?

32)	It may be oticed that there is no “weather” word here but because of persistence, it does the function call and we can see AI message with stream

33)	We can call again using he ame thread id, asking: “Which is warmer”?

34)	Changing the thread id, one can notice that the Language Model is not able to retrieve the answer.

35)	The above snippets covered the importance of persistence and we will next see how we can stream events

36)	Let us now see streaming of tokens

37)	We use A stream event method that comes on all LangChain and LangGraph Agents

38)	A Sync event is an asynchronous event which means that we use async check pointer

39)	We import async sqlite server and pass that to the Agent

40)	This is the same as before – we are just swapping synchronous sql lite server with asynchronous sqlite server

41)	This allows us to use asynchronous methods on the Graph

42)	We use a new thread id to start the conversation afresh

43)	We also use different type of events. The events represent update from the underlying stream. We look for events corresponding to the new token

44)	These kinds of events are based on model stream

45)	When we use these events, we want the event printed out and we print it with pipe delimiter

46)	When we run this, we can see the streaming real time

47)	We will be able to see the streaming of the tokens real time.

48)	This becomes very useful in production applications.



In [None]:
# 1. Two important concepts. Persistence and Streaming
# 2. Persistence lets you keep around the state of the Agent
# at a particular point in time.
# 3. This allows you to go back to that satte and resume in that state at
# any instant
# 4. This is important for long running applicatiions.
# 5. Likewise for streaming, you can emit a list of signals
# of what's going at that moment.
# 6. So, for long running applications we know exactly what the Agent is doing


In [None]:
#7. Lets craete the Agent
#8. First, load in the environment variables.

import os
import openai
os.environ["OPENAI_API_KEY"] = ""
openai.api_key = os.environ['OPENAI_API_KEY']
os.environ["TAVILY_API_KEY"] = ""

In [None]:
# 9. We make the necessary imports related to LAngGraph, etc

!pip install langgraph
!pip install langchain-openai
!pip install langchain-community

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults



In [None]:
# 10. Create the Tavily Search tool
tool = TavilySearchResults(max_results=2)

In [None]:
# 11. Create the AgentState class
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [None]:
!pip install langgraph-checkpoint-sqlite
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")



In [None]:
# 12. Create the Agent again
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}


In [None]:
# 13. Next, we are going to add in persistence
# 14. In order to deal with persistence, there is the concept of checkpointer
# in LangGraph
# 15. A checkpointer check points the state after and in between every node
# 16. To add in persistence, we use a sqlite server
# 17. sqlite is a built in database under the hood
# 18. We will use in memory database
# 19. That is, if we refresh the notebook it will disappear
# 20. This can be easily connected to a external database
# 21. You can use checkpointers that vuse Redis or Postgres or
# other persistent databases
# 22. It may be noted that we just add another parameter to the Agent
# that is checkpointer
# 23. We then pass the checkpointer in graph compile
# 24. Note, all this is already done above.

In [None]:
# 25.  We create the Agent and we pass in checkpointer=memory

prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4.1")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [None]:
# 26. We are also going to add cocnept of streaming
# 27. There are two things that we care about.
# 28. We care about streaming of indivdual messages
# 29. These are the AI message that determines what action
# to take.
# 30. We then have the observation message that represents the
# result of taking that action
# 31. The second thing that we may want to stream is token.
# 32. We might want to stream the LLM output
# 33. Let us first stream the messages
# 34. We will do the tokens later on.

In [None]:
# 35. Let us create a HumanMessage

messages = [HumanMessage(content="What is the weather in Birmingham?")]


In [None]:
# 36. We add the concept of thread config.
# 37. This will be used to keep track of different threads
# inside the pesistent checkpointer
# 28. This will allow us to ahve multiple conversations going on
# at the same time
# 29. This is really needed for production applications
# 30. Where there are many users
# 31. thread_config is a dictionary with a configurable key
# 32. As part of that we have a thread id which can set equal to any string
# 33. Here we set that equal to 1

thread = {"configurable": {"thread_id": "1"}}

In [None]:
# 34. We now call the graph not with invoke but with stream.
# 35. We are going to pass the messages dictionary but also thread config
# as the second parameter.
# 36. We get back a stream of events
# 37. These events represnt the updates to the state over time
# 38. Since we  know that our state has one key, the messages key, we
# loop through it and print that out.

# 39. When we run the cell we will see that we get a stream of results
# 40. First we get a AI message
# 41. This is the first result from the Language Model asking us to
# to call Tavily.
# 42. We then get a tool message which is the result of callling Tavily.
# 43. We then get results from Tavily
# 44. Finally we get the final result from LLM answering the question
# 45. With stream, we have really good visibility of what exactly is going on
# under the hood.


from langgraph.checkpoint.sqlite import SqliteSaver
thread = {"configurable": {"thread_id": "1"}}
with SqliteSaver.from_conn_string(":memory:") as memory:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v['messages'])
    # Turn 2 – continues same in‑memory conversation
    messages = [HumanMessage(content="What about in Mumbai")]
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v['messages'])


[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_dPcIZYpyTCML0rlTiZK989pl', 'function': {'arguments': '{"query":"current weather in Birmingham"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 151, 'total_tokens': 172, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4.1-2025-04-14', 'system_fingerprint': 'fp_51e1070cf2', 'id': 'chatcmpl-C8uFBbRASmn0F2LhP7XL9u1ZO81FZ', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--7f7d814d-439c-4eee-b1f1-e767163583c4-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Birmingham'}, 'id': 'call_dPcIZYpyTCML0rlTiZK989pl', 'type': 'tool_call'}], usage_metadata={'input_tokens

In [None]:
# 46. Let's call with anothert message
# 47. This time with "What about Mumbai?"
# 48. This is continuing same question as before
# 49. It's a follow up question
# 50. We do not see anything explicitly about weather
# 51. Since its a continuation of same conversation
# 52. It should undertand the conversation is about weather
# 53. We pass in the same thread id
# 54 In order to show we are continuing at same point
# 55. Done in the same `with` loop otherwise the database closes



In [None]:
# Streaming tokens
# 56. We ahve covered the importance of Persistance
# 57. We have seen how we can stream events
# 58. Let us now see how we can stream tokens themselves
# 59. We will use Astream events method that comes on all
# LAngChain and LAngGraph objects
# 60. Async event is an asynchronous method
# 61. This emans we have to use an Async checkpointer
# 62. In order to do this, we use Async sqlite server and pass it to the Agent
# 63. Same as before but we use AsyncSQLite server which allows to use Async
# methods on the Graph
# 64 See streaming real time into the screen
!pip install langgraph-checkpoint-sqlite

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)





In [None]:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string(":memory:") as memory:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)
    messages = [HumanMessage(content="What is the weather in Birmingham?")]
    thread = {"configurable": {"thread_id": "4"}}

    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        if event["event"] == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="|")



Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Birmingham'}, 'id': 'call_iF7shvsZ8PpxcjLOtKfE36Dx', 'type': 'tool_call'}
Back to the model!
The| current| weather| in| Birmingham| is| over|cast| with| a| temperature| of| around| |27|°C|.| The| wind| is| calm| at| |0| km|/h|,| and| humidity| is| at| |45|%.| Clouds| cover| about| |80|%| of| the| sky|.| Visibility| is| good| at| |16| km|.|