# Persistence and Streaming

In [67]:
import os
import time
from dotenv import load_dotenv
_ = load_dotenv()
api_key = os.getenv("API_KEY")

In [58]:
from contextlib import contextmanager
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.sqlite import SqliteSaver

In [3]:
tool = TavilySearchResults(max_results=2)

Define the agent's state structure, which holds the messages exchanged.


In [8]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [59]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    @contextmanager
    def get_checkpointer(self):
        with SqliteSaver.from_conn_string(self.db_path) as checkpointer:
            yield checkpointer

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [68]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatGoogleGenerativeAI(api_key=api_key, model="gemini-1.5-pro")
# abot = Agent(model, [tool], system=prompt)


# Initialize an in-memory checkpoint saver (SQLite) to store the agent's state. This is used for persistence.
# Using the checkpointer within a context manager (SqliteSaver)

with SqliteSaver.from_conn_string(":memory:") as memory:
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)

    # Initial message
    messages = [HumanMessage(content="What is the weather in sf?")]

    # Define the thread configuration (thread ID for conversation persistence)
    thread = {"configurable": {"thread_id": "1"}}

    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v['messages'])
    time.sleep(30)
    print("\n----\n")

    # Continue conversation
    messages = [HumanMessage(content="What about in la?")]
    thread = {"configurable": {"thread_id": "1"}}
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)
    time.sleep(30)
    print("\n----\n")

    # Another follow-up question with the same thread ID to keep the conversation going.
    messages = [HumanMessage(content="Which one is warmer?")]
    thread = {"configurable": {"thread_id": "1"}}
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)
    time.sleep(30)
    print("\n----\n")


    # If the thread ID is changed, the agent will lose memory of previous interactions.
    messages = [HumanMessage(content="Which one is warmer?")]
    thread = {"configurable": {"thread_id": "2"}}  # New thread ID means no prior context.
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)  # Stream the response, showing how the agent loses context.


[AIMessage(content='', additional_kwargs={'function_call': {'name': 'tavily_search_results_json', 'arguments': '{"query": "weather in san francisco"}'}}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run-b1e07252-d757-43cb-8670-4edd3376bd20-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'weather in san francisco'}, 'id': 'c19b6b33-312e-4acc-8274-ddc949141b44', 'type': 'tool_call'}], usage_metadata={'input_tokens': 147, 'output_tokens': 13, 'total_tokens': 160, 'input_token_details': {'cache_read': 0}})]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in san francisco'}, 'id': 'c19b6b33-312e-4acc-8274-ddc949141b44', 'type': 'tool_call'}
Back to the model!
[ToolMessage(content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'San Francisco\', \'region\': \'California\', \'country\': \'United States of America\', \

Retrying langchain_google_genai.chat_models._chat_with_retry.<locals>._chat_with_retry in 2.0 seconds as it raised ResourceExhausted: 429 Resource has been exhausted (e.g. check quota)..


{'messages': [AIMessage(content='The weather in Los Angeles is sunny with a temperature of 10.6°C (51.1°F). The wind is blowing from the east at 3.8 mph (6.1 kph). The humidity is 38% and there is no cloud cover. The UV index is 0.1.', additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run-60509f96-8b15-4a07-b501-4ddd74808f72-0', usage_metadata={'input_tokens': 1803, 'output_tokens': 70, 'total_tokens': 1873, 'input_token_details': {'cache_read': 0}})]}

----

{'messages': [AIMessage(content='Los Angeles is warmer than San Francisco.  The temperature in Los Angeles is 10.6°C (51.1°F) while in San Francisco it is 9.6°C (49.3°F).', additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run-19d25cef-53fe-4c7f-9d84-a8df3607e122-0', usage_metadata={'input_tokens': 1879, 'output_tokens': 51,

## Streaming tokens

In [71]:
# Using async version of the SQLite checkpointer for token streaming.
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [None]:
async with AsyncSqliteSaver.from_conn_string(":memory:") as memory:

    abot = Agent(model, [tool], system=prompt, checkpointer=memory)

    # Stream tokens asynchronously, printing each token as it is generated.
    messages = [HumanMessage(content="What is the weather in SF?")]
    thread = {"configurable": {"thread_id": "4"}}       # New thread ID for a fresh conversation.
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content    # Extract the generated token content.
            if content:
                # Only print non-empty content,
                # as empty content indicates the model is requesting a tool invocation.
                print(content, end="|")                 # Print token with a pipe delimiter for visual separation.

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in san francisco'}, 'id': '0ed5061d-cc61-400b-91e3-3e8f1ddfcbec', 'type': 'tool_call'}
Back to the model!
The| weather in San Francisco is partly cloudy with a temperature of 12.2|°C (54°F). The wind is blowing from the NNW| at 12.2 km/h.  The humidity is 37%.|