# Persistence and Streaming

Based on [**this tutorial**](https://learn.deeplearning.ai/courses/ai-agents-in-langgraph/lesson/5/persistence-and-streaming)

# Principles

When building agents, they're often working on **longer running tasks**.

For this type of tasks, there are two important concepts:
- **persistence**,
- **streaming**.

> **Persistence** lets you keep around the state of an agent at a particular point in time. This lets you:
> - go back to that state,
> - resume in that state in future interactions.

> With **streaming**, you can emit a list of signals of what's going on at that exact moment. So, for long running applications, you know exactly what the agent is doing.

In this notebook, we will see these concepts in action.

# Setup

In [2]:
from dotenv import load_dotenv

In [3]:
_ = load_dotenv()

# Imports and Basic Implementation

In [17]:
import operator
from typing import Annotated, TypedDict

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import AnyMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph
from rich import print as rprint  # Enhance pretty printing for outputs

In [5]:
# Instanciate Search Tool
tool = TavilySearchResults(max_results=2)

In [6]:
# Define AgentState
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

# Persistence

In order to deal with **persistence**, we will use what's called a **checkpointer** into LangGraph.

It basically **checkpoints the state between every node**.

Here, we'll make use of a `SqliteSaver`, and use it **"in memory"**. This basically means that, when leaving the notebook, memory will be erased. Of course, we could keep track of the previous states with connecting this to an external database (some checkpointers, for example, use *Postgres* or *Redis*). We won't do it as this is a POC and we only want to illustrate the concept.

In [7]:
from langgraph.checkpoint.sqlite import SqliteSaver

In [8]:
memory = SqliteSaver.from_conn_string(":memory:")

It's then really easy to incorporate it within our `Agent` class.

In [9]:
# Define Agent
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):  # 👈 REFERENCE TO CHECKPOINTER
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)  # 👈 REFERENCE TO CHECKPOINTER
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

> **NOTE**
>
> For data persistence, it can also be made use of other databases, or [**Redis**](https://redis.io/) for example.

In [10]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-3.5-turbo")  # 👈 Change to -4o for POC
abot = Agent(model, [tool], system=prompt, checkpointer=memory)  # 👈 Passing the memory object previously defined

# Streaming

There are mainly two things that we might care about streaming:
- First, we might care about **streaming the individual messages**, so that would be:
    - the AI message that determines what action to take,
    - the observation message that represents the result of taking that action.
- The second thing we might take care about streaming is **tokens**. So, for each token of the LLM call, we might want to stream the output.

To begin, we're just going to start by streaming the messages. We'll do the tokens later on.

In [11]:
messages = [HumanMessage(content="What is the weather in sf?")]

We're now going to add the concept of a **thread config**, which will be used to keep track of different theads, inside the persistent checkpointer.

This will allow us to **have multiple conversations going on at the same time**, which is really needed for production applications, where you generally have many users.

This **thread config** is only a `dict`, with a `"configurable"` key. Its value is another `dict` with a lone key being `"thread_id"`, and its value as a `str`, here `"1"` (`uuid`s can also be used.)

In [15]:
thread = {"configurable": {"thread_id": "1"}}

We'll now call the graph, not with `invoke`, but with `stream`, passing:
- the same dictionary,
- `thread` as a second argument.

We're then gonna get back a **stream** of events, which represent **updates to that state over time**.



In [16]:
# Temporarily manage issue with LangSmith (don't bother with this ftm)
!export LANGCHAIN_TRACING_V2="false"

## Initial Stream

In [21]:
for event in abot.graph.stream({"messages": messages}, thread):
    # rprint(event)
    for v in event.values():
        rprint(v["messages"])

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in San Francisco'}, 'id': 'call_AOSKlG76P36uUTAwuCHGA3mA'}
Back to the model!


We get back a stream of events:

> - **first, we get an `AIMessage`, which is the first result from the language model**.

```python
AIMessage(
    content='',
    additional_kwargs={
        'tool_calls': [{
            'id': 'call_eqDDo3U0wuFrWTitkRpMCIw4',
            'function': {
                'arguments': '{"query":"weather in San Francisco"}',
                'name': 'tavily_search_results_json'}, 
                'type': 'function'
            }]
        }, 
    response_metadata={
        'token_usage': {'completion_tokens': 21, 'prompt_tokens': 1186, 'total_tokens': 1207}, 
        'model_name': 'gpt-3.5-turbo', 
        'system_fingerprint': None, 
        'finish_reason': 'tool_calls', 
        'logprobs': None
        }, 
    id='run-af070dca-58fc-42d1-9d5b-c2d081e72a86-0', 
    tool_calls=[{
        'name': 'tavily_search_results_json', 
        'args': {'query': 'weather in San Francisco'}, 
        'id': 'call_eqDDo3U0wuFrWTitkRpMCIw4'
        }], 
    usage_metadata={'input_tokens': 1186, 'output_tokens': 21, 'total_tokens': 1207}
)
```
> - **It tells us to call `tavily`, which is automatically logged aftewards with this printing**:

```python
Calling: {
    'name': 'tavily_search_results_json',
    'args': {'query': 'weather in San Francisco'},
    'id': 'call_eqDDo3U0wuFrWTitkRpMCIw4'
}
```
> - **Then, the action is performed, is logged with the printing of `Back to the model!`, and we get the following `ToolMessage` (I won't parse it now as it doesn't really improves readability), which is the result of calling `tavily` and, hence, the result of the search**:

```python
ToolMessage(
    content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'San Francisco\', \'region\': \'California\', \'country\': \'United States of America\', \'lat\': 37.78, \'lon\': -122.42, \'tz_id\': \'America/Los_Angeles\', \'localtime_epoch\': 1717751204, \'localtime\': \'2024-06-07 2:06\'}, \'current\': {\'last_updated_epoch\': 1717750800, \'last_updated\': \'2024-06-07 02:00\', \'temp_c\': 12.2, \'temp_f\': 54.0, \'is_day\': 0, \'condition\': {\'text\': \'Clear\', \'icon\': \'//cdn.weatherapi.com/weather/64x64/night/113.png\', \'code\': 1000}, \'wind_mph\': 4.3, \'wind_kph\': 6.8, \'wind_degree\': 10, \'wind_dir\': \'N\', \'pressure_mb\': 1011.0, \'pressure_in\': 29.84, \'precip_mm\': 0.0, \'precip_in\': 0.0, \'humidity\': 93, \'cloud\': 0, \'feelslike_c\': 10.7, \'feelslike_f\': 51.2, \'windchill_c\': 9.8, \'windchill_f\': 49.6, \'heatindex_c\': 11.4, \'heatindex_f\': 52.6, \'dewpoint_c\': 9.3, \'dewpoint_f\': 48.8, \'vis_km\': 14.0, \'vis_miles\': 8.0, \'uv\': 1.0, \'gust_mph\': 14.0, \'gust_kph\': 22.5}}"}, {\'url\': \'https://www.weather.gov/index.php/mtr/\', \'content\': \'Current Conditions showing NA; Customize Your Weather.gov. Enter Your City, ST or ZIP Code ... 2024 at 9:40:09 am PDT Watches, Warnings & Advisories. Zoom Out. Excessive Heat Warning. Gale Warning. Heat Advisory. Small Craft Advisory. ... National Weather Service San Francisco Bay Area, CA 21 Grace Hopper Ave, Stop 5 Monterey, CA 93943-5505\'}]', 
    name='tavily_search_results_json', 
    tool_call_id='call_eqDDo3U0wuFrWTitkRpMCIw4'
)
```
> - **Finally, there's an `AIMessage`, which is the result of the LLM, answering our question**:

```python
AIMessage(
    content='The current weather in San Francisco is clear with a temperature of 54.0°F (12.2°C). The wind is blowing at 4.3 mph from the north, and the humidity is at 93%.', 
    response_metadata={
        'token_usage': {'completion_tokens': 46, 'prompt_tokens': 1732, 'total_tokens': 1778}, 
        'model_name': 'gpt-3.5-turbo', 
        'system_fingerprint': None, 
        'finish_reason': 'stop', 
        'logprobs': None
    }, 
    id='run-de9261f7-2481-4194-b1ac-0e9e5cebaf7b-0', 
    usage_metadata={'input_tokens': 1732, 'output_tokens': 46, 'total_tokens': 1778}
)
```

With this `stream` method, we:
- get back all of these intermediate results, 
- have a good visibility of what exactly is going on.

## Follow-Up Question

Let's now call this stream with another message.

So, this is continuing the same conversation that we had before, with asking a follow-up question.

**We don't say anything about the weather, but based on it being a conversation, we would expect it to realize that we're asking about the weather here**.

To mention it's the same conversation, of course, we're passing here **the same `thread_id`**.

In [23]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        rprint(v["messages"])

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in Los Angeles'}, 'id': 'call_sswMVjt7V78OMyoQcW1B4Y0z'}
Back to the model!


And we can notice everything works as expected.

## Comparing Previous Answers

In [24]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        rprint(v["messages"])

## Changing `thread_id`

> **NOTE**
> 
> Here...
> - at best, we expect the model to **underline there's a problem of context**.
> - at worst, we expect it to **hallucinate, doing its best to fulfill the request without a necessary context**.

In [25]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        rprint(v["messages"])

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'average temperature in Miami'}, 'id': 'call_fT3Ga9y9OQdgiaSVbl7thxc2'}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'average temperature in New York'}, 'id': 'call_qUCiBFtUvOJABP7ID4cvppEz'}
Back to the model!


And indeed, we get the worst scenario, with the model doing its best to get two measures to compare, and getting (without us being able to know why...) temperatures in Miami and New York.

> **NOTE**
>
> For comparison purposes, **it would be interesting to see how a more recent model like -4o would perform**...

# Streaming Tokens

In [26]:
try:
    from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
except ImportError as e:
    print(e)

If the previous code doesn't work, it can be fixed with installing [**aiosqlite**](https://pypi.org/project/aiosqlite/), as mentioned in [**AsyncSqliteSaver's documentation**](https://langchain-ai.github.io/langgraph/reference/checkpoints/#asyncsqlitesaver).

In [28]:
memory = AsyncSqliteSaver.from_conn_string(":memory:")

We're gonna use a new `"thread_id"` to start a brand new conversation.

We're also going to be iterating over a different type of event. These events represent updates from the underlying stream.

We want to **look for events that correspond to new tokens**.

These kind of events are called `"on_chat_model_stream"`.

When we see these events happening, we want to get the content, and print it out with a pipe delimiter.

When running this, we should see it streaming real time into the screen.

In [29]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

NotImplementedError: The SqliteSaver does not support async methods. Consider using AsyncSqliteSaver instead.
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
Note: AsyncSqliteSaver requires the aiosqlite package to use.
Install with:
`pip install aiosqlite`
See https://langchain-ai.github.io/langgraph/reference/checkpoints/#asyncsqlitesaverfor more information.